1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-28 13:48:49 +08:00
Dušan Borovčanin c3019fffb6
NOISSUE - Refactor messaging (#1141)
* Refactor messaging

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename SubscribeHandler to MessageHandler

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove `Auth` event logs

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update message pubsub APi

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix topics handling

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update CoAP adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update Twins service

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update LoRa adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update OPC UA adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove broker package

Package `broker` is conceptually renamed to package `nats`.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update makefile

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Add comment explanation

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix MQTT adapter

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix typo

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Move NATS pub/sub implementation to pubsub pkg

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove an empty line in main methods

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Move messaging-related code to messaging package

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix Twins mocks

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Change Occurred back to Created

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix tranformer test

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix message proto commands

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Replace string literal with constant

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove alias from main method

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Change messaging pubsub alias

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename occured to created

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Handle NATS connection in the NATS PubSub

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename n to pub/pubSub

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix typos

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2020-04-28 11:02:35 +02:00

149 lines
3.4 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package nats
import (
"errors"
"fmt"
"sync"
"github.com/gogo/protobuf/proto"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging"
broker "github.com/nats-io/nats.go"
)
const chansPrefix = "channels"
// SubjectAllChannels represents subject to subscribe for all the channels.
const SubjectAllChannels = "channels.>"
var (
errAlreadySubscribed = errors.New("already subscribed to topic")
errNotSubscribed = errors.New("not subscribed")
errEmptyTopic = errors.New("empty topic")
)
var _ messaging.PubSub = (*pubsub)(nil)
// PubSub wraps messaging Publisher exposing
// Close() method for NATS connection.
type PubSub interface {
messaging.PubSub
Close()
}
type pubsub struct {
conn *broker.Conn
logger log.Logger
mu sync.Mutex
queue string
subscriptions map[string]*broker.Subscription
}
// NewPubSub returns NATS message publisher/subscriber.
// Parameter queue specifies the queue for the Subscribe method.
// If queue is specified (is not an empty string), Subscribe method
// will execute NATS QueueSubscribe which is conceptually different
// from ordinary subscribe. For more information, please take a look
// here: https://docs.nats.io/developing-with-nats/receiving/queues.
// If the queue is empty, Subscribe will be used.
func NewPubSub(url, queue string, logger log.Logger) (PubSub, error) {
conn, err := broker.Connect(url)
if err != nil {
return nil, err
}
ret := &pubsub{
conn: conn,
queue: queue,
logger: logger,
subscriptions: make(map[string]*broker.Subscription),
}
return ret, nil
}
func (ps *pubsub) Publish(topic string, msg messaging.Message) error {
data, err := proto.Marshal(&msg)
if err != nil {
return err
}
subject := fmt.Sprintf("%s.%s", chansPrefix, topic)
if msg.Subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
}
if err := ps.conn.Publish(subject, data); err != nil {
return err
}
return nil
}
func (ps *pubsub) Subscribe(topic string, handler messaging.MessageHandler) error {
if topic == "" {
return errEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
if _, ok := ps.subscriptions[topic]; ok {
return errAlreadySubscribed
}
nh := ps.natsHandler(handler)
topic = fmt.Sprintf("%s.%s", chansPrefix, topic)
if ps.queue != "" {
sub, err := ps.conn.QueueSubscribe(topic, ps.queue, nh)
if err != nil {
return err
}
ps.subscriptions[topic] = sub
return nil
}
sub, err := ps.conn.Subscribe(topic, nh)
if err != nil {
return err
}
ps.subscriptions[topic] = sub
return nil
}
func (ps *pubsub) Unsubscribe(topic string) error {
if topic == "" {
return errEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
topic = fmt.Sprintf("%s.%s", chansPrefix, topic)
sub, ok := ps.subscriptions[topic]
if !ok {
return errNotSubscribed
}
if err := sub.Unsubscribe(); err != nil {
return err
}
delete(ps.subscriptions, topic)
return nil
}
func (ps *pubsub) Close() {
ps.conn.Close()
}
func (ps *pubsub) natsHandler(h messaging.MessageHandler) broker.MsgHandler {
return func(m *broker.Msg) {
var msg messaging.Message
if err := proto.Unmarshal(m.Data, &msg); err != nil {
ps.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err))
return
}
if err := h(msg); err != nil {
ps.logger.Warn(fmt.Sprintf("Failed to handle Mainflux message: %s", err))
}
}
}