2019-10-07 08:14:47 -06:00
|
|
|
// Copyright (c) Mainflux
|
2018-08-26 13:15:48 +02:00
|
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
|
2018-05-21 16:28:52 +02:00
|
|
|
package influxdb
|
|
|
|
|
|
|
|
import (
|
2019-04-04 18:10:01 +02:00
|
|
|
"math"
|
2018-05-21 16:28:52 +02:00
|
|
|
"strconv"
|
|
|
|
"time"
|
|
|
|
|
2019-11-05 11:57:16 +01:00
|
|
|
"github.com/mainflux/mainflux/transformers/senml"
|
2018-05-21 16:28:52 +02:00
|
|
|
"github.com/mainflux/mainflux/writers"
|
|
|
|
|
|
|
|
influxdata "github.com/influxdata/influxdb/client/v2"
|
|
|
|
)
|
|
|
|
|
2018-06-01 11:20:44 +02:00
|
|
|
const pointName = "messages"
|
|
|
|
|
2018-05-21 16:28:52 +02:00
|
|
|
var _ writers.MessageRepository = (*influxRepo)(nil)
|
|
|
|
|
|
|
|
type influxRepo struct {
|
2019-10-31 14:04:47 +01:00
|
|
|
client influxdata.Client
|
|
|
|
cfg influxdata.BatchPointsConfig
|
2018-05-21 16:28:52 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
type fields map[string]interface{}
|
|
|
|
type tags map[string]string
|
|
|
|
|
|
|
|
// New returns new InfluxDB writer.
|
2019-10-31 14:04:47 +01:00
|
|
|
func New(client influxdata.Client, database string) writers.MessageRepository {
|
|
|
|
return &influxRepo{
|
2018-09-23 01:53:03 +02:00
|
|
|
client: client,
|
|
|
|
cfg: influxdata.BatchPointsConfig{
|
|
|
|
Database: database,
|
|
|
|
},
|
|
|
|
}
|
2018-05-21 16:28:52 +02:00
|
|
|
}
|
|
|
|
|
2019-11-05 11:57:16 +01:00
|
|
|
func (repo *influxRepo) Save(messages ...senml.Message) error {
|
2019-10-31 14:04:47 +01:00
|
|
|
pts, err := influxdata.NewBatchPoints(repo.cfg)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2018-11-05 19:18:51 +01:00
|
|
|
}
|
2019-11-05 11:57:16 +01:00
|
|
|
|
2019-10-31 14:04:47 +01:00
|
|
|
for _, msg := range messages {
|
|
|
|
tgs, flds := repo.tagsOf(&msg), repo.fieldsOf(&msg)
|
2018-09-23 01:53:03 +02:00
|
|
|
|
2019-10-31 14:04:47 +01:00
|
|
|
sec, dec := math.Modf(msg.Time)
|
|
|
|
t := time.Unix(int64(sec), int64(dec*(1e9)))
|
2018-11-05 19:18:51 +01:00
|
|
|
|
2019-10-31 14:04:47 +01:00
|
|
|
pt, err := influxdata.NewPoint(pointName, tgs, flds, t)
|
2018-11-05 19:18:51 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-10-31 14:04:47 +01:00
|
|
|
pts.AddPoint(pt)
|
2018-09-23 01:53:03 +02:00
|
|
|
}
|
|
|
|
|
2019-10-31 14:04:47 +01:00
|
|
|
return repo.client.Write(pts)
|
2018-05-21 16:28:52 +02:00
|
|
|
}
|
|
|
|
|
2019-11-05 11:57:16 +01:00
|
|
|
func (repo *influxRepo) tagsOf(msg *senml.Message) tags {
|
2018-05-21 16:28:52 +02:00
|
|
|
return tags{
|
2018-12-05 13:09:25 +01:00
|
|
|
"channel": msg.Channel,
|
2019-03-15 18:38:07 +01:00
|
|
|
"subtopic": msg.Subtopic,
|
2018-12-05 13:09:25 +01:00
|
|
|
"publisher": msg.Publisher,
|
2018-12-11 17:32:10 +01:00
|
|
|
"name": msg.Name,
|
2018-05-21 16:28:52 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-05 11:57:16 +01:00
|
|
|
func (repo *influxRepo) fieldsOf(msg *senml.Message) fields {
|
2018-11-18 16:42:39 +01:00
|
|
|
updateTime := strconv.FormatFloat(msg.UpdateTime, 'f', -1, 64)
|
|
|
|
ret := fields{
|
|
|
|
"protocol": msg.Protocol,
|
|
|
|
"unit": msg.Unit,
|
|
|
|
"updateTime": updateTime,
|
|
|
|
}
|
|
|
|
|
2019-11-05 11:57:16 +01:00
|
|
|
switch {
|
|
|
|
case msg.Value != nil:
|
|
|
|
ret["value"] = *msg.Value
|
|
|
|
case msg.StringValue != nil:
|
|
|
|
ret["stringValue"] = *msg.StringValue
|
|
|
|
case msg.DataValue != nil:
|
|
|
|
ret["dataValue"] = *msg.DataValue
|
|
|
|
case msg.BoolValue != nil:
|
|
|
|
ret["boolValue"] = *msg.BoolValue
|
2018-11-05 19:18:51 +01:00
|
|
|
}
|
|
|
|
|
2019-11-05 11:57:16 +01:00
|
|
|
if msg.Sum != nil {
|
|
|
|
ret["sum"] = *msg.Sum
|
2018-05-21 16:28:52 +02:00
|
|
|
}
|
2018-11-05 19:18:51 +01:00
|
|
|
|
|
|
|
return ret
|
2018-05-21 16:28:52 +02:00
|
|
|
}
|