import json from iotedgedriverlinksdk import _deviceInfos, _driverInfo from iotedgedriverlinksdk.edge import (add_connect_map, del_connect_map, device_login_async, device_login_sync, device_logout_async, device_logout_sync, register_device, send_message) from iotedgedriverlinksdk.exception import ( EdgeDriverLinkDeviceConfigException, EdgeDriverLinkDeviceOfflineException, EdgeDriverLinkDeviceProductSecretException) class SubDevice(object): def __init__(self, product_sn: str = '', device_sn: str = '', on_msg_callback=None): self.device_sn = device_sn self.product_sn = product_sn self.product_secret = '' self.callback = on_msg_callback self.rrpc = None self.online = False if self.product_sn != '' and self.device_sn != '': self._identity = self.product_sn+'.'+self.device_sn def set_product_sn(self, product_sn: str): self.product_sn = product_sn if self.product_sn != '' and self.device_sn != '': self._identity = self.product_sn+'.'+self.device_sn def set_device_sn(self, device_sn: str): self.device_sn = device_sn if self.product_sn != '' and self.device_sn != '': self._identity = self.product_sn+'.'+self.device_sn def set_product_secret(self, product_secret: str): self.product_secret = product_secret def set_msg_callback(self, msg_callback): self.callback = msg_callback def set_rrpc_callback(self, rrpc_callback): self.rrpc = rrpc_callback def get_device_info(self): return { "productSN": self.product_sn, "deviceSN": self.device_sn } def registerDevice(self, timeout=5): if self.product_sn == '' or self.device_sn == '' or self.product_secret == '': raise EdgeDriverLinkDeviceProductSecretException register_device(self.product_sn, self.device_sn, self.product_secret, timeout=timeout) def logout(self, sync=False, timeout=5): if self.online: if sync: device_logout_sync(product_sn=self.product_sn, device_sn=self.device_sn, timeout=timeout) else: device_logout_async(product_sn=self.product_sn, device_sn=self.device_sn) self.online = False del_connect_map(self._identity) def login(self, sync=False, timeout=5): if self._identity == '': raise EdgeDriverLinkDeviceConfigException add_connect_map(self._identity, self) if sync: device_login_sync(product_sn=self.product_sn, device_sn=self.device_sn, timeout=timeout) else: device_login_async(product_sn=self.product_sn, device_sn=self.device_sn) self.online = True def publish(self, topic: str, payload: b''): if self.online: send_message(topic, payload, is_cached=False, duration=0) else: raise EdgeDriverLinkDeviceOfflineException class Config(object): def __init__(self, config=None): self.config = config def getDriverInfo(self): return _driverInfo def getDeviceInfos(self): return _deviceInfos def getConfig(): if _driverInfo is None: config = {"deviceList": _deviceInfos} else: config = { "config": _driverInfo, "deviceList": _deviceInfos} return json.dumps(config)