2021-05-26 10:59:33 +08:00

191 lines
5.1 KiB
Python

import asyncio
import json
import os
import queue
import random
import signal
import string
import sys
import threading
import time
from nats.aio.client import Client as NATS
from nats.aio.errors import NatsError
from iotedgedriverlinksdk import _driver_id, getLogger
_logger = getLogger()
def exit_handler(signum, frame):
sys.exit(0)
_nat_publish_queue = queue.Queue()
_nat_client_publish_queue = queue.Queue()
_nat_subscribe_queue = queue.Queue()
class _natsPublish(object):
def __init__(self):
self.url = os.environ.get(
'IOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222'
self.nc = NATS()
self.loop = asyncio.new_event_loop()
async def _publish(self):
global _nat_client_publish_queue
try:
await self.nc.connect(servers=[self.url], loop=self.loop)
except Exception as e1:
_logger.error(e1)
sys.exit(1)
while True:
try:
msg = _nat_client_publish_queue.get()
bty = msg['payload']
await self.nc.publish(subject=msg['subject'],
payload=bty.encode('utf-8'))
await self.nc.flush()
except NatsError as e:
_logger.error(e)
except Exception as e:
_logger.error(e)
def start(self):
self.loop.run_until_complete(self._publish())
class _natsClientPub(object):
def __init__(self):
self.url = os.environ.get(
'IOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222'
self.nc = NATS()
self.loop = asyncio.new_event_loop()
async def _publish(self):
global _nat_publish_queue
try:
await self.nc.connect(servers=[self.url], loop=self.loop)
except Exception as e1:
_logger.error(e1)
sys.exit(1)
while True:
try:
msg = _nat_publish_queue.get()
bty = json.dumps(msg['payload'])
await self.nc.publish(subject=msg['subject'],
payload=bty.encode('utf-8'))
await self.nc.flush()
except NatsError as e:
_logger.error(e)
except Exception as e:
_logger.error(e)
def start(self):
self.loop.run_until_complete(self._publish())
# self.loop.run_forever()
class _natsClientSub(object):
def __init__(self):
self.url = os.environ.get(
'IOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222'
self.nc = NATS()
self.loop = asyncio.new_event_loop()
async def _connect(self):
try:
await self.nc.connect(servers=[self.url], loop=self.loop)
except Exception as e1:
_logger.error(e1)
sys.exit(1)
async def message_handler(msg):
global _nat_subscribe_queue
_nat_subscribe_queue.put(msg)
await self.nc.subscribe("edge.local."+_driver_id, queue=_driver_id, cb=message_handler, is_async=True)
await self.nc.subscribe("edge.local.broadcast", queue=_driver_id, cb=message_handler, is_async=True)
await self.nc.subscribe("edge.state.reply", queue=_driver_id, cb=message_handler, is_async=True)
await self.nc.flush()
def start(self):
self.loop.run_until_complete(self._connect())
self.loop.run_forever()
class _natsSubscribe(object):
def __init__(self, subject: str, queue: str, cb):
self.url = os.environ.get(
'IOTEDGE_NATS_ADDRESS') or 'tcp://127.0.0.1:4222'
self.nc = NATS()
self.loop = asyncio.new_event_loop()
self.cb = cb
self.subject = subject
self.queue = queue
async def _connect(self):
try:
await self.nc.connect(servers=[self.url], loop=self.loop)
except Exception as e1:
_logger.error(e1)
sys.exit(1)
await self.nc.subscribe(self.subject, queue=self.queue, cb=self.cb, is_async=True)
await self.nc.flush()
def start(self):
self.loop.run_until_complete(self._connect())
self.loop.run_forever()
def _publish_nats_msg(msg):
global _nat_publish_queue
data = {
'subject': 'edge.router.'+_driver_id,
'payload': msg
}
_nat_publish_queue.put(data)
def natsPublish(subject: str, payload: bytes):
global _nat_client_publish_queue
data = {
'subject': subject,
'payload': payload
}
_nat_client_publish_queue.put(data)
def natsSubscribe(subject, queue, cb):
def _nats_sub():
_natsSubscribe(subject, queue, cb).start()
t = threading.Thread(target=_nats_sub)
t.setDaemon(True)
t.start()
def _start_pub():
_natsClientPub().start()
def _nats_pub():
_natsPublish().start()
def _start_sub():
_natsClientSub().start()
_t_pub = threading.Thread(target=_nats_pub)
_t_pub.setDaemon(True)
_t_pub.start()
_t_nats_pub = threading.Thread(target=_start_pub)
_t_nats_pub.setDaemon(True)
_t_nats_pub.start()
_t_nats_sub = threading.Thread(target=_start_sub)
_t_nats_sub.setDaemon(True)
_t_nats_sub.start()