1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-05-06 19:29:15 +08:00

MF-454 - Use message Time field as a time for InfluxDB points (#455)

* Use message time as Point time in InfluxDB

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Use actual message time

Update all Reader and Writer services to use time from the message
instead of time given from the corrseponding Writer service.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Remove message time check

Messages time less than 2**28 represent time relative to the current time so Writer used to convert this to the correct value, i.e. msg.Time += time.Now(). However, this step is optional and should really be a part of the app on top of Mainflux or could be introduced with minor changes in Normalizer, Reader or Writer services, so there is no need for this to be supported out of box.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Use channel and publisher as tag keys

Move all the other Message fields to the field keys.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>
This commit is contained in:
Dušan Borovčanin 2018-11-18 16:42:39 +01:00 committed by Drasko DRASKOVIC
parent bf299a0143
commit 0379eb6c0f
8 changed files with 59 additions and 30 deletions

View File

@ -10,6 +10,7 @@ package cassandra_test
import ( import (
"fmt" "fmt"
"testing" "testing"
"time"
"github.com/mainflux/mainflux" "github.com/mainflux/mainflux"
readers "github.com/mainflux/mainflux/readers/cassandra" readers "github.com/mainflux/mainflux/readers/cassandra"
@ -41,6 +42,7 @@ func TestReadAll(t *testing.T) {
writer := writers.New(session) writer := writers.New(session)
messages := []mainflux.Message{} messages := []mainflux.Message{}
now := time.Now().Unix()
for i := 0; i < msgsNum; i++ { for i := 0; i < msgsNum; i++ {
// Mix possible values as well as value sum. // Mix possible values as well as value sum.
count := i % valueFields count := i % valueFields
@ -58,6 +60,7 @@ func TestReadAll(t *testing.T) {
case 5: case 5:
msg.ValueSum = &mainflux.SumValue{Value: 45} msg.ValueSum = &mainflux.SumValue{Value: 45}
} }
msg.Time = float64(now + int64(i))
err := writer.Save(msg) err := writer.Save(msg)
require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err))

View File

@ -6,6 +6,7 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/readers"
@ -22,9 +23,6 @@ type influxRepository struct {
client influxdata.Client client influxdata.Client
} }
type fields map[string]interface{}
type tags map[string]string
// New returns new InfluxDB reader. // New returns new InfluxDB reader.
func New(client influxdata.Client, database string) (readers.MessageRepository, error) { func New(client influxdata.Client, database string) (readers.MessageRepository, error) {
return &influxRepository{database, client}, nil return &influxRepository{database, client}, nil
@ -35,7 +33,7 @@ func (repo *influxRepository) ReadAll(chanID, offset, limit uint64) []mainflux.M
limit = maxLimit limit = maxLimit
} }
cmd := fmt.Sprintf(`SELECT * from messages WHERE Channel='%d' LIMIT %d OFFSET %d`, chanID, limit, offset) cmd := fmt.Sprintf(`SELECT * from messages WHERE channel='%d' LIMIT %d OFFSET %d`, chanID, limit, offset)
q := influxdata.Query{ q := influxdata.Query{
Command: cmd, Command: cmd,
Database: repo.database, Database: repo.database,
@ -51,6 +49,7 @@ func (repo *influxRepository) ReadAll(chanID, offset, limit uint64) []mainflux.M
if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 { if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 {
return ret return ret
} }
result := resp.Results[0].Series[0] result := resp.Results[0].Series[0]
for _, v := range result.Values { for _, v := range result.Values {
ret = append(ret, parseMessage(result.Columns, v)) ret = append(ret, parseMessage(result.Columns, v))
@ -63,17 +62,19 @@ func (repo *influxRepository) ReadAll(chanID, offset, limit uint64) []mainflux.M
// results in form of rows and columns, this obscure message conversion is needed // results in form of rows and columns, this obscure message conversion is needed
// to return actual []mainflux.Message from the query result. // to return actual []mainflux.Message from the query result.
func parseValues(value interface{}, name string, msg *mainflux.Message) { func parseValues(value interface{}, name string, msg *mainflux.Message) {
if name == "ValueSum" && value != nil { if name == "valueSum" && value != nil {
if sum, ok := value.(json.Number); ok { if sum, ok := value.(json.Number); ok {
valSum, err := sum.Float64() valSum, err := sum.Float64()
if err != nil { if err != nil {
return return
} }
msg.ValueSum = &mainflux.SumValue{Value: valSum} msg.ValueSum = &mainflux.SumValue{Value: valSum}
} }
return return
} }
if strings.HasSuffix(name, "Value") {
if strings.HasSuffix(strings.ToLower(name), "value") {
switch value.(type) { switch value.(type) {
case bool: case bool:
msg.Value = &mainflux.Message_BoolValue{value.(bool)} msg.Value = &mainflux.Message_BoolValue{value.(bool)}
@ -85,12 +86,12 @@ func parseValues(value interface{}, name string, msg *mainflux.Message) {
msg.Value = &mainflux.Message_FloatValue{num} msg.Value = &mainflux.Message_FloatValue{num}
case string: case string:
if strings.HasPrefix(name, "String") { if strings.HasPrefix(name, "string") {
msg.Value = &mainflux.Message_StringValue{value.(string)} msg.Value = &mainflux.Message_StringValue{value.(string)}
return return
} }
if strings.HasPrefix(name, "Data") { if strings.HasPrefix(name, "data") {
msg.Value = &mainflux.Message_DataValue{value.(string)} msg.Value = &mainflux.Message_DataValue{value.(string)}
} }
} }
@ -102,7 +103,7 @@ func parseMessage(names []string, fields []interface{}) mainflux.Message {
v := reflect.ValueOf(&m).Elem() v := reflect.ValueOf(&m).Elem()
for i, name := range names { for i, name := range names {
parseValues(fields[i], name, &m) parseValues(fields[i], name, &m)
msgField := v.FieldByName(name) msgField := v.FieldByName(strings.Title(name))
if !msgField.IsValid() { if !msgField.IsValid() {
continue continue
} }
@ -117,6 +118,17 @@ func parseMessage(names []string, fields []interface{}) mainflux.Message {
u, _ := strconv.ParseUint(fields[i].(string), 10, 64) u, _ := strconv.ParseUint(fields[i].(string), 10, 64)
msgField.SetUint(u) msgField.SetUint(u)
case float64: case float64:
if name == "time" {
t, err := time.Parse(time.RFC3339, fields[i].(string))
if err != nil {
continue
}
v := float64(t.Unix())
msgField.SetFloat(v)
continue
}
val, _ := strconv.ParseFloat(fields[i].(string), 64) val, _ := strconv.ParseFloat(fields[i].(string), 64)
msgField.SetFloat(val) msgField.SetFloat(val)
} }

View File

@ -42,7 +42,7 @@ var (
Value: &mainflux.Message_FloatValue{5}, Value: &mainflux.Message_FloatValue{5},
ValueSum: &mainflux.SumValue{Value: 45}, ValueSum: &mainflux.SumValue{Value: 45},
Time: 123456, Time: 123456,
UpdateTime: 1234567, UpdateTime: 1234,
Link: "link", Link: "link",
} }
) )
@ -55,6 +55,7 @@ func TestReadAll(t *testing.T) {
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB writer expected to succeed: %s.\n", err)) require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB writer expected to succeed: %s.\n", err))
messages := []mainflux.Message{} messages := []mainflux.Message{}
now := time.Now().Unix()
for i := 0; i < msgsNum; i++ { for i := 0; i < msgsNum; i++ {
// Mix possible values as well as value sum. // Mix possible values as well as value sum.
count := i % valueFields count := i % valueFields
@ -72,6 +73,7 @@ func TestReadAll(t *testing.T) {
case 5: case 5:
msg.ValueSum = &mainflux.SumValue{Value: 45} msg.ValueSum = &mainflux.SumValue{Value: 45}
} }
msg.Time = float64(now + int64(i))
err := writer.Save(msg) err := writer.Save(msg)
require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err)) require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err))
@ -115,6 +117,6 @@ func TestReadAll(t *testing.T) {
for desc, tc := range cases { for desc, tc := range cases {
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit) result := reader.ReadAll(tc.chanID, tc.offset, tc.limit)
assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected %v got %v", desc, tc.messages, result)) assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected: %v \n-------------\n got: %v", desc, tc.messages, result))
} }
} }

View File

@ -12,6 +12,7 @@ import (
"fmt" "fmt"
"os" "os"
"testing" "testing"
"time"
readers "github.com/mainflux/mainflux/readers/mongodb" readers "github.com/mainflux/mainflux/readers/mongodb"
writers "github.com/mainflux/mainflux/writers/mongodb" writers "github.com/mainflux/mainflux/writers/mongodb"
@ -48,10 +49,10 @@ func TestReadAll(t *testing.T) {
require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err))
db := client.Database(testDB) db := client.Database(testDB)
writer := writers.New(db) writer := writers.New(db)
messages := []mainflux.Message{} messages := []mainflux.Message{}
now := time.Now().Unix()
for i := 0; i < msgsNum; i++ { for i := 0; i < msgsNum; i++ {
// Mix possible values as well as value sum. // Mix possible values as well as value sum.
count := i % valueFields count := i % valueFields
@ -69,6 +70,7 @@ func TestReadAll(t *testing.T) {
case 5: case 5:
msg.ValueSum = &mainflux.SumValue{Value: 45} msg.ValueSum = &mainflux.SumValue{Value: 45}
} }
msg.Time = float64(now + int64(i))
err := writer.Save(msg) err := writer.Save(msg)
require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err))

View File

@ -10,6 +10,7 @@ package cassandra_test
import ( import (
"fmt" "fmt"
"testing" "testing"
"time"
"github.com/mainflux/mainflux" "github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/writers/cassandra" "github.com/mainflux/mainflux/writers/cassandra"
@ -35,6 +36,7 @@ func TestSave(t *testing.T) {
require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err)) require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err))
repo := cassandra.New(session) repo := cassandra.New(session)
now := time.Now().Unix()
for i := 0; i < msgsNum; i++ { for i := 0; i < msgsNum; i++ {
// Mix possible values as well as value sum. // Mix possible values as well as value sum.
count := i % valueFields count := i % valueFields
@ -52,6 +54,7 @@ func TestSave(t *testing.T) {
case 5: case 5:
msg.ValueSum = &mainflux.SumValue{Value: 45} msg.ValueSum = &mainflux.SumValue{Value: 45}
} }
msg.Time = float64(now + int64(i))
err = repo.Save(msg) err = repo.Save(msg)
assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err)) assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err))

