mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-24 13:48:49 +08:00
MF-1314 - Add value comparison filters for readers (#1353)
* MF-1314 - Add value comparison filters for readers Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Check if comparison parameter is valid Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use eq, lt, lte, gt, gte as comparison operator keys Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use consts for comparison operators Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use comparator naming instead of comparison Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix openapi.yml Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
This commit is contained in:
parent
24b902d049
commit
f0f60e2d2a
@ -295,12 +295,68 @@ func TestReadAll(t *testing.T) {
|
||||
Messages: valueMsgs[0:10],
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "read page with value and equal comparator",
|
||||
url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=%s", ts.URL, chanID, v, readers.EqualKey),
|
||||
token: token,
|
||||
status: http.StatusOK,
|
||||
res: pageRes{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: valueMsgs[0:10],
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "read page with value and lower-than comparator",
|
||||
url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=%s", ts.URL, chanID, v+1, readers.LowerThanKey),
|
||||
token: token,
|
||||
status: http.StatusOK,
|
||||
res: pageRes{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: valueMsgs[0:10],
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "read page with value and lower-than-or-equal comparator",
|
||||
url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=%s", ts.URL, chanID, v+1, readers.LowerThanEqualKey),
|
||||
token: token,
|
||||
status: http.StatusOK,
|
||||
res: pageRes{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: valueMsgs[0:10],
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "read page with value and greater-than comparator",
|
||||
url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=%s", ts.URL, chanID, v-1, readers.GreaterThanKey),
|
||||
token: token,
|
||||
status: http.StatusOK,
|
||||
res: pageRes{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: valueMsgs[0:10],
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "read page with value and greater-than-or-equal comparator",
|
||||
url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=%s", ts.URL, chanID, v-1, readers.GreaterThanEqualKey),
|
||||
token: token,
|
||||
status: http.StatusOK,
|
||||
res: pageRes{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: valueMsgs[0:10],
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "read page with non-float value",
|
||||
url: fmt.Sprintf("%s/channels/%s/messages?v=ab01", ts.URL, chanID),
|
||||
token: token,
|
||||
status: http.StatusBadRequest,
|
||||
},
|
||||
{
|
||||
desc: "read page with value and wrong comparator",
|
||||
url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=wrong", ts.URL, chanID, v-1),
|
||||
token: token,
|
||||
status: http.StatusBadRequest,
|
||||
},
|
||||
{
|
||||
desc: "read page with boolean value",
|
||||
url: fmt.Sprintf("%s/channels/%s/messages?vb=true", ts.URL, chanID),
|
||||
@ -337,7 +393,6 @@ func TestReadAll(t *testing.T) {
|
||||
Messages: dataMsgs[0:10],
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
desc: "read page with non-float from",
|
||||
url: fmt.Sprintf("%s/channels/%s/messages?from=ABCD", ts.URL, chanID),
|
||||
|
@ -15,7 +15,15 @@ type listMessagesReq struct {
|
||||
}
|
||||
|
||||
func (req listMessagesReq) validate() error {
|
||||
if req.pageMeta.Limit < 1 {
|
||||
if req.pageMeta.Limit < 1 || req.pageMeta.Offset < 0 {
|
||||
return errInvalidRequest
|
||||
}
|
||||
if req.pageMeta.Comparator != "" &&
|
||||
req.pageMeta.Comparator != readers.EqualKey &&
|
||||
req.pageMeta.Comparator != readers.LowerThanKey &&
|
||||
req.pageMeta.Comparator != readers.LowerThanEqualKey &&
|
||||
req.pageMeta.Comparator != readers.GreaterThanKey &&
|
||||
req.pageMeta.Comparator != readers.GreaterThanEqualKey {
|
||||
return errInvalidRequest
|
||||
}
|
||||
|
||||
|
@ -110,6 +110,11 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
comparator, err := readStringQuery(r, "comparator")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vs, err := readStringQuery(r, "vs")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -141,6 +146,7 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) {
|
||||
Protocol: protocol,
|
||||
Name: name,
|
||||
Value: v,
|
||||
Comparator: comparator,
|
||||
StringValue: vs,
|
||||
DataValue: vd,
|
||||
From: from,
|
||||
|
@ -36,8 +36,9 @@ func New(session *gocql.Session) readers.MessageRepository {
|
||||
}
|
||||
|
||||
func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) {
|
||||
if rpm.Format == "" {
|
||||
rpm.Format = defTable
|
||||
format := defTable
|
||||
if rpm.Format != "" {
|
||||
format = rpm.Format
|
||||
}
|
||||
|
||||
q, vals := buildQuery(chanID, rpm)
|
||||
@ -46,12 +47,12 @@ func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
|
||||
value, string_value, bool_value, data_value, sum, time,
|
||||
update_time FROM messages WHERE channel = ? %s LIMIT ?
|
||||
ALLOW FILTERING`, q)
|
||||
countCQL := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, rpm.Format, q)
|
||||
countCQL := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, format, q)
|
||||
|
||||
if rpm.Format != defTable {
|
||||
if format != defTable {
|
||||
selectCQL = fmt.Sprintf(`SELECT channel, subtopic, publisher, protocol, created, payload FROM %s WHERE channel = ? %s LIMIT ?
|
||||
ALLOW FILTERING`, rpm.Format, q)
|
||||
countCQL = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, rpm.Format, q)
|
||||
ALLOW FILTERING`, format, q)
|
||||
countCQL = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, format, q)
|
||||
}
|
||||
|
||||
iter := cr.session.Query(selectCQL, vals...).Iter()
|
||||
@ -70,7 +71,7 @@ func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
|
||||
Messages: []readers.Message{},
|
||||
}
|
||||
|
||||
switch rpm.Format {
|
||||
switch format {
|
||||
case defTable:
|
||||
for scanner.Next() {
|
||||
var msg senml.Message
|
||||
@ -128,7 +129,8 @@ func buildQuery(chanID string, rpm readers.PageMetadata) (string, []interface{})
|
||||
condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name)
|
||||
case "v":
|
||||
vals = append(vals, val)
|
||||
condCQL = fmt.Sprintf(`%s AND value = ?`, condCQL)
|
||||
comparator := readers.ParseValueComparator(query)
|
||||
condCQL = fmt.Sprintf(`%s AND value %s ?`, condCQL, comparator)
|
||||
case "vb":
|
||||
vals = append(vals, val)
|
||||
condCQL = fmt.Sprintf(`%s AND bool_value = ?`, condCQL)
|
||||
|
@ -215,7 +215,72 @@ func TestReadSenml(t *testing.T) {
|
||||
Value: v,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(queryMsgs)),
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v,
|
||||
Comparator: readers.EqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and lower-than comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v + 1,
|
||||
Comparator: readers.LowerThanKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and lower-than-or-equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v + 1,
|
||||
Comparator: readers.LowerThanEqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and greater-than comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v - 1,
|
||||
Comparator: readers.GreaterThanKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and greater-than-or-equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v - 1,
|
||||
Comparator: readers.GreaterThanEqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
|
||||
const (
|
||||
countCol = "count_protocol"
|
||||
format = "format"
|
||||
// Measurement for SenML messages
|
||||
defMeasurement = "messages"
|
||||
)
|
||||
@ -41,13 +40,14 @@ func New(client influxdata.Client, database string) readers.MessageRepository {
|
||||
}
|
||||
|
||||
func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) {
|
||||
if rpm.Format == "" {
|
||||
rpm.Format = defMeasurement
|
||||
format := defMeasurement
|
||||
if rpm.Format != "" {
|
||||
format = rpm.Format
|
||||
}
|
||||
|
||||
condition := fmtCondition(chanID, rpm)
|
||||
|
||||
cmd := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY time DESC LIMIT %d OFFSET %d`, rpm.Format, condition, rpm.Limit, rpm.Offset)
|
||||
cmd := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY time DESC LIMIT %d OFFSET %d`, format, condition, rpm.Limit, rpm.Offset)
|
||||
q := influxdata.Query{
|
||||
Command: cmd,
|
||||
Database: repo.database,
|
||||
@ -69,10 +69,10 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
|
||||
|
||||
result := resp.Results[0].Series[0]
|
||||
for _, v := range result.Values {
|
||||
ret = append(ret, parseMessage(rpm.Format, result.Columns, v))
|
||||
ret = append(ret, parseMessage(format, result.Columns, v))
|
||||
}
|
||||
|
||||
total, err := repo.count(rpm.Format, condition)
|
||||
total, err := repo.count(format, condition)
|
||||
if err != nil {
|
||||
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
|
||||
}
|
||||
@ -148,7 +148,8 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) string {
|
||||
"protocol":
|
||||
condition = fmt.Sprintf(`%s AND "%s"='%s'`, condition, name, value)
|
||||
case "v":
|
||||
condition = fmt.Sprintf(`%s AND value = %f`, condition, value)
|
||||
comparator := readers.ParseValueComparator(query)
|
||||
condition = fmt.Sprintf(`%s AND value %s %f`, condition, comparator, value)
|
||||
case "vb":
|
||||
condition = fmt.Sprintf(`%s AND boolValue = %t`, condition, value)
|
||||
case "vs":
|
||||
|
@ -29,7 +29,7 @@ const (
|
||||
|
||||
var (
|
||||
v float64 = 5
|
||||
vs = "value"
|
||||
vs = "a"
|
||||
vb = true
|
||||
vd = "dataValue"
|
||||
sum float64 = 42
|
||||
@ -213,6 +213,71 @@ func TestReadAll(t *testing.T) {
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v,
|
||||
Comparator: readers.EqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and lower-than comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v + 1,
|
||||
Comparator: readers.LowerThanKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and lower-than-or-equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v + 1,
|
||||
Comparator: readers.LowerThanEqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and greater-than comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v - 1,
|
||||
Comparator: readers.GreaterThanKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and greater-than-or-equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v - 1,
|
||||
Comparator: readers.GreaterThanEqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with boolean value": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
|
@ -5,6 +5,19 @@ package readers
|
||||
|
||||
import "errors"
|
||||
|
||||
const (
|
||||
// EqualKey represents the equal comparison operator key.
|
||||
EqualKey = "eq"
|
||||
// LowerThanKey represents the lower-than comparison operator key.
|
||||
LowerThanKey = "lt"
|
||||
// LowerThanEqualKey represents the lower-than-or-equal comparison operator key.
|
||||
LowerThanEqualKey = "le"
|
||||
// GreaterThanKey represents the greater-than-or-equal comparison operator key.
|
||||
GreaterThanKey = "gt"
|
||||
// GreaterThanEqualKey represents the greater-than-or-equal comparison operator key.
|
||||
GreaterThanEqualKey = "ge"
|
||||
)
|
||||
|
||||
// ErrNotFound indicates that requested entity doesn't exist.
|
||||
var ErrNotFound = errors.New("entity not found")
|
||||
|
||||
@ -28,13 +41,14 @@ type MessagesPage struct {
|
||||
|
||||
// PageMetadata represents the parameters used to create database queries
|
||||
type PageMetadata struct {
|
||||
Offset uint64 `json:"offset,omitempty"`
|
||||
Limit uint64 `json:"limit,omitempty"`
|
||||
Offset uint64 `json:"offset"`
|
||||
Limit uint64 `json:"limit"`
|
||||
Subtopic string `json:"subtopic,omitempty"`
|
||||
Publisher string `json:"publisher,omitempty"`
|
||||
Protocol string `json:"protocol,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Value float64 `json:"v,omitempty"`
|
||||
Comparator string `json:"comparator,omitempty"`
|
||||
BoolValue bool `json:"vb,omitempty"`
|
||||
StringValue string `json:"vs,omitempty"`
|
||||
DataValue string `json:"vd,omitempty"`
|
||||
@ -42,3 +56,25 @@ type PageMetadata struct {
|
||||
To float64 `json:"to,omitempty"`
|
||||
Format string `json:"format,omitempty"`
|
||||
}
|
||||
|
||||
// ParseValueComparator convert comparison operator keys into mathematic anotation
|
||||
func ParseValueComparator(query map[string]interface{}) string {
|
||||
comparator := "="
|
||||
val, ok := query["comparator"]
|
||||
if ok {
|
||||
switch val.(string) {
|
||||
case EqualKey:
|
||||
comparator = "="
|
||||
case LowerThanKey:
|
||||
comparator = "<"
|
||||
case LowerThanEqualKey:
|
||||
comparator = "<="
|
||||
case GreaterThanKey:
|
||||
comparator = ">"
|
||||
case GreaterThanEqualKey:
|
||||
comparator = ">="
|
||||
}
|
||||
}
|
||||
|
||||
return comparator
|
||||
}
|
||||
|
@ -67,11 +67,41 @@ func (repo *messageRepositoryMock) ReadAll(chanID string, rpm readers.PageMetada
|
||||
ok = false
|
||||
}
|
||||
case "v":
|
||||
if senml.Value == nil ||
|
||||
(senml.Value != nil &&
|
||||
*senml.Value != rpm.Value) {
|
||||
if senml.Value == nil {
|
||||
ok = false
|
||||
}
|
||||
|
||||
val, okQuery := query["comparator"]
|
||||
if okQuery {
|
||||
switch val.(string) {
|
||||
case readers.LowerThanKey:
|
||||
if senml.Value != nil &&
|
||||
*senml.Value >= rpm.Value {
|
||||
ok = false
|
||||
}
|
||||
case readers.LowerThanEqualKey:
|
||||
if senml.Value != nil &&
|
||||
*senml.Value > rpm.Value {
|
||||
ok = false
|
||||
}
|
||||
case readers.GreaterThanKey:
|
||||
if senml.Value != nil &&
|
||||
*senml.Value <= rpm.Value {
|
||||
ok = false
|
||||
}
|
||||
case readers.GreaterThanEqualKey:
|
||||
if senml.Value != nil &&
|
||||
*senml.Value < rpm.Value {
|
||||
ok = false
|
||||
}
|
||||
case readers.EqualKey:
|
||||
default:
|
||||
if senml.Value != nil &&
|
||||
*senml.Value != rpm.Value {
|
||||
ok = false
|
||||
}
|
||||
}
|
||||
}
|
||||
case "vb":
|
||||
if senml.BoolValue == nil ||
|
||||
(senml.BoolValue != nil &&
|
||||
|
@ -38,11 +38,12 @@ func New(db *mongo.Database) readers.MessageRepository {
|
||||
}
|
||||
|
||||
func (repo mongoRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) {
|
||||
if rpm.Format == "" {
|
||||
rpm.Format = defCollection
|
||||
format := defCollection
|
||||
if rpm.Format != "" {
|
||||
format = rpm.Format
|
||||
}
|
||||
|
||||
col := repo.db.Collection(rpm.Format)
|
||||
col := repo.db.Collection(format)
|
||||
|
||||
sortMap := map[string]interface{}{
|
||||
"time": -1,
|
||||
@ -56,7 +57,7 @@ func (repo mongoRepository) ReadAll(chanID string, rpm readers.PageMetadata) (re
|
||||
defer cursor.Close(context.Background())
|
||||
|
||||
var messages []readers.Message
|
||||
switch rpm.Format {
|
||||
switch format {
|
||||
case defCollection:
|
||||
for cursor.Next(context.Background()) {
|
||||
var m senml.Message
|
||||
@ -117,7 +118,23 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) bson.D {
|
||||
"protocol":
|
||||
filter = append(filter, bson.E{Key: name, Value: value})
|
||||
case "v":
|
||||
filter = append(filter, bson.E{Key: "value", Value: value})
|
||||
bsonFilter := value
|
||||
val, ok := query["comparator"]
|
||||
if ok {
|
||||
switch val.(string) {
|
||||
case readers.EqualKey:
|
||||
bsonFilter = value
|
||||
case readers.LowerThanKey:
|
||||
bsonFilter = bson.M{"$lt": value}
|
||||
case readers.LowerThanEqualKey:
|
||||
bsonFilter = bson.M{"$lte": value}
|
||||
case readers.GreaterThanKey:
|
||||
bsonFilter = bson.M{"$gt": value}
|
||||
case readers.GreaterThanEqualKey:
|
||||
bsonFilter = bson.M{"$gte": value}
|
||||
}
|
||||
}
|
||||
filter = append(filter, bson.E{Key: "value", Value: bsonFilter})
|
||||
case "vb":
|
||||
filter = append(filter, bson.E{Key: "bool_value", Value: value})
|
||||
case "vs":
|
||||
|
@ -217,6 +217,71 @@ func TestReadSenml(t *testing.T) {
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v,
|
||||
Comparator: readers.EqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and lower-than comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v + 1,
|
||||
Comparator: readers.LowerThanKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and lower-than-or-equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v + 1,
|
||||
Comparator: readers.LowerThanEqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and greater-than comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v - 1,
|
||||
Comparator: readers.GreaterThanKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and greater-than-or-equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v - 1,
|
||||
Comparator: readers.GreaterThanEqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with boolean value": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
|
@ -174,6 +174,20 @@ components:
|
||||
schema:
|
||||
type: string
|
||||
required: false
|
||||
Comparator:
|
||||
name: comparator
|
||||
description: Value comparison operator.
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
default: eq
|
||||
enum:
|
||||
- eq
|
||||
- lt
|
||||
- le
|
||||
- gt
|
||||
- ge
|
||||
required: false
|
||||
From:
|
||||
name: from
|
||||
description: SenML message time in nanoseconds (integer part represents seconds).
|
||||
|
@ -38,15 +38,17 @@ func New(db *sqlx.DB) readers.MessageRepository {
|
||||
}
|
||||
|
||||
func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) {
|
||||
order := "created"
|
||||
if rpm.Format == "" {
|
||||
order = "time"
|
||||
rpm.Format = defTable
|
||||
order := "time"
|
||||
format := defTable
|
||||
|
||||
if rpm.Format != "" {
|
||||
order = "created"
|
||||
format = rpm.Format
|
||||
}
|
||||
|
||||
q := fmt.Sprintf(`SELECT * FROM %s
|
||||
WHERE %s ORDER BY %s DESC
|
||||
LIMIT :limit OFFSET :offset;`, rpm.Format, fmtCondition(chanID, rpm), order)
|
||||
LIMIT :limit OFFSET :offset;`, format, fmtCondition(chanID, rpm), order)
|
||||
|
||||
params := map[string]interface{}{
|
||||
"channel": chanID,
|
||||
@ -74,7 +76,7 @@ func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (r
|
||||
PageMetadata: rpm,
|
||||
Messages: []readers.Message{},
|
||||
}
|
||||
switch rpm.Format {
|
||||
switch format {
|
||||
case defTable:
|
||||
for rows.Next() {
|
||||
msg := dbMessage{Message: senml.Message{}}
|
||||
@ -100,7 +102,7 @@ func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (r
|
||||
|
||||
}
|
||||
|
||||
q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, rpm.Format, fmtCondition(chanID, rpm))
|
||||
q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, fmtCondition(chanID, rpm))
|
||||
rows, err = tr.db.NamedQuery(q, params)
|
||||
if err != nil {
|
||||
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
|
||||
@ -137,7 +139,8 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) string {
|
||||
"protocol":
|
||||
condition = fmt.Sprintf(`%s AND %s = :%s`, condition, name, name)
|
||||
case "v":
|
||||
condition = fmt.Sprintf(`%s AND value = :value`, condition)
|
||||
comparator := readers.ParseValueComparator(query)
|
||||
condition = fmt.Sprintf(`%s AND value %s :value`, condition, comparator)
|
||||
case "vb":
|
||||
condition = fmt.Sprintf(`%s AND bool_value = :bool_value`, condition)
|
||||
case "vs":
|
||||
|
@ -206,7 +206,72 @@ func TestReadSenml(t *testing.T) {
|
||||
Value: v,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(queryMsgs)),
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v,
|
||||
Comparator: readers.EqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and lower-than comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v + 1,
|
||||
Comparator: readers.LowerThanKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and lower-than-or-equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v + 1,
|
||||
Comparator: readers.LowerThanEqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and greater-than comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v - 1,
|
||||
Comparator: readers.GreaterThanKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
"read message with value and greater-than-or-equal comparator": {
|
||||
chanID: chanID,
|
||||
pageMeta: readers.PageMetadata{
|
||||
Offset: 0,
|
||||
Limit: limit,
|
||||
Value: v - 1,
|
||||
Comparator: readers.GreaterThanEqualKey,
|
||||
},
|
||||
page: readers.MessagesPage{
|
||||
Total: uint64(len(valueMsgs)),
|
||||
Messages: fromSenml(valueMsgs[0:limit]),
|
||||
},
|
||||
},
|
||||
|
Loading…
x
Reference in New Issue
Block a user