From 23a665b54ad69177049afb66dbe39031014f74fc Mon Sep 17 00:00:00 2001 From: b1ackd0t Date: Wed, 2 Aug 2023 13:09:56 +0300 Subject: [PATCH] MF-1510 - Add Event Sourcing To Users Service (#1835) * Add Event Sourcing on Users Service Signed-off-by: rodneyosodo * Modify Events Sourced From Things Service Signed-off-by: rodneyosodo * Rename Cache Signed-off-by: rodneyosodo * Remove SharedBy From Events Signed-off-by: rodneyosodo * Use Combined Publisher Signed-off-by: rodneyosodo * Format Docker Compose File Signed-off-by: rodneyosodo * Rename Events Signed-off-by: rodneyosodo --------- Signed-off-by: rodneyosodo --- cmd/things/main.go | 16 +- cmd/users/main.go | 21 +- docker/.env | 3 + docker/docker-compose.yml | 4 + things/clients/redis/doc.go | 2 +- things/clients/redis/events.go | 12 +- things/clients/redis/streams.go | 41 +-- things/groups/redis/events.go | 3 - things/groups/redis/streams.go | 30 +-- things/policies/redis/doc.go | 2 +- things/policies/redis/events.go | 3 +- things/policies/redis/streams.go | 22 +- users/clients/redis/doc.go | 5 + users/clients/redis/events.go | 417 +++++++++++++++++++++++++++++++ users/clients/redis/streams.go | 294 ++++++++++++++++++++++ users/groups/redis/doc.go | 5 + users/groups/redis/events.go | 263 +++++++++++++++++++ users/groups/redis/streams.go | 155 ++++++++++++ users/policies/redis/doc.go | 5 + users/policies/redis/events.go | 107 ++++++++ users/policies/redis/streams.go | 104 ++++++++ 21 files changed, 1451 insertions(+), 63 deletions(-) create mode 100644 users/clients/redis/doc.go create mode 100644 users/clients/redis/events.go create mode 100644 users/clients/redis/streams.go create mode 100644 users/groups/redis/doc.go create mode 100644 users/groups/redis/events.go create mode 100644 users/groups/redis/streams.go create mode 100644 users/policies/redis/doc.go create mode 100644 users/policies/redis/events.go create mode 100644 users/policies/redis/streams.go diff --git a/cmd/things/main.go b/cmd/things/main.go index 74e74491..de0db41d 100644 --- a/cmd/things/main.go +++ b/cmd/things/main.go @@ -31,20 +31,20 @@ import ( "github.com/mainflux/mainflux/things/clients" capi "github.com/mainflux/mainflux/things/clients/api" cpostgres "github.com/mainflux/mainflux/things/clients/postgres" - redisthcache "github.com/mainflux/mainflux/things/clients/redis" + thcache "github.com/mainflux/mainflux/things/clients/redis" localusers "github.com/mainflux/mainflux/things/clients/standalone" ctracing "github.com/mainflux/mainflux/things/clients/tracing" "github.com/mainflux/mainflux/things/groups" gapi "github.com/mainflux/mainflux/things/groups/api" gpostgres "github.com/mainflux/mainflux/things/groups/postgres" - redischcache "github.com/mainflux/mainflux/things/groups/redis" + chcache "github.com/mainflux/mainflux/things/groups/redis" gtracing "github.com/mainflux/mainflux/things/groups/tracing" tpolicies "github.com/mainflux/mainflux/things/policies" papi "github.com/mainflux/mainflux/things/policies/api" grpcapi "github.com/mainflux/mainflux/things/policies/api/grpc" httpapi "github.com/mainflux/mainflux/things/policies/api/http" ppostgres "github.com/mainflux/mainflux/things/policies/postgres" - redispcache "github.com/mainflux/mainflux/things/policies/redis" + pcache "github.com/mainflux/mainflux/things/policies/redis" ppracing "github.com/mainflux/mainflux/things/policies/tracing" thingsPg "github.com/mainflux/mainflux/things/postgres" upolicies "github.com/mainflux/mainflux/users/policies" @@ -224,16 +224,16 @@ func newService(ctx context.Context, db *sqlx.DB, dbConfig pgClient.Config, auth logger.Error(fmt.Sprintf("failed to parse cache key duration: %s", err.Error())) } - policyCache := redispcache.NewCache(cacheClient, kDuration) - thingCache := redisthcache.NewCache(cacheClient, kDuration) + policyCache := pcache.NewCache(cacheClient, kDuration) + thingCache := thcache.NewCache(cacheClient, kDuration) psvc := tpolicies.NewService(auth, pRepo, policyCache, idp) csvc := clients.NewService(auth, psvc, cRepo, gRepo, thingCache, idp) gsvc := groups.NewService(auth, psvc, gRepo, idp) - csvc = redisthcache.NewEventStoreMiddleware(ctx, csvc, esClient) - gsvc = redischcache.NewEventStoreMiddleware(ctx, gsvc, esClient) - psvc = redispcache.NewEventStoreMiddleware(ctx, psvc, esClient) + csvc = thcache.NewEventStoreMiddleware(ctx, csvc, esClient) + gsvc = chcache.NewEventStoreMiddleware(ctx, gsvc, esClient) + psvc = pcache.NewEventStoreMiddleware(ctx, psvc, esClient) csvc = ctracing.New(csvc, tracer) csvc = capi.LoggingMiddleware(csvc, logger) diff --git a/cmd/users/main.go b/cmd/users/main.go index 15f2eec2..2a6e4f5a 100644 --- a/cmd/users/main.go +++ b/cmd/users/main.go @@ -12,6 +12,7 @@ import ( "regexp" "time" + "github.com/go-redis/redis/v8" "github.com/go-zoo/bone" "github.com/jmoiron/sqlx" chclient "github.com/mainflux/callhome/pkg/client" @@ -19,6 +20,7 @@ import ( "github.com/mainflux/mainflux/internal" jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" pgClient "github.com/mainflux/mainflux/internal/clients/postgres" + redisClient "github.com/mainflux/mainflux/internal/clients/redis" "github.com/mainflux/mainflux/internal/email" "github.com/mainflux/mainflux/internal/env" "github.com/mainflux/mainflux/internal/postgres" @@ -32,10 +34,12 @@ import ( capi "github.com/mainflux/mainflux/users/clients/api" "github.com/mainflux/mainflux/users/clients/emailer" cpostgres "github.com/mainflux/mainflux/users/clients/postgres" + ucache "github.com/mainflux/mainflux/users/clients/redis" ctracing "github.com/mainflux/mainflux/users/clients/tracing" "github.com/mainflux/mainflux/users/groups" gapi "github.com/mainflux/mainflux/users/groups/api" gpostgres "github.com/mainflux/mainflux/users/groups/postgres" + gcache "github.com/mainflux/mainflux/users/groups/redis" gtracing "github.com/mainflux/mainflux/users/groups/tracing" "github.com/mainflux/mainflux/users/hasher" "github.com/mainflux/mainflux/users/jwt" @@ -44,6 +48,7 @@ import ( grpcapi "github.com/mainflux/mainflux/users/policies/api/grpc" httpapi "github.com/mainflux/mainflux/users/policies/api/http" ppostgres "github.com/mainflux/mainflux/users/policies/postgres" + pcache "github.com/mainflux/mainflux/users/policies/redis" ptracing "github.com/mainflux/mainflux/users/policies/tracing" clientsPg "github.com/mainflux/mainflux/users/postgres" "go.opentelemetry.io/otel/trace" @@ -55,6 +60,7 @@ import ( const ( svcName = "users" envPrefixDB = "MF_USERS_DB_" + envPrefixES = "MF_USERS_ES_" envPrefixHTTP = "MF_USERS_HTTP_" envPrefixGrpc = "MF_USERS_GRPC_" defDB = "users" @@ -139,7 +145,14 @@ func main() { }() tracer := tp.Tracer(svcName) - csvc, gsvc, psvc := newService(ctx, db, dbConfig, tracer, cfg, ec, logger) + // Setup new redis event store client + esClient, err := redisClient.Setup(envPrefixES) + if err != nil { + logger.Fatal(err.Error()) + } + defer esClient.Close() + + csvc, gsvc, psvc := newService(ctx, db, dbConfig, esClient, tracer, cfg, ec, logger) httpServerConfig := server.Config{Port: defSvcHTTPPort} if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { @@ -185,7 +198,7 @@ func main() { } } -func newService(ctx context.Context, db *sqlx.DB, dbConfig pgClient.Config, tracer trace.Tracer, c config, ec email.Config, logger mflog.Logger) (clients.Service, groups.Service, policies.Service) { +func newService(ctx context.Context, db *sqlx.DB, dbConfig pgClient.Config, esClient *redis.Client, tracer trace.Tracer, c config, ec email.Config, logger mflog.Logger) (clients.Service, groups.Service, policies.Service) { database := postgres.NewDatabase(db, dbConfig, tracer) cRepo := cpostgres.NewRepository(database) gRepo := gpostgres.NewRepository(database) @@ -212,6 +225,10 @@ func newService(ctx context.Context, db *sqlx.DB, dbConfig pgClient.Config, trac gsvc := groups.NewService(gRepo, pRepo, tokenizer, idp) psvc := policies.NewService(pRepo, tokenizer, idp) + csvc = ucache.NewEventStoreMiddleware(ctx, csvc, esClient) + gsvc = gcache.NewEventStoreMiddleware(ctx, gsvc, esClient) + psvc = pcache.NewEventStoreMiddleware(ctx, psvc, esClient) + csvc = ctracing.New(csvc, tracer) csvc = capi.LoggingMiddleware(csvc, logger) counter, latency := internal.MakeMetrics(svcName, "api") diff --git a/docker/.env b/docker/.env index b19cde81..0e9e5ee2 100644 --- a/docker/.env +++ b/docker/.env @@ -64,6 +64,9 @@ MF_USERS_DB_SSL_MODE=disable MF_USERS_DB_SSL_CERT= MF_USERS_DB_SSL_KEY= MF_USERS_DB_SSL_ROOT_CERT= +MF_USERS_ES_URL=es-redis:${MF_REDIS_TCP_PORT} +MF_USERS_ES_PASS= +MF_USERS_ES_DB= MF_USERS_RESET_PWD_TEMPLATE=users.tmpl MF_USERS_INSTANCE_ID= diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index b0b25fb8..40fd65c5 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -14,6 +14,7 @@ volumes: mainflux-es-redis-volume: mainflux-mqtt-broker-volume: + services: nginx: image: nginx:1.23.3-alpine @@ -176,6 +177,9 @@ services: MF_EMAIL_FROM_ADDRESS: ${MF_EMAIL_FROM_ADDRESS} MF_EMAIL_FROM_NAME: ${MF_EMAIL_FROM_NAME} MF_EMAIL_TEMPLATE: ${MF_EMAIL_TEMPLATE} + MF_USERS_ES_URL: ${MF_USERS_ES_URL} + MF_USERS_ES_PASS: ${MF_USERS_ES_PASS} + MF_USERS_ES_DB: ${MF_USERS_ES_DB} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} ports: diff --git a/things/clients/redis/doc.go b/things/clients/redis/doc.go index 39f6938a..ccaef648 100644 --- a/things/clients/redis/doc.go +++ b/things/clients/redis/doc.go @@ -2,5 +2,5 @@ // SPDX-License-Identifier: Apache-2.0 // Package redis contains cache implementations using Redis as -// the underlying cache. +// the underlying cache and event store. package redis diff --git a/things/clients/redis/events.go b/things/clients/redis/events.go index 077fa81f..21c77e71 100644 --- a/things/clients/redis/events.go +++ b/things/clients/redis/events.go @@ -78,10 +78,13 @@ type updateClientEvent struct { func (uce updateClientEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ - "operation": clientUpdate + "." + uce.operation, + "operation": clientUpdate, "updated_at": uce.UpdatedAt, "updated_by": uce.UpdatedBy, } + if uce.operation != "" { + val["operation"] = clientUpdate + "_" + uce.operation + } if uce.ID != "" { val["id"] = uce.ID @@ -93,6 +96,9 @@ func (uce updateClientEvent) Encode() (map[string]interface{}, error) { tags := fmt.Sprintf("[%s]", strings.Join(uce.Tags, ",")) val["tags"] = tags } + if uce.Owner != "" { + val["owner"] = uce.Owner + } if uce.Credentials.Identity != "" { val["identity"] = uce.Credentials.Identity } @@ -225,6 +231,10 @@ func (lce listClientEvent) Encode() (map[string]interface{}, error) { if lce.Subject != "" { val["subject"] = lce.Subject } + if len(lce.IDs) > 0 { + ids := fmt.Sprintf("[%s]", strings.Join(lce.IDs, ",")) + val["ids"] = ids + } if lce.Identity != "" { val["identity"] = lce.Identity } diff --git a/things/clients/redis/streams.go b/things/clients/redis/streams.go index 573e9860..2fc45b0d 100644 --- a/things/clients/redis/streams.go +++ b/things/clients/redis/streams.go @@ -53,13 +53,14 @@ func (es *eventStore) CreateThings(ctx context.Context, token string, thing ...m return sths, err } } + return sths, nil } func (es *eventStore) UpdateClient(ctx context.Context, token string, thing mfclients.Client) (mfclients.Client, error) { cli, err := es.svc.UpdateClient(ctx, token, thing) if err != nil { - return mfclients.Client{}, err + return cli, err } return es.update(ctx, "", cli) @@ -68,7 +69,7 @@ func (es *eventStore) UpdateClient(ctx context.Context, token string, thing mfcl func (es *eventStore) UpdateClientOwner(ctx context.Context, token string, thing mfclients.Client) (mfclients.Client, error) { cli, err := es.svc.UpdateClientOwner(ctx, token, thing) if err != nil { - return mfclients.Client{}, err + return cli, err } return es.update(ctx, "owner", cli) @@ -77,7 +78,7 @@ func (es *eventStore) UpdateClientOwner(ctx context.Context, token string, thing func (es *eventStore) UpdateClientTags(ctx context.Context, token string, thing mfclients.Client) (mfclients.Client, error) { cli, err := es.svc.UpdateClientTags(ctx, token, thing) if err != nil { - return mfclients.Client{}, err + return cli, err } return es.update(ctx, "tags", cli) @@ -86,28 +87,30 @@ func (es *eventStore) UpdateClientTags(ctx context.Context, token string, thing func (es *eventStore) UpdateClientSecret(ctx context.Context, token, id, key string) (mfclients.Client, error) { cli, err := es.svc.UpdateClientSecret(ctx, token, id, key) if err != nil { - return mfclients.Client{}, err + return cli, err } return es.update(ctx, "secret", cli) } -func (es *eventStore) update(ctx context.Context, operation string, cli mfclients.Client) (mfclients.Client, error) { +func (es *eventStore) update(ctx context.Context, operation string, thing mfclients.Client) (mfclients.Client, error) { event := updateClientEvent{ - cli, operation, - } - if err := es.Publish(ctx, event); err != nil { - return cli, err + thing, operation, } - return cli, nil + if err := es.Publish(ctx, event); err != nil { + return thing, err + } + + return thing, nil } func (es *eventStore) ViewClient(ctx context.Context, token, id string) (mfclients.Client, error) { cli, err := es.svc.ViewClient(ctx, token, id) if err != nil { - return mfclients.Client{}, err + return cli, err } + event := viewClientEvent{ cli, } @@ -121,7 +124,7 @@ func (es *eventStore) ViewClient(ctx context.Context, token, id string) (mfclien func (es *eventStore) ListClients(ctx context.Context, token string, pm mfclients.Page) (mfclients.ClientsPage, error) { cp, err := es.svc.ListClients(ctx, token, pm) if err != nil { - return mfclients.ClientsPage{}, err + return cp, err } event := listClientEvent{ pm, @@ -134,24 +137,24 @@ func (es *eventStore) ListClients(ctx context.Context, token string, pm mfclient } func (es *eventStore) ListClientsByGroup(ctx context.Context, token, chID string, pm mfclients.Page) (mfclients.MembersPage, error) { - cp, err := es.svc.ListClientsByGroup(ctx, token, chID, pm) + mp, err := es.svc.ListClientsByGroup(ctx, token, chID, pm) if err != nil { - return mfclients.MembersPage{}, err + return mp, err } event := listClientByGroupEvent{ pm, chID, } if err := es.Publish(ctx, event); err != nil { - return cp, err + return mp, err } - return cp, nil + return mp, nil } func (es *eventStore) EnableClient(ctx context.Context, token, id string) (mfclients.Client, error) { cli, err := es.svc.EnableClient(ctx, token, id) if err != nil { - return mfclients.Client{}, err + return cli, err } return es.delete(ctx, cli) @@ -160,7 +163,7 @@ func (es *eventStore) EnableClient(ctx context.Context, token, id string) (mfcli func (es *eventStore) DisableClient(ctx context.Context, token, id string) (mfclients.Client, error) { cli, err := es.svc.DisableClient(ctx, token, id) if err != nil { - return mfclients.Client{}, err + return cli, err } return es.delete(ctx, cli) @@ -183,7 +186,7 @@ func (es *eventStore) delete(ctx context.Context, cli mfclients.Client) (mfclien func (es *eventStore) Identify(ctx context.Context, key string) (string, error) { thingID, err := es.svc.Identify(ctx, key) if err != nil { - return "", err + return thingID, err } event := identifyClientEvent{ thingID: thingID, diff --git a/things/groups/redis/events.go b/things/groups/redis/events.go index 7584963d..463f9a0e 100644 --- a/things/groups/redis/events.go +++ b/things/groups/redis/events.go @@ -108,9 +108,6 @@ func (uce updateGroupEvent) Encode() (map[string]interface{}, error) { if !uce.CreatedAt.IsZero() { val["created_at"] = uce.CreatedAt } - if !uce.UpdatedAt.IsZero() { - val["updated_at"] = uce.UpdatedAt - } if uce.Status.String() != "" { val["status"] = uce.Status.String() } diff --git a/things/groups/redis/streams.go b/things/groups/redis/streams.go index 31ff24c4..1a2d2e55 100644 --- a/things/groups/redis/streams.go +++ b/things/groups/redis/streams.go @@ -40,26 +40,26 @@ func NewEventStoreMiddleware(ctx context.Context, svc groups.Service, client *re } func (es *eventStore) CreateGroups(ctx context.Context, token string, groups ...mfgroups.Group) ([]mfgroups.Group, error) { - gs, err := es.svc.CreateGroups(ctx, token, groups...) + grps, err := es.svc.CreateGroups(ctx, token, groups...) if err != nil { - return gs, err + return grps, err } - for _, group := range gs { + for _, group := range grps { event := createGroupEvent{ group, } if err := es.Publish(ctx, event); err != nil { - return gs, err + return grps, err } } - return gs, nil + return grps, nil } func (es *eventStore) UpdateGroup(ctx context.Context, token string, group mfgroups.Group) (mfgroups.Group, error) { group, err := es.svc.UpdateGroup(ctx, token, group) if err != nil { - return mfgroups.Group{}, err + return group, err } event := updateGroupEvent{ @@ -75,7 +75,7 @@ func (es *eventStore) UpdateGroup(ctx context.Context, token string, group mfgro func (es *eventStore) ViewGroup(ctx context.Context, token, id string) (mfgroups.Group, error) { group, err := es.svc.ViewGroup(ctx, token, id) if err != nil { - return mfgroups.Group{}, err + return group, err } event := viewGroupEvent{ group, @@ -90,7 +90,7 @@ func (es *eventStore) ViewGroup(ctx context.Context, token, id string) (mfgroups func (es *eventStore) ListGroups(ctx context.Context, token string, pm mfgroups.GroupsPage) (mfgroups.GroupsPage, error) { gp, err := es.svc.ListGroups(ctx, token, pm) if err != nil { - return mfgroups.GroupsPage{}, err + return gp, err } event := listGroupEvent{ pm, @@ -105,7 +105,7 @@ func (es *eventStore) ListGroups(ctx context.Context, token string, pm mfgroups. func (es *eventStore) ListMemberships(ctx context.Context, token, clientID string, pm mfgroups.GroupsPage) (mfgroups.MembershipsPage, error) { mp, err := es.svc.ListMemberships(ctx, token, clientID, pm) if err != nil { - return mfgroups.MembershipsPage{}, err + return mp, err } event := listGroupMembershipEvent{ pm, clientID, @@ -118,21 +118,21 @@ func (es *eventStore) ListMemberships(ctx context.Context, token, clientID strin } func (es *eventStore) EnableGroup(ctx context.Context, token, id string) (mfgroups.Group, error) { - cli, err := es.svc.EnableGroup(ctx, token, id) + group, err := es.svc.EnableGroup(ctx, token, id) if err != nil { - return mfgroups.Group{}, err + return group, err } - return es.delete(ctx, cli) + return es.delete(ctx, group) } func (es *eventStore) DisableGroup(ctx context.Context, token, id string) (mfgroups.Group, error) { - cli, err := es.svc.DisableGroup(ctx, token, id) + group, err := es.svc.DisableGroup(ctx, token, id) if err != nil { - return mfgroups.Group{}, err + return group, err } - return es.delete(ctx, cli) + return es.delete(ctx, group) } func (es *eventStore) delete(ctx context.Context, group mfgroups.Group) (mfgroups.Group, error) { diff --git a/things/policies/redis/doc.go b/things/policies/redis/doc.go index 39f6938a..ccaef648 100644 --- a/things/policies/redis/doc.go +++ b/things/policies/redis/doc.go @@ -2,5 +2,5 @@ // SPDX-License-Identifier: Apache-2.0 // Package redis contains cache implementations using Redis as -// the underlying cache. +// the underlying cache and event store. package redis diff --git a/things/policies/redis/events.go b/things/policies/redis/events.go index 31b663f6..9fa526bd 100644 --- a/things/policies/redis/events.go +++ b/things/policies/redis/events.go @@ -62,14 +62,13 @@ func (pe policyEvent) Encode() (map[string]interface{}, error) { type authorizeEvent struct { policies.AccessRequest - entityType string } func (ae authorizeEvent) Encode() (map[string]interface{}, error) { // We don't want to send the key over the stream, so we don't send the subject. val := map[string]interface{}{ "operation": authorize, - "entity_type": ae.entityType, + "entity_type": ae.Entity, } if ae.Object != "" { diff --git a/things/policies/redis/streams.go b/things/policies/redis/streams.go index 9db64c92..ad303f58 100644 --- a/things/policies/redis/streams.go +++ b/things/policies/redis/streams.go @@ -39,25 +39,25 @@ func NewEventStoreMiddleware(ctx context.Context, svc policies.Service, client * } func (es *eventStore) Authorize(ctx context.Context, ar policies.AccessRequest) (policies.Policy, error) { - id, err := es.svc.Authorize(ctx, ar) + policy, err := es.svc.Authorize(ctx, ar) if err != nil { - return policies.Policy{}, err + return policy, err } event := authorizeEvent{ - ar, ar.Entity, + ar, } if err := es.Publish(ctx, event); err != nil { - return id, err + return policy, err } - return id, nil + return policy, nil } func (es *eventStore) AddPolicy(ctx context.Context, token string, policy policies.Policy) (policies.Policy, error) { policy, err := es.svc.AddPolicy(ctx, token, policy) if err != nil { - return policies.Policy{}, err + return policy, err } event := policyEvent{ @@ -73,7 +73,7 @@ func (es *eventStore) AddPolicy(ctx context.Context, token string, policy polici func (es *eventStore) UpdatePolicy(ctx context.Context, token string, policy policies.Policy) (policies.Policy, error) { policy, err := es.svc.UpdatePolicy(ctx, token, policy) if err != nil { - return policies.Policy{}, err + return policy, err } event := policyEvent{ @@ -87,19 +87,19 @@ func (es *eventStore) UpdatePolicy(ctx context.Context, token string, policy pol } func (es *eventStore) ListPolicies(ctx context.Context, token string, page policies.Page) (policies.PolicyPage, error) { - policypage, err := es.svc.ListPolicies(ctx, token, page) + pp, err := es.svc.ListPolicies(ctx, token, page) if err != nil { - return policies.PolicyPage{}, err + return pp, err } event := listPoliciesEvent{ page, } if err := es.Publish(ctx, event); err != nil { - return policypage, err + return pp, err } - return policypage, nil + return pp, nil } func (es *eventStore) DeletePolicy(ctx context.Context, token string, policy policies.Policy) error { diff --git a/users/clients/redis/doc.go b/users/clients/redis/doc.go new file mode 100644 index 00000000..161c1a1b --- /dev/null +++ b/users/clients/redis/doc.go @@ -0,0 +1,5 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package redis contains event source Redis client implementation. +package redis diff --git a/users/clients/redis/events.go b/users/clients/redis/events.go new file mode 100644 index 00000000..ea4e2e92 --- /dev/null +++ b/users/clients/redis/events.go @@ -0,0 +1,417 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package redis + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + mfredis "github.com/mainflux/mainflux/internal/clients/redis" + mfclients "github.com/mainflux/mainflux/pkg/clients" +) + +const ( + clientPrefix = "user." + clientCreate = clientPrefix + "create" + clientUpdate = clientPrefix + "update" + clientRemove = clientPrefix + "remove" + clientView = clientPrefix + "view" + profileView = clientPrefix + "view_profile" + clientList = clientPrefix + "list" + clientListByGroup = clientPrefix + "list_by_group" + clientIdentify = clientPrefix + "identify" + generateResetToken = clientPrefix + "generate_reset_token" + issueToken = clientPrefix + "issue_token" + refreshToken = clientPrefix + "refresh_token" + resetSecret = clientPrefix + "reset_secret" + sendPasswordReset = clientPrefix + "send_password_reset" +) + +var ( + _ mfredis.Event = (*createClientEvent)(nil) + _ mfredis.Event = (*updateClientEvent)(nil) + _ mfredis.Event = (*removeClientEvent)(nil) + _ mfredis.Event = (*viewClientEvent)(nil) + _ mfredis.Event = (*viewProfileEvent)(nil) + _ mfredis.Event = (*listClientEvent)(nil) + _ mfredis.Event = (*listClientByGroupEvent)(nil) + _ mfredis.Event = (*identifyClientEvent)(nil) + _ mfredis.Event = (*generateResetTokenEvent)(nil) + _ mfredis.Event = (*issueTokenEvent)(nil) + _ mfredis.Event = (*refreshTokenEvent)(nil) + _ mfredis.Event = (*resetSecretEvent)(nil) + _ mfredis.Event = (*sendPasswordResetEvent)(nil) +) + +type createClientEvent struct { + mfclients.Client +} + +func (cce createClientEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": clientCreate, + "id": cce.ID, + "status": cce.Status.String(), + "created_at": cce.CreatedAt, + } + + if cce.Name != "" { + val["name"] = cce.Name + } + if len(cce.Tags) > 0 { + tags := fmt.Sprintf("[%s]", strings.Join(cce.Tags, ",")) + val["tags"] = tags + } + if cce.Owner != "" { + val["owner"] = cce.Owner + } + if cce.Metadata != nil { + metadata, err := json.Marshal(cce.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if cce.Credentials.Identity != "" { + val["identity"] = cce.Credentials.Identity + } + + return val, nil +} + +type updateClientEvent struct { + mfclients.Client + operation string +} + +func (uce updateClientEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": clientUpdate, + "updated_at": uce.UpdatedAt, + "updated_by": uce.UpdatedBy, + } + if uce.operation != "" { + val["operation"] = clientUpdate + "_" + uce.operation + } + + if uce.ID != "" { + val["id"] = uce.ID + } + if uce.Name != "" { + val["name"] = uce.Name + } + if len(uce.Tags) > 0 { + tags := fmt.Sprintf("[%s]", strings.Join(uce.Tags, ",")) + val["tags"] = tags + } + if uce.Credentials.Identity != "" { + val["identity"] = uce.Credentials.Identity + } + if uce.Metadata != nil { + metadata, err := json.Marshal(uce.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if !uce.CreatedAt.IsZero() { + val["created_at"] = uce.CreatedAt + } + if uce.Status.String() != "" { + val["status"] = uce.Status.String() + } + + return val, nil +} + +type removeClientEvent struct { + id string + status string + updatedAt time.Time + updatedBy string +} + +func (rce removeClientEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "operation": clientRemove, + "id": rce.id, + "status": rce.status, + "updated_at": rce.updatedAt, + "updated_by": rce.updatedBy, + }, nil +} + +type viewClientEvent struct { + mfclients.Client +} + +func (vce viewClientEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": clientView, + "id": vce.ID, + } + + if vce.Name != "" { + val["name"] = vce.Name + } + if len(vce.Tags) > 0 { + tags := fmt.Sprintf("[%s]", strings.Join(vce.Tags, ",")) + val["tags"] = tags + } + if vce.Owner != "" { + val["owner"] = vce.Owner + } + if vce.Credentials.Identity != "" { + val["identity"] = vce.Credentials.Identity + } + if vce.Metadata != nil { + metadata, err := json.Marshal(vce.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if !vce.CreatedAt.IsZero() { + val["created_at"] = vce.CreatedAt + } + if !vce.UpdatedAt.IsZero() { + val["updated_at"] = vce.UpdatedAt + } + if vce.UpdatedBy != "" { + val["updated_by"] = vce.UpdatedBy + } + if vce.Status.String() != "" { + val["status"] = vce.Status.String() + } + + return val, nil +} + +type viewProfileEvent struct { + mfclients.Client +} + +func (vpe viewProfileEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": profileView, + "id": vpe.ID, + } + + if vpe.Name != "" { + val["name"] = vpe.Name + } + if len(vpe.Tags) > 0 { + tags := fmt.Sprintf("[%s]", strings.Join(vpe.Tags, ",")) + val["tags"] = tags + } + if vpe.Owner != "" { + val["owner"] = vpe.Owner + } + if vpe.Credentials.Identity != "" { + val["identity"] = vpe.Credentials.Identity + } + if vpe.Metadata != nil { + metadata, err := json.Marshal(vpe.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if !vpe.CreatedAt.IsZero() { + val["created_at"] = vpe.CreatedAt + } + if !vpe.UpdatedAt.IsZero() { + val["updated_at"] = vpe.UpdatedAt + } + if vpe.UpdatedBy != "" { + val["updated_by"] = vpe.UpdatedBy + } + if vpe.Status.String() != "" { + val["status"] = vpe.Status.String() + } + + return val, nil +} + +type listClientEvent struct { + mfclients.Page +} + +func (lce listClientEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": clientList, + "total": lce.Total, + "offset": lce.Offset, + "limit": lce.Limit, + } + + if lce.Name != "" { + val["name"] = lce.Name + } + if lce.Order != "" { + val["order"] = lce.Order + } + if lce.Dir != "" { + val["dir"] = lce.Dir + } + if lce.Metadata != nil { + metadata, err := json.Marshal(lce.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if lce.Owner != "" { + val["owner"] = lce.Owner + } + if lce.Tag != "" { + val["tag"] = lce.Tag + } + if lce.SharedBy != "" { + val["sharedBy"] = lce.SharedBy + } + if lce.Status.String() != "" { + val["status"] = lce.Status.String() + } + if lce.Action != "" { + val["action"] = lce.Action + } + if lce.Subject != "" { + val["subject"] = lce.Subject + } + if lce.Identity != "" { + val["identity"] = lce.Identity + } + + return val, nil +} + +type listClientByGroupEvent struct { + mfclients.Page + channelID string +} + +func (lcge listClientByGroupEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": clientListByGroup, + "total": lcge.Total, + "offset": lcge.Offset, + "limit": lcge.Limit, + "channel_id": lcge.channelID, + } + + if lcge.Name != "" { + val["name"] = lcge.Name + } + if lcge.Order != "" { + val["order"] = lcge.Order + } + if lcge.Dir != "" { + val["dir"] = lcge.Dir + } + if lcge.Metadata != nil { + metadata, err := json.Marshal(lcge.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if lcge.Owner != "" { + val["owner"] = lcge.Owner + } + if lcge.Tag != "" { + val["tag"] = lcge.Tag + } + if lcge.SharedBy != "" { + val["sharedBy"] = lcge.SharedBy + } + if lcge.Status.String() != "" { + val["status"] = lcge.Status.String() + } + if lcge.Action != "" { + val["action"] = lcge.Action + } + if lcge.Subject != "" { + val["subject"] = lcge.Subject + } + if lcge.Identity != "" { + val["identity"] = lcge.Identity + } + + return val, nil +} + +type identifyClientEvent struct { + userID string +} + +func (ice identifyClientEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "operation": clientIdentify, + "user_id": ice.userID, + }, nil +} + +type generateResetTokenEvent struct { + email string + host string +} + +func (grte generateResetTokenEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "operation": generateResetToken, + "email": grte.email, + "host": grte.host, + }, nil +} + +type issueTokenEvent struct { + identity string +} + +func (ite issueTokenEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "operation": issueToken, + "identity": ite.identity, + }, nil +} + +type refreshTokenEvent struct{} + +func (rte refreshTokenEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "operation": refreshToken, + }, nil +} + +type resetSecretEvent struct{} + +func (rse resetSecretEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "operation": resetSecret, + }, nil +} + +type sendPasswordResetEvent struct { + host string + email string + user string +} + +func (spre sendPasswordResetEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "operation": sendPasswordReset, + "host": spre.host, + "email": spre.email, + "user": spre.user, + }, nil +} diff --git a/users/clients/redis/streams.go b/users/clients/redis/streams.go new file mode 100644 index 00000000..9f8f2356 --- /dev/null +++ b/users/clients/redis/streams.go @@ -0,0 +1,294 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package redis + +import ( + "context" + + "github.com/go-redis/redis/v8" + mfredis "github.com/mainflux/mainflux/internal/clients/redis" + mfclients "github.com/mainflux/mainflux/pkg/clients" + "github.com/mainflux/mainflux/users/clients" + "github.com/mainflux/mainflux/users/jwt" +) + +const ( + streamID = "mainflux.users" + streamLen = 1000 +) + +var _ clients.Service = (*eventStore)(nil) + +type eventStore struct { + mfredis.Publisher + svc clients.Service + client *redis.Client +} + +// NewEventStoreMiddleware returns wrapper around users service that sends +// events to event store. +func NewEventStoreMiddleware(ctx context.Context, svc clients.Service, client *redis.Client) clients.Service { + es := eventStore{ + svc: svc, + client: client, + Publisher: mfredis.NewEventStore(client, streamID, streamLen), + } + + go es.StartPublishingRoutine(ctx) + + return es +} + +func (es eventStore) RegisterClient(ctx context.Context, token string, user mfclients.Client) (mfclients.Client, error) { + user, err := es.svc.RegisterClient(ctx, token, user) + if err != nil { + return user, err + } + + event := createClientEvent{ + user, + } + + if err := es.Publish(ctx, event); err != nil { + return user, err + } + + return user, nil +} + +func (es eventStore) UpdateClient(ctx context.Context, token string, user mfclients.Client) (mfclients.Client, error) { + user, err := es.svc.UpdateClient(ctx, token, user) + if err != nil { + return user, err + } + + return es.update(ctx, "", user) +} + +func (es eventStore) UpdateClientOwner(ctx context.Context, token string, user mfclients.Client) (mfclients.Client, error) { + user, err := es.svc.UpdateClientOwner(ctx, token, user) + if err != nil { + return user, err + } + + return es.update(ctx, "owner", user) +} + +func (es eventStore) UpdateClientTags(ctx context.Context, token string, user mfclients.Client) (mfclients.Client, error) { + user, err := es.svc.UpdateClientTags(ctx, token, user) + if err != nil { + return user, err + } + + return es.update(ctx, "tags", user) +} + +func (es eventStore) UpdateClientSecret(ctx context.Context, token, oldSecret, newSecret string) (mfclients.Client, error) { + user, err := es.svc.UpdateClientSecret(ctx, token, oldSecret, newSecret) + if err != nil { + return user, err + } + + return es.update(ctx, "secret", user) +} + +func (es eventStore) UpdateClientIdentity(ctx context.Context, token, id, identity string) (mfclients.Client, error) { + user, err := es.svc.UpdateClientIdentity(ctx, token, id, identity) + if err != nil { + return user, err + } + + return es.update(ctx, "identity", user) +} + +func (es eventStore) update(ctx context.Context, operation string, user mfclients.Client) (mfclients.Client, error) { + event := updateClientEvent{ + user, operation, + } + + if err := es.Publish(ctx, event); err != nil { + return user, err + } + + return user, nil +} + +func (es eventStore) ViewClient(ctx context.Context, token, id string) (mfclients.Client, error) { + user, err := es.svc.ViewClient(ctx, token, id) + if err != nil { + return user, err + } + + event := viewClientEvent{ + user, + } + + if err := es.Publish(ctx, event); err != nil { + return user, err + } + + return user, nil +} + +func (es eventStore) ViewProfile(ctx context.Context, token string) (mfclients.Client, error) { + user, err := es.svc.ViewProfile(ctx, token) + if err != nil { + return user, err + } + + event := viewProfileEvent{ + user, + } + + if err := es.Publish(ctx, event); err != nil { + return user, err + } + + return user, nil +} + +func (es eventStore) ListClients(ctx context.Context, token string, pm mfclients.Page) (mfclients.ClientsPage, error) { + cp, err := es.svc.ListClients(ctx, token, pm) + if err != nil { + return cp, err + } + event := listClientEvent{ + pm, + } + + if err := es.Publish(ctx, event); err != nil { + return cp, err + } + + return cp, nil +} + +func (es eventStore) ListMembers(ctx context.Context, token, groupID string, pm mfclients.Page) (mfclients.MembersPage, error) { + mp, err := es.svc.ListMembers(ctx, token, groupID, pm) + if err != nil { + return mp, err + } + event := listClientByGroupEvent{ + pm, groupID, + } + + if err := es.Publish(ctx, event); err != nil { + return mp, err + } + + return mp, nil +} + +func (es eventStore) EnableClient(ctx context.Context, token, id string) (mfclients.Client, error) { + user, err := es.svc.EnableClient(ctx, token, id) + if err != nil { + return user, err + } + + return es.delete(ctx, user) +} + +func (es eventStore) DisableClient(ctx context.Context, token, id string) (mfclients.Client, error) { + user, err := es.svc.DisableClient(ctx, token, id) + if err != nil { + return user, err + } + + return es.delete(ctx, user) +} + +func (es eventStore) delete(ctx context.Context, user mfclients.Client) (mfclients.Client, error) { + event := removeClientEvent{ + id: user.ID, + updatedAt: user.UpdatedAt, + updatedBy: user.UpdatedBy, + status: user.Status.String(), + } + + if err := es.Publish(ctx, event); err != nil { + return user, err + } + + return user, nil +} + +func (es eventStore) Identify(ctx context.Context, token string) (string, error) { + userID, err := es.svc.Identify(ctx, token) + if err != nil { + return userID, err + } + event := identifyClientEvent{ + userID: userID, + } + + if err := es.Publish(ctx, event); err != nil { + return userID, err + } + + return userID, nil +} + +func (es eventStore) GenerateResetToken(ctx context.Context, email, host string) error { + if err := es.svc.GenerateResetToken(ctx, email, host); err != nil { + return err + } + event := generateResetTokenEvent{ + email: email, + host: host, + } + + return es.Publish(ctx, event) +} + +func (es eventStore) IssueToken(ctx context.Context, identity, secret string) (jwt.Token, error) { + token, err := es.svc.IssueToken(ctx, identity, secret) + if err != nil { + return token, err + } + event := issueTokenEvent{ + identity: identity, + } + + if err := es.Publish(ctx, event); err != nil { + return token, err + } + + return token, nil +} + +func (es eventStore) RefreshToken(ctx context.Context, refreshToken string) (jwt.Token, error) { + token, err := es.svc.RefreshToken(ctx, refreshToken) + if err != nil { + return token, err + } + event := refreshTokenEvent{} + + if err := es.Publish(ctx, event); err != nil { + return token, err + } + + return token, nil +} + +func (es eventStore) ResetSecret(ctx context.Context, resetToken, secret string) error { + if err := es.svc.ResetSecret(ctx, resetToken, secret); err != nil { + return err + } + event := resetSecretEvent{} + + return es.Publish(ctx, event) +} + +func (es eventStore) SendPasswordReset(ctx context.Context, host, email, user, token string) error { + if err := es.svc.SendPasswordReset(ctx, host, email, user, token); err != nil { + return err + } + event := sendPasswordResetEvent{ + host: host, + email: email, + user: user, + } + + return es.Publish(ctx, event) +} diff --git a/users/groups/redis/doc.go b/users/groups/redis/doc.go new file mode 100644 index 00000000..161c1a1b --- /dev/null +++ b/users/groups/redis/doc.go @@ -0,0 +1,5 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package redis contains event source Redis client implementation. +package redis diff --git a/users/groups/redis/events.go b/users/groups/redis/events.go new file mode 100644 index 00000000..87ebc121 --- /dev/null +++ b/users/groups/redis/events.go @@ -0,0 +1,263 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package redis + +import ( + "encoding/json" + "time" + + mfredis "github.com/mainflux/mainflux/internal/clients/redis" + mfgroups "github.com/mainflux/mainflux/pkg/groups" +) + +const ( + groupPrefix = "group." + groupCreate = groupPrefix + "create" + groupUpdate = groupPrefix + "update" + groupRemove = groupPrefix + "remove" + groupView = groupPrefix + "view" + groupList = groupPrefix + "list" + groupListMemberships = groupPrefix + "list_by_group" +) + +var ( + _ mfredis.Event = (*createGroupEvent)(nil) + _ mfredis.Event = (*updateGroupEvent)(nil) + _ mfredis.Event = (*removeGroupEvent)(nil) + _ mfredis.Event = (*viewGroupEvent)(nil) + _ mfredis.Event = (*listGroupEvent)(nil) + _ mfredis.Event = (*listGroupMembershipEvent)(nil) +) + +type createGroupEvent struct { + mfgroups.Group +} + +func (cge createGroupEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": groupCreate, + "id": cge.ID, + "status": cge.Status.String(), + "created_at": cge.CreatedAt, + } + + if cge.Owner != "" { + val["owner"] = cge.Owner + } + if cge.Parent != "" { + val["parent"] = cge.Parent + } + if cge.Name != "" { + val["name"] = cge.Name + } + if cge.Description != "" { + val["description"] = cge.Description + } + if cge.Metadata != nil { + metadata, err := json.Marshal(cge.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if cge.Status.String() != "" { + val["status"] = cge.Status.String() + } + + return val, nil +} + +type updateGroupEvent struct { + mfgroups.Group +} + +func (uge updateGroupEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": groupUpdate, + "updated_at": uge.UpdatedAt, + "updated_by": uge.UpdatedBy, + } + + if uge.ID != "" { + val["id"] = uge.ID + } + if uge.Owner != "" { + val["owner"] = uge.Owner + } + if uge.Parent != "" { + val["parent"] = uge.Parent + } + if uge.Name != "" { + val["name"] = uge.Name + } + if uge.Description != "" { + val["description"] = uge.Description + } + if uge.Metadata != nil { + metadata, err := json.Marshal(uge.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if !uge.CreatedAt.IsZero() { + val["created_at"] = uge.CreatedAt + } + if uge.Status.String() != "" { + val["status"] = uge.Status.String() + } + + return val, nil +} + +type removeGroupEvent struct { + id string + status string + updatedAt time.Time + updatedBy string +} + +func (rge removeGroupEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "operation": groupRemove, + "id": rge.id, + "status": rge.status, + "updated_at": rge.updatedAt, + "updated_by": rge.updatedBy, + }, nil +} + +type viewGroupEvent struct { + mfgroups.Group +} + +func (vge viewGroupEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": groupView, + "id": vge.ID, + } + + if vge.Owner != "" { + val["owner"] = vge.Owner + } + if vge.Parent != "" { + val["parent"] = vge.Parent + } + if vge.Name != "" { + val["name"] = vge.Name + } + if vge.Description != "" { + val["description"] = vge.Description + } + if vge.Metadata != nil { + metadata, err := json.Marshal(vge.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if !vge.CreatedAt.IsZero() { + val["created_at"] = vge.CreatedAt + } + if !vge.UpdatedAt.IsZero() { + val["updated_at"] = vge.UpdatedAt + } + if vge.UpdatedBy != "" { + val["updated_by"] = vge.UpdatedBy + } + if vge.Status.String() != "" { + val["status"] = vge.Status.String() + } + + return val, nil +} + +type listGroupEvent struct { + mfgroups.GroupsPage +} + +func (lge listGroupEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": groupList, + "total": lge.Total, + "offset": lge.Offset, + "limit": lge.Limit, + } + + if lge.Name != "" { + val["name"] = lge.Name + } + if lge.OwnerID != "" { + val["owner_id"] = lge.OwnerID + } + if lge.Tag != "" { + val["tag"] = lge.Tag + } + if lge.Metadata != nil { + metadata, err := json.Marshal(lge.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if lge.Status.String() != "" { + val["status"] = lge.Status.String() + } + if lge.Action != "" { + val["action"] = lge.Action + } + if lge.Subject != "" { + val["subject"] = lge.Subject + } + + return val, nil +} + +type listGroupMembershipEvent struct { + mfgroups.GroupsPage + channelID string +} + +func (lgme listGroupMembershipEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": groupListMemberships, + "total": lgme.Total, + "offset": lgme.Offset, + "limit": lgme.Limit, + "channel_id": lgme.channelID, + } + + if lgme.Name != "" { + val["name"] = lgme.Name + } + if lgme.OwnerID != "" { + val["owner_id"] = lgme.OwnerID + } + if lgme.Tag != "" { + val["tag"] = lgme.Tag + } + if lgme.Metadata != nil { + metadata, err := json.Marshal(lgme.Metadata) + if err != nil { + return map[string]interface{}{}, err + } + + val["metadata"] = metadata + } + if lgme.Status.String() != "" { + val["status"] = lgme.Status.String() + } + if lgme.Action != "" { + val["action"] = lgme.Action + } + if lgme.Subject != "" { + val["subject"] = lgme.Subject + } + + return val, nil +} diff --git a/users/groups/redis/streams.go b/users/groups/redis/streams.go new file mode 100644 index 00000000..cc12d634 --- /dev/null +++ b/users/groups/redis/streams.go @@ -0,0 +1,155 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package redis + +import ( + "context" + + "github.com/go-redis/redis/v8" + mfredis "github.com/mainflux/mainflux/internal/clients/redis" + mfgroups "github.com/mainflux/mainflux/pkg/groups" + "github.com/mainflux/mainflux/users/groups" +) + +const ( + streamID = "mainflux.users" + streamLen = 1000 +) + +var _ groups.Service = (*eventStore)(nil) + +type eventStore struct { + mfredis.Publisher + svc groups.Service + client *redis.Client +} + +// NewEventStoreMiddleware returns wrapper around things service that sends +// events to event store. +func NewEventStoreMiddleware(ctx context.Context, svc groups.Service, client *redis.Client) groups.Service { + es := eventStore{ + svc: svc, + client: client, + Publisher: mfredis.NewEventStore(client, streamID, streamLen), + } + + go es.StartPublishingRoutine(ctx) + + return es +} + +func (es eventStore) CreateGroup(ctx context.Context, token string, group mfgroups.Group) (mfgroups.Group, error) { + group, err := es.svc.CreateGroup(ctx, token, group) + if err != nil { + return group, err + } + + event := createGroupEvent{ + group, + } + + if err := es.Publish(ctx, event); err != nil { + return group, err + } + + return group, nil +} + +func (es eventStore) UpdateGroup(ctx context.Context, token string, group mfgroups.Group) (mfgroups.Group, error) { + group, err := es.svc.UpdateGroup(ctx, token, group) + if err != nil { + return group, err + } + + event := updateGroupEvent{ + group, + } + + if err := es.Publish(ctx, event); err != nil { + return group, err + } + + return group, nil +} + +func (es eventStore) ViewGroup(ctx context.Context, token, id string) (mfgroups.Group, error) { + group, err := es.svc.ViewGroup(ctx, token, id) + if err != nil { + return group, err + } + event := viewGroupEvent{ + group, + } + + if err := es.Publish(ctx, event); err != nil { + return group, err + } + + return group, nil +} + +func (es eventStore) ListGroups(ctx context.Context, token string, pm mfgroups.GroupsPage) (mfgroups.GroupsPage, error) { + gp, err := es.svc.ListGroups(ctx, token, pm) + if err != nil { + return gp, err + } + event := listGroupEvent{ + pm, + } + + if err := es.Publish(ctx, event); err != nil { + return gp, err + } + + return gp, nil +} + +func (es eventStore) ListMemberships(ctx context.Context, token, clientID string, pm mfgroups.GroupsPage) (mfgroups.MembershipsPage, error) { + mp, err := es.svc.ListMemberships(ctx, token, clientID, pm) + if err != nil { + return mp, err + } + event := listGroupMembershipEvent{ + pm, clientID, + } + + if err := es.Publish(ctx, event); err != nil { + return mp, err + } + + return mp, nil +} + +func (es eventStore) EnableGroup(ctx context.Context, token, id string) (mfgroups.Group, error) { + group, err := es.svc.EnableGroup(ctx, token, id) + if err != nil { + return group, err + } + + return es.delete(ctx, group) +} + +func (es eventStore) DisableGroup(ctx context.Context, token, id string) (mfgroups.Group, error) { + group, err := es.svc.DisableGroup(ctx, token, id) + if err != nil { + return group, err + } + + return es.delete(ctx, group) +} + +func (es eventStore) delete(ctx context.Context, group mfgroups.Group) (mfgroups.Group, error) { + event := removeGroupEvent{ + id: group.ID, + updatedAt: group.UpdatedAt, + updatedBy: group.UpdatedBy, + status: group.Status.String(), + } + + if err := es.Publish(ctx, event); err != nil { + return group, err + } + + return group, nil +} diff --git a/users/policies/redis/doc.go b/users/policies/redis/doc.go new file mode 100644 index 00000000..161c1a1b --- /dev/null +++ b/users/policies/redis/doc.go @@ -0,0 +1,5 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package redis contains event source Redis client implementation. +package redis diff --git a/users/policies/redis/events.go b/users/policies/redis/events.go new file mode 100644 index 00000000..0ace35f4 --- /dev/null +++ b/users/policies/redis/events.go @@ -0,0 +1,107 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package redis + +import ( + "fmt" + "strings" + + mfredis "github.com/mainflux/mainflux/internal/clients/redis" + "github.com/mainflux/mainflux/users/policies" +) + +const ( + policyPrefix = "policies." + authorize = policyPrefix + "authorize" + policyAdd = policyPrefix + "add" + policyUpdate = policyPrefix + "update" + policyList = policyPrefix + "list" + policyDelete = policyPrefix + "delete" +) + +var ( + _ mfredis.Event = (*policyEvent)(nil) + _ mfredis.Event = (*authorizeEvent)(nil) + _ mfredis.Event = (*listPoliciesEvent)(nil) +) + +type policyEvent struct { + policies.Policy + operation string +} + +func (pe policyEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": pe.operation, + } + if pe.OwnerID != "" { + val["owner_id"] = pe.OwnerID + } + if pe.Subject != "" { + val["subject"] = pe.Subject + } + if pe.Object != "" { + val["object"] = pe.Object + } + if len(pe.Actions) > 0 { + actions := fmt.Sprintf("[%s]", strings.Join(pe.Actions, ",")) + val["actions"] = actions + } + if !pe.CreatedAt.IsZero() { + val["created_at"] = pe.CreatedAt + } + if !pe.UpdatedAt.IsZero() { + val["updated_at"] = pe.UpdatedAt + } + if pe.UpdatedBy != "" { + val["updated_by"] = pe.UpdatedBy + } + return val, nil +} + +type authorizeEvent struct { + policies.AccessRequest +} + +func (ae authorizeEvent) Encode() (map[string]interface{}, error) { + // We don't want to send the key over the stream, so we don't send the subject. + val := map[string]interface{}{ + "operation": authorize, + "entity_type": ae.Entity, + } + + if ae.Object != "" { + val["object"] = ae.Object + } + if ae.Action != "" { + val["action"] = ae.Action + } + return val, nil +} + +type listPoliciesEvent struct { + policies.Page +} + +func (lpe listPoliciesEvent) Encode() (map[string]interface{}, error) { + val := map[string]interface{}{ + "operation": policyList, + "total": lpe.Total, + "limit": lpe.Limit, + "offset": lpe.Offset, + } + if lpe.OwnerID != "" { + val["owner_id"] = lpe.OwnerID + } + if lpe.Subject != "" { + val["subject"] = lpe.Subject + } + if lpe.Object != "" { + val["object"] = lpe.Object + } + if lpe.Action != "" { + val["action"] = lpe.Action + } + return val, nil +} diff --git a/users/policies/redis/streams.go b/users/policies/redis/streams.go new file mode 100644 index 00000000..6cb37e77 --- /dev/null +++ b/users/policies/redis/streams.go @@ -0,0 +1,104 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package redis + +import ( + "context" + + "github.com/go-redis/redis/v8" + mfredis "github.com/mainflux/mainflux/internal/clients/redis" + "github.com/mainflux/mainflux/users/policies" +) + +const ( + streamID = "mainflux.users" + streamLen = 1000 +) + +var _ policies.Service = (*eventStore)(nil) + +type eventStore struct { + mfredis.Publisher + svc policies.Service + client *redis.Client +} + +// NewEventStoreMiddleware returns wrapper around policy service that sends +// events to event store. +func NewEventStoreMiddleware(ctx context.Context, svc policies.Service, client *redis.Client) policies.Service { + es := eventStore{ + svc: svc, + client: client, + Publisher: mfredis.NewEventStore(client, streamID, streamLen), + } + + go es.StartPublishingRoutine(ctx) + + return es +} + +func (es eventStore) Authorize(ctx context.Context, ar policies.AccessRequest) error { + if err := es.svc.Authorize(ctx, ar); err != nil { + return err + } + + event := authorizeEvent{ + ar, + } + + return es.Publish(ctx, event) +} + +func (es eventStore) AddPolicy(ctx context.Context, token string, policy policies.Policy) error { + if err := es.svc.AddPolicy(ctx, token, policy); err != nil { + return err + } + + event := policyEvent{ + policy, policyAdd, + } + + return es.Publish(ctx, event) +} + +func (es eventStore) UpdatePolicy(ctx context.Context, token string, policy policies.Policy) error { + if err := es.svc.UpdatePolicy(ctx, token, policy); err != nil { + return err + } + + event := policyEvent{ + policy, policyUpdate, + } + + return es.Publish(ctx, event) +} + +func (es eventStore) ListPolicies(ctx context.Context, token string, page policies.Page) (policies.PolicyPage, error) { + pp, err := es.svc.ListPolicies(ctx, token, page) + if err != nil { + return pp, err + } + + event := listPoliciesEvent{ + page, + } + + if err := es.Publish(ctx, event); err != nil { + return pp, err + } + + return pp, nil +} + +func (es eventStore) DeletePolicy(ctx context.Context, token string, policy policies.Policy) error { + if err := es.svc.DeletePolicy(ctx, token, policy); err != nil { + return err + } + + event := policyEvent{ + policy, policyDelete, + } + + return es.Publish(ctx, event) +}