View File

@ -112,7 +112,9 @@ func (repo *influxRepo) savePoint(point *influxdata.Point) error {
func (repo *influxRepo) Save(msg mainflux.Message) error { func (repo *influxRepo) Save(msg mainflux.Message) error {
tags, fields := repo.tagsOf(&msg), repo.fieldsOf(&msg) tags, fields := repo.tagsOf(&msg), repo.fieldsOf(&msg)
pt, err := influxdata.NewPoint(pointName, tags, fields, time.Now()) t := time.Unix(int64(msg.Time), 0)
pt, err := influxdata.NewPoint(pointName, tags, fields, t)
if err != nil { if err != nil {
return err return err
} }
@ -121,38 +123,38 @@ func (repo *influxRepo) Save(msg mainflux.Message) error {
} }
func (repo *influxRepo) tagsOf(msg *mainflux.Message) tags { func (repo *influxRepo) tagsOf(msg *mainflux.Message) tags {
time := strconv.FormatFloat(msg.Time, 'f', -1, 64)
update := strconv.FormatFloat(msg.UpdateTime, 'f', -1, 64)
channel := strconv.FormatUint(msg.Channel, 10) channel := strconv.FormatUint(msg.Channel, 10)
publisher := strconv.FormatUint(msg.Publisher, 10) publisher := strconv.FormatUint(msg.Publisher, 10)
return tags{ return tags{
"Channel": channel, "channel": channel,
"Publisher": publisher, "publisher": publisher,
"Protocol": msg.Protocol,
"Name": msg.Name,
"Unit": msg.Unit,
"Link": msg.Link,
"Time": time,
"UpdateTime": update,
} }
} }
func (repo *influxRepo) fieldsOf(msg *mainflux.Message) fields { func (repo *influxRepo) fieldsOf(msg *mainflux.Message) fields {
ret := fields{} updateTime := strconv.FormatFloat(msg.UpdateTime, 'f', -1, 64)
ret := fields{
"protocol": msg.Protocol,
"name": msg.Name,
"unit": msg.Unit,
"link": msg.Link,
"updateTime": updateTime,
}
switch msg.Value.(type) { switch msg.Value.(type) {
case *mainflux.Message_FloatValue: case *mainflux.Message_FloatValue:
ret["Value"] = msg.GetFloatValue() ret["value"] = msg.GetFloatValue()
case *mainflux.Message_StringValue: case *mainflux.Message_StringValue:
ret["StringValue"] = msg.GetStringValue() ret["stringValue"] = msg.GetStringValue()
case *mainflux.Message_DataValue: case *mainflux.Message_DataValue:
ret["DataValue"] = msg.GetDataValue() ret["dataValue"] = msg.GetDataValue()
case *mainflux.Message_BoolValue: case *mainflux.Message_BoolValue:
ret["BoolValue"] = msg.GetBoolValue() ret["boolValue"] = msg.GetBoolValue()
} }
if msg.ValueSum != nil { if msg.ValueSum != nil {
ret["ValueSum"] = msg.GetValueSum().GetValue() ret["valueSum"] = msg.GetValueSum().GetValue()
} }
return ret return ret

View File

@ -47,7 +47,6 @@ var (
Unit: "km", Unit: "km",
Value: &mainflux.Message_FloatValue{24}, Value: &mainflux.Message_FloatValue{24},
ValueSum: &mainflux.SumValue{Value: 22}, ValueSum: &mainflux.SumValue{Value: 22},
Time: 13451312,
UpdateTime: 5456565466, UpdateTime: 5456565466,
Link: "link", Link: "link",
} }
@ -145,6 +144,7 @@ func TestSave(t *testing.T) {
row, err := queryDB(dropMsgs) row, err := queryDB(dropMsgs)
require.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) require.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err))
now := time.Now().Unix()
for i := 0; i < tc.msgsNum; i++ { for i := 0; i < tc.msgsNum; i++ {
// Mix possible values as well as value sum. // Mix possible values as well as value sum.
count := i % valueFields count := i % valueFields
@ -162,6 +162,7 @@ func TestSave(t *testing.T) {
case 5: case 5:
msg.ValueSum = &mainflux.SumValue{Value: 45} msg.ValueSum = &mainflux.SumValue{Value: 45}
} }
msg.Time = float64(now + int64(i))
err := tc.repo.Save(msg) err := tc.repo.Save(msg)
assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err))

View File

@ -12,6 +12,7 @@ import (
"fmt" "fmt"
"os" "os"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -53,6 +54,8 @@ func TestSave(t *testing.T) {
db := client.Database(testDB) db := client.Database(testDB)
repo := mongodb.New(db) repo := mongodb.New(db)
now := time.Now().Unix()
for i := 0; i < msgsNum; i++ { for i := 0; i < msgsNum; i++ {
// Mix possible values as well as value sum. // Mix possible values as well as value sum.
count := i % valueFields count := i % valueFields
@ -70,6 +73,7 @@ func TestSave(t *testing.T) {
case 5: case 5:
msg.ValueSum = &mainflux.SumValue{Value: 45} msg.ValueSum = &mainflux.SumValue{Value: 45}
} }
msg.Time = float64(now + int64(i))
err = repo.Save(msg) err = repo.Save(msg)
} }