1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-24 13:48:49 +08:00

MF-1288 - Add tests for JSON messages in message writers and readers (#1407)

* Add JSON Writer tests

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add Posgres Reader JSON tests

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add ID comment

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add MongoDB Reader tests for JSON

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Rename test message

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add tests for InfluxDB JSON messages

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Use test file for constants block

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Rename MongoDB imports

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add Cassandra reader JSON tests

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Simplify test payload

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Handle wrong format uniformly across Readers

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
This commit is contained in:
Dušan Borovčanin 2021-04-20 21:33:26 +02:00 committed by GitHub
parent e02e9c2387
commit e04d94ecc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 971 additions and 48 deletions

View File

@ -8,7 +8,9 @@ import (
"testing"
"time"
"github.com/gofrs/uuid"
"github.com/mainflux/mainflux/consumers/writers/cassandra"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -31,13 +33,12 @@ var (
sum float64 = 42
)
func TestSave(t *testing.T) {
func TestSaveSenml(t *testing.T) {
session, err := cassandra.Connect(cassandra.DBConfig{
Hosts: []string{addr},
Keyspace: keyspace,
})
require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err))
repo := cassandra.New(session)
now := time.Now().Unix()
msg := senml.Message{
@ -71,3 +72,47 @@ func TestSave(t *testing.T) {
err = repo.Consume(msgs)
assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err))
}
func TestSaveJSON(t *testing.T) {
session, err := cassandra.Connect(cassandra.DBConfig{
Hosts: []string{addr},
Keyspace: keyspace,
})
require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err))
repo := cassandra.New(session)
chid, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
pubid, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
msg := json.Message{
Channel: chid.String(),
Publisher: pubid.String(),
Created: time.Now().Unix(),
Subtopic: "subtopic/format/some_json",
Protocol: "mqtt",
Payload: map[string]interface{}{
"field_1": 123,
"field_2": "value",
"field_3": false,
"field_4": 12.344,
"field_5": map[string]interface{}{
"field_1": "value",
"field_2": 42,
},
},
}
now := time.Now().Unix()
msgs := json.Messages{
Format: "some_json",
}
for i := 0; i < msgsNum; i++ {
msg.Created = now + int64(i)
msgs.Data = append(msgs.Data, msg)
}
err = repo.Consume(msgs)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
}

View File

@ -9,9 +9,11 @@ import (
"testing"
"time"
"github.com/gofrs/uuid"
influxdata "github.com/influxdata/influxdb/client/v2"
writer "github.com/mainflux/mainflux/consumers/writers/influxdb"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -63,7 +65,7 @@ func queryDB(cmd string) ([][]interface{}, error) {
return response.Results[0].Series[0].Values, nil
}
func TestSave(t *testing.T) {
func TestSaveSenml(t *testing.T) {
repo := writer.New(client, testDB)
cases := []struct {
@ -130,3 +132,49 @@ func TestSave(t *testing.T) {
assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count))
}
}
func TestSaveJSON(t *testing.T) {
repo := writer.New(client, testDB)
chid, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
pubid, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
msg := json.Message{
Channel: chid.String(),
Publisher: pubid.String(),
Created: time.Now().Unix(),
Subtopic: "subtopic/format/some_json",
Protocol: "mqtt",
Payload: map[string]interface{}{
"field_1": 123,
"field_2": "value",
"field_3": false,
"field_4": 12.344,
"field_5": map[string]interface{}{
"field_1": "value",
"field_2": 42,
},
},
}
now := time.Now().Unix()
msgs := json.Messages{
Format: "some_json",
}
for i := 0; i < streamsSize; i++ {
msg.Created = now + int64(i)
msgs.Data = append(msgs.Data, msg)
}
err = repo.Consume(msgs)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
row, err := queryDB(selectMsgs)
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err))
count := len(row)
assert.Equal(t, streamsSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", streamsSize, count))
}

View File

