From 381ebb1e517789359007e500e038f48afaf5b1c3 Mon Sep 17 00:00:00 2001 From: Darko Draskovic Date: Thu, 9 Jul 2020 12:18:19 +0200 Subject: [PATCH] NOISSUE - Add subtopic wildcard for twin attribute's definition (#1214) * Add wildcard to attribute subtopic Signed-off-by: Darko Draskovic * Add MF_TWINS_SUBTOPIC_WILDCARD env var Signed-off-by: Darko Draskovic * Remove configurable wildcard env var and mqtt notif leftovers Signed-off-by: Darko Draskovic * Add mongodb RetrieveByAttribute tests Signed-off-by: Darko Draskovic * Add redis wildcard subtopic IDs retrieval Signed-off-by: Darko Draskovic * Add tests for wildcard state save Signed-off-by: Darko Draskovic --- twins/README.md | 13 ++++----- twins/mocks/service.go | 12 ++++++++ twins/mocks/states.go | 16 ++++++++--- twins/mocks/twins.go | 16 ++++------- twins/mongodb/twins.go | 10 +++++-- twins/mongodb/twins_test.go | 55 +++++++++++++++++++++++++++++++++++-- twins/redis/twins.go | 5 ++++ twins/redis/twins_test.go | 53 ++++++++++++++++------------------- twins/service.go | 11 ++++---- twins/service_test.go | 10 ++++++- 10 files changed, 137 insertions(+), 64 deletions(-) diff --git a/twins/README.md b/twins/README.md index 7179a436..1065b8bb 100644 --- a/twins/README.md +++ b/twins/README.md @@ -26,11 +26,10 @@ default values. | MF_TWINS_SINGLE_USER_TOKEN | User token for single user mode that should be passed in auth header | | | MF_TWINS_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | | MF_TWINS_CA_CERTS | Path to trusted CAs in PEM format | | -| MF_TWINS_MQTT_URL | Mqtt broker URL for twin CRUD and states update notifications | tcp://localhost:1883 | -| MF_TWINS_CHANNEL_ID | Mqtt notifications topic | | +| MF_TWINS_CHANNEL_ID | NATS notifications channel ID | | | MF_NATS_URL | Mainflux NATS broker URL | nats://localhost:4222 | | MF_AUTHN_GRPC_URL | AuthN service gRPC URL | localhost:8181 | -| MF_AUTHN_GRPC_TIMEOUT | AuthN service gRPC request timeout in seconds | 1s | +| MF_AUTHN_GRPC_TIMEOUT | AuthN service gRPC request timeout in seconds | 1s | | MF_TWINS_CACHE_URL | Cache database URL | localhost:6379 | | MF_TWINS_CACHE_PASS | Cache database password | | | MF_TWINS_CACHE_DB | Cache instance name | 0 | @@ -63,8 +62,7 @@ services: MF_TWINS_SINGLE_USER_TOKEN: [User token for single user mode] MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on] MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format] - MF_TWINS_MQTT_URL: [Mqtt broker URL for twin CRUD and states] - MF_TWINS_CHANNEL_ID: [Mqtt notifications topic] + MF_TWINS_CHANNEL_ID: [NATS notifications channel ID] MF_NATS_URL: [Mainflux NATS broker URL] MF_AUTHN_GRPC_URL: [AuthN service gRPC URL] MF_AUTHN_GRPC_TIMEOUT: [AuthN service gRPC request timeout in seconds] @@ -100,8 +98,7 @@ MF_TWINS_SINGLE_USER_EMAIL: [User email for single user mode] \ MF_TWINS_SINGLE_USER_TOKEN: [User token for single user mode] \ MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on] \ MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format] \ -MF_TWINS_MQTT_URL: [Mqtt broker URL for twin CRUD and states] \ -MF_TWINS_CHANNEL_ID: [Mqtt notifications topic] \ +MF_TWINS_CHANNEL_ID: [NATS notifications channel ID] \ MF_NATS_URL: [Mainflux NATS broker URL] \ MF_AUTHN_GRPC_URL: [AuthN service gRPC URL] \ MF_AUTHN_GRPC_TIMEOUT: [AuthN service gRPC request timeout in seconds] \ @@ -118,7 +115,7 @@ stands for the crud operation done on twin - create, update, delete or retrieve - or state - save state. In order to use twin service notifications, one must inform it - via environment variables - about the Mainflux channel used for notification publishing. You must use an already existing channel, since you -cannot know in advance or set the channel id (Mainflux does it automatically). +cannot know in advance or set the channel ID (Mainflux does it automatically). To set the environment variable, please go to `.env` file and set the following variable: diff --git a/twins/mocks/service.go b/twins/mocks/service.go index b28f8299..9c25afe2 100644 --- a/twins/mocks/service.go +++ b/twins/mocks/service.go @@ -2,6 +2,7 @@ package mocks import ( "encoding/json" + "strconv" "time" "github.com/mainflux/mainflux/pkg/messaging" @@ -12,6 +13,8 @@ import ( const publisher = "twins" +var id = 0 + // NewService use mock dependencies to create real twins service func NewService(tokens map[string]string) twins.Service { auth := NewAuthNServiceClient(tokens) @@ -38,6 +41,15 @@ func CreateDefinition(channels []string, subtopics []string) twins.Definition { return def } +// CreateTwin creates twin +func CreateTwin(channels []string, subtopics []string) twins.Twin { + id++ + return twins.Twin{ + ID: strconv.Itoa(id), + Definitions: []twins.Definition{CreateDefinition(channels, subtopics)}, + } +} + // CreateSenML creates SenML record array func CreateSenML(n int, recs []senml.Record) { for i, rec := range recs { diff --git a/twins/mocks/states.go b/twins/mocks/states.go index 3497741c..6e25298a 100644 --- a/twins/mocks/states.go +++ b/twins/mocks/states.go @@ -55,12 +55,11 @@ func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset uint64, srm.mu.Lock() defer srm.mu.Unlock() - items := make([]twins.State, 0) - if limit <= 0 { return twins.StatesPage{}, nil } + var items []twins.State for k, v := range srm.states { if (uint64)(len(items)) >= limit { break @@ -78,11 +77,10 @@ func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset uint64, return items[i].ID < items[j].ID }) - total := uint64(len(srm.states)) page := twins.StatesPage{ States: items, PageMetadata: twins.PageMetadata{ - Total: total, + Total: srm.total(twinID), Offset: offset, Limit: limit, }, @@ -91,6 +89,16 @@ func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset uint64, return page, nil } +func (srm *stateRepositoryMock) total(twinID string) uint64 { + var total uint64 + for k := range srm.states { + if strings.HasPrefix(k, twinID) { + total++ + } + } + return total +} + // RetrieveLast returns the last state related to twin spec by id func (srm *stateRepositoryMock) RetrieveLast(ctx context.Context, twinID string) (twins.State, error) { srm.mu.Lock() diff --git a/twins/mocks/twins.go b/twins/mocks/twins.go index d365948a..2675a368 100644 --- a/twins/mocks/twins.go +++ b/twins/mocks/twins.go @@ -75,17 +75,13 @@ func (trm *twinRepositoryMock) RetrieveByAttribute(ctx context.Context, channel, for _, twin := range trm.twins { def := twin.Definitions[len(twin.Definitions)-1] for _, attr := range def.Attributes { - if attr.Channel == channel && attr.Subtopic == subtopic { + if attr.Channel == channel && (attr.Subtopic == twins.SubtopicWildcard || attr.Subtopic == subtopic) { ids = append(ids, twin.ID) break } } } - - if len(ids) > 0 { - return ids, nil - } - return ids, twins.ErrNotFound + return ids, nil } func (trm *twinRepositoryMock) RetrieveAll(_ context.Context, owner string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.Page, error) { @@ -215,12 +211,10 @@ func (tcm *twinCacheMock) IDs(_ context.Context, channel, subtopic string) ([]st var ids []string - idsMap, ok := tcm.attrIds[channel+subtopic] - if !ok { - return ids, nil + for k := range tcm.attrIds[channel+subtopic] { + ids = append(ids, k) } - - for k := range idsMap { + for k := range tcm.attrIds[channel+twins.SubtopicWildcard] { ids = append(ids, k) } diff --git a/twins/mongodb/twins.go b/twins/mongodb/twins.go index 5d2b4dd1..2b57cb8b 100644 --- a/twins/mongodb/twins.go +++ b/twins/mongodb/twins.go @@ -18,7 +18,8 @@ const ( ) type twinRepository struct { - db *mongo.Database + db *mongo.Database + subtopicWildcard string } var _ twins.TwinRepository = (*twinRepository)(nil) @@ -92,8 +93,11 @@ func (tr *twinRepository) RetrieveByAttribute(ctx context.Context, channel, subt } match := bson.M{ "$match": bson.M{ - "definition.channel": channel, - "definition.subtopic": subtopic, + "definition.channel": channel, + "$or": []interface{}{ + bson.M{"definition.subtopic": subtopic}, + bson.M{"definition.subtopic": twins.SubtopicWildcard}, + }, }, } prj2 := bson.M{ diff --git a/twins/mongodb/twins_test.go b/twins/mongodb/twins_test.go index c4c0d144..3ba90fa0 100644 --- a/twins/mongodb/twins_test.go +++ b/twins/mongodb/twins_test.go @@ -11,9 +11,10 @@ import ( "testing" log "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/twins" - "github.com/mainflux/mainflux/twins/mongodb" uuidProvider "github.com/mainflux/mainflux/pkg/uuid" + "github.com/mainflux/mainflux/twins" + "github.com/mainflux/mainflux/twins/mocks" + "github.com/mainflux/mainflux/twins/mongodb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" @@ -28,6 +29,7 @@ const ( collection = "twins" email = "mfx_twin@example.com" validName = "mfx_twin" + subtopic = "engine" ) var ( @@ -185,6 +187,55 @@ func TestTwinsRetrieveByID(t *testing.T) { } } +func TestTwinsRetrieveByAttribute(t *testing.T) { + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) + require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) + + db := client.Database(testDB) + repo := mongodb.NewTwinRepository(db) + + chID, err := uuid.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + empty := mocks.CreateTwin([]string{chID}, []string{""}) + _, err = repo.Save(context.Background(), empty) + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + wildcard := mocks.CreateTwin([]string{chID}, []string{twins.SubtopicWildcard}) + _, err = repo.Save(context.Background(), wildcard) + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + nonEmpty := mocks.CreateTwin([]string{chID}, []string{subtopic}) + _, err = repo.Save(context.Background(), nonEmpty) + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + cases := []struct { + desc string + subtopic string + ids []string + }{ + { + desc: "retrieve empty subtopic", + subtopic: "", + ids: []string{wildcard.ID, empty.ID}, + }, + { + desc: "retrieve wildcard subtopic", + subtopic: twins.SubtopicWildcard, + ids: []string{wildcard.ID}, + }, + { + desc: "retrieve non-empty subtopic", + subtopic: subtopic, + ids: []string{wildcard.ID, nonEmpty.ID}, + }, + } + + for _, tc := range cases { + ids, err := repo.RetrieveByAttribute(context.Background(), chID, tc.subtopic) + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + assert.ElementsMatch(t, ids, tc.ids, fmt.Sprintf("%s: expected ids %v do not match received ids %v", tc.desc, tc.ids, ids)) + } +} + func TestTwinsRetrieveAll(t *testing.T) { email := "twin-multi-retrieval@example.com" name := "mainflux" diff --git a/twins/redis/twins.go b/twins/redis/twins.go index a080b3f0..931915f0 100644 --- a/twins/redis/twins.go +++ b/twins/redis/twins.go @@ -74,6 +74,11 @@ func (tc *twinCache) IDs(_ context.Context, channel, subtopic string) ([]string, if err != nil { return nil, errors.Wrap(ErrRedisTwinIDs, err) } + idsWildcard, err := tc.client.SMembers(attrKey(channel, twins.SubtopicWildcard)).Result() + if err != nil { + return nil, errors.Wrap(ErrRedisTwinIDs, err) + } + ids = append(ids, idsWildcard...) return ids, nil } diff --git a/twins/redis/twins_test.go b/twins/redis/twins_test.go index bf0a7611..0714a87b 100644 --- a/twins/redis/twins_test.go +++ b/twins/redis/twins_test.go @@ -8,7 +8,6 @@ import ( "fmt" "testing" - "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/twins" "github.com/mainflux/mainflux/twins/mocks" "github.com/mainflux/mainflux/twins/redis" @@ -25,11 +24,8 @@ func TestTwinSave(t *testing.T) { redisClient.FlushAll() twinCache := redis.NewTwinCache(redisClient) - twin1, err := createTwin(channels[0:2], subtopics[0:2]) - require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - - twin2, err := createTwin(channels[1:3], subtopics[1:3]) - require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + twin1 := mocks.CreateTwin(channels[0:2], subtopics[0:2]) + twin2 := mocks.CreateTwin(channels[1:3], subtopics[1:3]) cases := []struct { desc string @@ -133,8 +129,7 @@ func TestTwinUpdate(t *testing.T) { var tws []twins.Twin for i := range channels { - tw, err := createTwin(channels[i:i+1], subtopics[i:i+1]) - require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + tw := mocks.CreateTwin(channels[i:i+1], subtopics[i:i+1]) tws = append(tws, tw) } err := twinCache.Save(ctx, tws[0]) @@ -185,23 +180,27 @@ func TestTwinIDs(t *testing.T) { var tws []twins.Twin for i := 0; i < len(channels); i++ { - tw, err := createTwin(channels[0:1], subtopics[0:1]) - require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - err = twinCache.Save(ctx, tw) + tw := mocks.CreateTwin(channels[0:1], subtopics[0:1]) + err := twinCache.Save(ctx, tw) require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) tws = append(tws, tw) } for i := 0; i < len(channels); i++ { - tw, err := createTwin(channels[1:2], subtopics[1:2]) - require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - err = twinCache.Save(ctx, tw) + tw := mocks.CreateTwin(channels[1:2], subtopics[1:2]) + err := twinCache.Save(ctx, tw) require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) tws = append(tws, tw) } + twEmptySubt := mocks.CreateTwin(channels[0:1], []string{""}) + err := twinCache.Save(ctx, twEmptySubt) + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + twSubtWild := mocks.CreateTwin(channels[0:1], []string{twins.SubtopicWildcard}) + err = twinCache.Save(ctx, twSubtWild) + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) nonExistAttr := twins.Attribute{ Channel: channels[2], - Subtopic: subtopics[0], + Subtopic: subtopics[2], PersistState: true, } @@ -211,9 +210,15 @@ func TestTwinIDs(t *testing.T) { attr twins.Attribute err error }{ + { + desc: "Get twin IDs from cache for empty subtopic attribute", + ids: []string{twEmptySubt.ID, twSubtWild.ID}, + attr: twEmptySubt.Definitions[0].Attributes[0], + err: nil, + }, { desc: "Get twin IDs from cache for subset of ids", - ids: []string{tws[0].ID, tws[1].ID, tws[2].ID}, + ids: []string{tws[0].ID, tws[1].ID, tws[2].ID, twSubtWild.ID}, attr: tws[0].Definitions[0].Attributes[0], err: nil, }, @@ -245,9 +250,8 @@ func TestTwinRemove(t *testing.T) { var tws []twins.Twin for i := range channels { - tw, err := createTwin(channels[i:i+1], subtopics[i:i+1]) - require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - err = twinCache.Save(ctx, tw) + tw := mocks.CreateTwin(channels[i:i+1], subtopics[i:i+1]) + err := twinCache.Save(ctx, tw) require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) tws = append(tws, tw) } @@ -286,14 +290,3 @@ func TestTwinRemove(t *testing.T) { } } } - -func createTwin(channels []string, subtopics []string) (twins.Twin, error) { - id, err := uuid.New().ID() - if err != nil { - return twins.Twin{}, err - } - return twins.Twin{ - ID: id, - Definitions: []twins.Definition{mocks.CreateDefinition(channels, subtopics)}, - }, nil -} diff --git a/twins/service.go b/twins/service.go index c311aa7a..fa4aed86 100644 --- a/twins/service.go +++ b/twins/service.go @@ -70,8 +70,9 @@ const ( noop = iota update save - millisec = 1e6 - nanosec = 1e9 + millisec = 1e6 + nanosec = 1e9 + SubtopicWildcard = ">" ) var crudOp = map[string]string{ @@ -314,7 +315,7 @@ func (ts *twinsService) saveState(msg *messaging.Message, twinID string) error { } for _, rec := range recs { - action := prepareState(&st, &tw, rec, msg) + action := ts.prepareState(&st, &tw, rec, msg) switch action { case noop: return nil @@ -335,7 +336,7 @@ func (ts *twinsService) saveState(msg *messaging.Message, twinID string) error { return nil } -func prepareState(st *State, tw *Twin, rec senml.Record, msg *messaging.Message) int { +func (ts *twinsService) prepareState(st *State, tw *Twin, rec senml.Record, msg *messaging.Message) int { def := tw.Definitions[len(tw.Definitions)-1] st.TwinID = tw.ID st.Definition = def.ID @@ -362,7 +363,7 @@ func prepareState(st *State, tw *Twin, rec senml.Record, msg *messaging.Message) if !attr.PersistState { continue } - if attr.Channel == msg.Channel && attr.Subtopic == msg.Subtopic { + if attr.Channel == msg.Channel && (attr.Subtopic == SubtopicWildcard || attr.Subtopic == msg.Subtopic) { action = update delta := math.Abs(float64(st.Created.UnixNano()) - recNano) if recNano == 0 || delta > float64(def.Delta) { diff --git a/twins/service_test.go b/twins/service_test.go index e354dfff..45750177 100644 --- a/twins/service_test.go +++ b/twins/service_test.go @@ -253,6 +253,10 @@ func TestSaveStates(t *testing.T) { tw, err := svc.AddTwin(context.Background(), token, twin, def) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + defWildcard := mocks.CreateDefinition(channels[0:2], []string{twins.SubtopicWildcard, twins.SubtopicWildcard}) + twWildcard, err := svc.AddTwin(context.Background(), token, twin, defWildcard) + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + var recs = make([]senml.Record, numRecs) mocks.CreateSenML(numRecs, recs) @@ -300,12 +304,16 @@ func TestSaveStates(t *testing.T) { require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) err = svc.SaveStates(message) - assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) ttlAdded += tc.size page, err := svc.ListStates(context.TODO(), token, 0, 10, tw.ID) require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) assert.Equal(t, ttlAdded, page.Total, fmt.Sprintf("%s: expected %d total got %d total\n", tc.desc, ttlAdded, page.Total)) + + page, err = svc.ListStates(context.TODO(), token, 0, 10, twWildcard.ID) + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + assert.Equal(t, ttlAdded, page.Total, fmt.Sprintf("%s: expected %d total got %d total\n", tc.desc, ttlAdded, page.Total)) } }