mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-26 13:48:53 +08:00

* Fix empty protobuf values Update Normalizer service and .proto files. Reader and Writer services needs to be updated due to message format change. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update HTTP adapter to use gogo protobuf Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update Reader services API Update API or Reader services to match Message changes due to switching to gogo/proto. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update InfluxDB services Update InfluxDB Reader and Writer services to match new Message format. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update MongoDB services Update MongoDB Reader and Writer services to match new message format. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update Cassandra services Update Cassandra Reader and Writer service to match new Message format. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Fix InfluxDB Reader test Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update Makefile and docs accordingly Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Fix possible data race in InfluxDB writer Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update InfluxDB Writer tests Raise test coverage. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Fix InfluxDB reader Fix wrong ValueSum readings. Upadete tests and raise coverage. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update MongoDB services tests Raise test coverage for MongoDB Reader and Writer services. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update Readers API tests Raise test coverage. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Raise test coverage Update Cassandra Reader and Writer services tests. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Use gogo/protobuf in CoAP adapter Add gogo/protobuf to Gopkg.toml and update dependencies. Update Dockerfile to run `make proto`. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update Cassandra Reader tests Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Refactor code Improve code style and comments to improve readability. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Rename Sum to SumValue Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Rename Values to Value Since message contains only single value (or possibly no value at all), `Values` name could be misleading. Rename simple double value from `Value` to `FloatValue` accordingly. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Fix InfluxDB Reader logging Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Replace exclusive if statements with switch-case Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update Cassandra services tests Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>
127 lines
2.9 KiB
Go
127 lines
2.9 KiB
Go
package influxdb
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/mainflux/mainflux/readers"
|
|
|
|
influxdata "github.com/influxdata/influxdb/client/v2"
|
|
"github.com/mainflux/mainflux"
|
|
)
|
|
|
|
const maxLimit = 100
|
|
|
|
var _ readers.MessageRepository = (*influxRepository)(nil)
|
|
|
|
type influxRepository struct {
|
|
database string
|
|
client influxdata.Client
|
|
}
|
|
|
|
type fields map[string]interface{}
|
|
type tags map[string]string
|
|
|
|
// New returns new InfluxDB reader.
|
|
func New(client influxdata.Client, database string) (readers.MessageRepository, error) {
|
|
return &influxRepository{database, client}, nil
|
|
}
|
|
|
|
func (repo *influxRepository) ReadAll(chanID, offset, limit uint64) []mainflux.Message {
|
|
if limit > maxLimit {
|
|
limit = maxLimit
|
|
}
|
|
|
|
cmd := fmt.Sprintf(`SELECT * from messages WHERE Channel='%d' LIMIT %d OFFSET %d`, chanID, limit, offset)
|
|
q := influxdata.Query{
|
|
Command: cmd,
|
|
Database: repo.database,
|
|
}
|
|
|
|
ret := []mainflux.Message{}
|
|
|
|
resp, err := repo.client.Query(q)
|
|
if err != nil || resp.Error() != nil {
|
|
return ret
|
|
}
|
|
|
|
if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 {
|
|
return ret
|
|
}
|
|
result := resp.Results[0].Series[0]
|
|
for _, v := range result.Values {
|
|
ret = append(ret, parseMessage(result.Columns, v))
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
// ParseMessage and parseValues are util methods. Since InfluxDB client returns
|
|
// results in form of rows and columns, this obscure message conversion is needed
|
|
// to return actual []mainflux.Message from the query result.
|
|
func parseValues(value interface{}, name string, msg *mainflux.Message) {
|
|
if name == "ValueSum" && value != nil {
|
|
if sum, ok := value.(json.Number); ok {
|
|
valSum, err := sum.Float64()
|
|
if err != nil {
|
|
return
|
|
}
|
|
msg.ValueSum = &mainflux.SumValue{Value: valSum}
|
|
}
|
|
return
|
|
}
|
|
if strings.HasSuffix(name, "Value") {
|
|
switch value.(type) {
|
|
case bool:
|
|
msg.Value = &mainflux.Message_BoolValue{value.(bool)}
|
|
case json.Number:
|
|
num, err := value.(json.Number).Float64()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
msg.Value = &mainflux.Message_FloatValue{num}
|
|
case string:
|
|
if strings.HasPrefix(name, "String") {
|
|
msg.Value = &mainflux.Message_StringValue{value.(string)}
|
|
return
|
|
}
|
|
|
|
if strings.HasPrefix(name, "Data") {
|
|
msg.Value = &mainflux.Message_DataValue{value.(string)}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func parseMessage(names []string, fields []interface{}) mainflux.Message {
|
|
m := mainflux.Message{}
|
|
v := reflect.ValueOf(&m).Elem()
|
|
for i, name := range names {
|
|
parseValues(fields[i], name, &m)
|
|
msgField := v.FieldByName(name)
|
|
if !msgField.IsValid() {
|
|
continue
|
|
}
|
|
|
|
f := msgField.Interface()
|
|
switch f.(type) {
|
|
case string:
|
|
if s, ok := fields[i].(string); ok {
|
|
msgField.SetString(s)
|
|
}
|
|
case uint64:
|
|
u, _ := strconv.ParseUint(fields[i].(string), 10, 64)
|
|
msgField.SetUint(u)
|
|
case float64:
|
|
val, _ := strconv.ParseFloat(fields[i].(string), 64)
|
|
msgField.SetFloat(val)
|
|
}
|
|
}
|
|
|
|
return m
|
|
}
|