@ -10,10 +10,12 @@ import (
"testing"
"time"
"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mainflux/mainflux/consumers/writers/mongodb"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
log "github.com/mainflux/mainflux/logger"
@ -41,7 +43,7 @@ var (
sum float64 = 42
)
func TestSave(t *testing.T) {
func TestSaveSenml(t *testing.T) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
@ -88,3 +90,47 @@ func TestSave(t *testing.T) {
assert.Nil(t, err, fmt.Sprintf("Querying database expected to succeed: %s.\n", err))
assert.Equal(t, int64(msgsNum), count, fmt.Sprintf("Expected to have %d value, found %d instead.\n", msgsNum, count))
}
func TestSaveJSON(t *testing.T) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
repo := mongodb.New(db)
chid, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
pubid, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
msg := json.Message{
Channel: chid.String(),
Publisher: pubid.String(),
Created: time.Now().Unix(),
Subtopic: "subtopic/format/some_json",
Protocol: "mqtt",
Payload: map[string]interface{}{
"field_1": 123,
"field_2": "value",
"field_3": false,
"field_4": 12.344,
"field_5": map[string]interface{}{
"field_1": "value",
"field_2": 42,
},
},
}
now := time.Now().Unix()
msgs := json.Messages{
Format: "some_json",
}
for i := 0; i < msgsNum; i++ {
msg.Created = now + int64(i)
msgs.Data = append(msgs.Data, msg)
}
err = repo.Consume(msgs)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
}

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/mainflux/mainflux/consumers/writers/postgres"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -30,8 +31,8 @@ var (
sum float64 = 42
)
func TestMessageSave(t *testing.T) {
messageRepo := postgres.New(db)
func TestSaveSenml(t *testing.T) {
repo := postgres.New(db)
chid, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
@ -67,6 +68,46 @@ func TestMessageSave(t *testing.T) {
msgs = append(msgs, msg)
}
err = messageRepo.Consume(msgs)
err = repo.Consume(msgs)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
}
func TestSaveJSON(t *testing.T) {
repo := postgres.New(db)
chid, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
pubid, err := uuid.NewV4()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
msg := json.Message{
Channel: chid.String(),
Publisher: pubid.String(),
Created: time.Now().Unix(),
Subtopic: "subtopic/format/some_json",
Protocol: "mqtt",
Payload: map[string]interface{}{
"field_1": 123,
"field_2": "value",
"field_3": false,
"field_4": 12.344,
"field_5": map[string]interface{}{
"field_1": "value",
"field_2": 42,
},
},
}
now := time.Now().Unix()
msgs := json.Messages{
Format: "some_json",
}
for i := 0; i < msgsNum; i++ {
msg.Created = now + int64(i)
msgs.Data = append(msgs.Data, msg)
}
err = repo.Consume(msgs)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
}

View File

@ -20,6 +20,9 @@ const (
format = "format"
// Table for SenML messages
defTable = "messages"
// Error code for Undefined table error.
undefinedTableCode = 8704
)
var _ readers.MessageRepository = (*cassandraRepository)(nil)
@ -79,6 +82,11 @@ func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
&msg.Name, &msg.Unit, &msg.Value, &msg.StringValue, &msg.BoolValue,
&msg.DataValue, &msg.Sum, &msg.Time, &msg.UpdateTime)
if err != nil {
if e, ok := err.(gocql.RequestError); ok {
if e.Code() == undefinedTableCode {
return readers.MessagesPage{}, nil
}
}
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
page.Messages = append(page.Messages, msg)
@ -88,6 +96,11 @@ func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
var msg jsonMessage
err := scanner.Scan(&msg.Channel, &msg.Subtopic, &msg.Publisher, &msg.Protocol, &msg.Created, &msg.Payload)
if err != nil {
if e, ok := err.(gocql.RequestError); ok {
if e.Code() == undefinedTableCode {
return readers.MessagesPage{}, nil
}
}
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
m, err := msg.toMap()
@ -100,6 +113,11 @@ func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
}
if err := cr.session.Query(countCQL, vals[:len(vals)-1]...).Scan(&page.Total); err != nil {
if e, ok := err.(gocql.RequestError); ok {
if e.Code() == undefinedTableCode {
return readers.MessagesPage{}, nil
}
}
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}

View File

@ -8,11 +8,12 @@ import (
"testing"
"time"
writer "github.com/mainflux/mainflux/consumers/writers/cassandra"
cwriter "github.com/mainflux/mainflux/consumers/writers/cassandra"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/readers"
reader "github.com/mainflux/mainflux/readers/cassandra"
creader "github.com/mainflux/mainflux/readers/cassandra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -26,28 +27,31 @@ const (
mqttProt = "mqtt"
httpProt = "http"
msgName = "temperature"
format1 = "format_1"
format2 = "format_2"
wrongID = "0"
)
var (
addr = "localhost"
v float64 = 5
vs = "value"
vb = true
vd = "base64"
sum float64 = 42
addr = "localhost"
v float64 = 5
vs = "value"
vb = true
vd = "base64"
sum float64 = 42
idProvider = uuid.New()
)
func TestReadSenml(t *testing.T) {
session, err := reader.Connect(reader.DBConfig{
session, err := creader.Connect(creader.DBConfig{
Hosts: []string{addr},
Keyspace: keyspace,
})
require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err))
defer session.Close()
writer := writer.New(session)
writer := cwriter.New(session)
chanID, err := idProvider.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
@ -106,7 +110,7 @@ func TestReadSenml(t *testing.T) {
err = writer.Consume(messages)
require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err))
reader := reader.New(session)
reader := creader.New(session)
// Since messages are not saved in natural order,
// cases that return subset of messages are only
@ -183,6 +187,19 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs),
},
},
"read message with wrong format": {
chanID: chanID,
pageMeta: readers.PageMetadata{
Format: "messagess",
Offset: 0,
Limit: uint64(len(queryMsgs)),
Publisher: pubID2,
},
page: readers.MessagesPage{
Total: 0,
Messages: []readers.Message{},
},
},
"read message with protocol": {
chanID: chanID,
pageMeta: readers.PageMetadata{
@ -367,6 +384,155 @@ func TestReadSenml(t *testing.T) {
}
}
func TestReadJSON(t *testing.T) {
session, err := creader.Connect(creader.DBConfig{
Hosts: []string{addr},
Keyspace: keyspace,
})
require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err))
defer session.Close()
writer := cwriter.New(session)
id1, err := idProvider.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
m := json.Message{
Channel: id1,
Publisher: id1,
Created: time.Now().Unix(),
Subtopic: "subtopic/format/some_json",
Protocol: "coap",
Payload: map[string]interface{}{
"field_2": "value",
"field_3": false,
"field_4": 12.344,
"field_5": map[string]interface{}{
"field_1": "value",
"field_2": 42.0,
},
},
}
messages1 := json.Messages{
Format: format1,
}
msgs1 := []map[string]interface{}{}
for i := 0; i < msgsNum; i++ {
msg := m
messages1.Data = append(messages1.Data, msg)
m := toMap(msg)
msgs1 = append(msgs1, m)
}
err = writer.Consume(messages1)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
id2, err := idProvider.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
m = json.Message{
Channel: id2,
Publisher: id2,
Created: time.Now().Unix(),
Subtopic: "subtopic/other_format/some_other_json",
Protocol: "udp",
Payload: map[string]interface{}{
"field_pi": 3.14159265,
"false_value": false,
"field_map": map[string]interface{}{
"field_1": "wrong_value",
"field_2": 24.5,
},
},
}
messages2 := json.Messages{
Format: format2,
}
msgs2 := []map[string]interface{}{}
for i := 0; i < msgsNum; i++ {
msg := m
if i%2 == 0 {
msg.Protocol = httpProt
}
messages2.Data = append(messages2.Data, msg)
m := toMap(msg)
msgs2 = append(msgs2, m)
}
err = writer.Consume(messages2)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
httpMsgs := []map[string]interface{}{}
for i := 0; i < msgsNum; i += 2 {
httpMsgs = append(httpMsgs, msgs2[i])
}
reader := creader.New(session)
cases := map[string]struct {
chanID string
pageMeta readers.PageMetadata
page readers.MessagesPage
}{
"read message page for existing channel": {
chanID: id1,
pageMeta: readers.PageMetadata{
Format: messages1.Format,
Offset: 0,
Limit: 10,
},
page: readers.MessagesPage{
Total: 100,
Messages: fromJSON(msgs1[:10]),
},
},
"read message page for non-existent channel": {
chanID: wrongID,
pageMeta: readers.PageMetadata{
Format: messages1.Format,
Offset: 0,
Limit: 10,
},
page: readers.MessagesPage{
Messages: []readers.Message{},
},
},
"read message last page": {
chanID: id2,
pageMeta: readers.PageMetadata{
Format: messages2.Format,
Offset: msgsNum - 20,
Limit: msgsNum,
},
page: readers.MessagesPage{
Total: msgsNum,
Messages: fromJSON(msgs2[msgsNum-20 : msgsNum]),
},
},
"read message with protocol": {
chanID: id2,
pageMeta: readers.PageMetadata{
Format: messages2.Format,
Offset: 0,
Limit: uint64(msgsNum / 2),
Protocol: httpProt,
},
page: readers.MessagesPage{
Total: uint64(msgsNum / 2),
Messages: fromJSON(httpMsgs),
},
},
}
for desc, tc := range cases {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
for i := 0; i < len(result.Messages); i++ {
m := result.Messages[i]
// Remove id as it is not sent by the client.
delete(m.(map[string]interface{}), "id")
result.Messages[i] = m
}
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
}
}
func fromSenml(in []senml.Message) []readers.Message {
var ret []readers.Message
for _, m := range in {
@ -374,3 +540,22 @@ func fromSenml(in []senml.Message) []readers.Message {
}
return ret
}
func fromJSON(msg []map[string]interface{}) []readers.Message {
var ret []readers.Message
for _, m := range msg {
ret = append(ret, m)
}
return ret
}
func toMap(msg json.Message) map[string]interface{} {
return map[string]interface{}{
"channel": msg.Channel,
"created": msg.Created,
"subtopic": msg.Subtopic,
"publisher": msg.Publisher,
"protocol": msg.Protocol,
"payload": map[string]interface{}(msg.Payload),
}
}

View File

@ -56,6 +56,7 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
var ret []readers.Message
resp, err := repo.client.Query(q)
if err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
@ -69,7 +70,11 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
result := resp.Results[0].Series[0]
for _, v := range result.Values {
ret = append(ret, parseMessage(format, result.Columns, v))
msg, err := parseMessage(format, result.Columns, v)
if err != nil {
return readers.MessagesPage{}, err
}
ret = append(ret, msg)
}
total, err := repo.count(format, condition)
@ -124,7 +129,6 @@ func (repo *influxRepository) count(measurement, condition string) (uint64, erro
if !ok {
return 0, nil
}
return strconv.ParseUint(count.String(), 10, 64)
}
@ -209,10 +213,10 @@ func parseValues(value interface{}, name string, msg *senml.Message) {
}
}
func parseMessage(measurement string, names []string, fields []interface{}) interface{} {
func parseMessage(measurement string, names []string, fields []interface{}) (interface{}, error) {
switch measurement {
case defMeasurement:
return parseSenml(names, fields)
return parseSenml(names, fields), nil
default:
return parseJSON(names, fields)
}
@ -254,11 +258,25 @@ func parseSenml(names []string, fields []interface{}) interface{} {
return m
}
func parseJSON(names []string, fields []interface{}) interface{} {
func parseJSON(names []string, fields []interface{}) (interface{}, error) {
ret := make(map[string]interface{})
pld := make(map[string]interface{})
for i, n := range names {
ret[n] = fields[i]
switch n {
case "channel", "created", "subtopic", "publisher", "protocol", "time":
ret[n] = fields[i]
default:
v := fields[i]
if val, ok := v.(json.Number); ok {
var err error
v, err = val.Float64()
if err != nil {
return nil, err
}
}
pld[n] = v
}
}
return jsont.ParseFlat(ret)
ret["payload"] = jsont.ParseFlat(pld)
return ret, nil
}

View File

@ -7,6 +7,7 @@ import (
influxdata "github.com/influxdata/influxdb/client/v2"
iwriter "github.com/mainflux/mainflux/consumers/writers/influxdb"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/readers"
@ -25,6 +26,10 @@ const (
mqttProt = "mqtt"
httpProt = "http"
msgName = "temperature"
format1 = "format1"
format2 = "format2"
wrongID = "wrong_id"
)
var (
@ -103,7 +108,6 @@ func TestReadAll(t *testing.T) {
require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err))
reader := ireader.New(client, testDB)
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB reader expected to succeed: %s.\n", err))
cases := map[string]struct {
chanID string
@ -177,6 +181,19 @@ func TestReadAll(t *testing.T) {
Messages: fromSenml(queryMsgs),
},
},
"read message with wrong format": {
chanID: chanID,
pageMeta: readers.PageMetadata{
Format: "messagess",
Offset: 0,
Limit: uint64(len(queryMsgs)),
Publisher: pubID2,
},
page: readers.MessagesPage{
Total: 0,
Messages: []readers.Message{},
},
},
"read message with protocol": {
chanID: chanID,
pageMeta: readers.PageMetadata{
@ -357,11 +374,144 @@ func TestReadAll(t *testing.T) {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected: %v, got: %v", desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %d got %d", desc, tc.page.Total, result.Total))
}
}
func TestReadJSON(t *testing.T) {
writer := iwriter.New(client, testDB)
id1, err := idProvider.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
m := json.Message{
Channel: id1,
Publisher: id1,
Created: time.Now().UnixNano(),
Subtopic: "subtopic/format/some_json",
Protocol: "coap",
Payload: map[string]interface{}{
"field_1": 123.0,
"field_2": "value",
"field_3": false,
},
}
messages1 := json.Messages{
Format: format1,
}
msgs1 := []map[string]interface{}{}
for i := 0; i < msgsNum; i++ {
messages1.Data = append(messages1.Data, m)
m := toMap(m)
msgs1 = append(msgs1, m)
}
err = writer.Consume(messages1)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
id2, err := idProvider.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
m = json.Message{
Channel: id2,
Publisher: id2,
Created: time.Now().UnixNano() + msgsNum,
Subtopic: "subtopic/other_format/some_other_json",
Protocol: "udp",
Payload: map[string]interface{}{
"field_pi": 3.14159265,
},
}
messages2 := json.Messages{
Format: format2,
}
msgs2 := []map[string]interface{}{}
for i := 0; i < msgsNum; i++ {
msg := m
if i%2 == 0 {
msg.Protocol = httpProt
}
messages2.Data = append(messages2.Data, msg)
m := toMap(msg)
msgs2 = append(msgs2, m)
}
err = writer.Consume(messages2)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
httpMsgs := []map[string]interface{}{}
for i := 0; i < msgsNum; i += 2 {
httpMsgs = append(httpMsgs, msgs2[i])
}
reader := ireader.New(client, testDB)
cases := map[string]struct {
chanID string
pageMeta readers.PageMetadata
page readers.MessagesPage
}{
"read message page for existing channel": {
chanID: id1,
pageMeta: readers.PageMetadata{
Format: messages1.Format,
Offset: 0,
Limit: 1,
},
page: readers.MessagesPage{
Total: msgsNum,
Messages: fromJSON(msgs1[:1]),
},
},
"read message page for non-existent channel": {
chanID: wrongID,
pageMeta: readers.PageMetadata{
Format: messages1.Format,
Offset: 0,
Limit: 10,
},
page: readers.MessagesPage{
Messages: []readers.Message{},
},
},
"read message last page": {
chanID: id2,
pageMeta: readers.PageMetadata{
Format: messages2.Format,
Offset: msgsNum - 20,
Limit: msgsNum,
},
page: readers.MessagesPage{
Total: msgsNum,
Messages: fromJSON(msgs2[msgsNum-20 : msgsNum]),
},
},
"read message with protocol": {
chanID: id2,
pageMeta: readers.PageMetadata{
Format: messages2.Format,
Offset: 0,
Limit: uint64(msgsNum / 2),
Protocol: httpProt,
},
page: readers.MessagesPage{
Total: uint64(msgsNum / 2),
Messages: fromJSON(httpMsgs),
},
},
}
for desc, tc := range cases {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
for i := 0; i < len(result.Messages); i++ {
m := result.Messages[i]
// Remove time as it is not sent by the client.
delete(m.(map[string]interface{}), "time")
result.Messages[i] = m
}
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected \n%v got \n%v", desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
}
}
func fromSenml(in []senml.Message) []readers.Message {
var ret []readers.Message
for _, m := range in {
@ -369,3 +519,22 @@ func fromSenml(in []senml.Message) []readers.Message {
}
return ret
}
func fromJSON(msg []map[string]interface{}) []readers.Message {
var ret []readers.Message
for _, m := range msg {
ret = append(ret, m)
}
return ret
}
func toMap(msg json.Message) map[string]interface{} {
return map[string]interface{}{
"channel": msg.Channel,
// "created": msg.Created,
"subtopic": msg.Subtopic,
"publisher": msg.Publisher,
"protocol": msg.Protocol,
"payload": map[string]interface{}(msg.Payload),
}
}

View File

@ -39,14 +39,16 @@ func New(db *mongo.Database) readers.MessageRepository {
func (repo mongoRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) {
format := defCollection
if rpm.Format != "" {
order := "time"
if rpm.Format != "" && rpm.Format != defCollection {
order = "created"
format = rpm.Format
}
col := repo.db.Collection(format)
sortMap := map[string]interface{}{
"time": -1,
order: -1,
}
// Remove format filter and format the rest properly.
filter := fmtCondition(chanID, rpm)

View File

@ -9,11 +9,12 @@ import (
"testing"
"time"
writer "github.com/mainflux/mainflux/consumers/writers/mongodb"
mwriter "github.com/mainflux/mainflux/consumers/writers/mongodb"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/readers"
reader "github.com/mainflux/mainflux/readers/mongodb"
mreader "github.com/mainflux/mainflux/readers/mongodb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -31,6 +32,10 @@ const (
mqttProt = "mqtt"
httpProt = "http"
msgName = "temperature"
wrongID = "wrong-id"
format1 = "format_1"
format2 = "format_2"
)
var (
@ -51,7 +56,7 @@ func TestReadSenml(t *testing.T) {
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
writer := writer.New(db)
writer := mwriter.New(db)
chanID, err := idProvider.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
@ -107,7 +112,7 @@ func TestReadSenml(t *testing.T) {
}
err = writer.Consume(messages)
require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err))
reader := reader.New(db)
reader := mreader.New(db)
cases := map[string]struct {
chanID string
@ -181,6 +186,18 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs),
},
},
"read message with invalid format": {
chanID: chanID,
pageMeta: readers.PageMetadata{
Format: "messagess",
Offset: 0,
Limit: uint64(len(queryMsgs)),
},
page: readers.MessagesPage{
Total: 0,
Messages: []readers.Message{},
},
},
"read message with protocol": {
chanID: chanID,
pageMeta: readers.PageMetadata{
@ -359,7 +376,153 @@ func TestReadSenml(t *testing.T) {
for desc, tc := range cases {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
}
}
func TestReadJSON(t *testing.T) {
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB)
writer := mwriter.New(db)
id1, err := idProvider.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
m := json.Message{
Channel: id1,
Publisher: id1,
Created: time.Now().Unix(),
Subtopic: "subtopic/format/some_json",
Protocol: "coap",
Payload: map[string]interface{}{
"field_2": "value",
"field_3": false,
"field_4": 12.344,
"field_5": map[string]interface{}{
"field_1": "value",
"field_2": 42.0,
},
},
}
messages1 := json.Messages{
Format: format1,
}
msgs1 := []map[string]interface{}{}
for i := 0; i < msgsNum; i++ {
msg := m
messages1.Data = append(messages1.Data, msg)
m := toMap(msg)
msgs1 = append(msgs1, m)
}
err = writer.Consume(messages1)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
id2, err := idProvider.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
m = json.Message{
Channel: id2,
Publisher: id2,
Created: time.Now().Unix(),
Subtopic: "subtopic/other_format/some_other_json",
Protocol: "udp",
Payload: map[string]interface{}{
"field_2": "other_value",
"field_3": false,
"field_5": map[string]interface{}{
"field_1": "wrong_value",
"field_2": 24.5,
},
},
}
messages2 := json.Messages{
Format: format2,
}
msgs2 := []map[string]interface{}{}
for i := 0; i < msgsNum; i++ {
msg := m
if i%2 == 0 {
msg.Protocol = httpProt
}
messages2.Data = append(messages2.Data, msg)
m := toMap(msg)
msgs2 = append(msgs2, m)
}
err = writer.Consume(messages2)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
httpMsgs := []map[string]interface{}{}
for i := 0; i < msgsNum; i += 2 {
httpMsgs = append(httpMsgs, msgs2[i])
}
reader := mreader.New(db)
cases := map[string]struct {
chanID string
pageMeta readers.PageMetadata
page readers.MessagesPage
}{
"read message page for existing channel": {
chanID: id1,
pageMeta: readers.PageMetadata{
Format: messages1.Format,
Offset: 0,
Limit: 10,
},
page: readers.MessagesPage{
Total: 100,
Messages: fromJSON(msgs1[:10]),
},
},
"read message page for non-existent channel": {
chanID: wrongID,
pageMeta: readers.PageMetadata{
Format: messages1.Format,
Offset: 0,
Limit: 10,
},
page: readers.MessagesPage{
Messages: []readers.Message{},
},
},
"read message last page": {
chanID: id2,
pageMeta: readers.PageMetadata{
Format: messages2.Format,
Offset: msgsNum - 20,
Limit: msgsNum,
},
page: readers.MessagesPage{
Total: msgsNum,
Messages: fromJSON(msgs2[msgsNum-20 : msgsNum]),
},
},
"read message with protocol": {
chanID: id2,
pageMeta: readers.PageMetadata{
Format: messages2.Format,
Offset: 0,
Limit: uint64(msgsNum / 2),
Protocol: httpProt,
},
page: readers.MessagesPage{
Total: uint64(msgsNum / 2),
Messages: fromJSON(httpMsgs),
},
},
}
for desc, tc := range cases {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
for i := 0; i < len(result.Messages); i++ {
m := result.Messages[i]
// Remove id as it is not sent by the client.
delete(m.(map[string]interface{}), "_id")
result.Messages[i] = m
}
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
@ -373,3 +536,22 @@ func fromSenml(in []senml.Message) []readers.Message {
}
return ret
}
func fromJSON(msg []map[string]interface{}) []readers.Message {
var ret []readers.Message
for _, m := range msg {
ret = append(ret, m)
}
return ret
}
func toMap(msg json.Message) map[string]interface{} {
return map[string]interface{}{
"channel": msg.Channel,
"created": msg.Created,
"subtopic": msg.Subtopic,
"publisher": msg.Publisher,
"protocol": msg.Protocol,
"payload": map[string]interface{}(msg.Payload),
}
}

View File

@ -8,6 +8,7 @@ import (
"fmt"
"github.com/jmoiron/sqlx" // required for DB access
"github.com/lib/pq"
"github.com/mainflux/mainflux/pkg/errors"
jsont "github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
@ -20,6 +21,9 @@ const (
format = "format"
// Table for SenML messages
defTable = "messages"
// Error code for Undefined table error.
undefinedTableCode = "42P01"
)
var errReadMessages = errors.New("failed to read messages from postgres database")
@ -68,6 +72,11 @@ func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (r
rows, err := tr.db.NamedQuery(q, params)
if err != nil {
if e, ok := err.(*pq.Error); ok {
if e.Code == undefinedTableCode {
return readers.MessagesPage{}, nil
}
}
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
defer rows.Close()
@ -79,7 +88,7 @@ func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (r
switch format {
case defTable:
for rows.Next() {
msg := dbMessage{Message: senml.Message{}}
msg := senmlMessage{Message: senml.Message{}}
if err := rows.StructScan(&msg); err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
@ -156,7 +165,7 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) string {
return condition
}
type dbMessage struct {
type senmlMessage struct {
ID string `db:"id"`
senml.Message
}

View File

@ -9,6 +9,7 @@ import (
"time"
pwriter "github.com/mainflux/mainflux/consumers/writers/postgres"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/readers"
@ -25,6 +26,10 @@ const (
mqttProt = "mqtt"
httpProt = "http"
msgName = "temperature"
format1 = "format1"
format2 = "format2"
wrongID = "0"
wrongValue = "wrong-value"
)
var (
@ -174,17 +179,17 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs),
},
},
"read message with publisher and format": {
"read message with wrong format": {
chanID: chanID,
pageMeta: readers.PageMetadata{
Format: "messages",
Format: "messagess",
Offset: 0,
Limit: uint64(len(queryMsgs)),
Publisher: pubID2,
},
page: readers.MessagesPage{
Total: uint64(len(queryMsgs)),
Messages: fromSenml(queryMsgs),
Total: 0,
Messages: []readers.Message{},
},
},
"read message with protocol": {
@ -371,10 +376,170 @@ func TestReadSenml(t *testing.T) {
}
}
func fromSenml(in []senml.Message) []readers.Message {
func TestReadJSON(t *testing.T) {
writer := pwriter.New(db)
id1, err := idProvider.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
m := json.Message{
Channel: id1,
Publisher: id1,
Created: time.Now().Unix(),
Subtopic: "subtopic/format/some_json",
Protocol: "coap",
Payload: map[string]interface{}{
"field_1": 123.0,
"field_2": "value",
"field_3": false,
"field_4": 12.344,
"field_5": map[string]interface{}{
"field_1": "value",
"field_2": 42.0,
},
},
}
messages1 := json.Messages{
Format: format1,
}
msgs1 := []map[string]interface{}{}
for i := 0; i < msgsNum; i++ {
msg := m
messages1.Data = append(messages1.Data, msg)
m := toMap(msg)
msgs1 = append(msgs1, m)
}
err = writer.Consume(messages1)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
id2, err := idProvider.ID()
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
m = json.Message{
Channel: id2,
Publisher: id2,
Created: time.Now().Unix(),
Subtopic: "subtopic/other_format/some_other_json",
Protocol: "udp",
Payload: map[string]interface{}{
"field_1": "other_value",
"false_value": false,
"field_pi": 3.14159265,
},
}
messages2 := json.Messages{
Format: format2,
}
msgs2 := []map[string]interface{}{}
for i := 0; i < msgsNum; i++ {
msg := m
if i%2 == 0 {
msg.Protocol = httpProt
}
messages2.Data = append(messages2.Data, msg)
m := toMap(msg)
msgs2 = append(msgs2, m)
}
err = writer.Consume(messages2)
assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err))
httpMsgs := []map[string]interface{}{}
for i := 0; i < msgsNum; i += 2 {
httpMsgs = append(httpMsgs, msgs2[i])
}
reader := preader.New(db)
cases := map[string]struct {
chanID string
pageMeta readers.PageMetadata
page readers.MessagesPage
}{
"read message page for existing channel": {
chanID: id1,
pageMeta: readers.PageMetadata{
Format: messages1.Format,
Offset: 0,
Limit: 10,
},
page: readers.MessagesPage{
Total: 100,
Messages: fromJSON(msgs1[:10]),
},
},
"read message page for non-existent channel": {
chanID: wrongID,
pageMeta: readers.PageMetadata{
Format: messages1.Format,
Offset: 0,
Limit: 10,
},
page: readers.MessagesPage{
Messages: []readers.Message{},
},
},
"read message last page": {
chanID: id2,
pageMeta: readers.PageMetadata{
Format: messages2.Format,
Offset: msgsNum - 20,
Limit: msgsNum,
},
page: readers.MessagesPage{
Total: msgsNum,
Messages: fromJSON(msgs2[msgsNum-20 : msgsNum]),
},
},
"read message with protocol": {
chanID: id2,
pageMeta: readers.PageMetadata{
Format: messages2.Format,
Offset: 0,
Limit: uint64(msgsNum / 2),
Protocol: httpProt,
},
page: readers.MessagesPage{
Total: uint64(msgsNum / 2),
Messages: fromJSON(httpMsgs),
},
},
}
for desc, tc := range cases {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
for i := 0; i < len(result.Messages); i++ {
m := result.Messages[i]
// Remove id as it is not sent by the client.
delete(m.(map[string]interface{}), "id")
result.Messages[i] = m
}
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
}
}
func fromSenml(msg []senml.Message) []readers.Message {
var ret []readers.Message
for _, m := range in {
for _, m := range msg {
ret = append(ret, m)
}
return ret
}
func fromJSON(msg []map[string]interface{}) []readers.Message {
var ret []readers.Message
for _, m := range msg {
ret = append(ret, m)
}
return ret
}
func toMap(msg json.Message) map[string]interface{} {
return map[string]interface{}{
"channel": msg.Channel,
"created": msg.Created,
"subtopic": msg.Subtopic,
"publisher": msg.Publisher,
"protocol": msg.Protocol,
"payload": map[string]interface{}(msg.Payload),
}
}

View File

@ -17,11 +17,6 @@ import (
dockertest "github.com/ory/dockertest/v3"
)
const (
wrongID = "0"
wrongValue = "wrong-value"
)
var (
testLog, _ = logger.New(os.Stdout, logger.Info.String())
db *sqlx.DB