1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-24 13:48:49 +08:00
Mainflux.mainflux/mqtt/forwarder.go
b1ackd0t 4401e79a0d
NOISSUE - Add Subscriber Config (#1896)
* Replace Nats with Nats Jestream For PubSub

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Add Stream Description

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Fix connection leak in NATS publisher

The publisher struct in pkg/messaging/nats/publisher.go was modified to include a new `conn` field of type `*broker.Conn`. This change was made to fix a connection leak issue in the NATS publisher.

The `NewPublisher` function was updated to assign the `conn` parameter to the new `conn` field in the publisher struct.

Additionally, the `Close` method in the publisher struct was modified to close the `conn` connection.

This commit fixes the connection leak issue in the NATS publisher and ensures that connections are properly closed.

Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>

* Setup subscriber config to contain handler topic and ID

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Add delivery policy

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Avoid duplicate messages

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Rename to DeliveryPolicy

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Fix tests

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Not check for data result set when we are returning subset of messages

Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>
Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com>

* For unsubscribe remove config

Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com>

* Fix comment

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

---------

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>
Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>
Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com>
2023-10-23 15:27:15 +02:00

76 lines
1.7 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package mqtt
import (
"context"
"fmt"
"strings"
mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging"
)
// Forwarder specifies MQTT forwarder interface API.
type Forwarder interface {
// Forward subscribes to the Subscriber and
// publishes messages using provided Publisher.
Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error
}
type forwarder struct {
topic string
logger mflog.Logger
}
// NewForwarder returns new Forwarder implementation.
func NewForwarder(topic string, logger mflog.Logger) Forwarder {
return forwarder{
topic: topic,
logger: logger,
}
}
func (f forwarder) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error {
subCfg := messaging.SubscriberConfig{
ID: id,
Topic: f.topic,
Handler: handle(ctx, pub, f.logger),
}
return sub.Subscribe(ctx, subCfg)
}
func handle(ctx context.Context, pub messaging.Publisher, logger mflog.Logger) handleFunc {
return func(msg *messaging.Message) error {
if msg.Protocol == protocol {
return nil
}
// Use concatenation instead of fmt.Sprintf for the
// sake of simplicity and performance.
topic := "channels/" + msg.Channel + "/messages"
if msg.Subtopic != "" {
topic = topic + "/" + strings.ReplaceAll(msg.Subtopic, ".", "/")
}
go func() {
if err := pub.Publish(ctx, topic, msg); err != nil {
logger.Warn(fmt.Sprintf("Failed to forward message: %s", err))
}
}()
return nil
}
}
type handleFunc func(msg *messaging.Message) error
func (h handleFunc) Handle(msg *messaging.Message) error {
return h(msg)
}
func (h handleFunc) Cancel() error {
return nil
}