mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-27 13:48:49 +08:00
MF-447 - Add event sourcing to things service (#460)
* Add event sourcing middleware for things service Add event sourcing for write operations over things and channels. Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com> * Add tests for event sourcing middleware Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com> * Update things in tests to contain metadata field Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>
This commit is contained in:
parent
8ddd78c818
commit
a680fa35d2
@ -209,6 +209,7 @@ func newService(conn *grpc.ClientConn, db *sql.DB, client *redis.Client, logger
|
||||
idp := uuid.New()
|
||||
|
||||
svc := things.New(users, thingsRepo, channelsRepo, chanCache, thingCache, idp)
|
||||
svc = rediscache.NewEventStoreMiddleware(svc, client)
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
svc = api.MetricsMiddleware(
|
||||
svc,
|
||||
|
@ -17,7 +17,7 @@ import (
|
||||
)
|
||||
|
||||
func TestConnect(t *testing.T) {
|
||||
channelCache := redis.NewChannelCache(cacheClient)
|
||||
channelCache := redis.NewChannelCache(redisClient)
|
||||
|
||||
cid := uint64(123)
|
||||
tid := uint64(321)
|
||||
@ -45,7 +45,7 @@ func TestConnect(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHasThing(t *testing.T) {
|
||||
channelCache := redis.NewChannelCache(cacheClient)
|
||||
channelCache := redis.NewChannelCache(redisClient)
|
||||
|
||||
cid := uint64(123)
|
||||
tid := uint64(321)
|
||||
@ -81,7 +81,7 @@ func TestHasThing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
func TestDisconnect(t *testing.T) {
|
||||
channelCache := redis.NewChannelCache(cacheClient)
|
||||
channelCache := redis.NewChannelCache(redisClient)
|
||||
|
||||
cid := uint64(123)
|
||||
tid := uint64(321)
|
||||
@ -119,7 +119,7 @@ func TestDisconnect(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRemove(t *testing.T) {
|
||||
channelCache := redis.NewChannelCache(cacheClient)
|
||||
channelCache := redis.NewChannelCache(redisClient)
|
||||
|
||||
cid := uint64(123)
|
||||
cid2 := uint64(124)
|
||||
|
163
things/redis/events.go
Normal file
163
things/redis/events.go
Normal file
@ -0,0 +1,163 @@
|
||||
package redis
|
||||
|
||||
const (
|
||||
thingPrefix = "thing."
|
||||
thingCreate = thingPrefix + "create"
|
||||
thingUpdate = thingPrefix + "update"
|
||||
thingRemove = thingPrefix + "remove"
|
||||
thingConnect = thingPrefix + "connect"
|
||||
thingDisconnect = thingPrefix + "disconnect"
|
||||
|
||||
channelPrefix = "channel."
|
||||
channelCreate = channelPrefix + "create"
|
||||
channelUpdate = channelPrefix + "update"
|
||||
channelRemove = channelPrefix + "remove"
|
||||
)
|
||||
|
||||
type event interface {
|
||||
Encode() map[string]interface{}
|
||||
}
|
||||
|
||||
var (
|
||||
_ event = (*createThingEvent)(nil)
|
||||
_ event = (*updateThingEvent)(nil)
|
||||
_ event = (*removeThingEvent)(nil)
|
||||
_ event = (*createChannelEvent)(nil)
|
||||
_ event = (*updateChannelEvent)(nil)
|
||||
_ event = (*removeChannelEvent)(nil)
|
||||
_ event = (*connectThingEvent)(nil)
|
||||
_ event = (*disconnectThingEvent)(nil)
|
||||
)
|
||||
|
||||
type createThingEvent struct {
|
||||
id string
|
||||
owner string
|
||||
kind string
|
||||
name string
|
||||
metadata string
|
||||
}
|
||||
|
||||
func (cte createThingEvent) Encode() map[string]interface{} {
|
||||
val := map[string]interface{}{
|
||||
"id": cte.id,
|
||||
"owner": cte.owner,
|
||||
"type": cte.kind,
|
||||
"operation": thingCreate,
|
||||
}
|
||||
|
||||
if cte.name != "" {
|
||||
val["name"] = cte.name
|
||||
}
|
||||
|
||||
if cte.metadata != "" {
|
||||
val["metadata"] = cte.metadata
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
type updateThingEvent struct {
|
||||
id string
|
||||
kind string
|
||||
name string
|
||||
metadata string
|
||||
}
|
||||
|
||||
func (ute updateThingEvent) Encode() map[string]interface{} {
|
||||
val := map[string]interface{}{
|
||||
"id": ute.id,
|
||||
"type": ute.kind,
|
||||
"operation": thingUpdate,
|
||||
}
|
||||
|
||||
if ute.name != "" {
|
||||
val["name"] = ute.name
|
||||
}
|
||||
|
||||
if ute.metadata != "" {
|
||||
val["metadata"] = ute.metadata
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
type removeThingEvent struct {
|
||||
id string
|
||||
}
|
||||
|
||||
func (rte removeThingEvent) Encode() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"id": rte.id,
|
||||
"operation": thingRemove,
|
||||
}
|
||||
}
|
||||
|
||||
type createChannelEvent struct {
|
||||
id string
|
||||
owner string
|
||||
name string
|
||||
}
|
||||
|
||||
func (cce createChannelEvent) Encode() map[string]interface{} {
|
||||
val := map[string]interface{}{
|
||||
"id": cce.id,
|
||||
"owner": cce.owner,
|
||||
"operation": channelCreate,
|
||||
}
|
||||
|
||||
if cce.name != "" {
|
||||
val["name"] = cce.name
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
type updateChannelEvent struct {
|
||||
id string
|
||||
name string
|
||||
}
|
||||
|
||||
func (uce updateChannelEvent) Encode() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"id": uce.id,
|
||||
"name": uce.name,
|
||||
"operation": channelUpdate,
|
||||
}
|
||||
}
|
||||
|
||||
type removeChannelEvent struct {
|
||||
id string
|
||||
}
|
||||
|
||||
func (rce removeChannelEvent) Encode() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"id": rce.id,
|
||||
"operation": channelRemove,
|
||||
}
|
||||
}
|
||||
|
||||
type connectThingEvent struct {
|
||||
chanID string
|
||||
thingID string
|
||||
}
|
||||
|
||||
func (cte connectThingEvent) Encode() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"chan_id": cte.chanID,
|
||||
"thing_id": cte.thingID,
|
||||
"operation": thingConnect,
|
||||
}
|
||||
}
|
||||
|
||||
type disconnectThingEvent struct {
|
||||
chanID string
|
||||
thingID string
|
||||
}
|
||||
|
||||
func (dte disconnectThingEvent) Encode() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"chan_id": dte.chanID,
|
||||
"thing_id": dte.thingID,
|
||||
"operation": thingDisconnect,
|
||||
}
|
||||
}
|
@ -22,7 +22,7 @@ const (
|
||||
wrongValue = "wrong-value"
|
||||
)
|
||||
|
||||
var cacheClient *redis.Client
|
||||
var redisClient *redis.Client
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
pool, err := dockertest.NewPool("")
|
||||
@ -30,27 +30,28 @@ func TestMain(m *testing.M) {
|
||||
log.Fatalf("Could not connect to docker: %s", err)
|
||||
}
|
||||
|
||||
container, err := pool.Run("redis", "4.0.9-alpine", nil)
|
||||
container, err := pool.Run("redis", "5.0-alpine", nil)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not start container: %s", err)
|
||||
}
|
||||
|
||||
// When you're done, kill and remove the container
|
||||
defer pool.Purge(container)
|
||||
|
||||
if err := pool.Retry(func() error {
|
||||
cacheClient = redis.NewClient(&redis.Options{
|
||||
redisClient = redis.NewClient(&redis.Options{
|
||||
Addr: fmt.Sprintf("localhost:%s", container.GetPort("6379/tcp")),
|
||||
Password: "",
|
||||
DB: 0,
|
||||
})
|
||||
|
||||
return cacheClient.Ping().Err()
|
||||
return redisClient.Ping().Err()
|
||||
}); err != nil {
|
||||
log.Fatalf("Could not connect to docker: %s", err)
|
||||
}
|
||||
|
||||
code := m.Run()
|
||||
|
||||
if err := pool.Purge(container); err != nil {
|
||||
log.Fatalf("Could not purge container: %s", err)
|
||||
}
|
||||
|
||||
os.Exit(code)
|
||||
}
|
||||
|
218
things/redis/streams.go
Normal file
218
things/redis/streams.go
Normal file
@ -0,0 +1,218 @@
|
||||
//
|
||||
// Copyright (c) 2018
|
||||
// Mainflux
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package redis
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/mainflux/mainflux/things"
|
||||
)
|
||||
|
||||
const (
|
||||
streamID = "mainflux.things"
|
||||
streamLen = 1000
|
||||
)
|
||||
|
||||
var _ things.Service = (*eventStore)(nil)
|
||||
|
||||
type eventStore struct {
|
||||
svc things.Service
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
// NewEventStoreMiddleware returns wrapper around things service that sends
|
||||
// events to event store.
|
||||
func NewEventStoreMiddleware(svc things.Service, client *redis.Client) things.Service {
|
||||
return eventStore{
|
||||
svc: svc,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (es eventStore) AddThing(key string, thing things.Thing) (things.Thing, error) {
|
||||
sth, err := es.svc.AddThing(key, thing)
|
||||
if err != nil {
|
||||
return sth, err
|
||||
}
|
||||
|
||||
event := createThingEvent{
|
||||
id: strconv.FormatUint(sth.ID, 10),
|
||||
owner: sth.Owner,
|
||||
kind: sth.Type,
|
||||
name: sth.Name,
|
||||
metadata: sth.Metadata,
|
||||
}
|
||||
record := &redis.XAddArgs{
|
||||
Stream: streamID,
|
||||
MaxLenApprox: streamLen,
|
||||
Values: event.Encode(),
|
||||
}
|
||||
es.client.XAdd(record).Err()
|
||||
|
||||
return sth, err
|
||||
}
|
||||
|
||||
func (es eventStore) UpdateThing(key string, thing things.Thing) error {
|
||||
if err := es.svc.UpdateThing(key, thing); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := updateThingEvent{
|
||||
id: strconv.FormatUint(thing.ID, 10),
|
||||
kind: thing.Type,
|
||||
name: thing.Name,
|
||||
metadata: thing.Metadata,
|
||||
}
|
||||
record := &redis.XAddArgs{
|
||||
Stream: streamID,
|
||||
MaxLenApprox: streamLen,
|
||||
Values: event.Encode(),
|
||||
}
|
||||
es.client.XAdd(record).Err()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es eventStore) ViewThing(key string, id uint64) (things.Thing, error) {
|
||||
return es.svc.ViewThing(key, id)
|
||||
}
|
||||
|
||||
func (es eventStore) ListThings(key string, offset, limit uint64) ([]things.Thing, error) {
|
||||
return es.svc.ListThings(key, offset, limit)
|
||||
}
|
||||
|
||||
func (es eventStore) RemoveThing(key string, id uint64) error {
|
||||
if err := es.svc.RemoveThing(key, id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := removeThingEvent{
|
||||
id: strconv.FormatUint(id, 10),
|
||||
}
|
||||
record := &redis.XAddArgs{
|
||||
Stream: streamID,
|
||||
MaxLenApprox: streamLen,
|
||||
Values: event.Encode(),
|
||||
}
|
||||
es.client.XAdd(record).Err()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es eventStore) CreateChannel(key string, channel things.Channel) (things.Channel, error) {
|
||||
sch, err := es.svc.CreateChannel(key, channel)
|
||||
if err != nil {
|
||||
return sch, err
|
||||
}
|
||||
|
||||
event := createChannelEvent{
|
||||
id: strconv.FormatUint(sch.ID, 10),
|
||||
owner: sch.Owner,
|
||||
name: sch.Name,
|
||||
}
|
||||
record := &redis.XAddArgs{
|
||||
Stream: streamID,
|
||||
MaxLenApprox: streamLen,
|
||||
Values: event.Encode(),
|
||||
}
|
||||
es.client.XAdd(record).Err()
|
||||
|
||||
return sch, err
|
||||
}
|
||||
|
||||
func (es eventStore) UpdateChannel(key string, channel things.Channel) error {
|
||||
if err := es.svc.UpdateChannel(key, channel); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := updateChannelEvent{
|
||||
id: strconv.FormatUint(channel.ID, 10),
|
||||
name: channel.Name,
|
||||
}
|
||||
record := &redis.XAddArgs{
|
||||
Stream: streamID,
|
||||
MaxLenApprox: streamLen,
|
||||
Values: event.Encode(),
|
||||
}
|
||||
es.client.XAdd(record).Err()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es eventStore) ViewChannel(key string, id uint64) (things.Channel, error) {
|
||||
return es.svc.ViewChannel(key, id)
|
||||
}
|
||||
|
||||
func (es eventStore) ListChannels(key string, offset, limit uint64) ([]things.Channel, error) {
|
||||
return es.svc.ListChannels(key, offset, limit)
|
||||
}
|
||||
|
||||
func (es eventStore) RemoveChannel(key string, id uint64) error {
|
||||
if err := es.svc.RemoveChannel(key, id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := removeChannelEvent{
|
||||
id: strconv.FormatUint(id, 10),
|
||||
}
|
||||
record := &redis.XAddArgs{
|
||||
Stream: streamID,
|
||||
MaxLenApprox: streamLen,
|
||||
Values: event.Encode(),
|
||||
}
|
||||
es.client.XAdd(record).Err()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es eventStore) Connect(key string, chanID, thingID uint64) error {
|
||||
if err := es.svc.Connect(key, chanID, thingID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := connectThingEvent{
|
||||
chanID: strconv.FormatUint(chanID, 10),
|
||||
thingID: strconv.FormatUint(thingID, 10),
|
||||
}
|
||||
record := &redis.XAddArgs{
|
||||
Stream: streamID,
|
||||
MaxLenApprox: streamLen,
|
||||
Values: event.Encode(),
|
||||
}
|
||||
es.client.XAdd(record).Err()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es eventStore) Disconnect(key string, chanID, thingID uint64) error {
|
||||
if err := es.svc.Disconnect(key, chanID, thingID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := disconnectThingEvent{
|
||||
chanID: strconv.FormatUint(chanID, 10),
|
||||
thingID: strconv.FormatUint(thingID, 10),
|
||||
}
|
||||
record := &redis.XAddArgs{
|
||||
Stream: streamID,
|
||||
MaxLenApprox: streamLen,
|
||||
Values: event.Encode(),
|
||||
}
|
||||
es.client.XAdd(record).Err()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es eventStore) CanAccess(chanID uint64, key string) (uint64, error) {
|
||||
return es.svc.CanAccess(chanID, key)
|
||||
}
|
||||
|
||||
func (es eventStore) Identify(key string) (uint64, error) {
|
||||
return es.svc.Identify(key)
|
||||
}
|
522
things/redis/streams_test.go
Normal file
522
things/redis/streams_test.go
Normal file
@ -0,0 +1,522 @@
|
||||
//
|
||||
// Copyright (c) 2018
|
||||
// Mainflux
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package redis_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
r "github.com/go-redis/redis"
|
||||
"github.com/mainflux/mainflux/things"
|
||||
"github.com/mainflux/mainflux/things/mocks"
|
||||
"github.com/mainflux/mainflux/things/redis"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
streamID = "mainflux.things"
|
||||
email = "user@example.com"
|
||||
token = "token"
|
||||
thingPrefix = "thing."
|
||||
thingCreate = thingPrefix + "create"
|
||||
thingUpdate = thingPrefix + "update"
|
||||
thingRemove = thingPrefix + "remove"
|
||||
thingConnect = thingPrefix + "connect"
|
||||
thingDisconnect = thingPrefix + "disconnect"
|
||||
|
||||
channelPrefix = "channel."
|
||||
channelCreate = channelPrefix + "create"
|
||||
channelUpdate = channelPrefix + "update"
|
||||
channelRemove = channelPrefix + "remove"
|
||||
)
|
||||
|
||||
func newService(tokens map[string]string) things.Service {
|
||||
users := mocks.NewUsersService(tokens)
|
||||
thingsRepo := mocks.NewThingRepository()
|
||||
channelsRepo := mocks.NewChannelRepository(thingsRepo)
|
||||
chanCache := mocks.NewChannelCache()
|
||||
thingCache := mocks.NewThingCache()
|
||||
idp := mocks.NewIdentityProvider()
|
||||
|
||||
return things.New(users, thingsRepo, channelsRepo, chanCache, thingCache, idp)
|
||||
}
|
||||
|
||||
func TestAddThing(t *testing.T) {
|
||||
redisClient.FlushAll().Err()
|
||||
|
||||
svc := newService(map[string]string{token: email})
|
||||
svc = redis.NewEventStoreMiddleware(svc, redisClient)
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
thing things.Thing
|
||||
key string
|
||||
err error
|
||||
event map[string]interface{}
|
||||
}{
|
||||
{
|
||||
desc: "create thing successfully",
|
||||
thing: things.Thing{Type: "app", Name: "a", Metadata: "metadata"},
|
||||
key: token,
|
||||
err: nil,
|
||||
event: map[string]interface{}{
|
||||
"id": "1",
|
||||
"name": "a",
|
||||
"owner": email,
|
||||
"type": "app",
|
||||
"metadata": "metadata",
|
||||
"operation": thingCreate,
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "create invalid thing",
|
||||
thing: things.Thing{Type: "a", Name: "a"},
|
||||
key: token,
|
||||
err: things.ErrMalformedEntity,
|
||||
event: nil,
|
||||
},
|
||||
}
|
||||
|
||||
lastID := "0"
|
||||
for _, tc := range cases {
|
||||
_, err := svc.AddThing(tc.key, tc.thing)
|
||||
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
|
||||
|
||||
streams := redisClient.XRead(&r.XReadArgs{
|
||||
Streams: []string{streamID, lastID},
|
||||
Count: 1,
|
||||
}).Val()
|
||||
|
||||
var event map[string]interface{}
|
||||
if len(streams) > 0 && len(streams[0].Messages) > 0 {
|
||||
msg := streams[0].Messages[0]
|
||||
event = msg.Values
|
||||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateThing(t *testing.T) {
|
||||
redisClient.FlushAll().Err()
|
||||
|
||||
svc := newService(map[string]string{token: email})
|
||||
// Create thing without sending event.
|
||||
sth, err := svc.AddThing(token, things.Thing{Type: "app", Name: "a", Metadata: "metadata"})
|
||||
require.Nil(t, err, fmt.Sprintf("unexpected error %s", err))
|
||||
|
||||
svc = redis.NewEventStoreMiddleware(svc, redisClient)
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
thing things.Thing
|
||||
key string
|
||||
err error
|
||||
event map[string]interface{}
|
||||
}{
|
||||
{
|
||||
desc: "update existing thing successfully",
|
||||
thing: things.Thing{ID: sth.ID, Type: "app", Name: "a", Metadata: "metadata1"},
|
||||
key: token,
|
||||
err: nil,
|
||||
event: map[string]interface{}{
|
||||
"id": strconv.FormatUint(sth.ID, 10),
|
||||
"name": "a",
|
||||
"type": "app",
|
||||
"metadata": "metadata1",
|
||||
"operation": thingUpdate,
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "update invalid thing",
|
||||
thing: things.Thing{ID: math.MaxUint64, Type: "a", Name: "a"},
|
||||
key: token,
|
||||
err: things.ErrMalformedEntity,
|
||||
event: nil,
|
||||
},
|
||||
}
|
||||
|
||||
lastID := "0"
|
||||
for _, tc := range cases {
|
||||
err := svc.UpdateThing(tc.key, tc.thing)
|
||||
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
|
||||
|
||||
streams := redisClient.XRead(&r.XReadArgs{
|
||||
Streams: []string{streamID, lastID},
|
||||
Count: 1,
|
||||
}).Val()
|
||||
|
||||
var event map[string]interface{}
|
||||
if len(streams) > 0 && len(streams[0].Messages) > 0 {
|
||||
msg := streams[0].Messages[0]
|
||||
event = msg.Values
|
||||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveThing(t *testing.T) {
|
||||
redisClient.FlushAll().Err()
|
||||
|
||||
svc := newService(map[string]string{token: email})
|
||||
// Create thing without sending event.
|
||||
sth, err := svc.AddThing(token, things.Thing{Type: "app", Name: "a"})
|
||||
require.Nil(t, err, fmt.Sprintf("unexpected error %s", err))
|
||||
|
||||
svc = redis.NewEventStoreMiddleware(svc, redisClient)
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
id uint64
|
||||
key string
|
||||
err error
|
||||
event map[string]interface{}
|
||||
}{
|
||||
{
|
||||
desc: "delete existing thing successfully",
|
||||
id: sth.ID,
|
||||
key: token,
|
||||
err: nil,
|
||||
event: map[string]interface{}{
|
||||
"id": strconv.FormatUint(sth.ID, 10),
|
||||
"operation": thingRemove,
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "delete thing with invalid credentials",
|
||||
id: math.MaxUint64,
|
||||
key: "",
|
||||
err: things.ErrUnauthorizedAccess,
|
||||
event: nil,
|
||||
},
|
||||
}
|
||||
|
||||
lastID := "0"
|
||||
for _, tc := range cases {
|
||||
err := svc.RemoveThing(tc.key, tc.id)
|
||||
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
|
||||
|
||||
streams := redisClient.XRead(&r.XReadArgs{
|
||||
Streams: []string{streamID, lastID},
|
||||
Count: 1,
|
||||
}).Val()
|
||||
|
||||
var event map[string]interface{}
|
||||
if len(streams) > 0 && len(streams[0].Messages) > 0 {
|
||||
msg := streams[0].Messages[0]
|
||||
event = msg.Values
|
||||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateChannel(t *testing.T) {
|
||||
redisClient.FlushAll().Err()
|
||||
|
||||
svc := newService(map[string]string{token: email})
|
||||
svc = redis.NewEventStoreMiddleware(svc, redisClient)
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
channel things.Channel
|
||||
key string
|
||||
err error
|
||||
event map[string]interface{}
|
||||
}{
|
||||
{
|
||||
desc: "create channel successfully",
|
||||
channel: things.Channel{Name: "a"},
|
||||
key: token,
|
||||
err: nil,
|
||||
event: map[string]interface{}{
|
||||
"id": "1",
|
||||
"name": "a",
|
||||
"owner": email,
|
||||
"operation": channelCreate,
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "create channel with invalid credentials",
|
||||
channel: things.Channel{Name: "a"},
|
||||
key: "",
|
||||
err: things.ErrUnauthorizedAccess,
|
||||
event: nil,
|
||||
},
|
||||
}
|
||||
|
||||
lastID := "0"
|
||||
for _, tc := range cases {
|
||||
_, err := svc.CreateChannel(tc.key, tc.channel)
|
||||
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
|
||||
|
||||
streams := redisClient.XRead(&r.XReadArgs{
|
||||
Streams: []string{streamID, lastID},
|
||||
Count: 1,
|
||||
}).Val()
|
||||
|
||||
var event map[string]interface{}
|
||||
if len(streams) > 0 && len(streams[0].Messages) > 0 {
|
||||
msg := streams[0].Messages[0]
|
||||
event = msg.Values
|
||||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateChannel(t *testing.T) {
|
||||
redisClient.FlushAll().Err()
|
||||
|
||||
svc := newService(map[string]string{token: email})
|
||||
// Create channel without sending event.
|
||||
sch, err := svc.CreateChannel(token, things.Channel{Name: "a"})
|
||||
require.Nil(t, err, fmt.Sprintf("unexpected error %s", err))
|
||||
|
||||
svc = redis.NewEventStoreMiddleware(svc, redisClient)
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
channel things.Channel
|
||||
key string
|
||||
err error
|
||||
event map[string]interface{}
|
||||
}{
|
||||
{
|
||||
desc: "update channel successfully",
|
||||
channel: things.Channel{ID: sch.ID, Name: "b"},
|
||||
key: token,
|
||||
err: nil,
|
||||
event: map[string]interface{}{
|
||||
"id": strconv.FormatUint(sch.ID, 10),
|
||||
"name": "b",
|
||||
"operation": channelUpdate,
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "create non-existent channel",
|
||||
channel: things.Channel{ID: math.MaxUint64, Name: "c"},
|
||||
key: token,
|
||||
err: things.ErrNotFound,
|
||||
event: nil,
|
||||
},
|
||||
}
|
||||
|
||||
lastID := "0"
|
||||
for _, tc := range cases {
|
||||
err := svc.UpdateChannel(tc.key, tc.channel)
|
||||
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
|
||||
|
||||
streams := redisClient.XRead(&r.XReadArgs{
|
||||
Streams: []string{streamID, lastID},
|
||||
Count: 1,
|
||||
}).Val()
|
||||
|
||||
var event map[string]interface{}
|
||||
if len(streams) > 0 && len(streams[0].Messages) > 0 {
|
||||
msg := streams[0].Messages[0]
|
||||
event = msg.Values
|
||||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveChannel(t *testing.T) {
|
||||
redisClient.FlushAll().Err()
|
||||
|
||||
svc := newService(map[string]string{token: email})
|
||||
// Create channel without sending event.
|
||||
sch, err := svc.CreateChannel(token, things.Channel{Name: "a"})
|
||||
require.Nil(t, err, fmt.Sprintf("unexpected error %s", err))
|
||||
|
||||
svc = redis.NewEventStoreMiddleware(svc, redisClient)
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
id uint64
|
||||
key string
|
||||
err error
|
||||
event map[string]interface{}
|
||||
}{
|
||||
{
|
||||
desc: "update channel successfully",
|
||||
id: sch.ID,
|
||||
key: token,
|
||||
err: nil,
|
||||
event: map[string]interface{}{
|
||||
"id": strconv.FormatUint(sch.ID, 10),
|
||||
"operation": channelRemove,
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "create non-existent channel",
|
||||
id: math.MaxUint64,
|
||||
key: "",
|
||||
err: things.ErrUnauthorizedAccess,
|
||||
event: nil,
|
||||
},
|
||||
}
|
||||
|
||||
lastID := "0"
|
||||
for _, tc := range cases {
|
||||
err := svc.RemoveChannel(tc.key, tc.id)
|
||||
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
|
||||
|
||||
streams := redisClient.XRead(&r.XReadArgs{
|
||||
Streams: []string{streamID, lastID},
|
||||
Count: 1,
|
||||
}).Val()
|
||||
|
||||
var event map[string]interface{}
|
||||
if len(streams) > 0 && len(streams[0].Messages) > 0 {
|
||||
msg := streams[0].Messages[0]
|
||||
event = msg.Values
|
||||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnectEvent(t *testing.T) {
|
||||
redisClient.FlushAll().Err()
|
||||
|
||||
svc := newService(map[string]string{token: email})
|
||||
// Create thing and channel that will be connected.
|
||||
sth, err := svc.AddThing(token, things.Thing{Type: "device", Name: "a"})
|
||||
require.Nil(t, err, fmt.Sprintf("unexpected error %s", err))
|
||||
sch, err := svc.CreateChannel(token, things.Channel{Name: "a"})
|
||||
require.Nil(t, err, fmt.Sprintf("unexpected error %s", err))
|
||||
|
||||
svc = redis.NewEventStoreMiddleware(svc, redisClient)
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
thingID uint64
|
||||
chanID uint64
|
||||
key string
|
||||
err error
|
||||
event map[string]interface{}
|
||||
}{
|
||||
{
|
||||
desc: "connect existing thing to existing channel",
|
||||
thingID: sth.ID,
|
||||
chanID: sch.ID,
|
||||
key: token,
|
||||
err: nil,
|
||||
event: map[string]interface{}{
|
||||
"chan_id": strconv.FormatUint(sch.ID, 10),
|
||||
"thing_id": strconv.FormatUint(sth.ID, 10),
|
||||
"operation": thingConnect,
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "connect non-existent thing to channel",
|
||||
thingID: math.MaxUint64,
|
||||
chanID: sch.ID,
|
||||
key: token,
|
||||
err: things.ErrNotFound,
|
||||
event: nil,
|
||||
},
|
||||
}
|
||||
|
||||
lastID := "0"
|
||||
for _, tc := range cases {
|
||||
err := svc.Connect(tc.key, tc.chanID, tc.thingID)
|
||||
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
|
||||
|
||||
streams := redisClient.XRead(&r.XReadArgs{
|
||||
Streams: []string{streamID, lastID},
|
||||
Count: 1,
|
||||
}).Val()
|
||||
|
||||
var event map[string]interface{}
|
||||
if len(streams) > 0 && len(streams[0].Messages) > 0 {
|
||||
msg := streams[0].Messages[0]
|
||||
event = msg.Values
|
||||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDisconnectEvent(t *testing.T) {
|
||||
redisClient.FlushAll().Err()
|
||||
|
||||
svc := newService(map[string]string{token: email})
|
||||
// Create thing and channel that will be connected.
|
||||
sth, err := svc.AddThing(token, things.Thing{Type: "device", Name: "a"})
|
||||
require.Nil(t, err, fmt.Sprintf("unexpected error %s", err))
|
||||
sch, err := svc.CreateChannel(token, things.Channel{Name: "a"})
|
||||
require.Nil(t, err, fmt.Sprintf("unexpected error %s", err))
|
||||
err = svc.Connect(token, sch.ID, sth.ID)
|
||||
require.Nil(t, err, fmt.Sprintf("unexpected error %s", err))
|
||||
|
||||
svc = redis.NewEventStoreMiddleware(svc, redisClient)
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
thingID uint64
|
||||
chanID uint64
|
||||
key string
|
||||
err error
|
||||
event map[string]interface{}
|
||||
}{
|
||||
{
|
||||
desc: "disconnect thing from channel",
|
||||
thingID: sth.ID,
|
||||
chanID: sch.ID,
|
||||
key: token,
|
||||
err: nil,
|
||||
event: map[string]interface{}{
|
||||
"chan_id": strconv.FormatUint(sch.ID, 10),
|
||||
"thing_id": strconv.FormatUint(sth.ID, 10),
|
||||
"operation": thingDisconnect,
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "disconnect non-existent thing from channel",
|
||||
thingID: math.MaxUint64,
|
||||
chanID: sch.ID,
|
||||
key: token,
|
||||
err: things.ErrNotFound,
|
||||
event: nil,
|
||||
},
|
||||
}
|
||||
|
||||
lastID := "0"
|
||||
for _, tc := range cases {
|
||||
err := svc.Disconnect(tc.key, tc.chanID, tc.thingID)
|
||||
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
|
||||
|
||||
streams := redisClient.XRead(&r.XReadArgs{
|
||||
Streams: []string{streamID, lastID},
|
||||
Count: 1,
|
||||
}).Val()
|
||||
|
||||
var event map[string]interface{}
|
||||
if len(streams) > 0 && len(streams[0].Messages) > 0 {
|
||||
msg := streams[0].Messages[0]
|
||||
event = msg.Values
|
||||
lastID = msg.ID
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.event, event, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.event, event))
|
||||
}
|
||||
}
|
@ -19,8 +19,7 @@ import (
|
||||
)
|
||||
|
||||
func TestThingSave(t *testing.T) {
|
||||
|
||||
thingCache := redis.NewThingCache(cacheClient)
|
||||
thingCache := redis.NewThingCache(redisClient)
|
||||
key := uuid.New().ID()
|
||||
id := uint64(123)
|
||||
id2 := uint64(124)
|
||||
@ -56,7 +55,7 @@ func TestThingSave(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestThingID(t *testing.T) {
|
||||
thingCache := redis.NewThingCache(cacheClient)
|
||||
thingCache := redis.NewThingCache(redisClient)
|
||||
|
||||
key := uuid.New().ID()
|
||||
id := uint64(123)
|
||||
@ -88,7 +87,7 @@ func TestThingID(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestThingRemove(t *testing.T) {
|
||||
thingCache := redis.NewThingCache(cacheClient)
|
||||
thingCache := redis.NewThingCache(redisClient)
|
||||
|
||||
key := uuid.New().ID()
|
||||
id := uint64(123)
|
||||
|
Loading…
x
Reference in New Issue
Block a user