1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-29 13:49:28 +08:00

MF-1654 - Fix Topic Parsing in RabbitMQ Broker (#1655)

* Initial commit

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>

* Initial commit

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>

* increase message durability

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>

* inline err check

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>

* Remove quality of service and priority

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>
Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
This commit is contained in:
b1ackd0t 2022-10-21 13:06:39 +03:00 committed by GitHub
parent c212254c2a
commit 2213c5e1af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 7 deletions

View File

@ -15,7 +15,7 @@ import (
) )
// SubjectAllChannels represents subject to subscribe for all the channels. // SubjectAllChannels represents subject to subscribe for all the channels.
const SubjectAllChannels = "channels.>" const SubjectAllChannels = "channels.#"
func init() { func init() {
log.Println("The binary was build using RabbitMQ as the message broker") log.Println("The binary was build using RabbitMQ as the message broker")

View File

@ -5,6 +5,7 @@ package rabbitmq
import ( import (
"fmt" "fmt"
"strings"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging"
@ -29,7 +30,7 @@ func NewPublisher(url string) (messaging.Publisher, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil { if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil); err != nil {
return nil, err return nil, err
} }
ret := &publisher{ ret := &publisher{
@ -51,6 +52,8 @@ func (pub *publisher) Publish(topic string, msg messaging.Message) error {
if msg.Subtopic != "" { if msg.Subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic) subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
} }
subject = formatTopic(subject)
err = pub.ch.Publish( err = pub.ch.Publish(
exchangeName, exchangeName,
subject, subject,
@ -71,5 +74,12 @@ func (pub *publisher) Publish(topic string, msg messaging.Message) error {
} }
func (pub *publisher) Close() error { func (pub *publisher) Close() error {
if err := pub.ch.Close(); err != nil {
return err
}
return pub.conn.Close() return pub.conn.Close()
} }
func formatTopic(topic string) string {
return strings.Replace(topic, ">", "#", -1)
}

View File

@ -17,7 +17,7 @@ import (
const ( const (
chansPrefix = "channels" chansPrefix = "channels"
// SubjectAllChannels represents subject to subscribe for all the channels. // SubjectAllChannels represents subject to subscribe for all the channels.
SubjectAllChannels = "channels.>" SubjectAllChannels = "channels.#"
exchangeName = "mainflux-exchange" exchangeName = "mainflux-exchange"
) )
@ -51,7 +51,7 @@ func NewPubSub(url, queue string, logger log.Logger) (messaging.PubSub, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeDirect, true, false, false, false, nil); err != nil { if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil); err != nil {
return nil, err return nil, err
} }
ret := &pubsub{ ret := &pubsub{
@ -73,6 +73,8 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
return ErrEmptyTopic return ErrEmptyTopic
} }
ps.mu.Lock() ps.mu.Lock()
topic = formatTopic(topic)
// Check topic // Check topic
s, ok := ps.subscriptions[topic] s, ok := ps.subscriptions[topic]
if ok { if ok {
@ -95,8 +97,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
ps.subscriptions[topic] = s ps.subscriptions[topic] = s
} }
_, err := ps.ch.QueueDeclare(topic, true, true, true, false, nil) if _, err := ps.ch.QueueDeclare(topic, true, false, false, false, nil); err != nil {
if err != nil {
return err return err
} }
if err := ps.ch.QueueBind(topic, topic, exchangeName, false, nil); err != nil { if err := ps.ch.QueueBind(topic, topic, exchangeName, false, nil); err != nil {
@ -130,6 +131,7 @@ func (ps *pubsub) Unsubscribe(id, topic string) error {
ps.mu.Lock() ps.mu.Lock()
defer ps.mu.Unlock() defer ps.mu.Unlock()
topic = formatTopic(topic)
// Check topic // Check topic
s, ok := ps.subscriptions[topic] s, ok := ps.subscriptions[topic]
if !ok { if !ok {