1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00

NOISSUE - Add subtopic wildcard for twin attribute's definition (#1214)

* Add wildcard to attribute subtopic

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add MF_TWINS_SUBTOPIC_WILDCARD env var

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Remove configurable wildcard env var and mqtt notif leftovers

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add mongodb RetrieveByAttribute tests

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add redis wildcard subtopic IDs retrieval

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>

* Add tests for wildcard state save

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>
This commit is contained in:
Darko Draskovic 2020-07-09 12:18:19 +02:00 committed by GitHub
parent 09d09c6ef5
commit 381ebb1e51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 137 additions and 64 deletions

View File

@ -26,11 +26,10 @@ default values.
| MF_TWINS_SINGLE_USER_TOKEN | User token for single user mode that should be passed in auth header | |
| MF_TWINS_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
| MF_TWINS_CA_CERTS | Path to trusted CAs in PEM format | |
| MF_TWINS_MQTT_URL | Mqtt broker URL for twin CRUD and states update notifications | tcp://localhost:1883 |
| MF_TWINS_CHANNEL_ID | Mqtt notifications topic | |
| MF_TWINS_CHANNEL_ID | NATS notifications channel ID | |
| MF_NATS_URL | Mainflux NATS broker URL | nats://localhost:4222 |
| MF_AUTHN_GRPC_URL | AuthN service gRPC URL | localhost:8181 |
| MF_AUTHN_GRPC_TIMEOUT | AuthN service gRPC request timeout in seconds | 1s |
| MF_AUTHN_GRPC_TIMEOUT | AuthN service gRPC request timeout in seconds | 1s |
| MF_TWINS_CACHE_URL | Cache database URL | localhost:6379 |
| MF_TWINS_CACHE_PASS | Cache database password | |
| MF_TWINS_CACHE_DB | Cache instance name | 0 |
@ -63,8 +62,7 @@ services:
MF_TWINS_SINGLE_USER_TOKEN: [User token for single user mode]
MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on]
MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format]
MF_TWINS_MQTT_URL: [Mqtt broker URL for twin CRUD and states]
MF_TWINS_CHANNEL_ID: [Mqtt notifications topic]
MF_TWINS_CHANNEL_ID: [NATS notifications channel ID]
MF_NATS_URL: [Mainflux NATS broker URL]
MF_AUTHN_GRPC_URL: [AuthN service gRPC URL]
MF_AUTHN_GRPC_TIMEOUT: [AuthN service gRPC request timeout in seconds]
@ -100,8 +98,7 @@ MF_TWINS_SINGLE_USER_EMAIL: [User email for single user mode] \
MF_TWINS_SINGLE_USER_TOKEN: [User token for single user mode] \
MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on] \
MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format] \
MF_TWINS_MQTT_URL: [Mqtt broker URL for twin CRUD and states] \
MF_TWINS_CHANNEL_ID: [Mqtt notifications topic] \
MF_TWINS_CHANNEL_ID: [NATS notifications channel ID] \
MF_NATS_URL: [Mainflux NATS broker URL] \
MF_AUTHN_GRPC_URL: [AuthN service gRPC URL] \
MF_AUTHN_GRPC_TIMEOUT: [AuthN service gRPC request timeout in seconds] \
@ -118,7 +115,7 @@ stands for the crud operation done on twin - create, update, delete or
retrieve - or state - save state. In order to use twin service notifications,
one must inform it - via environment variables - about the Mainflux channel used
for notification publishing. You must use an already existing channel, since you
cannot know in advance or set the channel id (Mainflux does it automatically).
cannot know in advance or set the channel ID (Mainflux does it automatically).
To set the environment variable, please go to `.env` file and set the following
variable:

View File

