From a680fa35d2a5a85f9072a3c2eb03ad037d27b0a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Novakovi=C4=87?= Date: Tue, 20 Nov 2018 19:46:33 +0100 Subject: [PATCH] MF-447 - Add event sourcing to things service (#460) * Add event sourcing middleware for things service Add event sourcing for write operations over things and channels. Signed-off-by: Aleksandar Novakovic * Add tests for event sourcing middleware Signed-off-by: Aleksandar Novakovic * Update things in tests to contain metadata field Signed-off-by: Aleksandar Novakovic --- cmd/things/main.go | 1 + things/redis/channels_test.go | 8 +- things/redis/events.go | 163 +++++++++++ things/redis/setup_test.go | 15 +- things/redis/streams.go | 218 ++++++++++++++ things/redis/streams_test.go | 522 ++++++++++++++++++++++++++++++++++ things/redis/things_test.go | 7 +- 7 files changed, 919 insertions(+), 15 deletions(-) create mode 100644 things/redis/events.go create mode 100644 things/redis/streams.go create mode 100644 things/redis/streams_test.go diff --git a/cmd/things/main.go b/cmd/things/main.go index b2002436..fdc0161b 100644 --- a/cmd/things/main.go +++ b/cmd/things/main.go @@ -209,6 +209,7 @@ func newService(conn *grpc.ClientConn, db *sql.DB, client *redis.Client, logger idp := uuid.New() svc := things.New(users, thingsRepo, channelsRepo, chanCache, thingCache, idp) + svc = rediscache.NewEventStoreMiddleware(svc, client) svc = api.LoggingMiddleware(svc, logger) svc = api.MetricsMiddleware( svc, diff --git a/things/redis/channels_test.go b/things/redis/channels_test.go index 1191ebd9..910a3154 100644 --- a/things/redis/channels_test.go +++ b/things/redis/channels_test.go @@ -17,7 +17,7 @@ import ( ) func TestConnect(t *testing.T) { - channelCache := redis.NewChannelCache(cacheClient) + channelCache := redis.NewChannelCache(redisClient) cid := uint64(123) tid := uint64(321) @@ -45,7 +45,7 @@ func TestConnect(t *testing.T) { } func TestHasThing(t *testing.T) { - channelCache := redis.NewChannelCache(cacheClient) + channelCache := redis.NewChannelCache(redisClient) cid := uint64(123) tid := uint64(321) @@ -81,7 +81,7 @@ func TestHasThing(t *testing.T) { } } func TestDisconnect(t *testing.T) { - channelCache := redis.NewChannelCache(cacheClient) + channelCache := redis.NewChannelCache(redisClient) cid := uint64(123) tid := uint64(321) @@ -119,7 +119,7 @@ func TestDisconnect(t *testing.T) { } func TestRemove(t *testing.T) { - channelCache := redis.NewChannelCache(cacheClient) + channelCache := redis.NewChannelCache(redisClient) cid := uint64(123) cid2 := uint64(124) diff --git a/things/redis/events.go b/things/redis/events.go new file mode 100644 index 00000000..f6bd97a9 --- /dev/null +++ b/things/redis/events.go @@ -0,0 +1,163 @@ +package redis + +const ( + thingPrefix = "thing." + thingCreate = thingPrefix + "create" + thingUpdate = thingPrefix + "update" + thingRemove = thingPrefix + "remove" + thingConnect = thingPrefix + "connect" + thingDisconnect = thingPrefix + "disconnect" + + channelPrefix = "channel." + channelCreate = channelPrefix + "create" + channelUpdate = channelPrefix + "update" + channelRemove = channelPrefix + "remove" +) + +type event interface { + Encode() map[string]interface{} +} + +var ( + _ event = (*createThingEvent)(nil) + _ event = (*updateThingEvent)(nil) + _ event = (*removeThingEvent)(nil) + _ event = (*createChannelEvent)(nil) + _ event = (*updateChannelEvent)(nil) + _ event = (*removeChannelEvent)(nil) + _ event = (*connectThingEvent)(nil) + _ event = (*disconnectThingEvent)(nil) +) + +type createThingEvent struct { + id string + owner string + kind string + name string + metadata string +} + +func (cte createThingEvent) Encode() map[string]interface{} { + val := map[string]interface{}{ + "id": cte.id, + "owner": cte.owner, + "type": cte.kind, + "operation": thingCreate, + } + + if cte.name != "" { + val["name"] = cte.name + } + + if cte.metadata != "" { + val["metadata"] = cte.metadata + } + + return val +} + +type updateThingEvent struct { + id string + kind string + name string + metadata string +} + +func (ute updateThingEvent) Encode() map[string]interface{} { + val := map[string]interface{}{ + "id": ute.id, + "type": ute.kind, + "operation": thingUpdate, + } + + if ute.name != "" { + val["name"] = ute.name + } + + if ute.metadata != "" { + val["metadata"] = ute.metadata + } + + return val +} + +type removeThingEvent struct { + id string +} + +func (rte removeThingEvent) Encode() map[string]interface{} { + return map[string]interface{}{ + "id": rte.id, + "operation": thingRemove, + } +} + +type createChannelEvent struct { + id string + owner string + name string +} + +func (cce createChannelEvent) Encode() map[string]interface{} { + val := map[string]interface{}{ + "id": cce.id, + "owner": cce.owner, + "operation": channelCreate, + } + + if cce.name != "" { + val["name"] = cce.name + } + + return val +} + +type updateChannelEvent struct { + id string + name string +} + +func (uce updateChannelEvent) Encode() map[string]interface{} { + return map[string]interface{}{ + "id": uce.id, + "name": uce.name, + "operation": channelUpdate, + } +} + +type removeChannelEvent struct { + id string +} + +func (rce removeChannelEvent) Encode() map[string]interface{} { + return map[string]interface{}{ + "id": rce.id, + "operation": channelRemove, + } +} + +type connectThingEvent struct { + chanID string + thingID string +} + +func (cte connectThingEvent) Encode() map[string]interface{} { + return map[string]interface{}{ + "chan_id": cte.chanID, + "thing_id": cte.thingID, + "operation": thingConnect, + } +} + +type disconnectThingEvent struct { + chanID string + thingID string +} + +func (dte disconnectThingEvent) Encode() map[string]interface{} { + return map[string]interface{}{ + "chan_id": dte.chanID, + "thing_id": dte.thingID, + "operation": thingDisconnect, + } +} diff --git a/things/redis/setup_test.go b/things/redis/setup_test.go index c709fdaf..880d2045 100644 --- a/things/redis/setup_test.go +++ b/things/redis/setup_test.go @@ -22,7 +22,7 @@ const ( wrongValue = "wrong-value" ) -var cacheClient *redis.Client +var redisClient *redis.Client func TestMain(m *testing.M) { pool, err := dockertest.NewPool("") @@ -30,27 +30,28 @@ func TestMain(m *testing.M) { log.Fatalf("Could not connect to docker: %s", err) } - container, err := pool.Run("redis", "4.0.9-alpine", nil) + container, err := pool.Run("redis", "5.0-alpine", nil) if err != nil { log.Fatalf("Could not start container: %s", err) } - // When you're done, kill and remove the container - defer pool.Purge(container) - if err := pool.Retry(func() error { - cacheClient = redis.NewClient(&redis.Options{ + redisClient = redis.NewClient(&redis.Options{ Addr: fmt.Sprintf("localhost:%s", container.GetPort("6379/tcp")), Password: "", DB: 0, }) - return cacheClient.Ping().Err() + return redisClient.Ping().Err() }); err != nil { log.Fatalf("Could not connect to docker: %s", err) } code := m.Run() + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + os.Exit(code) } diff --git a/things/redis/streams.go b/things/redis/streams.go new file mode 100644 index 00000000..230614a3 --- /dev/null +++ b/things/redis/streams.go @@ -0,0 +1,218 @@ +// +// Copyright (c) 2018 +// Mainflux +// +// SPDX-License-Identifier: Apache-2.0 +// + +package redis + +import ( + "strconv" + + "github.com/go-redis/redis" + "github.com/mainflux/mainflux/things" +) + +const ( + streamID = "mainflux.things" + streamLen = 1000 +) + +var _ things.Service = (*eventStore)(nil) + +type eventStore struct { + svc things.Service + client *redis.Client +} + +// NewEventStoreMiddleware returns wrapper around things service that sends +// events to event store. +func NewEventStoreMiddleware(svc things.Service, client *redis.Client) things.Service { + return eventStore{ + svc: svc, + client: client, + } +} + +func (es eventStore) AddThing(key string, thing things.Thing) (things.Thing, error) { + sth, err := es.svc.AddThing(key, thing) + if err != nil { + return sth, err + } + + event := createThingEvent{ + id: strconv.FormatUint(sth.ID, 10), + owner: sth.Owner, + kind: sth.Type, + name: sth.Name, + metadata: sth.Metadata, + } + record := &redis.XAddArgs{ + Stream: streamID, + MaxLenApprox: streamLen, + Values: event.Encode(), + } + es.client.XAdd(record).Err() + + return sth, err +} + +func (es eventStore) UpdateThing(key string, thing things.Thing) error { + if err := es.svc.UpdateThing(key, thing); err != nil { + return err + } + + event := updateThingEvent{ + id: strconv.FormatUint(thing.ID, 10), + kind: thing.Type, + name: thing.Name, + metadata: thing.Metadata, + } + record := &redis.XAddArgs{ + Stream: streamID, + MaxLenApprox: streamLen, + Values: event.Encode(), + } + es.client.XAdd(record).Err() + + return nil +} + +func (es eventStore) ViewThing(key string, id uint64) (things.Thing, error) { + return es.svc.ViewThing(key, id) +} + +func (es eventStore) ListThings(key string, offset, limit uint64) ([]things.Thing, error) { + return es.svc.ListThings(key, offset, limit) +} + +func (es eventStore) RemoveThing(key string, id uint64) error { + if err := es.svc.RemoveThing(key, id); err != nil { + return err + } + + event := removeThingEvent{ + id: strconv.FormatUint(id, 10), + } + record := &redis.XAddArgs{ + Stream: streamID, + MaxLenApprox: streamLen, + Values: event.Encode(), + } + es.client.XAdd(record).Err() + + return nil +} + +func (es eventStore) CreateChannel(key string, channel things.Channel) (things.Channel, error) { + sch, err := es.svc.CreateChannel(key, channel) + if err != nil { + return sch, err + } + + event := createChannelEvent{ + id: strconv.FormatUint(sch.ID, 10), + owner: sch.Owner, + name: sch.Name, + } + record := &redis.XAddArgs{ + Stream: streamID, + MaxLenApprox: streamLen, + Values: event.Encode(), + } + es.client.XAdd(record).Err() + + return sch, err +} + +func (es eventStore) UpdateChannel(key string, channel things.Channel) error { + if err := es.svc.UpdateChannel(key, channel); err != nil { + return err + } + + event := updateChannelEvent{ + id: strconv.FormatUint(channel.ID, 10), + name: channel.Name, + } + record := &redis.XAddArgs{ + Stream: streamID, + MaxLenApprox: streamLen, + Values: event.Encode(), + } + es.client.XAdd(record).Err() + + return nil +} + +func (es eventStore) ViewChannel(key string, id uint64) (things.Channel, error) { + return es.svc.ViewChannel(key, id) +} + +func (es eventStore) ListChannels(key string, offset, limit uint64) ([]things.Channel, error) { + return es.svc.ListChannels(key, offset, limit) +} + +func (es eventStore) RemoveChannel(key string, id uint64) error { + if err := es.svc.RemoveChannel(key, id); err != nil { + return err + } + + event := removeChannelEvent{ + id: strconv.FormatUint(id, 10), + } + record := &redis.XAddArgs{ + Stream: streamID, + MaxLenApprox: streamLen, + Values: event.Encode(), + } + es.client.XAdd(record).Err() + + return nil +} + +func (es eventStore) Connect(key string, chanID, thingID uint64) error { + if err := es.svc.Connect(key, chanID, thingID); err != nil { + return err + } + + event := connectThingEvent{ + chanID: strconv.FormatUint(chanID, 10), + thingID: strconv.FormatUint(thingID, 10), + } + record := &redis.XAddArgs{ + Stream: streamID, + MaxLenApprox: streamLen, + Values: event.Encode(), + } + es.client.XAdd(record).Err() + + return nil +} + +func (es eventStore) Disconnect(key string, chanID, thingID uint64) error { + if err := es.svc.Disconnect(key, chanID, thingID); err != nil { + return err + } + + event := disconnectThingEvent{ + chanID: strconv.FormatUint(chanID, 10), + thingID: strconv.FormatUint(thingID, 10), + } + record := &redis.XAddArgs{ + Stream: streamID, + MaxLenApprox: streamLen, + Values: event.Encode(), + } + es.client.XAdd(record).Err() + + return nil +} + +func (es eventStore) CanAccess(chanID uint64, key string) (uint64, error) { + return es.svc.CanAccess(chanID, key) +} + +func (es eventStore) Identify(key string) (uint64, error) { + return es.svc.Identify(key) +} diff --git a/things/redis/streams_test.go b/things/redis/streams_test.go new file mode 100644 index 00000000..1d53ebf1 --- /dev/null +++ b/things/redis/streams_test.go @@ -0,0 +1,522 @@ +// +// Copyright (c) 2018 +// Mainflux +// +// SPDX-License-Identifier: Apache-2.0 +// + +package redis_test + +import ( + "fmt" + "math" + "strconv" + "testing" + + r "github.com/go-redis/redis" + "github.com/mainflux/mainflux/things" + "github.com/mainflux/mainflux/things/mocks" + "github.com/mainflux/mainflux/things/redis" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + streamID = "mainflux.things" + email = "user@example.com" + token = "token" + thingPrefix = "thing." + thingCreate = thingPrefix + "create" + thingUpdate = thingPrefix + "update" + thingRemove = thingPrefix + "remove" + thingConnect = thingPrefix + "connect" + thingDisconnect = thingPrefix + "disconnect" + + channelPrefix = "channel." + channelCreate = channelPrefix + "create" + channelUpdate = channelPrefix + "update" + channelRemove = channelPrefix + "remove" +) + +func newService(tokens map[string]string) things.Service { + users := mocks.NewUsersService(tokens) + thingsRepo := mocks.NewThingRepository() + channelsRepo := mocks.NewChannelRepository(thingsRepo) + chanCache := mocks.NewChannelCache() + thingCache := mocks.NewThingCache() + idp := mocks.NewIdentityProvider() + + return things.New(users, thingsRepo, channelsRepo, chanCache, thingCache, idp) +} + +func TestAddThing(t *testing.T) { + redisClient.FlushAll().Err() + + svc := newService(map[string]string{token: email}) + svc = redis.NewEventStoreMiddleware(svc, redisClient) + + cases := []struct { + desc string + thing things.Thing + key string + err error + event map[string]interface{} + }{ + { + desc: "create thing successfully", + thing: things.Thing{Type: "app", Name: "a", Metadata: "metadata"}, + key: token, + err: nil, + event: map[string]interface{}{ + "id": "1", + "name": "a", + "owner": email, + "type": "app", + "metadata": "metadata", + "operation": thingCreate, + }, + }, + { + desc: "create invalid thing", + thing: things.Thing{Type: "a", Name: "a"}, + key: token, + err: things.ErrMalformedEntity, + event: nil, + }, + } + + lastID := "0" + for _, tc := range cases { + _, err := svc.AddThing(tc.key, tc.thing) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(&r.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + lastID = msg.ID + } + + assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event)) + } +} + +func TestUpdateThing(t *testing.T) { + redisClient.FlushAll().Err() + + svc := newService(map[string]string{token: email}) + // Create thing without sending event. + sth, err := svc.AddThing(token, things.Thing{Type: "app", Name: "a", Metadata: "metadata"}) + require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) + + svc = redis.NewEventStoreMiddleware(svc, redisClient) + + cases := []struct { + desc string + thing things.Thing + key string + err error + event map[string]interface{} + }{ + { + desc: "update existing thing successfully", + thing: things.Thing{ID: sth.ID, Type: "app", Name: "a", Metadata: "metadata1"}, + key: token, + err: nil, + event: map[string]interface{}{ + "id": strconv.FormatUint(sth.ID, 10), + "name": "a", + "type": "app", + "metadata": "metadata1", + "operation": thingUpdate, + }, + }, + { + desc: "update invalid thing", + thing: things.Thing{ID: math.MaxUint64, Type: "a", Name: "a"}, + key: token, + err: things.ErrMalformedEntity, + event: nil, + }, + } + + lastID := "0" + for _, tc := range cases { + err := svc.UpdateThing(tc.key, tc.thing) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(&r.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + lastID = msg.ID + } + + assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event)) + } +} + +func TestRemoveThing(t *testing.T) { + redisClient.FlushAll().Err() + + svc := newService(map[string]string{token: email}) + // Create thing without sending event. + sth, err := svc.AddThing(token, things.Thing{Type: "app", Name: "a"}) + require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) + + svc = redis.NewEventStoreMiddleware(svc, redisClient) + + cases := []struct { + desc string + id uint64 + key string + err error + event map[string]interface{} + }{ + { + desc: "delete existing thing successfully", + id: sth.ID, + key: token, + err: nil, + event: map[string]interface{}{ + "id": strconv.FormatUint(sth.ID, 10), + "operation": thingRemove, + }, + }, + { + desc: "delete thing with invalid credentials", + id: math.MaxUint64, + key: "", + err: things.ErrUnauthorizedAccess, + event: nil, + }, + } + + lastID := "0" + for _, tc := range cases { + err := svc.RemoveThing(tc.key, tc.id) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(&r.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + lastID = msg.ID + } + + assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event)) + } +} + +func TestCreateChannel(t *testing.T) { + redisClient.FlushAll().Err() + + svc := newService(map[string]string{token: email}) + svc = redis.NewEventStoreMiddleware(svc, redisClient) + + cases := []struct { + desc string + channel things.Channel + key string + err error + event map[string]interface{} + }{ + { + desc: "create channel successfully", + channel: things.Channel{Name: "a"}, + key: token, + err: nil, + event: map[string]interface{}{ + "id": "1", + "name": "a", + "owner": email, + "operation": channelCreate, + }, + }, + { + desc: "create channel with invalid credentials", + channel: things.Channel{Name: "a"}, + key: "", + err: things.ErrUnauthorizedAccess, + event: nil, + }, + } + + lastID := "0" + for _, tc := range cases { + _, err := svc.CreateChannel(tc.key, tc.channel) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(&r.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + lastID = msg.ID + } + + assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event)) + } +} + +func TestUpdateChannel(t *testing.T) { + redisClient.FlushAll().Err() + + svc := newService(map[string]string{token: email}) + // Create channel without sending event. + sch, err := svc.CreateChannel(token, things.Channel{Name: "a"}) + require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) + + svc = redis.NewEventStoreMiddleware(svc, redisClient) + + cases := []struct { + desc string + channel things.Channel + key string + err error + event map[string]interface{} + }{ + { + desc: "update channel successfully", + channel: things.Channel{ID: sch.ID, Name: "b"}, + key: token, + err: nil, + event: map[string]interface{}{ + "id": strconv.FormatUint(sch.ID, 10), + "name": "b", + "operation": channelUpdate, + }, + }, + { + desc: "create non-existent channel", + channel: things.Channel{ID: math.MaxUint64, Name: "c"}, + key: token, + err: things.ErrNotFound, + event: nil, + }, + } + + lastID := "0" + for _, tc := range cases { + err := svc.UpdateChannel(tc.key, tc.channel) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(&r.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + lastID = msg.ID + } + + assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event)) + } +} + +func TestRemoveChannel(t *testing.T) { + redisClient.FlushAll().Err() + + svc := newService(map[string]string{token: email}) + // Create channel without sending event. + sch, err := svc.CreateChannel(token, things.Channel{Name: "a"}) + require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) + + svc = redis.NewEventStoreMiddleware(svc, redisClient) + + cases := []struct { + desc string + id uint64 + key string + err error + event map[string]interface{} + }{ + { + desc: "update channel successfully", + id: sch.ID, + key: token, + err: nil, + event: map[string]interface{}{ + "id": strconv.FormatUint(sch.ID, 10), + "operation": channelRemove, + }, + }, + { + desc: "create non-existent channel", + id: math.MaxUint64, + key: "", + err: things.ErrUnauthorizedAccess, + event: nil, + }, + } + + lastID := "0" + for _, tc := range cases { + err := svc.RemoveChannel(tc.key, tc.id) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(&r.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + lastID = msg.ID + } + + assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event)) + } +} + +func TestConnectEvent(t *testing.T) { + redisClient.FlushAll().Err() + + svc := newService(map[string]string{token: email}) + // Create thing and channel that will be connected. + sth, err := svc.AddThing(token, things.Thing{Type: "device", Name: "a"}) + require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) + sch, err := svc.CreateChannel(token, things.Channel{Name: "a"}) + require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) + + svc = redis.NewEventStoreMiddleware(svc, redisClient) + + cases := []struct { + desc string + thingID uint64 + chanID uint64 + key string + err error + event map[string]interface{} + }{ + { + desc: "connect existing thing to existing channel", + thingID: sth.ID, + chanID: sch.ID, + key: token, + err: nil, + event: map[string]interface{}{ + "chan_id": strconv.FormatUint(sch.ID, 10), + "thing_id": strconv.FormatUint(sth.ID, 10), + "operation": thingConnect, + }, + }, + { + desc: "connect non-existent thing to channel", + thingID: math.MaxUint64, + chanID: sch.ID, + key: token, + err: things.ErrNotFound, + event: nil, + }, + } + + lastID := "0" + for _, tc := range cases { + err := svc.Connect(tc.key, tc.chanID, tc.thingID) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(&r.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + lastID = msg.ID + } + + assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event)) + } +} + +func TestDisconnectEvent(t *testing.T) { + redisClient.FlushAll().Err() + + svc := newService(map[string]string{token: email}) + // Create thing and channel that will be connected. + sth, err := svc.AddThing(token, things.Thing{Type: "device", Name: "a"}) + require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) + sch, err := svc.CreateChannel(token, things.Channel{Name: "a"}) + require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) + err = svc.Connect(token, sch.ID, sth.ID) + require.Nil(t, err, fmt.Sprintf("unexpected error %s", err)) + + svc = redis.NewEventStoreMiddleware(svc, redisClient) + + cases := []struct { + desc string + thingID uint64 + chanID uint64 + key string + err error + event map[string]interface{} + }{ + { + desc: "disconnect thing from channel", + thingID: sth.ID, + chanID: sch.ID, + key: token, + err: nil, + event: map[string]interface{}{ + "chan_id": strconv.FormatUint(sch.ID, 10), + "thing_id": strconv.FormatUint(sth.ID, 10), + "operation": thingDisconnect, + }, + }, + { + desc: "disconnect non-existent thing from channel", + thingID: math.MaxUint64, + chanID: sch.ID, + key: token, + err: things.ErrNotFound, + event: nil, + }, + } + + lastID := "0" + for _, tc := range cases { + err := svc.Disconnect(tc.key, tc.chanID, tc.thingID) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(&r.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + lastID = msg.ID + } + + assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event)) + } +} diff --git a/things/redis/things_test.go b/things/redis/things_test.go index 187527df..69fd1395 100644 --- a/things/redis/things_test.go +++ b/things/redis/things_test.go @@ -19,8 +19,7 @@ import ( ) func TestThingSave(t *testing.T) { - - thingCache := redis.NewThingCache(cacheClient) + thingCache := redis.NewThingCache(redisClient) key := uuid.New().ID() id := uint64(123) id2 := uint64(124) @@ -56,7 +55,7 @@ func TestThingSave(t *testing.T) { } func TestThingID(t *testing.T) { - thingCache := redis.NewThingCache(cacheClient) + thingCache := redis.NewThingCache(redisClient) key := uuid.New().ID() id := uint64(123) @@ -88,7 +87,7 @@ func TestThingID(t *testing.T) { } func TestThingRemove(t *testing.T) { - thingCache := redis.NewThingCache(cacheClient) + thingCache := redis.NewThingCache(redisClient) key := uuid.New().ID() id := uint64(123)