mirror of
https://github.com/ucloud/iotstack-driver-sdk-python.git
synced 2025-04-25 13:48:50 +08:00
496 lines
14 KiB
Python
496 lines
14 KiB
Python
from iotedgedriverlinksdk.nats import (_nat_publish_queue,
|
|
_nat_subscribe_queue, _publish_nats_msg)
|
|
from iotedgedriverlinksdk.exception import (EdgeDriverLinkException,
|
|
EdgeDriverLinkOfflineException,
|
|
EdgeDriverLinkTimeoutException)
|
|
from iotedgedriverlinksdk import _driver_id, getLogger
|
|
from cachetools import TTLCache
|
|
import base64
|
|
import json
|
|
import queue
|
|
import random
|
|
import string
|
|
import threading
|
|
import time
|
|
import re
|
|
|
|
|
|
_logger = getLogger()
|
|
_action_queue_map = {}
|
|
_connect_map = {}
|
|
|
|
_cache = TTLCache(maxsize=10, ttl=30)
|
|
|
|
|
|
def get_edge_online_status():
|
|
global _cache
|
|
is_online = _cache.get('edge_status')
|
|
if is_online:
|
|
return True
|
|
return False
|
|
|
|
|
|
def add_connect_map(key: str, value):
|
|
global _connect_map
|
|
_connect_map[key] = value
|
|
|
|
|
|
def del_connect_map(key: str):
|
|
global _connect_map
|
|
_connect_map.pop(key)
|
|
|
|
|
|
def _generate_request_id():
|
|
return ''.join(random.sample(string.ascii_letters + string.digits, 16)).lower()
|
|
|
|
|
|
class _device_notify(object):
|
|
def __init__(self, callback):
|
|
self.callback = callback
|
|
|
|
def run(self, message):
|
|
self.callback(message)
|
|
|
|
|
|
_on_topo_change_callback = None
|
|
|
|
|
|
def set_on_topo_change_callback(callback):
|
|
global _on_topo_change_callback
|
|
_on_topo_change_callback = _device_notify(callback)
|
|
|
|
|
|
_on_status_change_callback = None
|
|
|
|
|
|
def set_on_status_change_callback(callback):
|
|
global _on_status_change_callback
|
|
_on_status_change_callback = _device_notify(callback)
|
|
|
|
|
|
def get_topo(timeout=5):
|
|
global _action_queue_map
|
|
request_id = _generate_request_id()
|
|
topic = '/$system/%s/%s/subdev/topo/get' % (
|
|
_generate_request_id(), "123456")
|
|
|
|
get_topo = {
|
|
'RequestID': request_id,
|
|
"Params": []
|
|
}
|
|
try:
|
|
data = json.dumps(get_topo)
|
|
_publish(topic=topic, payload=data.encode('utf-8'),
|
|
is_cached=False, duration=0)
|
|
q = queue.Queue()
|
|
_action_queue_map[request_id] = q
|
|
|
|
msg = q.get(timeout=5)
|
|
if msg['RetCode'] != 0:
|
|
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
|
|
|
|
return msg
|
|
|
|
except queue.Empty:
|
|
raise EdgeDriverLinkTimeoutException
|
|
except EdgeDriverLinkException as e:
|
|
raise e
|
|
except Exception as e:
|
|
raise e
|
|
finally:
|
|
_action_queue_map.pop(request_id)
|
|
|
|
|
|
def add_topo(product_sn, device_sn, timeout=5):
|
|
global _action_queue_map
|
|
if get_edge_online_status():
|
|
request_id = _generate_request_id()
|
|
topic = '/$system/%s/%s/subdev/topo/add' % (
|
|
product_sn, device_sn)
|
|
|
|
add_topo = {
|
|
'RequestID': request_id,
|
|
"Params": [
|
|
{
|
|
'ProductSN': product_sn,
|
|
'DeviceSN': device_sn
|
|
}
|
|
]
|
|
}
|
|
try:
|
|
data = json.dumps(add_topo)
|
|
_publish(topic=topic, payload=data.encode('utf-8'),
|
|
is_cached=False, duration=0)
|
|
q = queue.Queue()
|
|
_action_queue_map[request_id] = q
|
|
|
|
msg = q.get(timeout=5)
|
|
|
|
_action_queue_map.pop(request_id)
|
|
if msg['RetCode'] != 0:
|
|
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
|
|
|
|
except queue.Empty:
|
|
raise EdgeDriverLinkTimeoutException
|
|
except EdgeDriverLinkException as e:
|
|
raise e
|
|
except Exception as e:
|
|
raise e
|
|
else:
|
|
raise EdgeDriverLinkOfflineException
|
|
|
|
|
|
def delete_topo(product_sn, device_sn, timeout=5):
|
|
global _action_queue_map
|
|
if get_edge_online_status():
|
|
request_id = _generate_request_id()
|
|
topic = '/$system/%s/%s/subdev/topo/delete' % (
|
|
product_sn, device_sn)
|
|
|
|
delete_topo = {
|
|
'RequestID': request_id,
|
|
"Params": [
|
|
{
|
|
'ProductSN': product_sn,
|
|
'DeviceSN': device_sn
|
|
}
|
|
]
|
|
}
|
|
|
|
try:
|
|
data = json.dumps(delete_topo)
|
|
_publish(topic=topic, payload=data.encode('utf-8'),
|
|
is_cached=False, duration=0)
|
|
q = queue.Queue()
|
|
_action_queue_map[request_id] = q
|
|
|
|
msg = q.get(timeout=5)
|
|
|
|
_action_queue_map.pop(request_id)
|
|
if msg['RetCode'] != 0:
|
|
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
|
|
|
|
except queue.Empty:
|
|
raise EdgeDriverLinkTimeoutException
|
|
except EdgeDriverLinkException as e:
|
|
raise e
|
|
except Exception as e:
|
|
raise e
|
|
else:
|
|
raise EdgeDriverLinkOfflineException
|
|
|
|
|
|
def register_device(product_sn, device_sn, product_secret, timeout=5):
|
|
global _action_queue_map
|
|
if get_edge_online_status():
|
|
request_id = _generate_request_id()
|
|
register_data = {
|
|
'RequestID': request_id,
|
|
"Params": [
|
|
{
|
|
'ProductSN': product_sn,
|
|
'DeviceSN': device_sn,
|
|
'ProductSecret': product_secret
|
|
}
|
|
]
|
|
}
|
|
topic = '/$system/%s/%s/subdev/register' % (
|
|
product_sn, device_sn)
|
|
|
|
try:
|
|
data = json.dumps(register_data)
|
|
_publish(topic=topic, payload=data.encode('utf-8'),
|
|
is_cached=False, duration=0)
|
|
q = queue.Queue()
|
|
_action_queue_map[request_id] = q
|
|
|
|
msg = q.get(timeout=timeout)
|
|
_action_queue_map.pop(request_id)
|
|
if msg['RetCode'] != 0:
|
|
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
|
|
|
|
except queue.Empty:
|
|
raise EdgeDriverLinkTimeoutException
|
|
except EdgeDriverLinkException as e:
|
|
raise e
|
|
except Exception as e:
|
|
raise e
|
|
else:
|
|
raise EdgeDriverLinkOfflineException
|
|
|
|
|
|
def device_login_async(product_sn, device_sn):
|
|
request_id = _generate_request_id()
|
|
topic = '/$system/%s/%s/subdev/login' % (
|
|
product_sn, device_sn)
|
|
|
|
device_login_msg = {
|
|
'RequestID': request_id,
|
|
"Params": [
|
|
{
|
|
'ProductSN': product_sn,
|
|
'DeviceSN': device_sn
|
|
}
|
|
]
|
|
}
|
|
data = json.dumps(device_login_msg)
|
|
_publish(topic=topic, payload=data.encode('utf-8'),
|
|
is_cached=False, duration=0)
|
|
|
|
|
|
def device_logout_async(product_sn, device_sn):
|
|
request_id = _generate_request_id()
|
|
topic = '/$system/%s/%s/subdev/logout' % (
|
|
product_sn, device_sn)
|
|
|
|
device_logout_msg = {
|
|
'RequestID': request_id,
|
|
"Params": [
|
|
{
|
|
'ProductSN': product_sn,
|
|
'DeviceSN': device_sn
|
|
}
|
|
]
|
|
}
|
|
data = json.dumps(device_logout_msg)
|
|
_publish(topic=topic, payload=data.encode('utf-8'),
|
|
is_cached=False, duration=0)
|
|
|
|
|
|
def device_login_sync(product_sn, device_sn, timeout=5):
|
|
global _action_queue_map
|
|
request_id = _generate_request_id()
|
|
topic = '/$system/%s/%s/subdev/login' % (
|
|
product_sn, device_sn)
|
|
|
|
device_login_msg = {
|
|
'RequestID': request_id,
|
|
"Params": [
|
|
{
|
|
'ProductSN': product_sn,
|
|
'DeviceSN': device_sn
|
|
}
|
|
]
|
|
}
|
|
|
|
try:
|
|
data = json.dumps(device_login_msg)
|
|
_publish(topic=topic, payload=data.encode('utf-8'),
|
|
is_cached=False, duration=0)
|
|
q = queue.Queue()
|
|
_action_queue_map[request_id] = q
|
|
|
|
msg = q.get(timeout=timeout)
|
|
_action_queue_map.pop(request_id)
|
|
if msg['RetCode'] != 0:
|
|
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
|
|
|
|
except queue.Empty:
|
|
raise EdgeDriverLinkTimeoutException
|
|
except EdgeDriverLinkException as e:
|
|
raise e
|
|
except Exception as e:
|
|
raise e
|
|
|
|
|
|
def device_logout_sync(product_sn, device_sn, timeout=5):
|
|
global _action_queue_map
|
|
request_id = _generate_request_id()
|
|
topic = '/$system/%s/%s/subdev/logout' % (
|
|
product_sn, device_sn)
|
|
|
|
device_logout_msg = {
|
|
'RequestID': request_id,
|
|
"Params": [
|
|
{
|
|
'ProductSN': product_sn,
|
|
'DeviceSN': device_sn
|
|
}
|
|
]
|
|
}
|
|
|
|
try:
|
|
data = json.dumps(device_logout_msg)
|
|
_publish(topic=topic, payload=data.encode('utf-8'),
|
|
is_cached=False, duration=0)
|
|
q = queue.Queue()
|
|
_action_queue_map[request_id] = q
|
|
|
|
msg = q.get(timeout=timeout)
|
|
_action_queue_map.pop(request_id)
|
|
if msg['RetCode'] != 0:
|
|
raise EdgeDriverLinkException(msg['RetCode'], msg['Message'])
|
|
|
|
except queue.Empty:
|
|
raise EdgeDriverLinkTimeoutException
|
|
except EdgeDriverLinkException as e:
|
|
raise e
|
|
except Exception as e:
|
|
raise e
|
|
|
|
|
|
def send_message(topic: str, payload: bytes, is_cached=False, duration=0):
|
|
_publish(topic=topic, payload=payload,
|
|
is_cached=is_cached, duration=duration)
|
|
|
|
|
|
def _on_broadcast_message(message):
|
|
global _on_topo_change_callback
|
|
global _on_status_change_callback
|
|
global _action_queue_map
|
|
_logger.debug("recv message:{} " .format(str(message)))
|
|
try:
|
|
js = json.loads(message)
|
|
topic = js['topic']
|
|
|
|
data = str(base64.b64decode(js['payload']), "utf-8")
|
|
# _logger.debug("broadcast message payload: " + data)
|
|
|
|
msg = json.loads(data)
|
|
|
|
if isinstance(topic, str) and topic.startswith("/$system/"):
|
|
# on topo change callback
|
|
if topic.endswith("/subdev/topo/notify/add"):
|
|
if _on_topo_change_callback:
|
|
msg['operation'] = 'add'
|
|
_on_topo_change_callback.run(msg)
|
|
|
|
elif topic.endswith("/subdev/topo/notify/delete"):
|
|
if _on_topo_change_callback:
|
|
msg['operation'] = 'delete'
|
|
_on_topo_change_callback.run(msg)
|
|
|
|
elif topic.endswith('/subdev/enable'):
|
|
if _on_status_change_callback:
|
|
msg['operation'] = 'enable'
|
|
_on_status_change_callback.run(msg)
|
|
|
|
elif topic.endswith('/subdev/disable'):
|
|
if _on_status_change_callback:
|
|
msg['operation'] = 'disable'
|
|
_on_status_change_callback.run(msg)
|
|
|
|
else:
|
|
request_id = msg['RequestID']
|
|
if request_id in _action_queue_map:
|
|
q = _action_queue_map[request_id]
|
|
q.put(msg)
|
|
else:
|
|
_logger.debug('unknown message topic:{}'.format(topic))
|
|
return
|
|
|
|
except Exception as e:
|
|
_logger.error(e)
|
|
|
|
|
|
def _on_message(message):
|
|
global _connect_map
|
|
_logger.debug("recv message: {}".format(str(message)))
|
|
try:
|
|
js = json.loads(message)
|
|
identify = js['productSN'] + \
|
|
'.'+js['deviceSN']
|
|
|
|
topic = js['topic']
|
|
msg = base64.b64decode(js['payload'])
|
|
# _logger.debug("normal message payload: {}".format(str(msg, 'utf-8')))
|
|
if identify in _connect_map:
|
|
sub_dev = _connect_map[identify]
|
|
if isinstance(topic, str) and topic.startswith("/$system/") and topic.find("/rrpc/request/") > 0:
|
|
if sub_dev.rrpc:
|
|
_logger.debug("rrpc request: {}".format(str(message)))
|
|
resp = sub_dev.rrpc(topic, msg)
|
|
if resp:
|
|
_logger.debug("rrpc response: {}".format(str(resp)))
|
|
topic = topic.replace(
|
|
"/rrpc/request/", "/rrpc/response/", 1)
|
|
sub_dev.publish(topic, resp)
|
|
return
|
|
|
|
if sub_dev.callback:
|
|
sub_dev.callback(topic, msg)
|
|
else:
|
|
_logger.error('unknown message topic:{}'.format(topic))
|
|
return
|
|
|
|
except Exception as e:
|
|
_logger.error(e)
|
|
|
|
|
|
def _publish(topic: str, payload: bytes, is_cached=False, duration=0):
|
|
try:
|
|
payload_encode = base64.b64encode(payload)
|
|
data = {
|
|
'src': 'local',
|
|
'topic': topic,
|
|
'isCatched': is_cached,
|
|
'duration': duration,
|
|
'payload': str(payload_encode, 'utf-8')
|
|
}
|
|
_publish_nats_msg(data)
|
|
except Exception as e:
|
|
_logger.error(e)
|
|
raise e
|
|
|
|
|
|
def _set_edge_status():
|
|
global _cache
|
|
_cache['edge_status'] = True
|
|
|
|
|
|
def init_subscribe_handler():
|
|
while True:
|
|
msg = _nat_subscribe_queue.get()
|
|
# _logger.debug(msg)
|
|
subject = msg.subject
|
|
data = msg.data.decode()
|
|
|
|
if subject == "edge.local.broadcast":
|
|
_on_broadcast_message(data)
|
|
elif subject == "edge.state.reply":
|
|
_set_edge_status()
|
|
else:
|
|
_on_message(data)
|
|
|
|
|
|
def _get_device_list():
|
|
global _connect_map
|
|
result = []
|
|
for v in _connect_map.values():
|
|
result.append(v.get_device_info())
|
|
|
|
return result
|
|
|
|
|
|
def fetch_online_status():
|
|
min_retry_timeout = 1
|
|
max_retry_timeout = 15
|
|
retry_timeout = min_retry_timeout
|
|
while True:
|
|
device_list = _get_device_list()
|
|
data = {
|
|
'payload': {
|
|
'driverID': _driver_id,
|
|
'devices': device_list
|
|
},
|
|
'subject': 'edge.state.req'
|
|
}
|
|
|
|
_nat_publish_queue.put(data)
|
|
|
|
if get_edge_online_status():
|
|
time.sleep(max_retry_timeout)
|
|
else:
|
|
if retry_timeout < max_retry_timeout:
|
|
retry_timeout = retry_timeout + 1
|
|
time.sleep(retry_timeout)
|
|
|
|
|
|
_t_sub = threading.Thread(target=init_subscribe_handler)
|
|
_t_sub.setDaemon(True)
|
|
_t_sub.start()
|
|
|
|
_t_online = threading.Thread(target=fetch_online_status)
|
|
_t_online.setDaemon(True)
|
|
_t_online.start()
|