1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-29 13:49:28 +08:00
b1ackd0t 2f5e313c36
NOISSUE - Propagate Context (#1842)
* Initial Commit: Propagate Context

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Make IssueToken Inline

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

---------

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>
Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
2023-07-06 19:57:51 +02:00

62 lines
1.4 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis
import (
"context"
"github.com/go-redis/redis/v8"
)
const (
streamID = "mainflux.mqtt"
streamLen = 1000
)
type EventStore interface {
Connect(ctx context.Context, clientID string) error
Disconnect(ctx context.Context, clientID string) error
}
// EventStore is a struct used to store event streams in Redis.
type eventStore struct {
client *redis.Client
instance string
}
// NewEventStore returns wrapper around mProxy service that sends
// events to event store.
func NewEventStore(client *redis.Client, instance string) EventStore {
return eventStore{
client: client,
instance: instance,
}
}
func (es eventStore) storeEvent(ctx context.Context, clientID, eventType string) error {
event := mqttEvent{
clientID: clientID,
eventType: eventType,
instance: es.instance,
}
record := &redis.XAddArgs{
Stream: streamID,
MaxLenApprox: streamLen,
Values: event.Encode(),
}
return es.client.XAdd(ctx, record).Err()
}
// Connect issues event on MQTT CONNECT.
func (es eventStore) Connect(ctx context.Context, clientID string) error {
return es.storeEvent(ctx, clientID, "connect")
}
// Disconnect issues event on MQTT CONNECT.
func (es eventStore) Disconnect(ctx context.Context, clientID string) error {
return es.storeEvent(ctx, clientID, "disconnect")
}