mirror of
https://github.com/mainflux/mainflux.git
synced 2025-05-09 19:29:29 +08:00

* Use message time as Point time in InfluxDB Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Use actual message time Update all Reader and Writer services to use time from the message instead of time given from the corrseponding Writer service. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Remove message time check Messages time less than 2**28 represent time relative to the current time so Writer used to convert this to the correct value, i.e. msg.Time += time.Now(). However, this step is optional and should really be a part of the app on top of Mainflux or could be introduced with minor changes in Normalizer, Reader or Writer services, so there is no need for this to be supported out of box. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Use channel and publisher as tag keys Move all the other Message fields to the field keys. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>
162 lines
3.5 KiB
Go
162 lines
3.5 KiB
Go
//
|
|
// Copyright (c) 2018
|
|
// Mainflux
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
|
|
package influxdb
|
|
|
|
import (
|
|
"errors"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mainflux/mainflux/writers"
|
|
|
|
influxdata "github.com/influxdata/influxdb/client/v2"
|
|
"github.com/mainflux/mainflux"
|
|
)
|
|
|
|
const pointName = "messages"
|
|
|
|
var _ writers.MessageRepository = (*influxRepo)(nil)
|
|
|
|
var (
|
|
errZeroValueSize = errors.New("zero value batch size")
|
|
errZeroValueTimeout = errors.New("zero value batch timeout")
|
|
errNilBatch = errors.New("nil batch")
|
|
)
|
|
|
|
type influxRepo struct {
|
|
client influxdata.Client
|
|
batch influxdata.BatchPoints
|
|
batchSize int
|
|
mu sync.Mutex
|
|
tick <-chan time.Time
|
|
cfg influxdata.BatchPointsConfig
|
|
}
|
|
|
|
type fields map[string]interface{}
|
|
type tags map[string]string
|
|
|
|
// New returns new InfluxDB writer.
|
|
func New(client influxdata.Client, database string, batchSize int, batchTimeout time.Duration) (writers.MessageRepository, error) {
|
|
if batchSize <= 0 {
|
|
return &influxRepo{}, errZeroValueSize
|
|
}
|
|
|
|
if batchTimeout <= 0 {
|
|
return &influxRepo{}, errZeroValueTimeout
|
|
}
|
|
|
|
repo := &influxRepo{
|
|
client: client,
|
|
cfg: influxdata.BatchPointsConfig{
|
|
Database: database,
|
|
},
|
|
batchSize: batchSize,
|
|
}
|
|
|
|
var err error
|
|
repo.batch, err = influxdata.NewBatchPoints(repo.cfg)
|
|
if err != nil {
|
|
return &influxRepo{}, err
|
|
}
|
|
|
|
repo.tick = time.NewTicker(batchTimeout).C
|
|
go func() {
|
|
for {
|
|
<-repo.tick
|
|
// Nil point indicates that savePoint method is triggered by the ticker.
|
|
repo.savePoint(nil)
|
|
}
|
|
}()
|
|
|
|
return repo, nil
|
|
}
|
|
|
|
func (repo *influxRepo) savePoint(point *influxdata.Point) error {
|
|
repo.mu.Lock()
|
|
defer repo.mu.Unlock()
|
|
if repo.batch == nil {
|
|
return errNilBatch
|
|
}
|
|
|
|
// Ignore ticker if there is nothing to save.
|
|
if len(repo.batch.Points()) == 0 && point == nil {
|
|
return nil
|
|
}
|
|
|
|
if point != nil {
|
|
repo.batch.AddPoint(point)
|
|
}
|
|
|
|
if len(repo.batch.Points())%repo.batchSize == 0 || point == nil {
|
|
if err := repo.client.Write(repo.batch); err != nil {
|
|
return err
|
|
}
|
|
// It would be nice to reset ticker at this point, which
|
|
// implies creating a new ticker and goroutine. It would
|
|
// introduce unnecessary complexity with no justified benefits.
|
|
var err error
|
|
repo.batch, err = influxdata.NewBatchPoints(repo.cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (repo *influxRepo) Save(msg mainflux.Message) error {
|
|
tags, fields := repo.tagsOf(&msg), repo.fieldsOf(&msg)
|
|
t := time.Unix(int64(msg.Time), 0)
|
|
|
|
pt, err := influxdata.NewPoint(pointName, tags, fields, t)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return repo.savePoint(pt)
|
|
}
|
|
|
|
func (repo *influxRepo) tagsOf(msg *mainflux.Message) tags {
|
|
channel := strconv.FormatUint(msg.Channel, 10)
|
|
publisher := strconv.FormatUint(msg.Publisher, 10)
|
|
|
|
return tags{
|
|
"channel": channel,
|
|
"publisher": publisher,
|
|
}
|
|
}
|
|
|
|
func (repo *influxRepo) fieldsOf(msg *mainflux.Message) fields {
|
|
updateTime := strconv.FormatFloat(msg.UpdateTime, 'f', -1, 64)
|
|
ret := fields{
|
|
"protocol": msg.Protocol,
|
|
"name": msg.Name,
|
|
"unit": msg.Unit,
|
|
"link": msg.Link,
|
|
"updateTime": updateTime,
|
|
}
|
|
|
|
switch msg.Value.(type) {
|
|
case *mainflux.Message_FloatValue:
|
|
ret["value"] = msg.GetFloatValue()
|
|
case *mainflux.Message_StringValue:
|
|
ret["stringValue"] = msg.GetStringValue()
|
|
case *mainflux.Message_DataValue:
|
|
ret["dataValue"] = msg.GetDataValue()
|
|
case *mainflux.Message_BoolValue:
|
|
ret["boolValue"] = msg.GetBoolValue()
|
|
}
|
|
|
|
if msg.ValueSum != nil {
|
|
ret["valueSum"] = msg.GetValueSum().GetValue()
|
|
}
|
|
|
|
return ret
|
|
}
|