1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-05-02 22:17:10 +08:00
beres 61b2d6b87b MF-596 - Add subtopic to RawMessage (#642)
* Commit for mainflux-596
Modified and tested:
- cli
- http
- mqtt
- normalizer
- all readers
- sdk messages
- all writers
- ws
Missing:
- coap
- lora

Signed-off-by: ale <ale@metaverso.org>

* - fix subtopic name in, when starting with dot, http/ws/mqtt
- add some test on readers

Signed-off-by: ale <ale@metaverso.org>

* - fix string concatenation
- update http/transport regexp to match subtopic names with only \w-
- update ws/transport regexp like http ones with also the wildcard * and >

Signed-off-by: ale <ale@metaverso.org>

* added subtopic support to coap adapter

Signed-off-by: ale <ale@metaverso.org>

* - update replace functions with replaceall when needed
- renamed getDestChannel to fmtSubject
- update api/transport and ws/transport route to be more readable
- fix mqtt syntax
- renamed func andQuery to query as suggested by @anovakovic01
- have a nice we :)

Signed-off-by: ale <ale@metaverso.org>

* - fix error declaration on ws/nat/publisher
- fix regexp added missing allowed chars - and _ on coap/api/transport
- fix subtopic clean suffix / if present on coap/api/transport
- improve regexp on http and ws /api/transport, now does not accept url that do not strictly match
- add some ws subtopic tests

Signed-off-by: ale <ale@metaverso.org>

* - enabled wildcard chars on coap/api/transport
- allow use special chars on http and ws api/transport

Signed-off-by: ale <ale@metaverso.org>

* - use strings.Replace() insted ReplaceAll()

Signed-off-by: ale <ale@metaverso.org>

* - allow every chars on subtopics
- fix replace error on mqtt

Signed-off-by: ale <ale@metaverso.org>

* fix cassandra test

Signed-off-by: ale <ale@metaverso.org>

* fix ws test with invalid subtopic

Signed-off-by: ale <ale@metaverso.org>

* fix invalid GOCACHE in go1.12, replaced by -count 1, see https://golang.org/doc/go1.10#test

Signed-off-by: ale <ale@metaverso.org>

* - improve regexp on http/ws api/transport
- minor changes

Signed-off-by: ale <ale@metaverso.org>

* - add generic function parseSubtopic on ws/http adapters

Signed-off-by: ale <ale@metaverso.org>

* - add generic function fmtSubtopic on coap adapter

Signed-off-by: ale <ale@metaverso.org>
2019-03-15 18:38:07 +01:00

157 lines
3.6 KiB
Go

package influxdb
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
"time"
"github.com/mainflux/mainflux/readers"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
)
const maxLimit = 100
var _ readers.MessageRepository = (*influxRepository)(nil)
type influxRepository struct {
database string
client influxdata.Client
}
// New returns new InfluxDB reader.
func New(client influxdata.Client, database string) (readers.MessageRepository, error) {
return &influxRepository{database, client}, nil
}
func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) []mainflux.Message {
if limit > maxLimit {
limit = maxLimit
}
condition := fmtCondition(chanID, query)
cmd := fmt.Sprintf(`SELECT * from messages WHERE %s ORDER BY time DESC LIMIT %d OFFSET %d`, condition, limit, offset)
q := influxdata.Query{
Command: cmd,
Database: repo.database,
}
ret := []mainflux.Message{}
resp, err := repo.client.Query(q)
if err != nil || resp.Error() != nil {
return ret
}
if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 {
return ret
}
result := resp.Results[0].Series[0]
for _, v := range result.Values {
ret = append(ret, parseMessage(result.Columns, v))
}
return ret
}
func fmtCondition(chanID string, query map[string]string) string {
condition := fmt.Sprintf(`channel='%s'`, chanID)
for name, value := range query {
switch name {
case
"channel",
"subtopic",
"publisher":
condition = fmt.Sprintf(`%s AND %s='%s'`, condition, name,
strings.Replace(value, "'", "\\'", -1))
case
"name",
"protocol":
condition = fmt.Sprintf(`%s AND "%s"='%s'`, condition, name,
strings.Replace(value, "\"", "\\\"", -1))
}
}
return condition
}
// ParseMessage and parseValues are util methods. Since InfluxDB client returns
// results in form of rows and columns, this obscure message conversion is needed
// to return actual []mainflux.Message from the query result.
func parseValues(value interface{}, name string, msg *mainflux.Message) {
if name == "valueSum" && value != nil {
if sum, ok := value.(json.Number); ok {
valSum, err := sum.Float64()
if err != nil {
return
}
msg.ValueSum = &mainflux.SumValue{Value: valSum}
}
return
}
if strings.HasSuffix(strings.ToLower(name), "value") {
switch value.(type) {
case bool:
msg.Value = &mainflux.Message_BoolValue{BoolValue: value.(bool)}
case json.Number:
num, err := value.(json.Number).Float64()
if err != nil {
return
}
msg.Value = &mainflux.Message_FloatValue{FloatValue: num}
case string:
if strings.HasPrefix(name, "string") {
msg.Value = &mainflux.Message_StringValue{StringValue: value.(string)}
return
}
if strings.HasPrefix(name, "data") {
msg.Value = &mainflux.Message_DataValue{DataValue: value.(string)}
}
}
}
}
func parseMessage(names []string, fields []interface{}) mainflux.Message {
m := mainflux.Message{}
v := reflect.ValueOf(&m).Elem()
for i, name := range names {
parseValues(fields[i], name, &m)
msgField := v.FieldByName(strings.Title(name))
if !msgField.IsValid() {
continue
}
f := msgField.Interface()
switch f.(type) {
case string:
if s, ok := fields[i].(string); ok {
msgField.SetString(s)
}
case float64:
if name == "time" {
t, err := time.Parse(time.RFC3339, fields[i].(string))
if err != nil {
continue
}
v := float64(t.Unix())
msgField.SetFloat(v)
continue
}
val, _ := strconv.ParseFloat(fields[i].(string), 64)
msgField.SetFloat(val)
}
}
return m
}