1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-24 13:48:49 +08:00

MF-1510 - Add Event Sourcing To Users Service (#1835)

* Add Event Sourcing on Users Service

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Modify Events Sourced From Things Service

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Rename Cache

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Remove SharedBy From Events

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Use Combined Publisher

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Format Docker Compose File

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Rename Events

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

---------

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>
This commit is contained in:
b1ackd0t 2023-08-02 13:09:56 +03:00 committed by GitHub
parent 518d5827d0
commit 23a665b54a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1451 additions and 63 deletions

View File

@ -31,20 +31,20 @@ import (
"github.com/mainflux/mainflux/things/clients"
capi "github.com/mainflux/mainflux/things/clients/api"
cpostgres "github.com/mainflux/mainflux/things/clients/postgres"
redisthcache "github.com/mainflux/mainflux/things/clients/redis"
thcache "github.com/mainflux/mainflux/things/clients/redis"
localusers "github.com/mainflux/mainflux/things/clients/standalone"
ctracing "github.com/mainflux/mainflux/things/clients/tracing"
"github.com/mainflux/mainflux/things/groups"
gapi "github.com/mainflux/mainflux/things/groups/api"
gpostgres "github.com/mainflux/mainflux/things/groups/postgres"
redischcache "github.com/mainflux/mainflux/things/groups/redis"
chcache "github.com/mainflux/mainflux/things/groups/redis"
gtracing "github.com/mainflux/mainflux/things/groups/tracing"
tpolicies "github.com/mainflux/mainflux/things/policies"
papi "github.com/mainflux/mainflux/things/policies/api"
grpcapi "github.com/mainflux/mainflux/things/policies/api/grpc"
httpapi "github.com/mainflux/mainflux/things/policies/api/http"
ppostgres "github.com/mainflux/mainflux/things/policies/postgres"
redispcache "github.com/mainflux/mainflux/things/policies/redis"
pcache "github.com/mainflux/mainflux/things/policies/redis"
ppracing "github.com/mainflux/mainflux/things/policies/tracing"
thingsPg "github.com/mainflux/mainflux/things/postgres"
upolicies "github.com/mainflux/mainflux/users/policies"
@ -224,16 +224,16 @@ func newService(ctx context.Context, db *sqlx.DB, dbConfig pgClient.Config, auth
logger.Error(fmt.Sprintf("failed to parse cache key duration: %s", err.Error()))
}
policyCache := redispcache.NewCache(cacheClient, kDuration)
thingCache := redisthcache.NewCache(cacheClient, kDuration)
policyCache := pcache.NewCache(cacheClient, kDuration)
thingCache := thcache.NewCache(cacheClient, kDuration)
psvc := tpolicies.NewService(auth, pRepo, policyCache, idp)
csvc := clients.NewService(auth, psvc, cRepo, gRepo, thingCache, idp)
gsvc := groups.NewService(auth, psvc, gRepo, idp)
csvc = redisthcache.NewEventStoreMiddleware(ctx, csvc, esClient)
gsvc = redischcache.NewEventStoreMiddleware(ctx, gsvc, esClient)
psvc = redispcache.NewEventStoreMiddleware(ctx, psvc, esClient)
csvc = thcache.NewEventStoreMiddleware(ctx, csvc, esClient)
gsvc = chcache.NewEventStoreMiddleware(ctx, gsvc, esClient)
psvc = pcache.NewEventStoreMiddleware(ctx, psvc, esClient)
csvc = ctracing.New(csvc, tracer)
csvc = capi.LoggingMiddleware(csvc, logger)

View File

@ -12,6 +12,7 @@ import (
"regexp"
"time"
"github.com/go-redis/redis/v8"
"github.com/go-zoo/bone"
"github.com/jmoiron/sqlx"
chclient "github.com/mainflux/callhome/pkg/client"
@ -19,6 +20,7 @@ import (
"github.com/mainflux/mainflux/internal"
jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger"
pgClient "github.com/mainflux/mainflux/internal/clients/postgres"
redisClient "github.com/mainflux/mainflux/internal/clients/redis"
"github.com/mainflux/mainflux/internal/email"
"github.com/mainflux/mainflux/internal/env"
"github.com/mainflux/mainflux/internal/postgres"
@ -32,10 +34,12 @@ import (
capi "github.com/mainflux/mainflux/users/clients/api"
"github.com/mainflux/mainflux/users/clients/emailer"
cpostgres "github.com/mainflux/mainflux/users/clients/postgres"
ucache "github.com/mainflux/mainflux/users/clients/redis"
ctracing "github.com/mainflux/mainflux/users/clients/tracing"
"github.com/mainflux/mainflux/users/groups"
gapi "github.com/mainflux/mainflux/users/groups/api"
gpostgres "github.com/mainflux/mainflux/users/groups/postgres"
gcache "github.com/mainflux/mainflux/users/groups/redis"
gtracing "github.com/mainflux/mainflux/users/groups/tracing"
"github.com/mainflux/mainflux/users/hasher"
"github.com/mainflux/mainflux/users/jwt"
@ -44,6 +48,7 @@ import (
grpcapi "github.com/mainflux/mainflux/users/policies/api/grpc"
httpapi "github.com/mainflux/mainflux/users/policies/api/http"
ppostgres "github.com/mainflux/mainflux/users/policies/postgres"
pcache "github.com/mainflux/mainflux/users/policies/redis"
ptracing "github.com/mainflux/mainflux/users/policies/tracing"
clientsPg "github.com/mainflux/mainflux/users/postgres"
"go.opentelemetry.io/otel/trace"
@ -55,6 +60,7 @@ import (
const (
svcName = "users"
envPrefixDB = "MF_USERS_DB_"
envPrefixES = "MF_USERS_ES_"
envPrefixHTTP = "MF_USERS_HTTP_"
envPrefixGrpc = "MF_USERS_GRPC_"
defDB = "users"
@ -139,7 +145,14 @@ func main() {
}()
tracer := tp.Tracer(svcName)
csvc, gsvc, psvc := newService(ctx, db, dbConfig, tracer, cfg, ec, logger)
// Setup new redis event store client
esClient, err := redisClient.Setup(envPrefixES)
if err != nil {
logger.Fatal(err.Error())
}
defer esClient.Close()
csvc, gsvc, psvc := newService(ctx, db, dbConfig, esClient, tracer, cfg, ec, logger)
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
@ -185,7 +198,7 @@ func main() {
}
}
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgClient.Config, tracer trace.Tracer, c config, ec email.Config, logger mflog.Logger) (clients.Service, groups.Service, policies.Service) {
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgClient.Config, esClient *redis.Client, tracer trace.Tracer, c config, ec email.Config, logger mflog.Logger) (clients.Service, groups.Service, policies.Service) {
database := postgres.NewDatabase(db, dbConfig, tracer)
cRepo := cpostgres.NewRepository(database)
gRepo := gpostgres.NewRepository(database)
@ -212,6 +225,10 @@ func newService(ctx context.Context, db *sqlx.DB, dbConfig pgClient.Config, trac
gsvc := groups.NewService(gRepo, pRepo, tokenizer, idp)
psvc := policies.NewService(pRepo, tokenizer, idp)
csvc = ucache.NewEventStoreMiddleware(ctx, csvc, esClient)
gsvc = gcache.NewEventStoreMiddleware(ctx, gsvc, esClient)
psvc = pcache.NewEventStoreMiddleware(ctx, psvc, esClient)
csvc = ctracing.New(csvc, tracer)
csvc = capi.LoggingMiddleware(csvc, logger)
counter, latency := internal.MakeMetrics(svcName, "api")

View File

@ -64,6 +64,9 @@ MF_USERS_DB_SSL_MODE=disable
MF_USERS_DB_SSL_CERT=
MF_USERS_DB_SSL_KEY=
MF_USERS_DB_SSL_ROOT_CERT=
MF_USERS_ES_URL=es-redis:${MF_REDIS_TCP_PORT}
MF_USERS_ES_PASS=
MF_USERS_ES_DB=
MF_USERS_RESET_PWD_TEMPLATE=users.tmpl
MF_USERS_INSTANCE_ID=

View File

@ -14,6 +14,7 @@ volumes:
mainflux-es-redis-volume:
mainflux-mqtt-broker-volume:
services:
nginx:
image: nginx:1.23.3-alpine
@ -176,6 +177,9 @@ services:
MF_EMAIL_FROM_ADDRESS: ${MF_EMAIL_FROM_ADDRESS}
MF_EMAIL_FROM_NAME: ${MF_EMAIL_FROM_NAME}
MF_EMAIL_TEMPLATE: ${MF_EMAIL_TEMPLATE}
MF_USERS_ES_URL: ${MF_USERS_ES_URL}
MF_USERS_ES_PASS: ${MF_USERS_ES_PASS}
MF_USERS_ES_DB: ${MF_USERS_ES_DB}
MF_JAEGER_URL: ${MF_JAEGER_URL}
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
ports:

View File

@ -2,5 +2,5 @@
// SPDX-License-Identifier: Apache-2.0
// Package redis contains cache implementations using Redis as
// the underlying cache.
// the underlying cache and event store.
package redis

View File

@ -78,10 +78,13 @@ type updateClientEvent struct {
func (uce updateClientEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientUpdate + "." + uce.operation,
"operation": clientUpdate,
"updated_at": uce.UpdatedAt,
"updated_by": uce.UpdatedBy,
}
if uce.operation != "" {
val["operation"] = clientUpdate + "_" + uce.operation
}
if uce.ID != "" {
val["id"] = uce.ID
@ -93,6 +96,9 @@ func (uce updateClientEvent) Encode() (map[string]interface{}, error) {
tags := fmt.Sprintf("[%s]", strings.Join(uce.Tags, ","))
val["tags"] = tags
}
if uce.Owner != "" {
val["owner"] = uce.Owner
}
if uce.Credentials.Identity != "" {
val["identity"] = uce.Credentials.Identity
}
@ -225,6 +231,10 @@ func (lce listClientEvent) Encode() (map[string]interface{}, error) {
if lce.Subject != "" {
val["subject"] = lce.Subject
}
if len(lce.IDs) > 0 {
ids := fmt.Sprintf("[%s]", strings.Join(lce.IDs, ","))
val["ids"] = ids
}
if lce.Identity != "" {
val["identity"] = lce.Identity
}

View File

@ -53,13 +53,14 @@ func (es *eventStore) CreateThings(ctx context.Context, token string, thing ...m
return sths, err
}
}
return sths, nil
}
func (es *eventStore) UpdateClient(ctx context.Context, token string, thing mfclients.Client) (mfclients.Client, error) {
cli, err := es.svc.UpdateClient(ctx, token, thing)
if err != nil {
return mfclients.Client{}, err
return cli, err
}
return es.update(ctx, "", cli)
@ -68,7 +69,7 @@ func (es *eventStore) UpdateClient(ctx context.Context, token string, thing mfcl
func (es *eventStore) UpdateClientOwner(ctx context.Context, token string, thing mfclients.Client) (mfclients.Client, error) {
cli, err := es.svc.UpdateClientOwner(ctx, token, thing)
if err != nil {
return mfclients.Client{}, err
return cli, err
}
return es.update(ctx, "owner", cli)
@ -77,7 +78,7 @@ func (es *eventStore) UpdateClientOwner(ctx context.Context, token string, thing
func (es *eventStore) UpdateClientTags(ctx context.Context, token string, thing mfclients.Client) (mfclients.Client, error) {
cli, err := es.svc.UpdateClientTags(ctx, token, thing)
if err != nil {
return mfclients.Client{}, err
return cli, err
}
return es.update(ctx, "tags", cli)
@ -86,28 +87,30 @@ func (es *eventStore) UpdateClientTags(ctx context.Context, token string, thing
func (es *eventStore) UpdateClientSecret(ctx context.Context, token, id, key string) (mfclients.Client, error) {
cli, err := es.svc.UpdateClientSecret(ctx, token, id, key)
if err != nil {
return mfclients.Client{}, err
return cli, err
}
return es.update(ctx, "secret", cli)
}
func (es *eventStore) update(ctx context.Context, operation string, cli mfclients.Client) (mfclients.Client, error) {
func (es *eventStore) update(ctx context.Context, operation string, thing mfclients.Client) (mfclients.Client, error) {
event := updateClientEvent{
cli, operation,
}
if err := es.Publish(ctx, event); err != nil {
return cli, err
thing, operation,
}
return cli, nil
if err := es.Publish(ctx, event); err != nil {
return thing, err
}
return thing, nil
}
func (es *eventStore) ViewClient(ctx context.Context, token, id string) (mfclients.Client, error) {
cli, err := es.svc.ViewClient(ctx, token, id)
if err != nil {
return mfclients.Client{}, err
return cli, err
}
event := viewClientEvent{
cli,
}
@ -121,7 +124,7 @@ func (es *eventStore) ViewClient(ctx context.Context, token, id string) (mfclien
func (es *eventStore) ListClients(ctx context.Context, token string, pm mfclients.Page) (mfclients.ClientsPage, error) {
cp, err := es.svc.ListClients(ctx, token, pm)
if err != nil {
return mfclients.ClientsPage{}, err
return cp, err
}
event := listClientEvent{
pm,
@ -134,24 +137,24 @@ func (es *eventStore) ListClients(ctx context.Context, token string, pm mfclient
}
func (es *eventStore) ListClientsByGroup(ctx context.Context, token, chID string, pm mfclients.Page) (mfclients.MembersPage, error) {
cp, err := es.svc.ListClientsByGroup(ctx, token, chID, pm)
mp, err := es.svc.ListClientsByGroup(ctx, token, chID, pm)
if err != nil {
return mfclients.MembersPage{}, err
return mp, err
}
event := listClientByGroupEvent{
pm, chID,
}
if err := es.Publish(ctx, event); err != nil {
return cp, err
return mp, err
}
return cp, nil
return mp, nil
}
func (es *eventStore) EnableClient(ctx context.Context, token, id string) (mfclients.Client, error) {
cli, err := es.svc.EnableClient(ctx, token, id)
if err != nil {
return mfclients.Client{}, err
return cli, err
}
return es.delete(ctx, cli)
@ -160,7 +163,7 @@ func (es *eventStore) EnableClient(ctx context.Context, token, id string) (mfcli
func (es *eventStore) DisableClient(ctx context.Context, token, id string) (mfclients.Client, error) {
cli, err := es.svc.DisableClient(ctx, token, id)
if err != nil {
return mfclients.Client{}, err
return cli, err
}
return es.delete(ctx, cli)
@ -183,7 +186,7 @@ func (es *eventStore) delete(ctx context.Context, cli mfclients.Client) (mfclien
func (es *eventStore) Identify(ctx context.Context, key string) (string, error) {
thingID, err := es.svc.Identify(ctx, key)
if err != nil {
return "", err
return thingID, err
}
event := identifyClientEvent{
thingID: thingID,

View File

@ -108,9 +108,6 @@ func (uce updateGroupEvent) Encode() (map[string]interface{}, error) {
if !uce.CreatedAt.IsZero() {
val["created_at"] = uce.CreatedAt
}
if !uce.UpdatedAt.IsZero() {
val["updated_at"] = uce.UpdatedAt
}
if uce.Status.String() != "" {
val["status"] = uce.Status.String()
}

View File

@ -40,26 +40,26 @@ func NewEventStoreMiddleware(ctx context.Context, svc groups.Service, client *re
}
func (es *eventStore) CreateGroups(ctx context.Context, token string, groups ...mfgroups.Group) ([]mfgroups.Group, error) {
gs, err := es.svc.CreateGroups(ctx, token, groups...)
grps, err := es.svc.CreateGroups(ctx, token, groups...)
if err != nil {
return gs, err
return grps, err
}
for _, group := range gs {
for _, group := range grps {
event := createGroupEvent{
group,
}
if err := es.Publish(ctx, event); err != nil {
return gs, err
return grps, err
}
}
return gs, nil
return grps, nil
}
func (es *eventStore) UpdateGroup(ctx context.Context, token string, group mfgroups.Group) (mfgroups.Group, error) {
group, err := es.svc.UpdateGroup(ctx, token, group)
if err != nil {
return mfgroups.Group{}, err
return group, err
}
event := updateGroupEvent{
@ -75,7 +75,7 @@ func (es *eventStore) UpdateGroup(ctx context.Context, token string, group mfgro
func (es *eventStore) ViewGroup(ctx context.Context, token, id string) (mfgroups.Group, error) {
group, err := es.svc.ViewGroup(ctx, token, id)
if err != nil {
return mfgroups.Group{}, err
return group, err
}
event := viewGroupEvent{
group,
@ -90,7 +90,7 @@ func (es *eventStore) ViewGroup(ctx context.Context, token, id string) (mfgroups
func (es *eventStore) ListGroups(ctx context.Context, token string, pm mfgroups.GroupsPage) (mfgroups.GroupsPage, error) {
gp, err := es.svc.ListGroups(ctx, token, pm)
if err != nil {
return mfgroups.GroupsPage{}, err
return gp, err
}
event := listGroupEvent{
pm,
@ -105,7 +105,7 @@ func (es *eventStore) ListGroups(ctx context.Context, token string, pm mfgroups.
func (es *eventStore) ListMemberships(ctx context.Context, token, clientID string, pm mfgroups.GroupsPage) (mfgroups.MembershipsPage, error) {
mp, err := es.svc.ListMemberships(ctx, token, clientID, pm)
if err != nil {
return mfgroups.MembershipsPage{}, err
return mp, err
}
event := listGroupMembershipEvent{
pm, clientID,
@ -118,21 +118,21 @@ func (es *eventStore) ListMemberships(ctx context.Context, token, clientID strin
}
func (es *eventStore) EnableGroup(ctx context.Context, token, id string) (mfgroups.Group, error) {
cli, err := es.svc.EnableGroup(ctx, token, id)
group, err := es.svc.EnableGroup(ctx, token, id)
if err != nil {
return mfgroups.Group{}, err
return group, err
}
return es.delete(ctx, cli)
return es.delete(ctx, group)
}
func (es *eventStore) DisableGroup(ctx context.Context, token, id string) (mfgroups.Group, error) {
cli, err := es.svc.DisableGroup(ctx, token, id)
group, err := es.svc.DisableGroup(ctx, token, id)
if err != nil {
return mfgroups.Group{}, err
return group, err
}
return es.delete(ctx, cli)
return es.delete(ctx, group)
}
func (es *eventStore) delete(ctx context.Context, group mfgroups.Group) (mfgroups.Group, error) {

View File

@ -2,5 +2,5 @@
// SPDX-License-Identifier: Apache-2.0
// Package redis contains cache implementations using Redis as
// the underlying cache.
// the underlying cache and event store.
package redis

View File

@ -62,14 +62,13 @@ func (pe policyEvent) Encode() (map[string]interface{}, error) {
type authorizeEvent struct {
policies.AccessRequest
entityType string
}
func (ae authorizeEvent) Encode() (map[string]interface{}, error) {
// We don't want to send the key over the stream, so we don't send the subject.
val := map[string]interface{}{
"operation": authorize,
"entity_type": ae.entityType,
"entity_type": ae.Entity,
}
if ae.Object != "" {

View File

@ -39,25 +39,25 @@ func NewEventStoreMiddleware(ctx context.Context, svc policies.Service, client *
}
func (es *eventStore) Authorize(ctx context.Context, ar policies.AccessRequest) (policies.Policy, error) {
id, err := es.svc.Authorize(ctx, ar)
policy, err := es.svc.Authorize(ctx, ar)
if err != nil {
return policies.Policy{}, err
return policy, err
}
event := authorizeEvent{
ar, ar.Entity,
ar,
}
if err := es.Publish(ctx, event); err != nil {
return id, err
return policy, err
}
return id, nil
return policy, nil
}
func (es *eventStore) AddPolicy(ctx context.Context, token string, policy policies.Policy) (policies.Policy, error) {
policy, err := es.svc.AddPolicy(ctx, token, policy)
if err != nil {
return policies.Policy{}, err
return policy, err
}
event := policyEvent{
@ -73,7 +73,7 @@ func (es *eventStore) AddPolicy(ctx context.Context, token string, policy polici
func (es *eventStore) UpdatePolicy(ctx context.Context, token string, policy policies.Policy) (policies.Policy, error) {
policy, err := es.svc.UpdatePolicy(ctx, token, policy)
if err != nil {
return policies.Policy{}, err
return policy, err
}
event := policyEvent{
@ -87,19 +87,19 @@ func (es *eventStore) UpdatePolicy(ctx context.Context, token string, policy pol
}
func (es *eventStore) ListPolicies(ctx context.Context, token string, page policies.Page) (policies.PolicyPage, error) {
policypage, err := es.svc.ListPolicies(ctx, token, page)
pp, err := es.svc.ListPolicies(ctx, token, page)
if err != nil {
return policies.PolicyPage{}, err
return pp, err
}
event := listPoliciesEvent{
page,
}
if err := es.Publish(ctx, event); err != nil {
return policypage, err
return pp, err
}
return policypage, nil
return pp, nil
}
func (es *eventStore) DeletePolicy(ctx context.Context, token string, policy policies.Policy) error {

View File

@ -0,0 +1,5 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package redis contains event source Redis client implementation.
package redis

View File

@ -0,0 +1,417 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis
import (
"encoding/json"
"fmt"
"strings"
"time"
mfredis "github.com/mainflux/mainflux/internal/clients/redis"
mfclients "github.com/mainflux/mainflux/pkg/clients"
)
const (
clientPrefix = "user."
clientCreate = clientPrefix + "create"
clientUpdate = clientPrefix + "update"
clientRemove = clientPrefix + "remove"
clientView = clientPrefix + "view"
profileView = clientPrefix + "view_profile"
clientList = clientPrefix + "list"
clientListByGroup = clientPrefix + "list_by_group"
clientIdentify = clientPrefix + "identify"
generateResetToken = clientPrefix + "generate_reset_token"
issueToken = clientPrefix + "issue_token"
refreshToken = clientPrefix + "refresh_token"
resetSecret = clientPrefix + "reset_secret"
sendPasswordReset = clientPrefix + "send_password_reset"
)
var (
_ mfredis.Event = (*createClientEvent)(nil)
_ mfredis.Event = (*updateClientEvent)(nil)
_ mfredis.Event = (*removeClientEvent)(nil)
_ mfredis.Event = (*viewClientEvent)(nil)
_ mfredis.Event = (*viewProfileEvent)(nil)
_ mfredis.Event = (*listClientEvent)(nil)
_ mfredis.Event = (*listClientByGroupEvent)(nil)
_ mfredis.Event = (*identifyClientEvent)(nil)
_ mfredis.Event = (*generateResetTokenEvent)(nil)
_ mfredis.Event = (*issueTokenEvent)(nil)
_ mfredis.Event = (*refreshTokenEvent)(nil)
_ mfredis.Event = (*resetSecretEvent)(nil)
_ mfredis.Event = (*sendPasswordResetEvent)(nil)
)
type createClientEvent struct {
mfclients.Client
}
func (cce createClientEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientCreate,
"id": cce.ID,
"status": cce.Status.String(),
"created_at": cce.CreatedAt,
}
if cce.Name != "" {
val["name"] = cce.Name
}
if len(cce.Tags) > 0 {
tags := fmt.Sprintf("[%s]", strings.Join(cce.Tags, ","))
val["tags"] = tags
}
if cce.Owner != "" {
val["owner"] = cce.Owner
}
if cce.Metadata != nil {
metadata, err := json.Marshal(cce.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if cce.Credentials.Identity != "" {
val["identity"] = cce.Credentials.Identity
}
return val, nil
}
type updateClientEvent struct {
mfclients.Client
operation string
}
func (uce updateClientEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientUpdate,
"updated_at": uce.UpdatedAt,
"updated_by": uce.UpdatedBy,
}
if uce.operation != "" {
val["operation"] = clientUpdate + "_" + uce.operation
}
if uce.ID != "" {
val["id"] = uce.ID
}
if uce.Name != "" {
val["name"] = uce.Name
}
if len(uce.Tags) > 0 {
tags := fmt.Sprintf("[%s]", strings.Join(uce.Tags, ","))
val["tags"] = tags
}
if uce.Credentials.Identity != "" {
val["identity"] = uce.Credentials.Identity
}
if uce.Metadata != nil {
metadata, err := json.Marshal(uce.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if !uce.CreatedAt.IsZero() {
val["created_at"] = uce.CreatedAt
}
if uce.Status.String() != "" {
val["status"] = uce.Status.String()
}
return val, nil
}
type removeClientEvent struct {
id string
status string
updatedAt time.Time
updatedBy string
}
func (rce removeClientEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": clientRemove,
"id": rce.id,
"status": rce.status,
"updated_at": rce.updatedAt,
"updated_by": rce.updatedBy,
}, nil
}
type viewClientEvent struct {
mfclients.Client
}
func (vce viewClientEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientView,
"id": vce.ID,
}
if vce.Name != "" {
val["name"] = vce.Name
}
if len(vce.Tags) > 0 {
tags := fmt.Sprintf("[%s]", strings.Join(vce.Tags, ","))
val["tags"] = tags
}
if vce.Owner != "" {
val["owner"] = vce.Owner
}
if vce.Credentials.Identity != "" {
val["identity"] = vce.Credentials.Identity
}
if vce.Metadata != nil {
metadata, err := json.Marshal(vce.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if !vce.CreatedAt.IsZero() {
val["created_at"] = vce.CreatedAt
}
if !vce.UpdatedAt.IsZero() {
val["updated_at"] = vce.UpdatedAt
}
if vce.UpdatedBy != "" {
val["updated_by"] = vce.UpdatedBy
}
if vce.Status.String() != "" {
val["status"] = vce.Status.String()
}
return val, nil
}
type viewProfileEvent struct {
mfclients.Client
}
func (vpe viewProfileEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": profileView,
"id": vpe.ID,
}
if vpe.Name != "" {
val["name"] = vpe.Name
}
if len(vpe.Tags) > 0 {
tags := fmt.Sprintf("[%s]", strings.Join(vpe.Tags, ","))
val["tags"] = tags
}
if vpe.Owner != "" {
val["owner"] = vpe.Owner
}
if vpe.Credentials.Identity != "" {
val["identity"] = vpe.Credentials.Identity
}
if vpe.Metadata != nil {
metadata, err := json.Marshal(vpe.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if !vpe.CreatedAt.IsZero() {
val["created_at"] = vpe.CreatedAt
}
if !vpe.UpdatedAt.IsZero() {
val["updated_at"] = vpe.UpdatedAt
}
if vpe.UpdatedBy != "" {
val["updated_by"] = vpe.UpdatedBy
}
if vpe.Status.String() != "" {
val["status"] = vpe.Status.String()
}
return val, nil
}
type listClientEvent struct {
mfclients.Page
}
func (lce listClientEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientList,
"total": lce.Total,
"offset": lce.Offset,
"limit": lce.Limit,
}
if lce.Name != "" {
val["name"] = lce.Name
}
if lce.Order != "" {
val["order"] = lce.Order
}
if lce.Dir != "" {
val["dir"] = lce.Dir
}
if lce.Metadata != nil {
metadata, err := json.Marshal(lce.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if lce.Owner != "" {
val["owner"] = lce.Owner
}
if lce.Tag != "" {
val["tag"] = lce.Tag
}
if lce.SharedBy != "" {
val["sharedBy"] = lce.SharedBy
}
if lce.Status.String() != "" {
val["status"] = lce.Status.String()
}
if lce.Action != "" {
val["action"] = lce.Action
}
if lce.Subject != "" {
val["subject"] = lce.Subject
}
if lce.Identity != "" {
val["identity"] = lce.Identity
}
return val, nil
}
type listClientByGroupEvent struct {
mfclients.Page
channelID string
}
func (lcge listClientByGroupEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientListByGroup,
"total": lcge.Total,
"offset": lcge.Offset,
"limit": lcge.Limit,
"channel_id": lcge.channelID,
}
if lcge.Name != "" {
val["name"] = lcge.Name
}
if lcge.Order != "" {
val["order"] = lcge.Order
}
if lcge.Dir != "" {
val["dir"] = lcge.Dir
}
if lcge.Metadata != nil {
metadata, err := json.Marshal(lcge.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if lcge.Owner != "" {
val["owner"] = lcge.Owner
}
if lcge.Tag != "" {
val["tag"] = lcge.Tag
}
if lcge.SharedBy != "" {
val["sharedBy"] = lcge.SharedBy
}
if lcge.Status.String() != "" {
val["status"] = lcge.Status.String()
}
if lcge.Action != "" {
val["action"] = lcge.Action
}
if lcge.Subject != "" {
val["subject"] = lcge.Subject
}
if lcge.Identity != "" {
val["identity"] = lcge.Identity
}
return val, nil
}
type identifyClientEvent struct {
userID string
}
func (ice identifyClientEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": clientIdentify,
"user_id": ice.userID,
}, nil
}
type generateResetTokenEvent struct {
email string
host string
}
func (grte generateResetTokenEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": generateResetToken,
"email": grte.email,
"host": grte.host,
}, nil
}
type issueTokenEvent struct {
identity string
}
func (ite issueTokenEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": issueToken,
"identity": ite.identity,
}, nil
}
type refreshTokenEvent struct{}
func (rte refreshTokenEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": refreshToken,
}, nil
}
type resetSecretEvent struct{}
func (rse resetSecretEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": resetSecret,
}, nil
}
type sendPasswordResetEvent struct {
host string
email string
user string
}
func (spre sendPasswordResetEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": sendPasswordReset,
"host": spre.host,
"email": spre.email,
"user": spre.user,
}, nil
}

View File

@ -0,0 +1,294 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis
import (
"context"
"github.com/go-redis/redis/v8"
mfredis "github.com/mainflux/mainflux/internal/clients/redis"
mfclients "github.com/mainflux/mainflux/pkg/clients"
"github.com/mainflux/mainflux/users/clients"
"github.com/mainflux/mainflux/users/jwt"
)
const (
streamID = "mainflux.users"
streamLen = 1000
)
var _ clients.Service = (*eventStore)(nil)
type eventStore struct {
mfredis.Publisher
svc clients.Service
client *redis.Client
}
// NewEventStoreMiddleware returns wrapper around users service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc clients.Service, client *redis.Client) clients.Service {
es := eventStore{
svc: svc,
client: client,
Publisher: mfredis.NewEventStore(client, streamID, streamLen),
}
go es.StartPublishingRoutine(ctx)
return es
}
func (es eventStore) RegisterClient(ctx context.Context, token string, user mfclients.Client) (mfclients.Client, error) {
user, err := es.svc.RegisterClient(ctx, token, user)
if err != nil {
return user, err
}
event := createClientEvent{
user,
}
if err := es.Publish(ctx, event); err != nil {
return user, err
}
return user, nil
}
func (es eventStore) UpdateClient(ctx context.Context, token string, user mfclients.Client) (mfclients.Client, error) {
user, err := es.svc.UpdateClient(ctx, token, user)
if err != nil {
return user, err
}
return es.update(ctx, "", user)
}
func (es eventStore) UpdateClientOwner(ctx context.Context, token string, user mfclients.Client) (mfclients.Client, error) {
user, err := es.svc.UpdateClientOwner(ctx, token, user)
if err != nil {
return user, err
}
return es.update(ctx, "owner", user)
}
func (es eventStore) UpdateClientTags(ctx context.Context, token string, user mfclients.Client) (mfclients.Client, error) {
user, err := es.svc.UpdateClientTags(ctx, token, user)
if err != nil {
return user, err
}
return es.update(ctx, "tags", user)
}
func (es eventStore) UpdateClientSecret(ctx context.Context, token, oldSecret, newSecret string) (mfclients.Client, error) {
user, err := es.svc.UpdateClientSecret(ctx, token, oldSecret, newSecret)
if err != nil {
return user, err
}
return es.update(ctx, "secret", user)
}
func (es eventStore) UpdateClientIdentity(ctx context.Context, token, id, identity string) (mfclients.Client, error) {
user, err := es.svc.UpdateClientIdentity(ctx, token, id, identity)
if err != nil {
return user, err
}
return es.update(ctx, "identity", user)
}
func (es eventStore) update(ctx context.Context, operation string, user mfclients.Client) (mfclients.Client, error) {
event := updateClientEvent{
user, operation,
}
if err := es.Publish(ctx, event); err != nil {
return user, err
}
return user, nil
}
func (es eventStore) ViewClient(ctx context.Context, token, id string) (mfclients.Client, error) {
user, err := es.svc.ViewClient(ctx, token, id)
if err != nil {
return user, err
}
event := viewClientEvent{
user,
}
if err := es.Publish(ctx, event); err != nil {
return user, err
}
return user, nil
}
func (es eventStore) ViewProfile(ctx context.Context, token string) (mfclients.Client, error) {
user, err := es.svc.ViewProfile(ctx, token)
if err != nil {
return user, err
}
event := viewProfileEvent{
user,
}
if err := es.Publish(ctx, event); err != nil {
return user, err
}
return user, nil
}
func (es eventStore) ListClients(ctx context.Context, token string, pm mfclients.Page) (mfclients.ClientsPage, error) {
cp, err := es.svc.ListClients(ctx, token, pm)
if err != nil {
return cp, err
}
event := listClientEvent{
pm,
}
if err := es.Publish(ctx, event); err != nil {
return cp, err
}
return cp, nil
}
func (es eventStore) ListMembers(ctx context.Context, token, groupID string, pm mfclients.Page) (mfclients.MembersPage, error) {
mp, err := es.svc.ListMembers(ctx, token, groupID, pm)
if err != nil {
return mp, err
}
event := listClientByGroupEvent{
pm, groupID,
}
if err := es.Publish(ctx, event); err != nil {
return mp, err
}
return mp, nil
}
func (es eventStore) EnableClient(ctx context.Context, token, id string) (mfclients.Client, error) {
user, err := es.svc.EnableClient(ctx, token, id)
if err != nil {
return user, err
}
return es.delete(ctx, user)
}
func (es eventStore) DisableClient(ctx context.Context, token, id string) (mfclients.Client, error) {
user, err := es.svc.DisableClient(ctx, token, id)
if err != nil {
return user, err
}
return es.delete(ctx, user)
}
func (es eventStore) delete(ctx context.Context, user mfclients.Client) (mfclients.Client, error) {
event := removeClientEvent{
id: user.ID,
updatedAt: user.UpdatedAt,
updatedBy: user.UpdatedBy,
status: user.Status.String(),
}
if err := es.Publish(ctx, event); err != nil {
return user, err
}
return user, nil
}
func (es eventStore) Identify(ctx context.Context, token string) (string, error) {
userID, err := es.svc.Identify(ctx, token)
if err != nil {
return userID, err
}
event := identifyClientEvent{
userID: userID,
}
if err := es.Publish(ctx, event); err != nil {
return userID, err
}
return userID, nil
}
func (es eventStore) GenerateResetToken(ctx context.Context, email, host string) error {
if err := es.svc.GenerateResetToken(ctx, email, host); err != nil {
return err
}
event := generateResetTokenEvent{
email: email,
host: host,
}
return es.Publish(ctx, event)
}
func (es eventStore) IssueToken(ctx context.Context, identity, secret string) (jwt.Token, error) {
token, err := es.svc.IssueToken(ctx, identity, secret)
if err != nil {
return token, err
}
event := issueTokenEvent{
identity: identity,
}
if err := es.Publish(ctx, event); err != nil {
return token, err
}
return token, nil
}
func (es eventStore) RefreshToken(ctx context.Context, refreshToken string) (jwt.Token, error) {
token, err := es.svc.RefreshToken(ctx, refreshToken)
if err != nil {
return token, err
}
event := refreshTokenEvent{}
if err := es.Publish(ctx, event); err != nil {
return token, err
}
return token, nil
}
func (es eventStore) ResetSecret(ctx context.Context, resetToken, secret string) error {
if err := es.svc.ResetSecret(ctx, resetToken, secret); err != nil {
return err
}
event := resetSecretEvent{}
return es.Publish(ctx, event)
}
func (es eventStore) SendPasswordReset(ctx context.Context, host, email, user, token string) error {
if err := es.svc.SendPasswordReset(ctx, host, email, user, token); err != nil {
return err
}
event := sendPasswordResetEvent{
host: host,
email: email,
user: user,
}
return es.Publish(ctx, event)
}

View File

@ -0,0 +1,5 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package redis contains event source Redis client implementation.
package redis

View File

@ -0,0 +1,263 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis
import (
"encoding/json"
"time"
mfredis "github.com/mainflux/mainflux/internal/clients/redis"
mfgroups "github.com/mainflux/mainflux/pkg/groups"
)
const (
groupPrefix = "group."
groupCreate = groupPrefix + "create"
groupUpdate = groupPrefix + "update"
groupRemove = groupPrefix + "remove"
groupView = groupPrefix + "view"
groupList = groupPrefix + "list"
groupListMemberships = groupPrefix + "list_by_group"
)
var (
_ mfredis.Event = (*createGroupEvent)(nil)
_ mfredis.Event = (*updateGroupEvent)(nil)
_ mfredis.Event = (*removeGroupEvent)(nil)
_ mfredis.Event = (*viewGroupEvent)(nil)
_ mfredis.Event = (*listGroupEvent)(nil)
_ mfredis.Event = (*listGroupMembershipEvent)(nil)
)
type createGroupEvent struct {
mfgroups.Group
}
func (cge createGroupEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupCreate,
"id": cge.ID,
"status": cge.Status.String(),
"created_at": cge.CreatedAt,
}
if cge.Owner != "" {
val["owner"] = cge.Owner
}
if cge.Parent != "" {
val["parent"] = cge.Parent
}
if cge.Name != "" {
val["name"] = cge.Name
}
if cge.Description != "" {
val["description"] = cge.Description
}
if cge.Metadata != nil {
metadata, err := json.Marshal(cge.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if cge.Status.String() != "" {
val["status"] = cge.Status.String()
}
return val, nil
}
type updateGroupEvent struct {
mfgroups.Group
}
func (uge updateGroupEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupUpdate,
"updated_at": uge.UpdatedAt,
"updated_by": uge.UpdatedBy,
}
if uge.ID != "" {
val["id"] = uge.ID
}
if uge.Owner != "" {
val["owner"] = uge.Owner
}
if uge.Parent != "" {
val["parent"] = uge.Parent
}
if uge.Name != "" {
val["name"] = uge.Name
}
if uge.Description != "" {
val["description"] = uge.Description
}
if uge.Metadata != nil {
metadata, err := json.Marshal(uge.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if !uge.CreatedAt.IsZero() {
val["created_at"] = uge.CreatedAt
}
if uge.Status.String() != "" {
val["status"] = uge.Status.String()
}
return val, nil
}
type removeGroupEvent struct {
id string
status string
updatedAt time.Time
updatedBy string
}
func (rge removeGroupEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": groupRemove,
"id": rge.id,
"status": rge.status,
"updated_at": rge.updatedAt,
"updated_by": rge.updatedBy,
}, nil
}
type viewGroupEvent struct {
mfgroups.Group
}
func (vge viewGroupEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupView,
"id": vge.ID,
}
if vge.Owner != "" {
val["owner"] = vge.Owner
}
if vge.Parent != "" {
val["parent"] = vge.Parent
}
if vge.Name != "" {
val["name"] = vge.Name
}
if vge.Description != "" {
val["description"] = vge.Description
}
if vge.Metadata != nil {
metadata, err := json.Marshal(vge.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if !vge.CreatedAt.IsZero() {
val["created_at"] = vge.CreatedAt
}
if !vge.UpdatedAt.IsZero() {
val["updated_at"] = vge.UpdatedAt
}
if vge.UpdatedBy != "" {
val["updated_by"] = vge.UpdatedBy
}
if vge.Status.String() != "" {
val["status"] = vge.Status.String()
}
return val, nil
}
type listGroupEvent struct {
mfgroups.GroupsPage
}
func (lge listGroupEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupList,
"total": lge.Total,
"offset": lge.Offset,
"limit": lge.Limit,
}
if lge.Name != "" {
val["name"] = lge.Name
}
if lge.OwnerID != "" {
val["owner_id"] = lge.OwnerID
}
if lge.Tag != "" {
val["tag"] = lge.Tag
}
if lge.Metadata != nil {
metadata, err := json.Marshal(lge.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if lge.Status.String() != "" {
val["status"] = lge.Status.String()
}
if lge.Action != "" {
val["action"] = lge.Action
}
if lge.Subject != "" {
val["subject"] = lge.Subject
}
return val, nil
}
type listGroupMembershipEvent struct {
mfgroups.GroupsPage
channelID string
}
func (lgme listGroupMembershipEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupListMemberships,
"total": lgme.Total,
"offset": lgme.Offset,
"limit": lgme.Limit,
"channel_id": lgme.channelID,
}
if lgme.Name != "" {
val["name"] = lgme.Name
}
if lgme.OwnerID != "" {
val["owner_id"] = lgme.OwnerID
}
if lgme.Tag != "" {
val["tag"] = lgme.Tag
}
if lgme.Metadata != nil {
metadata, err := json.Marshal(lgme.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
}
if lgme.Status.String() != "" {
val["status"] = lgme.Status.String()
}
if lgme.Action != "" {
val["action"] = lgme.Action
}
if lgme.Subject != "" {
val["subject"] = lgme.Subject
}
return val, nil
}

View File

@ -0,0 +1,155 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis
import (
"context"
"github.com/go-redis/redis/v8"
mfredis "github.com/mainflux/mainflux/internal/clients/redis"
mfgroups "github.com/mainflux/mainflux/pkg/groups"
"github.com/mainflux/mainflux/users/groups"
)
const (
streamID = "mainflux.users"
streamLen = 1000
)
var _ groups.Service = (*eventStore)(nil)
type eventStore struct {
mfredis.Publisher
svc groups.Service
client *redis.Client
}
// NewEventStoreMiddleware returns wrapper around things service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc groups.Service, client *redis.Client) groups.Service {
es := eventStore{
svc: svc,
client: client,
Publisher: mfredis.NewEventStore(client, streamID, streamLen),
}
go es.StartPublishingRoutine(ctx)
return es
}
func (es eventStore) CreateGroup(ctx context.Context, token string, group mfgroups.Group) (mfgroups.Group, error) {
group, err := es.svc.CreateGroup(ctx, token, group)
if err != nil {
return group, err
}
event := createGroupEvent{
group,
}
if err := es.Publish(ctx, event); err != nil {
return group, err
}
return group, nil
}
func (es eventStore) UpdateGroup(ctx context.Context, token string, group mfgroups.Group) (mfgroups.Group, error) {
group, err := es.svc.UpdateGroup(ctx, token, group)
if err != nil {
return group, err
}
event := updateGroupEvent{
group,
}
if err := es.Publish(ctx, event); err != nil {
return group, err
}
return group, nil
}
func (es eventStore) ViewGroup(ctx context.Context, token, id string) (mfgroups.Group, error) {
group, err := es.svc.ViewGroup(ctx, token, id)
if err != nil {
return group, err
}
event := viewGroupEvent{
group,
}
if err := es.Publish(ctx, event); err != nil {
return group, err
}
return group, nil
}
func (es eventStore) ListGroups(ctx context.Context, token string, pm mfgroups.GroupsPage) (mfgroups.GroupsPage, error) {
gp, err := es.svc.ListGroups(ctx, token, pm)
if err != nil {
return gp, err
}
event := listGroupEvent{
pm,
}
if err := es.Publish(ctx, event); err != nil {
return gp, err
}
return gp, nil
}
func (es eventStore) ListMemberships(ctx context.Context, token, clientID string, pm mfgroups.GroupsPage) (mfgroups.MembershipsPage, error) {
mp, err := es.svc.ListMemberships(ctx, token, clientID, pm)
if err != nil {
return mp, err
}
event := listGroupMembershipEvent{
pm, clientID,
}
if err := es.Publish(ctx, event); err != nil {
return mp, err
}
return mp, nil
}
func (es eventStore) EnableGroup(ctx context.Context, token, id string) (mfgroups.Group, error) {
group, err := es.svc.EnableGroup(ctx, token, id)
if err != nil {
return group, err
}
return es.delete(ctx, group)
}
func (es eventStore) DisableGroup(ctx context.Context, token, id string) (mfgroups.Group, error) {
group, err := es.svc.DisableGroup(ctx, token, id)
if err != nil {
return group, err
}
return es.delete(ctx, group)
}
func (es eventStore) delete(ctx context.Context, group mfgroups.Group) (mfgroups.Group, error) {
event := removeGroupEvent{
id: group.ID,
updatedAt: group.UpdatedAt,
updatedBy: group.UpdatedBy,
status: group.Status.String(),
}
if err := es.Publish(ctx, event); err != nil {
return group, err
}
return group, nil
}

View File

@ -0,0 +1,5 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package redis contains event source Redis client implementation.
package redis

View File

@ -0,0 +1,107 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis
import (
"fmt"
"strings"
mfredis "github.com/mainflux/mainflux/internal/clients/redis"
"github.com/mainflux/mainflux/users/policies"
)
const (
policyPrefix = "policies."
authorize = policyPrefix + "authorize"
policyAdd = policyPrefix + "add"
policyUpdate = policyPrefix + "update"
policyList = policyPrefix + "list"
policyDelete = policyPrefix + "delete"
)
var (
_ mfredis.Event = (*policyEvent)(nil)
_ mfredis.Event = (*authorizeEvent)(nil)
_ mfredis.Event = (*listPoliciesEvent)(nil)
)
type policyEvent struct {
policies.Policy
operation string
}
func (pe policyEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": pe.operation,
}
if pe.OwnerID != "" {
val["owner_id"] = pe.OwnerID
}
if pe.Subject != "" {
val["subject"] = pe.Subject
}
if pe.Object != "" {
val["object"] = pe.Object
}
if len(pe.Actions) > 0 {
actions := fmt.Sprintf("[%s]", strings.Join(pe.Actions, ","))
val["actions"] = actions
}
if !pe.CreatedAt.IsZero() {
val["created_at"] = pe.CreatedAt
}
if !pe.UpdatedAt.IsZero() {
val["updated_at"] = pe.UpdatedAt
}
if pe.UpdatedBy != "" {
val["updated_by"] = pe.UpdatedBy
}
return val, nil
}
type authorizeEvent struct {
policies.AccessRequest
}
func (ae authorizeEvent) Encode() (map[string]interface{}, error) {
// We don't want to send the key over the stream, so we don't send the subject.
val := map[string]interface{}{
"operation": authorize,
"entity_type": ae.Entity,
}
if ae.Object != "" {
val["object"] = ae.Object
}
if ae.Action != "" {
val["action"] = ae.Action
}
return val, nil
}
type listPoliciesEvent struct {
policies.Page
}
func (lpe listPoliciesEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": policyList,
"total": lpe.Total,
"limit": lpe.Limit,
"offset": lpe.Offset,
}
if lpe.OwnerID != "" {
val["owner_id"] = lpe.OwnerID
}
if lpe.Subject != "" {
val["subject"] = lpe.Subject
}
if lpe.Object != "" {
val["object"] = lpe.Object
}
if lpe.Action != "" {
val["action"] = lpe.Action
}
return val, nil
}

View File

@ -0,0 +1,104 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package redis
import (
"context"
"github.com/go-redis/redis/v8"
mfredis "github.com/mainflux/mainflux/internal/clients/redis"
"github.com/mainflux/mainflux/users/policies"
)
const (
streamID = "mainflux.users"
streamLen = 1000
)
var _ policies.Service = (*eventStore)(nil)
type eventStore struct {
mfredis.Publisher
svc policies.Service
client *redis.Client
}
// NewEventStoreMiddleware returns wrapper around policy service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc policies.Service, client *redis.Client) policies.Service {
es := eventStore{
svc: svc,
client: client,
Publisher: mfredis.NewEventStore(client, streamID, streamLen),
}
go es.StartPublishingRoutine(ctx)
return es
}
func (es eventStore) Authorize(ctx context.Context, ar policies.AccessRequest) error {
if err := es.svc.Authorize(ctx, ar); err != nil {
return err
}
event := authorizeEvent{
ar,
}
return es.Publish(ctx, event)
}
func (es eventStore) AddPolicy(ctx context.Context, token string, policy policies.Policy) error {
if err := es.svc.AddPolicy(ctx, token, policy); err != nil {
return err
}
event := policyEvent{
policy, policyAdd,
}
return es.Publish(ctx, event)
}
func (es eventStore) UpdatePolicy(ctx context.Context, token string, policy policies.Policy) error {
if err := es.svc.UpdatePolicy(ctx, token, policy); err != nil {
return err
}
event := policyEvent{
policy, policyUpdate,
}
return es.Publish(ctx, event)
}
func (es eventStore) ListPolicies(ctx context.Context, token string, page policies.Page) (policies.PolicyPage, error) {
pp, err := es.svc.ListPolicies(ctx, token, page)
if err != nil {
return pp, err
}
event := listPoliciesEvent{
page,
}
if err := es.Publish(ctx, event); err != nil {
return pp, err
}
return pp, nil
}
func (es eventStore) DeletePolicy(ctx context.Context, token string, policy policies.Policy) error {
if err := es.svc.DeletePolicy(ctx, token, policy); err != nil {
return err
}
event := policyEvent{
policy, policyDelete,
}
return es.Publish(ctx, event)
}