1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-27 13:48:49 +08:00
Manuel Imperiale 4619576e94
MF-1061 - Add PageMetadata to readers (#1333)
* MF-1061 - Add PageMetadata to readers

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix merge conflicts

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix typo

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Mv Total to MessagesPage

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix review

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix readers mock and add filters tests

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Add Total check and allow combinations of query parameters

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use slices length as Total

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Simplify readers mock

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Add empty lines

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
2021-01-26 12:23:15 +01:00

264 lines
5.9 KiB
Go

package influxdb
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
"time"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/readers"
influxdata "github.com/influxdata/influxdb/client/v2"
jsont "github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
)
const (
countCol = "count_protocol"
format = "format"
// Measurement for SenML messages
defMeasurement = "messages"
)
var errReadMessages = errors.New("failed to read messages from influxdb database")
var _ readers.MessageRepository = (*influxRepository)(nil)
type influxRepository struct {
database string
client influxdata.Client
}
// New returns new InfluxDB reader.
func New(client influxdata.Client, database string) readers.MessageRepository {
return &influxRepository{
database,
client,
}
}
func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) {
if rpm.Format == "" {
rpm.Format = defMeasurement
}
condition := fmtCondition(chanID, rpm)
cmd := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY time DESC LIMIT %d OFFSET %d`, rpm.Format, condition, rpm.Limit, rpm.Offset)
q := influxdata.Query{
Command: cmd,
Database: repo.database,
}
var ret []readers.Message
resp, err := repo.client.Query(q)
if err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
if resp.Error() != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, resp.Error())
}
if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 {
return readers.MessagesPage{}, nil
}
result := resp.Results[0].Series[0]
for _, v := range result.Values {
ret = append(ret, parseMessage(rpm.Format, result.Columns, v))
}
total, err := repo.count(rpm.Format, condition)
if err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
page := readers.MessagesPage{
PageMetadata: rpm,
Total: total,
Messages: ret,
}
return page, nil
}
func (repo *influxRepository) count(measurement, condition string) (uint64, error) {
cmd := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s`, measurement, condition)
q := influxdata.Query{
Command: cmd,
Database: repo.database,
}
resp, err := repo.client.Query(q)
if err != nil {
return 0, err
}
if resp.Error() != nil {
return 0, resp.Error()
}
if len(resp.Results) < 1 ||
len(resp.Results[0].Series) < 1 ||
len(resp.Results[0].Series[0].Values) < 1 {
return 0, nil
}
countIndex := 0
for i, col := range resp.Results[0].Series[0].Columns {
if col == countCol {
countIndex = i
break
}
}
result := resp.Results[0].Series[0].Values[0]
if len(result) < countIndex+1 {
return 0, nil
}
count, ok := result[countIndex].(json.Number)
if !ok {
return 0, nil
}
return strconv.ParseUint(count.String(), 10, 64)
}
func fmtCondition(chanID string, rpm readers.PageMetadata) string {
condition := fmt.Sprintf(`channel='%s'`, chanID)
var query map[string]interface{}
meta, err := json.Marshal(rpm)
if err != nil {
return condition
}
json.Unmarshal(meta, &query)
for name, value := range query {
switch name {
case
"channel",
"subtopic",
"publisher",
"name",
"protocol":
condition = fmt.Sprintf(`%s AND "%s"='%s'`, condition, name, value)
case "v":
condition = fmt.Sprintf(`%s AND value = %f`, condition, value)
case "vb":
condition = fmt.Sprintf(`%s AND boolValue = %t`, condition, value)
case "vs":
condition = fmt.Sprintf(`%s AND stringValue = '%s'`, condition, value)
case "vd":
condition = fmt.Sprintf(`%s AND dataValue = '%s'`, condition, value)
case "from":
iVal := int64(value.(float64) * 1e9)
condition = fmt.Sprintf(`%s AND time >= %d`, condition, iVal)
case "to":
iVal := int64(value.(float64) * 1e9)
condition = fmt.Sprintf(`%s AND time < %d`, condition, iVal)
}
}
return condition
}
// ParseMessage and parseValues are util methods. Since InfluxDB client returns
// results in form of rows and columns, this obscure message conversion is needed
// to return actual []broker.Message from the query result.
func parseValues(value interface{}, name string, msg *senml.Message) {
if name == "sum" && value != nil {
if valSum, ok := value.(json.Number); ok {
sum, err := valSum.Float64()
if err != nil {
return
}
msg.Sum = &sum
}
return
}
if strings.HasSuffix(strings.ToLower(name), "value") {
switch value.(type) {
case bool:
v := value.(bool)
msg.BoolValue = &v
case json.Number:
num, err := value.(json.Number).Float64()
if err != nil {
return
}
msg.Value = &num
case string:
if strings.HasPrefix(name, "string") {
v := value.(string)
msg.StringValue = &v
return
}
if strings.HasPrefix(name, "data") {
v := value.(string)
msg.DataValue = &v
}
}
}
}
func parseMessage(measurement string, names []string, fields []interface{}) interface{} {
switch measurement {
case defMeasurement:
return parseSenml(names, fields)
default:
return parseJSON(names, fields)
}
}
func parseSenml(names []string, fields []interface{}) interface{} {
m := senml.Message{}
v := reflect.ValueOf(&m).Elem()
for i, name := range names {
parseValues(fields[i], name, &m)
msgField := v.FieldByName(strings.Title(name))
if !msgField.IsValid() {
continue
}
f := msgField.Interface()
switch f.(type) {
case string:
if s, ok := fields[i].(string); ok {
msgField.SetString(s)
}
case float64:
if name == "time" {
t, err := time.Parse(time.RFC3339Nano, fields[i].(string))
if err != nil {
continue
}
v := float64(t.UnixNano()) / float64(1e9)
msgField.SetFloat(v)
continue
}
val, _ := strconv.ParseFloat(fields[i].(string), 64)
msgField.SetFloat(val)
}
}
return m
}
func parseJSON(names []string, fields []interface{}) interface{} {
ret := make(map[string]interface{})
for i, n := range names {
ret[n] = fields[i]
}
return jsont.ParseFlat(ret)
}