commit e44e3835b1baa0cd81214d743be242c055020b9a Author: ethan.du Date: Mon Dec 7 10:42:06 2020 +0800 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d583f5d --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +build +dist +MANIFEST +uiotedge_driver_link_sdk.egg-info +.pypirc \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..84c1469 --- /dev/null +++ b/README.md @@ -0,0 +1,230 @@ +## API参考文档 + +主要的API参考文档如下: + +#### from uiotedgedriverlinksdk +* **[getLogger()](#getLogger)** + +--- + +#### from uiotedgedriverlinksdk.client +* **[getConfig()](#getConfig)** +* **[Config()](#Config)** +* Config#**[getDeviceInfos()](#getDeviceInfos)** +* Config#**[getDriverInfo()](#getDriverInfo)** + + +--- + +#### from uiotedgedriverlinksdk.exception +* BaseEdgeException +* EdgeDriverLinkException +* EdgeDriverLinkTimeoutException +* EdgeDriverLinkDeviceConfigException +* EdgeDriverLinkDeviceOfflineException +* EdgeDriverLinkOfflineException +* EdgeDriverLinkDeviceProductSecretException + + +--- +#### from uiotedgedriverlinksdk.client + +* **[SubDevice()](#subdevice)** +* SubDevice#**[set_product_sn()](#set_product_sn)** +* SubDevice#**[set_device_sn()](#set_device_sn)** +* SubDevice#**[set_product_secret()](#set_product_secret)** +* SubDevice#**[set_msg_callback()](#set_msg_callback)** +* SubDevice#**[login()](#login)** +* SubDevice#**[logout()](#logout)** +* SubDevice#**[publish()](#publish)** +* SubDevice#**[registerDevice()](#registerDevice)** + +--- + +#### from uiotedgedriverlinksdk.edge + +* **[register_device()](#register)** +* **[set_on_topo_change_callback()](#set_on_topo_change_callback)** +* **[set_on_status_change_callback()](#set_on_status_change_callback)** +* **[get_topo()](#get_topo)** +* **[add_topo()](#add_topo)** +* **[delete_topo()](#delete_topo)** + + +--- +* **[get_edge_online_status()](#get_edge_online_status)** + +--- + +### getLogger() +返回驱动内置logger。 + +--- + +### getConfig() +返回驱动相关配置。 + +--- + +### Config() +基于当前驱动配置字符串构造新的Config对象。 + +--- + +### Config. getDeviceInfos() +返回所有设备相关信息,返回DeviceInfo`List` +DeviceInfo包括如下信息: + +* productSN `str `: 官网申请的productKey。 +* deviceSN `str `: 设备名 +* config`dict `:设备自定义配置 + +--- + +### Config. getDriverInfo() +返回驱动相关信息,返回DriverInfo`List` + +--- + +### SubDevice(product_sn,device_sn, on_msg_callback) +设备接入客户端类, 用户主要通过它上下线设备和主动上报消息 + +* product_sn`str`: 云端分配的ProductSN +* device_sn`str`: 云端分配的DeviceSN +* on_msg_callback`func(topic:str, msg:b'')`: 云端下发消息回调,消息类型 []byte, 例如:` def callbacl(topic:str, msg:b''): print(str(msg,'utf-8')` + +--- + +### SubDevice.set_product_sn(product_sn) +设置子设备的productSN + +* product_sn`str`: 云端分配的ProductSN + + +--- + +### SubDevice.set_device_sn(device_sn) +设备子设备的DeviceSN + +* device_sn`str`: 云端分配的DeviceSN + + +--- + +### SubDevice.set_product_secret(product_secret) +设置子设备的Product Secret + +* product_secret`str`: 云端分配的Product Secret + + +--- + +### SubDevice.set_product_secret(product_secret) +设置子设备的Product Secret + +* product_secret`str`: 云端分配的Product Secret + + +--- + +### SubDevice.set_msg_callback(msg_callback) +设置子设备的接收消息的回调函数 + +* set_msg_callback`func`: 子设备收消息回调,例如:` def callbacl(topic:str, msg:b''): print(str(msg,'utf-8')` + + +--- + +### SubDevice.login(sync=False, timeout=5) +上报上线事件到Link IoT Edge + +* sync`bool`: 是否异步登陆 +* timeout`int`: 等待响应超时时间,单位s(秒) + +--- + +### SubDevice.logout(sync=False, timeout=5) +上报下线事件到Link IoT Edge + +* sync`bool`: 是否异步登陆 +* timeout`int`: 等待响应超时时间,单位s(秒) + +--- + +### SubDevice.publish(topic, payload) +上报消息到Link IoT Edge。异步 + +* topic`str`: 上报消息到Link IoT Edge的mqtt topic。 +* payload`[]byte`: 上报消息到Link IoT Edge的消息内容 + + +--- + + +### SubDevice.registerDevice(timeout) +动态注册一个设备到Link IoT Edge。同步 + +* timeout`int`: 等待响应超时时间,单位s(秒) +--- + + +--- + +### register_device(product_sn, device_sn, product_secret, timeout=5) +上报动态注册到Link IoT Edge。同步执行,等待响应 + +* product_sn`str`: 云端分配的ProductSN +* device_sn`str`: 自定义的DeviceSN +* product_secret`str`: ProductSecret +* timeout`int`: 等待响应超时时间,单位s(秒) + +--- + +### set_on_topo_change_callback(callback) +云端topo信息变化的下发消息的回调函数 + +* callback`func`: 云端下发消息回调,消息类型 {}, 例如: `def callback(msg): print(msg)` + + +--- + +### set_on_status_change_callback(callback) +云端设备启用和禁用信息变化的下发消息的回调函数 + +* callback`func`: 云端下发消息回调,消息类型 {}, 例如: `def callback(msg): print(msg)` + + +--- + +### get_topo(timeout=5) +上报get topo信息到Link IoT Edge。同步执行,等待响应 + +* timeout`int`: 等待响应超时时间,单位s(秒) + +--- + +### add_topo(product_sn, device_sn, timeout=5) +上报add topo信息到Link IoT Edge + +* product_sn`str`: 云端分配的ProductSN +* device_sn`str`: 云端分配的DeviceSN +* timeout`int`: 等待响应超时时间,单位s(秒) + +--- + +### delete_topo(product_sn, device_sn, timeout=5) +上报delete topo信息到Link IoT Edge + +* product_sn`str`: 云端分配的ProductSN +* device_sn`str`: 云端分配的DeviceSN +* timeout`int`: 等待响应超时时间,单位s(秒) + + +--- + + +### get_edge_online_status() +获取网关的在线状态,返回True / False + +--- + diff --git a/examples/demo/demo.zip b/examples/demo/demo.zip new file mode 100644 index 0000000..c88d793 Binary files /dev/null and b/examples/demo/demo.zip differ diff --git a/examples/demo/index.py b/examples/demo/index.py new file mode 100644 index 0000000..996cd72 --- /dev/null +++ b/examples/demo/index.py @@ -0,0 +1,88 @@ +import json +import logging +import time + +from uiotedgedriverlinksdk import getLogger +from uiotedgedriverlinksdk.client import Config, SubDevice +from uiotedgedriverlinksdk.exception import BaseEdgeException + +# 配置log +log = getLogger() +log.setLevel(logging.DEBUG) + +# 主函数 +if __name__ == "__main__": + try: + # 获取驱动及子设备配置信息 + driverConfig = Config().getDriverInfo() + log.info('driver config:{}'.format(driverConfig)) + + # 从驱动配置获取设备数据上报周期 + uploadPeriod = 5 + if "period" in driverConfig.keys() and isinstance(driverConfig['period'], int): + uploadPeriod = int(driverConfig['period']) + + deviceInfoList = Config().getDeviceInfos() + log.info('device list config:{}'.format(deviceInfoList)) + except Exception as e: + log.error('load driver config error: {}'.format(str(e))) + exit(1) + + try: + # 判断是否绑定子设备 + if len(deviceInfoList) < 1: + log.error( + 'subdevice null, please bind sub device for driver') + while True: + time.sleep(60) + + # 取其中一个子设备 + subDeviceInfo = deviceInfoList[0] + + # 获取子设备的ProductSN ,key值为 ‘productSN’ + productSN = subDeviceInfo['productSN'] + # 获取子设备的DeviceSN ,key值为 ‘deviceSN’ + deviceSN = subDeviceInfo['deviceSN'] + + def callback(topic: str, payload: b''): + log.info("recv message from {} : {}".format(topic, str(payload))) + + # 初始化一个子设备对象 + subDevice = SubDevice(product_sn=productSN, + device_sn=deviceSN, on_msg_callback=callback) + # 子设备上线 + subDevice.login() + + # 获取当前子设备的配置 + deviceConfig = subDeviceInfo['config'] + log.info('sub device config:{}'.format(deviceConfig)) + + # 从子设备配置获取子设备上报topic定义 + topic = "/{}/{}/upload".format(productSN, deviceSN) # 此处为默认topic + if 'topic' in deviceConfig and isinstance(deviceConfig['topic'], str): + topic = deviceConfig['topic'].format(productSN, deviceSN) + + # 从子设备配置获取子设备上报参数名称 + param = 'RelayStatus' # 此处定义默认属性名称: RelayStatus + if 'paramName' in deviceConfig and isinstance(deviceConfig['paramName'], str): + param = deviceConfig['paramName'] + + i = 0 + while True: + relayStatus = ("on", "off")[i % 2 == 0] + payload = { + "timestamp": time.time(), + param: relayStatus + } + byts = json.dumps(payload).encode('utf-8') + + subDevice.publish(topic, byts) + log.info("upload {} : {}".format(topic, str(byts))) + + time.sleep(uploadPeriod) + i = i+1 + + except BaseEdgeException: + log.error('Edge Exception: {}'.format(str(e))) + except Exception as e: + log.error('Exception error: {}'.format(str(e))) diff --git a/examples/demo/pack.sh b/examples/demo/pack.sh new file mode 100644 index 0000000..0d6450d --- /dev/null +++ b/examples/demo/pack.sh @@ -0,0 +1,13 @@ +#!/bin/bash -e +if [ -e "./demo.zip" ] ; then + rm ./demo.zip +fi + +mkdir tmp +cp index.py tmp/ +cd tmp +pip3 install -t . uiotedge_driver_link_sdk==0.0.41 #打包驱动SDK +zip -r demo.zip . +cd .. +cp tmp/demo.zip . +rm -rf tmp diff --git a/examples/ws/ws/index.py b/examples/ws/ws/index.py new file mode 100644 index 0000000..41c14c8 --- /dev/null +++ b/examples/ws/ws/index.py @@ -0,0 +1,156 @@ +import datetime +import json +import logging +import signal +import sys + +import tornado.httpserver +import tornado.ioloop +import tornado.web +from tornado.websocket import WebSocketHandler + +from uiotedgedriverlinksdk import getLogger +from uiotedgedriverlinksdk.client import Config, SubDevice, getConfig +from uiotedgedriverlinksdk.edge import (add_topo, delete_topo, get_topo, + register_device, + set_on_status_change_callback, + set_on_topo_change_callback) +from uiotedgedriverlinksdk.exception import ( + BaseEdgeException, EdgeDriverLinkDeviceOfflineException, + EdgeDriverLinkException, EdgeDriverLinkOfflineException, + EdgeDriverLinkTimeoutException) + +log = getLogger() +log.setLevel(logging.DEBUG) + + +class WebSocketSever(WebSocketHandler): + def check_origin(self, origin): + return True + + def set_default_headers(self): + self.set_header("Access-Control-Allow-Origin", "*") # 这个地方可以写域名 + self.set_header("Access-Control-Allow-Headers", "*") + self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS') + + def initialize(self, loop): + self.loop = loop + self.client_id = '' + self.product_sn = '' + self.device_sn = '' + self.client = SubDevice() + + def on_message_callback(self, topic, msg): + self.write_message(str(msg)) + + def open(self): + try: + product_sn = self.get_argument('product_sn') + device_sn = self.get_argument('device_sn') + except tornado.web.MissingArgumentError as e: + self.close(reason=str(e)) + else: + log.info("websocket connect from: {}.{}".format( + product_sn, device_sn)) + self.product_sn = product_sn + self.device_sn = device_sn + self.client_id = product_sn+'.'+device_sn + + try: + self.client.set_product_sn(product_sn) + self.client.set_device_sn(device_sn) + self.client.login() + + self.client.set_msg_callback(self.on_message_callback) + + self.write_message('login success') + + except Exception as e: + self.close(reason=str(e)) + + def on_message(self, message): + try: + if self.client_id == '' or self.client is None: + self.client.logout() + self.close(reason='unknown client identify') + return + + data = json.loads(message) + log.info("websocket [{}] from:{}".format( + data, self.client_id)) + + if 'action' in data: + action = data['action'] + if action == 'add_topo': + add_topo(self.product_sn, self.device_sn) + elif action == 'delete_topo': + delete_topo(self.product_sn, self.device_sn) + elif action == "logout": + self.client.logout() + self.close(reason='client exit') + return + elif action == 'get_topo': + topo = get_topo() + self.write_message(topo) + + elif 'topic' in data and 'payload' in data: + payload = data['payload'] + if isinstance(payload, dict): + byts = json.dumps(payload) + log.info('send time:{}'.format( + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))) + self.client.publish( + topic=data['topic'], payload=byts.encode('utf-8')) + log.info('send time:{}'.format( + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))) + elif isinstance(payload, str): + byts = payload.encode('utf-8') + log.info('send time:{}'.format( + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))) + self.client.publish( + topic=data['topic'], payload=byts) + + else: + print('unknown message') + except Exception as e: + self.client.logout() + self.close(reason=str(e)) + + def allow_draft76(self): + return True + + def on_close(self): + self.client.logout() + log.info("websocket closed from:{}, with reason: {}".format( + self.client_id, self.close_reason)) + + +class Application(tornado.web.Application): + def __init__(self, handlers, setting): + super(Application, self).__init__(handlers, **setting) + + +def main(): + from tornado.platform.asyncio import AnyThreadEventLoopPolicy + import asyncio + + asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) + lo = tornado.ioloop.IOLoop.current() + handlers = [ + (r"/ws", WebSocketSever, dict(loop=lo)) + ] + setting = dict(xsrf_cookies=False) + app = Application(handlers, setting) + app.listen(port=4567) + print("websocket start listen port on:{}".format('4567')) + lo.start() + + +def exit_handler(signum, frame): + sys.exit(0) + + +if __name__ == '__main__': + signal.signal(signal.SIGINT, exit_handler) + signal.signal(signal.SIGTERM, exit_handler) + main() diff --git a/examples/ws/ws/pack.sh b/examples/ws/ws/pack.sh new file mode 100644 index 0000000..4273724 --- /dev/null +++ b/examples/ws/ws/pack.sh @@ -0,0 +1,18 @@ +#!/bin/bash -e + +if [ -e "./ws.zip" ] ; then + rm ./ws.zip +fi + +mkdir tmp +cp index.py tmp + +cd tmp +pip3 install -t . uiotedge_driver_link_sdk==0.0.41 #打包驱动SDK +pip3 install -t . tornado -i https://mirrors.aliyun.com/pypi/simple/ #打包自己的依赖 +zip -r ws.zip . + +cd .. +cp tmp/ws.zip . +rm -rf tmp + diff --git a/examples/ws/ws/ws.zip b/examples/ws/ws/ws.zip new file mode 100644 index 0000000..b5ebe6b Binary files /dev/null and b/examples/ws/ws/ws.zip differ diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..328f5b1 --- /dev/null +++ b/run.sh @@ -0,0 +1,11 @@ +#!/bin/bash -e +python3 setup.py check +python3 setup.py build +sudo python3 setup.py install --force +sudo python3 setup.py sdist bdist_wheel || true +sudo python3 -m twine upload dist/*.whl + +sudo rm -rf build +sudo rm -rf dist +sudo rm -rf uiotedge_driver_link_sdk.egg-info + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..9dde3e7 --- /dev/null +++ b/setup.py @@ -0,0 +1,23 @@ +import sys + +from setuptools import setup + +if not (sys.version_info[0] == 3): + sys.exit("Link IoT Edge only support Python 3") + +setup( + name='uiotedge_driver_link_sdk', + version='0.0.41', + author='ucloud.cn', + url='https://pypi.org/project/uiotedge_driver_link_sdk/', + author_email='joy.zhou@ucloud.cn', + packages=['uiotedgedriverlinksdk'], + platforms="any", + license='Apache 2 License', + install_requires=[ + "asyncio-nats-client>=0.10.0", + "cachetools>=4.0.0" + ], + description="UIoT Edge Driver Link SDK", + long_description="UIoT Edge Driver Link SDK\n https://www.ucloud.cn/site/product/uiot.html" +) diff --git a/uiotedgedriverlinksdk/__init__.py b/uiotedgedriverlinksdk/__init__.py new file mode 100644 index 0000000..d01a2cc --- /dev/null +++ b/uiotedgedriverlinksdk/__init__.py @@ -0,0 +1,154 @@ +import asyncio +import json +import logging +import os +import queue +import sys +import threading +import time + +from nats.aio.client import Client as NATS +from nats.aio.errors import NatsError + +_driver_id = '' +_driver_name = '' +_deviceInfos = [] +_driverInfo = None +_driver_name = '' + +# get Config +_config_path = './etc/uiotedge/config.json' +with open(_config_path, 'r') as load_f: + try: + load_dict = json.load(load_f) + print(str(load_dict)) + # print('----config: {} -------'.format(load_dict)) + + _driver_id = load_dict['driverID'] + + if 'driverName' in load_dict.keys(): + _driver_name = load_dict['driverName'] + else: + _driver_name = _driver_id + + if 'deviceList' in load_dict.keys(): + _deviceInfos = load_dict['deviceList'] + + if 'driverInfo' in load_dict.keys(): + _driverInfo = load_dict['driverInfo'] + except Exception as e: + print('load config file error:{}'.format(e)) + sys.exit(1) + +print("driver_id: {}, driver name:{}".format(_driver_id, _driver_name)) + + +class _Logger(object): + def __init__(self, name): + self.url = os.environ.get( + 'UIOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222' + self.nc = NATS() + self.loop = asyncio.new_event_loop() + self.queue = queue.Queue() + self.name = name + self.logger = logging.getLogger() + format_str = logging.Formatter( + '%(asctime)s - %(levelname)s: %(message)s') + sh = logging.StreamHandler() + sh.setFormatter(format_str) + self.logger.addHandler(sh) + + async def _publish(self): + try: + await self.nc.connect(servers=[self.url], loop=self.loop) + logging.debug('nats for logger connect success') + except Exception as e1: + logging.error(e1) + sys.exit(1) + + while True: + try: + msg = self.queue.get() + bty = json.dumps(msg) + await self.nc.publish(subject='edge.log.upload', + payload=bty.encode('utf-8')) + await self.nc.flush() + except NatsError as e: + logging.error(e) + except Exception as e: + logging.error(e) + + def start(self): + self.loop.run_until_complete(self._publish()) + + def debug(self, msg): + data = { + 'module': self.name, + 'level': 'debug', + 'message': msg, + 'timestamp': int(time.time()), + } + self.queue.put(data) + self.logger.debug(msg) + + def info(self, msg): + data = { + 'module': self.name, + 'level': 'info', + 'message': msg, + 'timestamp': int(time.time()), + } + self.queue.put(data) + self.logger.info(msg) + + def error(self, msg): + data = { + 'module': self.name, + 'level': 'error', + 'message': msg, + 'timestamp': int(time.time()), + } + self.queue.put(data) + self.logger.error(msg) + + def warn(self, msg): + data = { + 'module': self.name, + 'level': 'warn', + 'message': msg, + 'timestamp': int(time.time()), + } + self.queue.put(data) + self.logger.warn(msg) + + def critical(self, msg): + data = { + 'module': self.name, + 'level': 'critical', + 'message': msg, + 'timestamp': int(time.time()), + } + self.queue.put(data) + self.logger.critical(msg) + + def setLevel(self, level): + self.logger.setLevel(level) + + +_uiotedge_logger = _Logger(_driver_name) + + +def _init_logger(): + global _uiotedge_logger + _uiotedge_logger.start() + logging.debug('init logger success') + + +_t_logger = threading.Thread(target=_init_logger) +_t_logger.setDaemon(True) +_t_logger.start() + + +def getLogger(): + global _uiotedge_logger + return _uiotedge_logger diff --git a/uiotedgedriverlinksdk/client.py b/uiotedgedriverlinksdk/client.py new file mode 100644 index 0000000..4817c6f --- /dev/null +++ b/uiotedgedriverlinksdk/client.py @@ -0,0 +1,105 @@ +import json + +from uiotedgedriverlinksdk import _deviceInfos, _driverInfo +from uiotedgedriverlinksdk.edge import (add_connect_map, del_connect_map, + device_login_async, device_login_sync, + device_logout_async, + device_logout_sync, register_device, + send_message) +from uiotedgedriverlinksdk.exception import ( + EdgeDriverLinkDeviceConfigException, EdgeDriverLinkDeviceOfflineException, + EdgeDriverLinkDeviceProductSecretException) + + +class SubDevice(object): + def __init__(self, product_sn: str = '', device_sn: str = '', on_msg_callback=None): + self.device_sn = device_sn + self.product_sn = product_sn + self.product_secret = '' + self.callback = on_msg_callback + self.online = False + if self.product_sn != '' and self.device_sn != '': + self._identity = self.product_sn+'.'+self.device_sn + + def set_product_sn(self, product_sn: str): + self.product_sn = product_sn + if self.product_sn != '' and self.device_sn != '': + self._identity = self.product_sn+'.'+self.device_sn + + def set_device_sn(self, device_sn: str): + self.device_sn = device_sn + if self.product_sn != '' and self.device_sn != '': + self._identity = self.product_sn+'.'+self.device_sn + + def set_product_secret(self, product_secret: str): + self.product_secret = product_secret + + def set_msg_callback(self, msg_callback): + self.callback = msg_callback + + def get_device_info(self): + return { + "productSN": self.product_sn, + "deviceSN": self.device_sn + } + + def registerDevice(self, timeout=5): + if self.product_sn == '' or self.device_sn == '' or self.product_secret == '': + raise EdgeDriverLinkDeviceProductSecretException + + register_device(self.product_sn, self.device_sn, + self.product_secret, timeout=timeout) + + def logout(self, sync=False, timeout=5): + if self.online: + if sync: + device_logout_sync(product_sn=self.product_sn, + device_sn=self.device_sn, + timeout=timeout) + else: + device_logout_async(product_sn=self.product_sn, + device_sn=self.device_sn) + self.online = False + del_connect_map(self._identity) + + def login(self, sync=False, timeout=5): + if self._identity == '': + raise EdgeDriverLinkDeviceConfigException + + add_connect_map(self._identity, self) + + if sync: + device_login_sync(product_sn=self.product_sn, + device_sn=self.device_sn, timeout=timeout) + else: + device_login_async(product_sn=self.product_sn, + device_sn=self.device_sn) + self.online = True + + def publish(self, topic: str, payload: b''): + if self.online: + send_message(topic, payload, is_cached=False, + duration=0) + else: + raise EdgeDriverLinkDeviceOfflineException + + +class Config(object): + def __init__(self, config=None): + self.config = config + + def getDriverInfo(self): + return _driverInfo + + def getDeviceInfos(self): + return _deviceInfos + + +def getConfig(): + if _driverInfo is None: + config = {"deviceList": _deviceInfos} + else: + config = { + "config": _driverInfo, + "deviceList": _deviceInfos} + return json.dumps(config) diff --git a/uiotedgedriverlinksdk/edge.py b/uiotedgedriverlinksdk/edge.py new file mode 100644 index 0000000..6a56333 --- /dev/null +++ b/uiotedgedriverlinksdk/edge.py @@ -0,0 +1,484 @@ +import base64 +import json +import queue +import random +import string +import threading +import time + +from cachetools import TTLCache + +from uiotedgedriverlinksdk import _driver_id, getLogger +from uiotedgedriverlinksdk.exception import (EdgeDriverLinkException, + EdgeDriverLinkOfflineException, + EdgeDriverLinkTimeoutException) +from uiotedgedriverlinksdk.nats import (_nat_publish_queue, + _nat_subscribe_queue, publish_nats_msg) + +_logger = getLogger() +_action_queue_map = {} +_connect_map = {} + +_cache = TTLCache(maxsize=10, ttl=30) + + +def get_edge_online_status(): + global _cache + is_online = _cache.get('edge_status') + if is_online: + return True + return False + + +def add_connect_map(key: str, value): + global _connect_map + _connect_map[key] = value + + +def del_connect_map(key: str): + global _connect_map + _connect_map.pop(key) + + +def _generate_request_id(): + return ''.join(random.sample(string.ascii_letters + string.digits, 16)).lower() + + +class _device_notify(object): + def __init__(self, callback): + self.callback = callback + + def run(self, message): + self.callback(message) + + +_on_topo_change_callback = None + + +def set_on_topo_change_callback(callback): + global _on_topo_change_callback + _on_topo_change_callback = _device_notify(callback) + + +_on_status_change_callback = None + + +def set_on_status_change_callback(callback): + global _on_status_change_callback + _on_status_change_callback = _device_notify(callback) + + +def get_topo(timeout=5): + global _action_queue_map + request_id = _generate_request_id() + topic = '/$system/%s/%s/subdev/topo/get' % ( + _generate_request_id(), "123456") + + get_topo = { + 'RequestID': request_id, + "Params": [] + } + try: + data = json.dumps(get_topo) + _publish(topic=topic, payload=data.encode('utf-8'), + is_cached=False, duration=0) + q = queue.Queue() + _action_queue_map[request_id] = q + + msg = q.get(timeout=5) + if msg['RetCode'] != 0: + raise EdgeDriverLinkException(msg['RetCode'], msg['Message']) + + return msg + + except queue.Empty: + raise EdgeDriverLinkTimeoutException + except EdgeDriverLinkException as e: + raise e + except Exception as e: + raise e + finally: + _action_queue_map.pop(request_id) + + +def add_topo(product_sn, device_sn, timeout=5): + global _action_queue_map + if get_edge_online_status(): + request_id = _generate_request_id() + topic = '/$system/%s/%s/subdev/topo/add' % ( + product_sn, device_sn) + + add_topo = { + 'RequestID': request_id, + "Params": [ + { + 'ProductSN': product_sn, + 'DeviceSN': device_sn + } + ] + } + try: + data = json.dumps(add_topo) + _publish(topic=topic, payload=data.encode('utf-8'), + is_cached=False, duration=0) + q = queue.Queue() + _action_queue_map[request_id] = q + + msg = q.get(timeout=5) + + _action_queue_map.pop(request_id) + if msg['RetCode'] != 0: + raise EdgeDriverLinkException(msg['RetCode'], msg['Message']) + + except queue.Empty: + raise EdgeDriverLinkTimeoutException + except EdgeDriverLinkException as e: + raise e + except Exception as e: + raise e + else: + raise EdgeDriverLinkOfflineException + + +def delete_topo(product_sn, device_sn, timeout=5): + global _action_queue_map + if get_edge_online_status(): + request_id = _generate_request_id() + topic = '/$system/%s/%s/subdev/topo/delete' % ( + product_sn, device_sn) + + delete_topo = { + 'RequestID': request_id, + "Params": [ + { + 'ProductSN': product_sn, + 'DeviceSN': device_sn + } + ] + } + + try: + data = json.dumps(delete_topo) + _publish(topic=topic, payload=data.encode('utf-8'), + is_cached=False, duration=0) + q = queue.Queue() + _action_queue_map[request_id] = q + + msg = q.get(timeout=5) + + _action_queue_map.pop(request_id) + if msg['RetCode'] != 0: + raise EdgeDriverLinkException(msg['RetCode'], msg['Message']) + + except queue.Empty: + raise EdgeDriverLinkTimeoutException + except EdgeDriverLinkException as e: + raise e + except Exception as e: + raise e + else: + raise EdgeDriverLinkOfflineException + + +def register_device(product_sn, device_sn, product_secret, timeout=5): + global _action_queue_map + if get_edge_online_status(): + request_id = _generate_request_id() + register_data = { + 'RequestID': request_id, + "Params": [ + { + 'ProductSN': product_sn, + 'DeviceSN': device_sn, + 'ProductSecret': product_secret + } + ] + } + topic = '/$system/%s/%s/subdev/register' % ( + product_sn, device_sn) + + try: + data = json.dumps(register_data) + _publish(topic=topic, payload=data.encode('utf-8'), + is_cached=False, duration=0) + q = queue.Queue() + _action_queue_map[request_id] = q + + msg = q.get(timeout=timeout) + _action_queue_map.pop(request_id) + if msg['RetCode'] != 0: + raise EdgeDriverLinkException(msg['RetCode'], msg['Message']) + + except queue.Empty: + raise EdgeDriverLinkTimeoutException + except EdgeDriverLinkException as e: + raise e + except Exception as e: + raise e + else: + raise EdgeDriverLinkOfflineException + + +def device_login_async(product_sn, device_sn): + request_id = _generate_request_id() + topic = '/$system/%s/%s/subdev/login' % ( + product_sn, device_sn) + + device_login_msg = { + 'RequestID': request_id, + "Params": [ + { + 'ProductSN': product_sn, + 'DeviceSN': device_sn + } + ] + } + data = json.dumps(device_login_msg) + _publish(topic=topic, payload=data.encode('utf-8'), + is_cached=False, duration=0) + + +def device_logout_async(product_sn, device_sn): + request_id = _generate_request_id() + topic = '/$system/%s/%s/subdev/logout' % ( + product_sn, device_sn) + + device_logout_msg = { + 'RequestID': request_id, + "Params": [ + { + 'ProductSN': product_sn, + 'DeviceSN': device_sn + } + ] + } + data = json.dumps(device_logout_msg) + _publish(topic=topic, payload=data.encode('utf-8'), + is_cached=False, duration=0) + + +def device_login_sync(product_sn, device_sn, timeout=5): + global _action_queue_map + request_id = _generate_request_id() + topic = '/$system/%s/%s/subdev/login' % ( + product_sn, device_sn) + + device_login_msg = { + 'RequestID': request_id, + "Params": [ + { + 'ProductSN': product_sn, + 'DeviceSN': device_sn + } + ] + } + + try: + data = json.dumps(device_login_msg) + _publish(topic=topic, payload=data.encode('utf-8'), + is_cached=False, duration=0) + q = queue.Queue() + _action_queue_map[request_id] = q + + msg = q.get(timeout=timeout) + _action_queue_map.pop(request_id) + if msg['RetCode'] != 0: + raise EdgeDriverLinkException(msg['RetCode'], msg['Message']) + + except queue.Empty: + raise EdgeDriverLinkTimeoutException + except EdgeDriverLinkException as e: + raise e + except Exception as e: + raise e + + +def device_logout_sync(product_sn, device_sn, timeout=5): + global _action_queue_map + request_id = _generate_request_id() + topic = '/$system/%s/%s/subdev/logout' % ( + product_sn, device_sn) + + device_logout_msg = { + 'RequestID': request_id, + "Params": [ + { + 'ProductSN': product_sn, + 'DeviceSN': device_sn + } + ] + } + + try: + data = json.dumps(device_logout_msg) + _publish(topic=topic, payload=data.encode('utf-8'), + is_cached=False, duration=0) + q = queue.Queue() + _action_queue_map[request_id] = q + + msg = q.get(timeout=timeout) + _action_queue_map.pop(request_id) + if msg['RetCode'] != 0: + raise EdgeDriverLinkException(msg['RetCode'], msg['Message']) + + except queue.Empty: + raise EdgeDriverLinkTimeoutException + except EdgeDriverLinkException as e: + raise e + except Exception as e: + raise e + + +def send_message(topic: str, payload: b'', is_cached=False, duration=0): + _publish(topic=topic, payload=payload, + is_cached=is_cached, duration=duration) + + +def _on_broadcast_message(message): + global _on_topo_change_callback + global _on_status_change_callback + global _action_queue_map + _logger.debug("recv message:{} " .format(str(message))) + try: + js = json.loads(message) + topic = js['topic'] + + data = str(base64.b64decode(js['payload']), "utf-8") + # _logger.debug("broadcast message payload: " + data) + + msg = json.loads(data) + + if isinstance(topic, str) and topic.startswith("/$system/"): + # on topo change callback + if topic.endswith("/subdev/topo/notify/add"): + if _on_topo_change_callback: + msg['operation'] = 'add' + _on_topo_change_callback.run(msg) + + elif topic.endswith("/subdev/topo/notify/delete"): + if _on_topo_change_callback: + msg['operation'] = 'delete' + _on_topo_change_callback.run(msg) + + elif topic.endswith('/subdev/enable'): + if _on_status_change_callback: + msg['operation'] = 'enable' + _on_status_change_callback.run(msg) + + elif topic.endswith('/subdev/disable'): + if _on_status_change_callback: + msg['operation'] = 'disable' + _on_status_change_callback.run(msg) + + else: + request_id = msg['RequestID'] + if request_id in _action_queue_map: + q = _action_queue_map[request_id] + q.put(msg) + else: + _logger.debug('unknown message topic:{}'.format(topic)) + return + + except Exception as e: + _logger.error(e) + + +def _on_message(message): + global _connect_map + _logger.debug("recv message: {}".format(str(message))) + try: + js = json.loads(message) + identify = js['productSN'] + \ + '.'+js['deviceSN'] + + topic = js['topic'] + msg = base64.b64decode(js['payload']) + # _logger.debug("normal message payload: {}".format(str(msg, 'utf-8'))) + if identify in _connect_map: + sub_dev = _connect_map[identify] + if sub_dev.callback: + sub_dev.callback(topic, msg) + else: + _logger.error('unknown message topic:{}'.format(topic)) + return + + except Exception as e: + _logger.error(e) + + +def _publish(topic: str, payload: b'', is_cached=False, duration=0): + try: + payload_encode = base64.b64encode(payload) + data = { + 'src': 'local', + 'topic': topic, + 'isCatched': is_cached, + 'duration': duration, + 'payload': str(payload_encode, 'utf-8') + } + publish_nats_msg(data) + except Exception as e: + _logger.error(e) + raise e + + +def _set_edge_status(): + global _cache + _cache['edge_status'] = True + + +def init_subscribe_handler(): + while True: + msg = _nat_subscribe_queue.get() + # _logger.debug(msg) + subject = msg.subject + data = msg.data.decode() + + if subject == "edge.local.broadcast": + _on_broadcast_message(data) + elif subject == "edge.state.reply": + _set_edge_status() + else: + _on_message(data) + + +def _get_device_list(): + global _connect_map + result = [] + for v in _connect_map.values(): + result.append(v.get_device_info()) + + return result + + +def fetch_online_status(): + min_retry_timeout = 1 + max_retry_timeout = 15 + retry_timeout = min_retry_timeout + while True: + device_list = _get_device_list() + data = { + 'payload': { + 'driverID': _driver_id, + 'devices': device_list + }, + 'subject': 'edge.state.req' + } + + _nat_publish_queue.put(data) + + if get_edge_online_status(): + time.sleep(max_retry_timeout) + else: + if retry_timeout < max_retry_timeout: + retry_timeout = retry_timeout + 1 + time.sleep(retry_timeout) + + +_t_sub = threading.Thread(target=init_subscribe_handler) +_t_sub.setDaemon(True) +_t_sub.start() + +_t_online = threading.Thread(target=fetch_online_status) +_t_online.setDaemon(True) +_t_online.start() diff --git a/uiotedgedriverlinksdk/exception.py b/uiotedgedriverlinksdk/exception.py new file mode 100644 index 0000000..bde8ab1 --- /dev/null +++ b/uiotedgedriverlinksdk/exception.py @@ -0,0 +1,39 @@ +class BaseEdgeException(Exception): + def gatherAttrs(self): + return ",".join("{}={}" + .format(k, getattr(self, k)) + for k in self.__dict__.keys()) + + def __str__(self): + return "[{}:{}]".format(self.__class__.__name__, self.gatherAttrs()) + + +class EdgeDriverLinkException(BaseEdgeException): + def __init__(self, code, msg): + self.code = code + self.msg = msg + + +class EdgeDriverLinkTimeoutException(BaseEdgeException): + def __str__(self): + return "[{}:{}]".format(self.__class__.__name__, 'wait response timeout, please check network or edge connect state.') + + +class EdgeDriverLinkDeviceConfigException(BaseEdgeException): + def __str__(self): + return "[{}:{}]".format(self.__class__.__name__, 'device param error, please make sure product_sn and device_sn not null.') + + +class EdgeDriverLinkDeviceOfflineException(BaseEdgeException): + def __str__(self): + return "[{}:{}]".format(self.__class__.__name__, 'device offline, please login first.') + + +class EdgeDriverLinkOfflineException(BaseEdgeException): + def __str__(self): + return "[{}:{}]".format(self.__class__.__name__, 'edge offline, please connect first.') + + +class EdgeDriverLinkDeviceProductSecretException(BaseEdgeException): + def __str__(self): + return "[{}:{}]".format(self.__class__.__name__, 'product secret param error, please make sure product secret not null.') diff --git a/uiotedgedriverlinksdk/nats.py b/uiotedgedriverlinksdk/nats.py new file mode 100644 index 0000000..561316c --- /dev/null +++ b/uiotedgedriverlinksdk/nats.py @@ -0,0 +1,114 @@ +import asyncio +import json +import os +import queue +import random +import signal +import string +import sys +import threading +import time + +from nats.aio.client import Client as NATS +from nats.aio.errors import NatsError +from uiotedgedriverlinksdk import _driver_id, getLogger + +_logger = getLogger() + + +def exit_handler(signum, frame): + sys.exit(0) + + +_nat_publish_queue = queue.Queue() +_nat_subscribe_queue = queue.Queue() + + +class natsClientPub(object): + def __init__(self): + self.url = os.environ.get( + 'UIOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222' + self.nc = NATS() + self.loop = asyncio.new_event_loop() + + async def _publish(self): + global _nat_publish_queue + try: + await self.nc.connect(servers=[self.url], loop=self.loop) + except Exception as e1: + _logger.error(e1) + sys.exit(1) + + while True: + try: + msg = _nat_publish_queue.get() + bty = json.dumps(msg['payload']) + await self.nc.publish(subject=msg['subject'], + payload=bty.encode('utf-8')) + await self.nc.flush() + except NatsError as e: + _logger.error(e) + except Exception as e: + _logger.error(e) + + def start(self): + self.loop.run_until_complete(self._publish()) + # self.loop.run_forever() + + +class natsClientSub(object): + def __init__(self): + self.url = os.environ.get( + 'UIOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222' + self.nc = NATS() + self.loop = asyncio.new_event_loop() + + async def _connect(self): + try: + await self.nc.connect(servers=[self.url], loop=self.loop) + except Exception as e1: + _logger.error(e1) + sys.exit(1) + + async def message_handler(msg): + global _nat_subscribe_queue + # subject = msg.subject + # reply = msg.reply + # data = msg.data.decode() + # sdk_print("Received a message on '{subject} {reply}': {data}".format( + # subject=subject, reply=reply, data=data)) + _nat_subscribe_queue.put(msg) + await self.nc.subscribe("edge.local."+_driver_id, queue=_driver_id, cb=message_handler, is_async=True) + await self.nc.subscribe("edge.local.broadcast", queue=_driver_id, cb=message_handler, is_async=True) + await self.nc.subscribe("edge.state.reply", queue=_driver_id, cb=message_handler, is_async=True) + await self.nc.flush() + + def start(self): + self.loop.run_until_complete(self._connect()) + self.loop.run_forever() + + +def publish_nats_msg(msg): + global _nat_publish_queue + data = { + 'subject': 'edge.router.'+_driver_id, + 'payload': msg + } + _nat_publish_queue.put(data) + + +def start_pub(): + natsClientPub().start() + + +def start_sub(): + natsClientSub().start() + + +_t_nats_pub = threading.Thread(target=start_pub) +_t_nats_pub.setDaemon(True) +_t_nats_pub.start() + +_t_nats_sub = threading.Thread(target=start_sub) +_t_nats_sub.setDaemon(True) +_t_nats_sub.start()