1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-05-02 22:17:10 +08:00
b1ackd0t abc1e3d858
MF-703 - Reliably Publish Event Messages to Redis (#1836)
* Reliably Publish Event Messages to Redis

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Make Redis Producer Safe For Concurrent Use

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Combine Redis Publishing

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Add defer statement

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Use Channel Instead of Array

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Adding `occurred_at`

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Fix Check `occurred_at`

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Remove Unused Const

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Check For Non NIL Error on Publishing

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Add More Tests

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Hanndle When Channel Is Full

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Fix Issue After Rebase

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Fix Tests

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

---------

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>
2023-07-31 17:03:45 +02:00

197 lines
4.7 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis
import (
"context"
"github.com/go-redis/redis/v8"
mfredis "github.com/mainflux/mainflux/internal/clients/redis"
mfclients "github.com/mainflux/mainflux/pkg/clients"
"github.com/mainflux/mainflux/things/clients"
)
const (
streamID = "mainflux.things"
streamLen = 1000
)
var _ clients.Service = (*eventStore)(nil)
type eventStore struct {
mfredis.Publisher
svc clients.Service
client *redis.Client
}
// NewEventStoreMiddleware returns wrapper around things service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc clients.Service, client *redis.Client) clients.Service {
es := &eventStore{
svc: svc,
client: client,
Publisher: mfredis.NewEventStore(client, streamID, streamLen),
}
go es.StartPublishingRoutine(ctx)
return es
}
func (es *eventStore) CreateThings(ctx context.Context, token string, thing ...mfclients.Client) ([]mfclients.Client, error) {
sths, err := es.svc.CreateThings(ctx, token, thing...)
if err != nil {
return sths, err
}
for _, th := range sths {
event := createClientEvent{
th,
}
if err := es.Publish(ctx, event); err != nil {
return sths, err
}
}
return sths, nil
}
func (es *eventStore) UpdateClient(ctx context.Context, token string, thing mfclients.Client) (mfclients.Client, error) {
cli, err := es.svc.UpdateClient(ctx, token, thing)
if err != nil {
return mfclients.Client{}, err
}
return es.update(ctx, "", cli)
}
func (es *eventStore) UpdateClientOwner(ctx context.Context, token string, thing mfclients.Client) (mfclients.Client, error) {
cli, err := es.svc.UpdateClientOwner(ctx, token, thing)
if err != nil {
return mfclients.Client{}, err
}
return es.update(ctx, "owner", cli)
}
func (es *eventStore) UpdateClientTags(ctx context.Context, token string, thing mfclients.Client) (mfclients.Client, error) {
cli, err := es.svc.UpdateClientTags(ctx, token, thing)
if err != nil {
return mfclients.Client{}, err
}
return es.update(ctx, "tags", cli)
}
func (es *eventStore) UpdateClientSecret(ctx context.Context, token, id, key string) (mfclients.Client, error) {
cli, err := es.svc.UpdateClientSecret(ctx, token, id, key)
if err != nil {
return mfclients.Client{}, err
}
return es.update(ctx, "secret", cli)
}
func (es *eventStore) update(ctx context.Context, operation string, cli mfclients.Client) (mfclients.Client, error) {
event := updateClientEvent{
cli, operation,
}
if err := es.Publish(ctx, event); err != nil {
return cli, err
}
return cli, nil
}
func (es *eventStore) ViewClient(ctx context.Context, token, id string) (mfclients.Client, error) {
cli, err := es.svc.ViewClient(ctx, token, id)
if err != nil {
return mfclients.Client{}, err
}
event := viewClientEvent{
cli,
}
if err := es.Publish(ctx, event); err != nil {
return cli, err
}
return cli, nil
}
func (es *eventStore) ListClients(ctx context.Context, token string, pm mfclients.Page) (mfclients.ClientsPage, error) {
cp, err := es.svc.ListClients(ctx, token, pm)
if err != nil {
return mfclients.ClientsPage{}, err
}
event := listClientEvent{
pm,
}
if err := es.Publish(ctx, event); err != nil {
return cp, err
}
return cp, nil
}
func (es *eventStore) ListClientsByGroup(ctx context.Context, token, chID string, pm mfclients.Page) (mfclients.MembersPage, error) {
cp, err := es.svc.ListClientsByGroup(ctx, token, chID, pm)
if err != nil {
return mfclients.MembersPage{}, err
}
event := listClientByGroupEvent{
pm, chID,
}
if err := es.Publish(ctx, event); err != nil {
return cp, err
}
return cp, nil
}
func (es *eventStore) EnableClient(ctx context.Context, token, id string) (mfclients.Client, error) {
cli, err := es.svc.EnableClient(ctx, token, id)
if err != nil {
return mfclients.Client{}, err
}
return es.delete(ctx, cli)
}
func (es *eventStore) DisableClient(ctx context.Context, token, id string) (mfclients.Client, error) {
cli, err := es.svc.DisableClient(ctx, token, id)
if err != nil {
return mfclients.Client{}, err
}
return es.delete(ctx, cli)
}
func (es *eventStore) delete(ctx context.Context, cli mfclients.Client) (mfclients.Client, error) {
event := removeClientEvent{
id: cli.ID,
updatedAt: cli.UpdatedAt,
updatedBy: cli.UpdatedBy,
status: cli.Status.String(),
}
if err := es.Publish(ctx, event); err != nil {
return cli, err
}
return cli, nil
}
func (es *eventStore) Identify(ctx context.Context, key string) (string, error) {
thingID, err := es.svc.Identify(ctx, key)
if err != nil {
return "", err
}
event := identifyClientEvent{
thingID: thingID,
}
if err := es.Publish(ctx, event); err != nil {
return thingID, err
}
return thingID, nil
}