1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-24 13:48:49 +08:00

MF-1197 - Add MQTT adapter tests (#1622)

* MF-1197 - add mqtt tests

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - add test cases

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - refactor tests

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Fix test names.Add const

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Add mocks. Add publish test

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Add buffer to logger

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Add redis mock.Change logger error

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Add new test cases

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Fix session fields

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Fix bad logs. Fix tests

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Add new test case.Fix names.

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Fix test name

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Refactoring tests

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Wrap errors

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Wrap errors. Fix logs level

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Wrap errors.

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Fix var names

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-888 - Add clientID err check.Refactor tests

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Fix info and err names

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Remove redundant log and channel parts check

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Revert channel parts check

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Add authorize test case

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Remove unused var

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Change var name

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 -Add missing test case.Change var name

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 -Add err case.

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 -Change var name

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Use logMsg for all logs

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Use logs instead of errors

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Use redis mock

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Return interface in constructor

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Add missing test cases. Create consts

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Add global vars

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Refactor code

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Remove unused var

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Remove unnecessary var

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Use created session client

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Refactor vars

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

* MF-1197 - Separate constant

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>

Signed-off-by: zzokki81 <zoran.rebic@outlook.com>
This commit is contained in:
Zoran Rebic 2022-11-21 14:40:19 +01:00 committed by GitHub
parent 677f3c70b0
commit 933b5dedce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 530 additions and 39 deletions

View File

@ -5,6 +5,7 @@ package mqtt
import (
"context"
"fmt"
"net/url"
"regexp"
"strings"
@ -22,15 +23,32 @@ var _ session.Handler = (*handler)(nil)
const protocol = "mqtt"
const (
LogInfoSubscribed = "subscribed with client_id %s to topics %s"
LogInfoUnsubscribed = "unsubscribed client_id %s from topics %s"
LogInfoConnected = "connected with client_id %s"
LogInfoDisconnected = "disconnected client_id %s and username %s"
LogInfoPublished = "published with client_id %s to the topic %s"
LogErrFailedConnect = "failed to connect: "
LogErrFailedSubscribe = "failed to subscribe: "
LogErrFailedUnsubscribe = "failed to unsubscribe: "
LogErrFailedPublish = "failed to publish: "
LogErrFailedDisconnect = "failed to disconnect: "
LogErrFailedPublishDisconnectEvent = "failed to publish disconnect event: "
logErrFailedParseSubtopic = "failed to parse subtopic: "
LogErrFailedPublishConnectEvent = "failed to publish connect event: "
LogErrFailedPublishToMsgBroker = "failed to publish to mainflux message broker: "
)
var (
channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
errMalformedTopic = errors.New("malformed topic")
errMalformedData = errors.New("malformed request data")
errMalformedSubtopic = errors.New("malformed subtopic")
errNilClient = errors.New("using nil client")
errInvalidConnect = errors.New("CONNECT request with invalid username or client ID")
errNilTopicPub = errors.New("PUBLISH to nil topic")
errNilTopicSub = errors.New("SUB to nil topic")
channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
ErrMalformedSubtopic = errors.New("malformed subtopic")
ErrClientNotInitialized = errors.New("client is not initialized")
ErrMalformedTopic = errors.New("malformed topic")
ErrMissingClientID = errors.New("client_id not found")
ErrMissingTopicPub = errors.New("failed to publish due to missing topic")
ErrMissingTopicSub = errors.New("failed to subscribe due to missing topic")
ErrAuthentication = errors.New("failed to perform authentication over the entity")
)
// Event implements events.Event interface
@ -56,7 +74,11 @@ func NewHandler(publishers []messaging.Publisher, es redis.EventStore,
// prior forwarding to the MQTT broker
func (h *handler) AuthConnect(c *session.Client) error {
if c == nil {
return errInvalidConnect
return ErrClientNotInitialized
}
if c.ID == "" {
return ErrMissingClientID
}
thid, err := h.auth.Identify(context.Background(), string(c.Password))
@ -69,7 +91,7 @@ func (h *handler) AuthConnect(c *session.Client) error {
}
if err := h.es.Connect(c.Username); err != nil {
h.logger.Warn("Failed to publish connect event: " + err.Error())
h.logger.Error(LogErrFailedPublishConnectEvent + err.Error())
}
return nil
@ -79,10 +101,10 @@ func (h *handler) AuthConnect(c *session.Client) error {
// prior forwarding to the MQTT broker
func (h *handler) AuthPublish(c *session.Client, topic *string, payload *[]byte) error {
if c == nil {
return errNilClient
return ErrClientNotInitialized
}
if topic == nil {
return errNilTopicPub
return ErrMissingTopicPub
}
return h.authAccess(c.Username, *topic)
@ -92,10 +114,10 @@ func (h *handler) AuthPublish(c *session.Client, topic *string, payload *[]byte)
// prior forwarding to the MQTT broker
func (h *handler) AuthSubscribe(c *session.Client, topics *[]string) error {
if c == nil {
return errNilClient
return ErrClientNotInitialized
}
if topics == nil || *topics == nil {
return errNilTopicSub
return ErrMissingTopicSub
}
for _, v := range *topics {
@ -111,25 +133,25 @@ func (h *handler) AuthSubscribe(c *session.Client, topics *[]string) error {
// Connect - after client successfully connected
func (h *handler) Connect(c *session.Client) {
if c == nil {
h.logger.Error("Nil client connect")
h.logger.Error(LogErrFailedConnect + (ErrClientNotInitialized).Error())
return
}
h.logger.Info("Connect - client with ID: " + c.ID)
h.logger.Info(fmt.Sprintf(LogInfoConnected, c.ID))
}
// Publish - after client successfully published
func (h *handler) Publish(c *session.Client, topic *string, payload *[]byte) {
if c == nil {
h.logger.Error("Nil client publish")
h.logger.Error(LogErrFailedPublish + ErrClientNotInitialized.Error())
return
}
h.logger.Info("Publish - client ID " + c.ID + " to the topic: " + *topic)
h.logger.Info(fmt.Sprintf(LogInfoPublished, c.ID, *topic))
// Topics are in the format:
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
channelParts := channelRegExp.FindStringSubmatch(*topic)
if len(channelParts) < 1 {
h.logger.Info("Error in mqtt publish %s" + errMalformedData.Error())
if len(channelParts) < 2 {
h.logger.Error(LogErrFailedPublish + (ErrMalformedTopic).Error())
return
}
@ -138,7 +160,7 @@ func (h *handler) Publish(c *session.Client, topic *string, payload *[]byte) {
subtopic, err := parseSubtopic(subtopic)
if err != nil {
h.logger.Info("Error parsing subtopic: " + err.Error())
h.logger.Error(logErrFailedParseSubtopic + err.Error())
return
}
@ -153,7 +175,7 @@ func (h *handler) Publish(c *session.Client, topic *string, payload *[]byte) {
for _, pub := range h.publishers {
if err := pub.Publish(msg.Channel, msg); err != nil {
h.logger.Info("Error publishing to Mainflux " + err.Error())
h.logger.Error(LogErrFailedPublishToMsgBroker + err.Error())
}
}
}
@ -161,30 +183,30 @@ func (h *handler) Publish(c *session.Client, topic *string, payload *[]byte) {
// Subscribe - after client successfully subscribed
func (h *handler) Subscribe(c *session.Client, topics *[]string) {
if c == nil {
h.logger.Error("Nil client subscribe")
h.logger.Error(LogErrFailedSubscribe + (ErrClientNotInitialized).Error())
return
}
h.logger.Info("Subscribe - client ID: " + c.ID + ", to topics: " + strings.Join(*topics, ","))
h.logger.Info(fmt.Sprintf(LogInfoSubscribed, c.ID, strings.Join(*topics, ",")))
}
// Unsubscribe - after client unsubscribed
func (h *handler) Unsubscribe(c *session.Client, topics *[]string) {
if c == nil {
h.logger.Error("Nil client unsubscribe")
h.logger.Error(LogErrFailedUnsubscribe + (ErrClientNotInitialized).Error())
return
}
h.logger.Info("Unsubscribe - client ID: " + c.ID + ", form topics: " + strings.Join(*topics, ","))
h.logger.Info(fmt.Sprintf(LogInfoUnsubscribed, c.ID, strings.Join(*topics, ",")))
}
// Disconnect - connection with broker or client lost
func (h *handler) Disconnect(c *session.Client) {
if c == nil {
h.logger.Error("Nil client disconnect")
h.logger.Error(LogErrFailedDisconnect + (ErrClientNotInitialized).Error())
return
}
h.logger.Info("Disconnect - Client with ID: " + c.ID + " and username " + c.Username + " disconnected")
h.logger.Error(fmt.Sprintf(LogInfoDisconnected, c.ID, c.Username))
if err := h.es.Disconnect(c.Username); err != nil {
h.logger.Warn("Failed to publish disconnect event: " + err.Error())
h.logger.Error(LogErrFailedPublishDisconnectEvent + err.Error())
}
}
@ -192,13 +214,12 @@ func (h *handler) authAccess(username string, topic string) error {
// Topics are in the format:
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
if !channelRegExp.Match([]byte(topic)) {
h.logger.Info("Malformed topic: " + topic)
return errMalformedTopic
return ErrMalformedTopic
}
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 1 {
return errMalformedData
return ErrMalformedTopic
}
chanID := channelParts[1]
@ -212,7 +233,7 @@ func parseSubtopic(subtopic string) (string, error) {
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", errMalformedSubtopic
return "", ErrMalformedSubtopic
}
subtopic = strings.Replace(subtopic, "/", ".", -1)
@ -224,7 +245,7 @@ func parseSubtopic(subtopic string) (string, error) {
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", errMalformedSubtopic
return "", ErrMalformedSubtopic
}
filteredElems = append(filteredElems, elem)

396
mqtt/handler_test.go Normal file
View File

@ -0,0 +1,396 @@
package mqtt_test
import (
"bytes"
"fmt"
"log"
"testing"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/mqtt"
"github.com/mainflux/mainflux/mqtt/mocks"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mproxy/pkg/session"
"github.com/stretchr/testify/assert"
)
const (
thingID = "513d02d2-16c1-4f23-98be-9e12f8fee898"
chanID = "123e4567-e89b-12d3-a456-000000000001"
invalidID = "invalidID"
clientID = "clientID"
password = "password"
subtopic = "testSubtopic"
invalidChannelIDTopic = "channels/**/messages"
)
var (
topicMsg = "channels/%s/messages"
topic = fmt.Sprintf(topicMsg, chanID)
invalidTopic = "invalidTopic"
payload = []byte("[{'n':'test-name', 'v': 1.2}]")
topics = []string{topic}
invalidTopics = []string{invalidTopic}
invalidChanIDTopics = []string{fmt.Sprintf(topicMsg, invalidTopic)}
//Test log messages for cases the handler does not provide a return value.
logBuffer = bytes.Buffer{}
sessionClient = session.Client{
ID: clientID,
Username: thingID,
Password: []byte(password),
}
invalidThingSessionClient = session.Client{
ID: clientID,
Username: invalidID,
Password: []byte(password),
}
)
func TestAuthConnect(t *testing.T) {
handler := newHandler()
cases := []struct {
desc string
err error
session *session.Client
}{
{
desc: "connect without active session",
err: mqtt.ErrClientNotInitialized,
session: nil,
},
{
desc: "connect without clientID",
err: mqtt.ErrMissingClientID,
session: &session.Client{
ID: "",
Username: thingID,
Password: []byte(password),
},
},
{
desc: "connect with invalid password",
err: errors.ErrAuthentication,
session: &session.Client{
ID: clientID,
Username: thingID,
Password: []byte(""),
},
},
{
desc: "connect with valid password and invalid username",
err: errors.ErrAuthentication,
session: &invalidThingSessionClient,
},
{
desc: "connect with valid username and password",
err: nil,
session: &sessionClient,
},
}
for _, tc := range cases {
err := handler.AuthConnect(tc.session)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestAuthPublish(t *testing.T) {
handler := newHandler()
cases := []struct {
desc string
client *session.Client
err error
topic *string
payload []byte
}{
{
desc: "publish with inactive client",
client: nil,
err: mqtt.ErrClientNotInitialized,
topic: &topic,
payload: payload,
},
{
desc: "publish without topic",
client: &sessionClient,
err: mqtt.ErrMissingTopicPub,
topic: nil,
payload: payload,
},
{
desc: "publish with malformed topic",
client: &sessionClient,
err: mqtt.ErrMalformedTopic,
topic: &invalidTopic,
payload: payload,
},
{
desc: "publish successfully",
client: &sessionClient,
err: nil,
topic: &topic,
payload: payload,
},
}
for _, tc := range cases {
err := handler.AuthPublish(tc.client, tc.topic, &tc.payload)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestAuthSubscribe(t *testing.T) {
handler := newHandler()
cases := []struct {
desc string
client *session.Client
err error
topic *[]string
}{
{
desc: "subscribe without active session",
client: nil,
err: mqtt.ErrClientNotInitialized,
topic: &topics,
},
{
desc: "subscribe without topics",
client: &sessionClient,
err: mqtt.ErrMissingTopicSub,
topic: nil,
},
{
desc: "subscribe with invalid topics",
client: &sessionClient,
err: mqtt.ErrMalformedTopic,
topic: &invalidTopics,
},
{
desc: "subscribe with invalid channel ID",
client: &sessionClient,
err: mqtt.ErrAuthentication,
topic: &invalidChanIDTopics,
},
{
desc: "subscribe with invalid thing ID",
client: &invalidThingSessionClient,
err: mqtt.ErrAuthentication,
topic: &topics,
},
{
desc: "subscribe with active session and valid topics",
client: &sessionClient,
err: nil,
topic: &topics,
},
}
for _, tc := range cases {
err := handler.AuthSubscribe(tc.client, tc.topic)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
func TestConnect(t *testing.T) {
handler := newHandler()
logBuffer.Reset()
cases := []struct {
desc string
client *session.Client
logMsg string
}{
{
desc: "connect without active session",
client: nil,
logMsg: mqtt.LogErrFailedConnect + mqtt.ErrClientNotInitialized.Error(),
},
{
desc: "connect with active session",
client: &sessionClient,
logMsg: fmt.Sprintf(mqtt.LogInfoConnected, clientID),
},
}
for _, tc := range cases {
handler.Connect(tc.client)
assert.Contains(t, logBuffer.String(), tc.logMsg)
}
}
func TestPublish(t *testing.T) {
handler := newHandler()
logBuffer.Reset()
malformedSubtopics := topic + "/" + subtopic + "%"
wrongCharSubtopics := topic + "/" + subtopic + ">"
validSubtopic := topic + "/" + subtopic
cases := []struct {
desc string
client *session.Client
topic string
payload []byte
logMsg string
}{
{
desc: "publish without active session",
client: nil,
topic: topic,
payload: payload,
logMsg: mqtt.ErrClientNotInitialized.Error(),
},
{
desc: "publish with invalid topic",
client: &sessionClient,
topic: invalidTopic,
payload: payload,
logMsg: fmt.Sprintf(mqtt.LogInfoPublished, clientID, invalidTopic),
},
{
desc: "publish with invalid channel ID",
client: &sessionClient,
topic: invalidChannelIDTopic,
payload: payload,
logMsg: mqtt.LogErrFailedPublish + mqtt.ErrMalformedTopic.Error(),
},
{
desc: "publish with malformed subtopic",
client: &sessionClient,
topic: malformedSubtopics,
payload: payload,
logMsg: mqtt.ErrMalformedSubtopic.Error(),
},
{
desc: "publish with subtopic containing wrong character",
client: &sessionClient,
topic: wrongCharSubtopics,
payload: payload,
logMsg: mqtt.ErrMalformedSubtopic.Error(),
},
{
desc: "publish with subtopic",
client: &sessionClient,
topic: validSubtopic,
payload: payload,
logMsg: subtopic,
},
{
desc: "publish without subtopic",
client: &sessionClient,
topic: topic,
payload: payload,
logMsg: "",
},
}
for _, tc := range cases {
handler.Publish(tc.client, &tc.topic, &tc.payload)
assert.Contains(t, logBuffer.String(), tc.logMsg)
}
}
func TestSubscribe(t *testing.T) {
handler := newHandler()
logBuffer.Reset()
cases := []struct {
desc string
client *session.Client
topic []string
logMsg string
}{
{
desc: "subscribe without active session",
client: nil,
topic: topics,
logMsg: mqtt.LogErrFailedSubscribe + mqtt.ErrClientNotInitialized.Error(),
},
{
desc: "subscribe with valid session and topics",
client: &sessionClient,
topic: topics,
logMsg: fmt.Sprintf(mqtt.LogInfoSubscribed, clientID, topics[0]),
},
}
for _, tc := range cases {
handler.Subscribe(tc.client, &tc.topic)
assert.Contains(t, logBuffer.String(), tc.logMsg)
}
}
func TestUnsubscribe(t *testing.T) {
handler := newHandler()
logBuffer.Reset()
cases := []struct {
desc string
client *session.Client
topic []string
logMsg string
}{
{
desc: "unsubscribe without active session",
client: nil,
topic: topics,
logMsg: mqtt.LogErrFailedUnsubscribe + mqtt.ErrClientNotInitialized.Error(),
},
{
desc: "unsubscribe with valid session and topics",
client: &sessionClient,
topic: topics,
logMsg: fmt.Sprintf(mqtt.LogInfoUnsubscribed, clientID, topics[0]),
},
}
for _, tc := range cases {
handler.Unsubscribe(tc.client, &tc.topic)
assert.Contains(t, logBuffer.String(), tc.logMsg)
}
}
func TestDisconnect(t *testing.T) {
handler := newHandler()
logBuffer.Reset()
cases := []struct {
desc string
client *session.Client
topic []string
logMsg string
}{
{
desc: "disconnect without active session",
client: nil,
topic: topics,
logMsg: mqtt.LogErrFailedDisconnect + mqtt.ErrClientNotInitialized.Error(),
},
{
desc: "disconnect with valid session",
client: &sessionClient,
topic: topics,
logMsg: fmt.Sprintf(mqtt.LogInfoDisconnected, clientID, thingID),
},
}
for _, tc := range cases {
handler.Disconnect(tc.client)
assert.Contains(t, logBuffer.String(), tc.logMsg)
}
}
func newHandler() session.Handler {
logger, err := logger.New(&logBuffer, "debug")
if err != nil {
log.Fatalf("failed to create logger: %s", err)
}
authClient := mocks.NewClient(map[string]string{password: thingID}, map[string]interface{}{chanID: thingID})
eventStore := mocks.NewEventStore()
return mqtt.NewHandler([]messaging.Publisher{mocks.NewPublisher()}, eventStore, logger, authClient)
}

32
mqtt/mocks/auth.go Normal file
View File

@ -0,0 +1,32 @@
package mocks
import (
"context"
"github.com/mainflux/mainflux/pkg/auth"
"github.com/mainflux/mainflux/pkg/errors"
)
type MockClient struct {
key map[string]string
conns map[string]interface{}
}
func NewClient(key map[string]string, conns map[string]interface{}) auth.Client {
return MockClient{key: key, conns: conns}
}
func (cli MockClient) Authorize(ctx context.Context, chanID, thingID string) error {
for k, v := range cli.conns {
if k == chanID && v == thingID {
return nil
}
}
return errors.ErrAuthentication
}
func (cli MockClient) Identify(ctx context.Context, thingKey string) (string, error) {
if id, ok := cli.key[thingKey]; ok {
return id, nil
}
return "", errors.ErrAuthentication
}

18
mqtt/mocks/publisher.go Normal file
View File

@ -0,0 +1,18 @@
package mocks
import "github.com/mainflux/mainflux/pkg/messaging"
type MockPublisher struct{}
// NewPublisher returns mock message publisher.
func NewPublisher() messaging.Publisher {
return MockPublisher{}
}
func (pub MockPublisher) Publish(topic string, msg messaging.Message) error {
return nil
}
func (pub MockPublisher) Close() error {
return nil
}

19
mqtt/mocks/redis.go Normal file
View File

@ -0,0 +1,19 @@
package mocks
import (
"github.com/mainflux/mainflux/mqtt/redis"
)
type MockEventStore struct{}
func NewEventStore() redis.EventStore {
return MockEventStore{}
}
func (es MockEventStore) Connect(clientID string) error {
return nil
}
func (es MockEventStore) Disconnect(clientID string) error {
return nil
}

View File

@ -16,8 +16,13 @@ const (
streamLen = 1000
)
type EventStore interface {
Connect(clientID string) error
Disconnect(clientID string) error
}
// EventStore is a struct used to store event streams in Redis
type EventStore struct {
type eventStore struct {
client *redis.Client
instance string
}
@ -25,13 +30,13 @@ type EventStore struct {
// NewEventStore returns wrapper around mProxy service that sends
// events to event store.
func NewEventStore(client *redis.Client, instance string) EventStore {
return EventStore{
return eventStore{
client: client,
instance: instance,
}
}
func (es EventStore) storeEvent(clientID, eventType string) error {
func (es eventStore) storeEvent(clientID, eventType string) error {
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
event := mqttEvent{
@ -55,11 +60,11 @@ func (es EventStore) storeEvent(clientID, eventType string) error {
}
// Connect issues event on MQTT CONNECT
func (es EventStore) Connect(clientID string) error {
func (es eventStore) Connect(clientID string) error {
return es.storeEvent(clientID, "connect")
}
// Disconnect issues event on MQTT CONNECT
func (es EventStore) Disconnect(clientID string) error {
func (es eventStore) Disconnect(clientID string) error {
return es.storeEvent(clientID, "disconnect")
}