// Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 package rabbitmq_test import ( "fmt" "testing" "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging/rabbitmq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) const ( topic = "topic" chansPrefix = "channels" channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b" subtopic = "engine" clientID = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b" ) var ( msgChan = make(chan messaging.Message) data = []byte("payload") ) func TestPublisher(t *testing.T) { err := pubsub.Subscribe(clientID, fmt.Sprintf("%s.%s", chansPrefix, topic), handler{}) require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) err = pubsub.Subscribe(clientID, fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), handler{}) require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) cases := []struct { desc string channel string subtopic string payload []byte }{ { desc: "publish message with nil payload", payload: nil, }, { desc: "publish message with string payload", payload: data, }, { desc: "publish message with channel", payload: data, channel: channel, }, { desc: "publish message with subtopic", payload: data, subtopic: subtopic, }, { desc: "publish message with channel and subtopic", payload: data, channel: channel, subtopic: subtopic, }, } for _, tc := range cases { expectedMsg := messaging.Message{ Channel: tc.channel, Subtopic: tc.subtopic, Payload: tc.payload, } err := publisher.Publish(topic, expectedMsg) require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) receivedMsg := <-msgChan assert.Equal(t, expectedMsg, receivedMsg, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, expectedMsg, receivedMsg)) } } func TestPubsub(t *testing.T) { // Test Subscribe and Unsubscribe subcases := []struct { desc string topic string clientID string errorMessage error pubsub bool //true for subscribe and false for unsubscribe }{ { desc: "Subscribe to a topic with an ID", topic: fmt.Sprintf("%s.%s", chansPrefix, topic), clientID: "clientid1", errorMessage: nil, pubsub: true, }, { desc: "Subscribe to the same topic with a different ID", topic: fmt.Sprintf("%s.%s", chansPrefix, topic), clientID: "clientid2", errorMessage: nil, pubsub: true, }, { desc: "Subscribe to an already subscribed topic with an ID", topic: fmt.Sprintf("%s.%s", chansPrefix, topic), clientID: "clientid1", errorMessage: rabbitmq.ErrAlreadySubscribed, pubsub: true, }, { desc: "Unsubscribe from a topic with an ID", topic: fmt.Sprintf("%s.%s", chansPrefix, topic), clientID: "clientid1", errorMessage: nil, pubsub: false, }, { desc: "Unsubscribe from a non-existent topic with an ID", topic: "h", clientID: "clientid1", errorMessage: rabbitmq.ErrNotSubscribed, pubsub: false, }, { desc: "Unsubscribe from the same topic with a different ID", topic: fmt.Sprintf("%s.%s", chansPrefix, topic), clientID: "clientidd2", errorMessage: rabbitmq.ErrNotSubscribed, pubsub: false, }, { desc: "Unsubscribe from the same topic with a different ID not subscribed", topic: fmt.Sprintf("%s.%s", chansPrefix, topic), clientID: "clientidd3", errorMessage: rabbitmq.ErrNotSubscribed, pubsub: false, }, { desc: "Unsubscribe from an already unsubscribed topic with an ID", topic: fmt.Sprintf("%s.%s", chansPrefix, topic), clientID: "clientid1", errorMessage: rabbitmq.ErrNotSubscribed, pubsub: false, }, { desc: "Subscribe to a topic with a subtopic with an ID", topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), clientID: "clientidd1", errorMessage: nil, pubsub: true, }, { desc: "Subscribe to an already subscribed topic with a subtopic with an ID", topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), clientID: "clientidd1", errorMessage: rabbitmq.ErrAlreadySubscribed, pubsub: true, }, { desc: "Unsubscribe from a topic with a subtopic with an ID", topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), clientID: "clientidd1", errorMessage: nil, pubsub: false, }, { desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID", topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), clientID: "clientid1", errorMessage: rabbitmq.ErrNotSubscribed, pubsub: false, }, { desc: "Subscribe to an empty topic with an ID", topic: "", clientID: "clientid1", errorMessage: rabbitmq.ErrEmptyTopic, pubsub: true, }, { desc: "Unsubscribe from an empty topic with an ID", topic: "", clientID: "clientid1", errorMessage: rabbitmq.ErrEmptyTopic, pubsub: false, }, { desc: "Subscribe to a topic with empty id", topic: fmt.Sprintf("%s.%s", chansPrefix, topic), clientID: "", errorMessage: rabbitmq.ErrEmptyID, pubsub: true, }, { desc: "Unsubscribe from a topic with empty id", topic: fmt.Sprintf("%s.%s", chansPrefix, topic), clientID: "", errorMessage: rabbitmq.ErrEmptyID, pubsub: false, }, } for _, pc := range subcases { if pc.pubsub == true { err := pubsub.Subscribe(pc.clientID, pc.topic, handler{}) if pc.errorMessage == nil { require.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) } else { assert.Equal(t, err, pc.errorMessage) } } else { err := pubsub.Unsubscribe(pc.clientID, pc.topic) if pc.errorMessage == nil { require.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) } else { assert.Equal(t, err, pc.errorMessage) } } } } type handler struct{} func (h handler) Handle(msg messaging.Message) error { msgChan <- msg return nil } func (h handler) Cancel() error { return nil }