1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-05-06 19:29:15 +08:00
Mainflux.mainflux/mqtt/adapter.go
Manuel Imperiale 2b4cf8a990
NOISSUE - Fix default nats pubsub subject (#1153)
* NOISSUE - Fix default nats publisher subject

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use created timestamp inn transformer

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use created timestamp in transformer

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* rename topic -> subject for nats

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* revert

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* revert

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix subscriber topic

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix timestamp

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use int64 nanoseconds Created timestamp

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Typo

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Add comment to created protobuf field

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Rm gogo from scripts/ci.sh

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix comment

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Return publisher in NewPublisher

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
2020-05-04 13:14:06 +02:00

250 lines
6.2 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mqtt
import (
"context"
"errors"
"net/url"
"regexp"
"strings"
"time"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/messaging"
"github.com/mainflux/mainflux/mqtt/redis"
"github.com/mainflux/mproxy/pkg/session"
opentracing "github.com/opentracing/opentracing-go"
)
var _ session.Handler = (*handler)(nil)
const protocol = "mqtt"
var (
channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
errMalformedTopic = errors.New("malformed topic")
errMalformedData = errors.New("malformed request data")
errMalformedSubtopic = errors.New("malformed subtopic")
errUnauthorizedAccess = errors.New("missing or invalid credentials provided")
errNilClient = errors.New("using nil client")
errInvalidConnect = errors.New("CONNECT request with invalid username or client ID")
errNilTopicPub = errors.New("PUBLISH to nil topic")
errNilTopicSub = errors.New("SUB to nil topic")
)
// Event implements events.Event interface
type handler struct {
publishers []messaging.Publisher
tc mainflux.ThingsServiceClient
tracer opentracing.Tracer
logger logger.Logger
es redis.EventStore
}
// New creates new Event entity
func New(publishers []messaging.Publisher, tc mainflux.ThingsServiceClient, es redis.EventStore,
logger logger.Logger, tracer opentracing.Tracer) session.Handler {
return &handler{
tc: tc,
es: es,
tracer: tracer,
logger: logger,
publishers: publishers,
}
}
// AuthConnect is called on device connection,
// prior forwarding to the MQTT broker
func (h *handler) AuthConnect(c *session.Client) error {
if c == nil {
return errInvalidConnect
}
t := &mainflux.Token{
Value: string(c.Password),
}
thid, err := h.tc.Identify(context.TODO(), t)
if err != nil {
return err
}
if thid.Value != c.Username {
return errUnauthorizedAccess
}
if err := h.es.Connect(c.Username); err != nil {
h.logger.Warn("Failed to publish connect event: " + err.Error())
}
return nil
}
// AuthPublish is called on device publish,
// prior forwarding to the MQTT broker
func (h *handler) AuthPublish(c *session.Client, topic *string, payload *[]byte) error {
if c == nil {
return errNilClient
}
if topic == nil {
return errNilTopicPub
}
return h.authAccess(c.Username, *topic)
}
// AuthSubscribe is called on device publish,
// prior forwarding to the MQTT broker
func (h *handler) AuthSubscribe(c *session.Client, topics *[]string) error {
if c == nil {
return errNilClient
}
if topics == nil || *topics == nil {
return errNilTopicSub
}
for _, v := range *topics {
if err := h.authAccess(c.Username, v); err != nil {
return err
}
}
return nil
}
// Connect - after client successfully connected
func (h *handler) Connect(c *session.Client) {
if c == nil {
h.logger.Error("Nil client connect")
return
}
h.logger.Info("Connect - client with ID: " + c.ID)
}
// Publish - after client successfully published
func (h *handler) Publish(c *session.Client, topic *string, payload *[]byte) {
if c == nil {
h.logger.Error("Nil client publish")
return
}
h.logger.Info("Publish - client ID " + c.ID + " to the topic: " + *topic)
// Topics are in the format:
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
channelParts := channelRegExp.FindStringSubmatch(*topic)
if len(channelParts) < 1 {
h.logger.Info("Error in mqtt publish %s" + errMalformedData.Error())
return
}
chanID := channelParts[1]
subtopic := channelParts[2]
subtopic, err := parseSubtopic(subtopic)
if err != nil {
h.logger.Info("Error parsing subtopic: " + err.Error())
return
}
msg := messaging.Message{
Protocol: protocol,
Channel: chanID,
Subtopic: subtopic,
Publisher: c.Username,
Payload: *payload,
Created: time.Now().UnixNano(),
}
for _, pub := range h.publishers {
if err := pub.Publish(msg.Channel, msg); err != nil {
h.logger.Info("Error publishing to Mainflux " + err.Error())
}
}
}
// Subscribe - after client successfully subscribed
func (h *handler) Subscribe(c *session.Client, topics *[]string) {
if c == nil {
h.logger.Error("Nil client subscribe")
return
}
h.logger.Info("Subscribe - client ID: " + c.ID + ", to topics: " + strings.Join(*topics, ","))
}
// Unsubscribe - after client unsubscribed
func (h *handler) Unsubscribe(c *session.Client, topics *[]string) {
if c == nil {
h.logger.Error("Nil client unsubscribe")
return
}
h.logger.Info("Unsubscribe - client ID: " + c.ID + ", form topics: " + strings.Join(*topics, ","))
}
// Disconnect - connection with broker or client lost
func (h *handler) Disconnect(c *session.Client) {
if c == nil {
h.logger.Error("Nil client disconnect")
return
}
h.logger.Info("Disconnect - Client with ID: " + c.ID + " and username " + c.Username + " disconnected")
if err := h.es.Disconnect(c.Username); err != nil {
h.logger.Warn("Failed to publish disconnect event: " + err.Error())
}
}
func (h *handler) authAccess(username string, topic string) error {
// Topics are in the format:
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
if !channelRegExp.Match([]byte(topic)) {
h.logger.Info("Malformed topic: " + topic)
return errMalformedTopic
}
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 1 {
return errMalformedData
}
chanID := channelParts[1]
ar := &mainflux.AccessByIDReq{
ThingID: username,
ChanID: chanID,
}
_, err := h.tc.CanAccessByID(context.TODO(), ar)
return err
}
func parseSubtopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
}
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", errMalformedSubtopic
}
subtopic = strings.Replace(subtopic, "/", ".", -1)
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", errMalformedSubtopic
}
filteredElems = append(filteredElems, elem)
}
subtopic = strings.Join(filteredElems, ".")
return subtopic, nil
}