diff --git a/pkg/messaging/nats/pubsub_test.go b/pkg/messaging/nats/pubsub_test.go new file mode 100644 index 00000000..55695129 --- /dev/null +++ b/pkg/messaging/nats/pubsub_test.go @@ -0,0 +1,84 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats_test + +import ( + "fmt" + "testing" + + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + topic = "topic" + chansPrefix = "channels" + channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b" + subtopic = "engine" +) + +var ( + msgChan = make(chan messaging.Message) + data = []byte("payload") +) + +func TestPubsub(t *testing.T) { + err := pubsub.Subscribe(fmt.Sprintf("%s.%s", chansPrefix, topic), handler) + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + err = pubsub.Subscribe(fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), handler) + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + cases := []struct { + desc string + channel string + subtopic string + payload []byte + }{ + { + desc: "publish message with nil payload", + payload: nil, + }, + { + desc: "publish message with string payload", + payload: data, + }, + { + desc: "publish message with channel", + payload: data, + channel: channel, + }, + { + desc: "publish message with subtopic", + payload: data, + subtopic: subtopic, + }, + { + desc: "publish message with channel and subtopic", + payload: data, + channel: channel, + subtopic: subtopic, + }, + } + + for _, tc := range cases { + expectedMsg := messaging.Message{ + Channel: tc.channel, + Subtopic: tc.subtopic, + Payload: tc.payload, + } + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + err = pubsub.Publish(topic, expectedMsg) + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + receivedMsg := <-msgChan + assert.Equal(t, expectedMsg, receivedMsg, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, expectedMsg, receivedMsg)) + } +} + +func handler(msg messaging.Message) error { + msgChan <- msg + return nil +} diff --git a/pkg/messaging/nats/setup_test.go b/pkg/messaging/nats/setup_test.go new file mode 100644 index 00000000..43d1ea2c --- /dev/null +++ b/pkg/messaging/nats/setup_test.go @@ -0,0 +1,74 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats_test + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + "testing" + + "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/mainflux/mainflux/pkg/messaging/nats" + dockertest "github.com/ory/dockertest/v3" +) + +var ( + publisher messaging.Publisher + pubsub messaging.PubSub +) + +func TestMain(m *testing.M) { + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + container, err := pool.Run("nats", "1.3.0", []string{}) + if err != nil { + log.Fatalf("Could not start container: %s", err) + } + handleInterrupt(pool, container) + + address := fmt.Sprintf("%s:%s", "localhost", container.GetPort("4222/tcp")) + if err := pool.Retry(func() error { + publisher, err = nats.NewPublisher(address) + return err + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + logger, err := logger.New(os.Stdout, "error") + if err != nil { + log.Fatalf(err.Error()) + } + if err := pool.Retry(func() error { + pubsub, err = nats.NewPubSub(address, "", logger) + return err + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + code := m.Run() + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + + os.Exit(code) +} + +func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) { + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + os.Exit(0) + }() +}