@ -2,6 +2,7 @@ package mocks
import (
"encoding/json"
"strconv"
"time"
"github.com/mainflux/mainflux/pkg/messaging"
@ -12,6 +13,8 @@ import (
const publisher = "twins"
var id = 0
// NewService use mock dependencies to create real twins service
func NewService(tokens map[string]string) twins.Service {
auth := NewAuthNServiceClient(tokens)
@ -38,6 +41,15 @@ func CreateDefinition(channels []string, subtopics []string) twins.Definition {
return def
}
// CreateTwin creates twin
func CreateTwin(channels []string, subtopics []string) twins.Twin {
id++
return twins.Twin{
ID: strconv.Itoa(id),
Definitions: []twins.Definition{CreateDefinition(channels, subtopics)},
}
}
// CreateSenML creates SenML record array
func CreateSenML(n int, recs []senml.Record) {
for i, rec := range recs {

View File

@ -55,12 +55,11 @@ func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset uint64,
srm.mu.Lock()
defer srm.mu.Unlock()
items := make([]twins.State, 0)
if limit <= 0 {
return twins.StatesPage{}, nil
}
var items []twins.State
for k, v := range srm.states {
if (uint64)(len(items)) >= limit {
break
@ -78,11 +77,10 @@ func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset uint64,
return items[i].ID < items[j].ID
})
total := uint64(len(srm.states))
page := twins.StatesPage{
States: items,
PageMetadata: twins.PageMetadata{
Total: total,
Total: srm.total(twinID),
Offset: offset,
Limit: limit,
},
@ -91,6 +89,16 @@ func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset uint64,
return page, nil
}
func (srm *stateRepositoryMock) total(twinID string) uint64 {
var total uint64
for k := range srm.states {
if strings.HasPrefix(k, twinID) {
total++
}
}
return total
}
// RetrieveLast returns the last state related to twin spec by id
func (srm *stateRepositoryMock) RetrieveLast(ctx context.Context, twinID string) (twins.State, error) {
srm.mu.Lock()

View File

@ -75,17 +75,13 @@ func (trm *twinRepositoryMock) RetrieveByAttribute(ctx context.Context, channel,
for _, twin := range trm.twins {
def := twin.Definitions[len(twin.Definitions)-1]
for _, attr := range def.Attributes {
if attr.Channel == channel && attr.Subtopic == subtopic {
if attr.Channel == channel && (attr.Subtopic == twins.SubtopicWildcard || attr.Subtopic == subtopic) {
ids = append(ids, twin.ID)
break
}
}
}
if len(ids) > 0 {
return ids, nil
}
return ids, twins.ErrNotFound
return ids, nil
}
func (trm *twinRepositoryMock) RetrieveAll(_ context.Context, owner string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.Page, error) {
@ -215,12 +211,10 @@ func (tcm *twinCacheMock) IDs(_ context.Context, channel, subtopic string) ([]st
var ids []string
idsMap, ok := tcm.attrIds[channel+subtopic]
if !ok {
return ids, nil
for k := range tcm.attrIds[channel+subtopic] {
ids = append(ids, k)
}
for k := range idsMap {
for k := range tcm.attrIds[channel+twins.SubtopicWildcard] {
ids = append(ids, k)
}

View File

@ -18,7 +18,8 @@ const (
)
type twinRepository struct {
db *mongo.Database
db *mongo.Database
subtopicWildcard string
}
var _ twins.TwinRepository = (*twinRepository)(nil)
@ -92,8 +93,11 @@ func (tr *twinRepository) RetrieveByAttribute(ctx context.Context, channel, subt
}
match := bson.M{
"$match": bson.M{
"definition.channel": channel,
"definition.subtopic": subtopic,
"definition.channel": channel,
"$or": []interface{}{
bson.M{"definition.subtopic": subtopic},
bson.M{"definition.subtopic": twins.SubtopicWildcard},
},
},
}
prj2 := bson.M{

View File

@ -11,9 +11,10 @@ import (
"testing"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/mongodb"
uuidProvider "github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/mocks"
"github.com/mainflux/mainflux/twins/mongodb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
@ -28,6 +29,7 @@ const (
collection = "twins"
email = "mfx_twin@example.com"
validName = "mfx_twin"
subtopic = "engine"
)
var (
@ -185,6 +187,55 @@ func TestTwinsRetrieveByID(t *testing.T) {
}
}
func TestTwinsRetrieveByAttribute(t *testing.T) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
repo := mongodb.NewTwinRepository(db)
chID, err := uuid.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
empty := mocks.CreateTwin([]string{chID}, []string{""})
_, err = repo.Save(context.Background(), empty)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
wildcard := mocks.CreateTwin([]string{chID}, []string{twins.SubtopicWildcard})
_, err = repo.Save(context.Background(), wildcard)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
nonEmpty := mocks.CreateTwin([]string{chID}, []string{subtopic})
_, err = repo.Save(context.Background(), nonEmpty)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
cases := []struct {
desc string
subtopic string
ids []string
}{
{
desc: "retrieve empty subtopic",
subtopic: "",
ids: []string{wildcard.ID, empty.ID},
},
{
desc: "retrieve wildcard subtopic",
subtopic: twins.SubtopicWildcard,
ids: []string{wildcard.ID},
},
{
desc: "retrieve non-empty subtopic",
subtopic: subtopic,
ids: []string{wildcard.ID, nonEmpty.ID},
},
}
for _, tc := range cases {
ids, err := repo.RetrieveByAttribute(context.Background(), chID, tc.subtopic)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
assert.ElementsMatch(t, ids, tc.ids, fmt.Sprintf("%s: expected ids %v do not match received ids %v", tc.desc, tc.ids, ids))
}
}
func TestTwinsRetrieveAll(t *testing.T) {
email := "twin-multi-retrieval@example.com"
name := "mainflux"

View File

@ -74,6 +74,11 @@ func (tc *twinCache) IDs(_ context.Context, channel, subtopic string) ([]string,
if err != nil {
return nil, errors.Wrap(ErrRedisTwinIDs, err)
}
idsWildcard, err := tc.client.SMembers(attrKey(channel, twins.SubtopicWildcard)).Result()
if err != nil {
return nil, errors.Wrap(ErrRedisTwinIDs, err)
}
ids = append(ids, idsWildcard...)
return ids, nil
}

View File

@ -8,7 +8,6 @@ import (
"fmt"
"testing"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/twins"
"github.com/mainflux/mainflux/twins/mocks"
"github.com/mainflux/mainflux/twins/redis"
@ -25,11 +24,8 @@ func TestTwinSave(t *testing.T) {
redisClient.FlushAll()
twinCache := redis.NewTwinCache(redisClient)
twin1, err := createTwin(channels[0:2], subtopics[0:2])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
twin2, err := createTwin(channels[1:3], subtopics[1:3])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
twin1 := mocks.CreateTwin(channels[0:2], subtopics[0:2])
twin2 := mocks.CreateTwin(channels[1:3], subtopics[1:3])
cases := []struct {
desc string
@ -133,8 +129,7 @@ func TestTwinUpdate(t *testing.T) {
var tws []twins.Twin
for i := range channels {
tw, err := createTwin(channels[i:i+1], subtopics[i:i+1])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
tw := mocks.CreateTwin(channels[i:i+1], subtopics[i:i+1])
tws = append(tws, tw)
}
err := twinCache.Save(ctx, tws[0])
@ -185,23 +180,27 @@ func TestTwinIDs(t *testing.T) {
var tws []twins.Twin
for i := 0; i < len(channels); i++ {
tw, err := createTwin(channels[0:1], subtopics[0:1])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
err = twinCache.Save(ctx, tw)
tw := mocks.CreateTwin(channels[0:1], subtopics[0:1])
err := twinCache.Save(ctx, tw)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
tws = append(tws, tw)
}
for i := 0; i < len(channels); i++ {
tw, err := createTwin(channels[1:2], subtopics[1:2])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
err = twinCache.Save(ctx, tw)
tw := mocks.CreateTwin(channels[1:2], subtopics[1:2])
err := twinCache.Save(ctx, tw)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
tws = append(tws, tw)
}
twEmptySubt := mocks.CreateTwin(channels[0:1], []string{""})
err := twinCache.Save(ctx, twEmptySubt)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
twSubtWild := mocks.CreateTwin(channels[0:1], []string{twins.SubtopicWildcard})
err = twinCache.Save(ctx, twSubtWild)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
nonExistAttr := twins.Attribute{
Channel: channels[2],
Subtopic: subtopics[0],
Subtopic: subtopics[2],
PersistState: true,
}
@ -211,9 +210,15 @@ func TestTwinIDs(t *testing.T) {
attr twins.Attribute
err error
}{
{
desc: "Get twin IDs from cache for empty subtopic attribute",
ids: []string{twEmptySubt.ID, twSubtWild.ID},
attr: twEmptySubt.Definitions[0].Attributes[0],
err: nil,
},
{
desc: "Get twin IDs from cache for subset of ids",
ids: []string{tws[0].ID, tws[1].ID, tws[2].ID},
ids: []string{tws[0].ID, tws[1].ID, tws[2].ID, twSubtWild.ID},
attr: tws[0].Definitions[0].Attributes[0],
err: nil,
},
@ -245,9 +250,8 @@ func TestTwinRemove(t *testing.T) {
var tws []twins.Twin
for i := range channels {
tw, err := createTwin(channels[i:i+1], subtopics[i:i+1])
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
err = twinCache.Save(ctx, tw)
tw := mocks.CreateTwin(channels[i:i+1], subtopics[i:i+1])
err := twinCache.Save(ctx, tw)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
tws = append(tws, tw)
}
@ -286,14 +290,3 @@ func TestTwinRemove(t *testing.T) {
}
}
}
func createTwin(channels []string, subtopics []string) (twins.Twin, error) {
id, err := uuid.New().ID()
if err != nil {
return twins.Twin{}, err
}
return twins.Twin{
ID: id,
Definitions: []twins.Definition{mocks.CreateDefinition(channels, subtopics)},
}, nil
}

View File

@ -70,8 +70,9 @@ const (
noop = iota
update
save
millisec = 1e6
nanosec = 1e9
millisec = 1e6
nanosec = 1e9
SubtopicWildcard = ">"
)
var crudOp = map[string]string{
@ -314,7 +315,7 @@ func (ts *twinsService) saveState(msg *messaging.Message, twinID string) error {
}
for _, rec := range recs {
action := prepareState(&st, &tw, rec, msg)
action := ts.prepareState(&st, &tw, rec, msg)
switch action {
case noop:
return nil
@ -335,7 +336,7 @@ func (ts *twinsService) saveState(msg *messaging.Message, twinID string) error {
return nil
}
func prepareState(st *State, tw *Twin, rec senml.Record, msg *messaging.Message) int {
func (ts *twinsService) prepareState(st *State, tw *Twin, rec senml.Record, msg *messaging.Message) int {
def := tw.Definitions[len(tw.Definitions)-1]
st.TwinID = tw.ID
st.Definition = def.ID
@ -362,7 +363,7 @@ func prepareState(st *State, tw *Twin, rec senml.Record, msg *messaging.Message)
if !attr.PersistState {
continue
}
if attr.Channel == msg.Channel && attr.Subtopic == msg.Subtopic {
if attr.Channel == msg.Channel && (attr.Subtopic == SubtopicWildcard || attr.Subtopic == msg.Subtopic) {
action = update
delta := math.Abs(float64(st.Created.UnixNano()) - recNano)
if recNano == 0 || delta > float64(def.Delta) {

View File

@ -253,6 +253,10 @@ func TestSaveStates(t *testing.T) {
tw, err := svc.AddTwin(context.Background(), token, twin, def)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
defWildcard := mocks.CreateDefinition(channels[0:2], []string{twins.SubtopicWildcard, twins.SubtopicWildcard})
twWildcard, err := svc.AddTwin(context.Background(), token, twin, defWildcard)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
var recs = make([]senml.Record, numRecs)
mocks.CreateSenML(numRecs, recs)
@ -300,12 +304,16 @@ func TestSaveStates(t *testing.T) {
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
err = svc.SaveStates(message)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
ttlAdded += tc.size
page, err := svc.ListStates(context.TODO(), token, 0, 10, tw.ID)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
assert.Equal(t, ttlAdded, page.Total, fmt.Sprintf("%s: expected %d total got %d total\n", tc.desc, ttlAdded, page.Total))
page, err = svc.ListStates(context.TODO(), token, 0, 10, twWildcard.ID)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
assert.Equal(t, ttlAdded, page.Total, fmt.Sprintf("%s: expected %d total got %d total\n", tc.desc, ttlAdded, page.Total))
}
}