diff --git a/readers/api/endpoint_test.go b/readers/api/endpoint_test.go index 51474a2a..10e95cb7 100644 --- a/readers/api/endpoint_test.go +++ b/readers/api/endpoint_test.go @@ -295,12 +295,68 @@ func TestReadAll(t *testing.T) { Messages: valueMsgs[0:10], }, }, + { + desc: "read page with value and equal comparator", + url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=%s", ts.URL, chanID, v, readers.EqualKey), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(valueMsgs)), + Messages: valueMsgs[0:10], + }, + }, + { + desc: "read page with value and lower-than comparator", + url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=%s", ts.URL, chanID, v+1, readers.LowerThanKey), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(valueMsgs)), + Messages: valueMsgs[0:10], + }, + }, + { + desc: "read page with value and lower-than-or-equal comparator", + url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=%s", ts.URL, chanID, v+1, readers.LowerThanEqualKey), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(valueMsgs)), + Messages: valueMsgs[0:10], + }, + }, + { + desc: "read page with value and greater-than comparator", + url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=%s", ts.URL, chanID, v-1, readers.GreaterThanKey), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(valueMsgs)), + Messages: valueMsgs[0:10], + }, + }, + { + desc: "read page with value and greater-than-or-equal comparator", + url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=%s", ts.URL, chanID, v-1, readers.GreaterThanEqualKey), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(valueMsgs)), + Messages: valueMsgs[0:10], + }, + }, { desc: "read page with non-float value", url: fmt.Sprintf("%s/channels/%s/messages?v=ab01", ts.URL, chanID), token: token, status: http.StatusBadRequest, }, + { + desc: "read page with value and wrong comparator", + url: fmt.Sprintf("%s/channels/%s/messages?v=%f&comparator=wrong", ts.URL, chanID, v-1), + token: token, + status: http.StatusBadRequest, + }, { desc: "read page with boolean value", url: fmt.Sprintf("%s/channels/%s/messages?vb=true", ts.URL, chanID), @@ -337,7 +393,6 @@ func TestReadAll(t *testing.T) { Messages: dataMsgs[0:10], }, }, - { desc: "read page with non-float from", url: fmt.Sprintf("%s/channels/%s/messages?from=ABCD", ts.URL, chanID), diff --git a/readers/api/requests.go b/readers/api/requests.go index b75d9efb..186604ba 100644 --- a/readers/api/requests.go +++ b/readers/api/requests.go @@ -15,7 +15,15 @@ type listMessagesReq struct { } func (req listMessagesReq) validate() error { - if req.pageMeta.Limit < 1 { + if req.pageMeta.Limit < 1 || req.pageMeta.Offset < 0 { + return errInvalidRequest + } + if req.pageMeta.Comparator != "" && + req.pageMeta.Comparator != readers.EqualKey && + req.pageMeta.Comparator != readers.LowerThanKey && + req.pageMeta.Comparator != readers.LowerThanEqualKey && + req.pageMeta.Comparator != readers.GreaterThanKey && + req.pageMeta.Comparator != readers.GreaterThanEqualKey { return errInvalidRequest } diff --git a/readers/api/transport.go b/readers/api/transport.go index b8b1a94e..b64e515e 100644 --- a/readers/api/transport.go +++ b/readers/api/transport.go @@ -110,6 +110,11 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) { return nil, err } + comparator, err := readStringQuery(r, "comparator") + if err != nil { + return nil, err + } + vs, err := readStringQuery(r, "vs") if err != nil { return nil, err @@ -141,6 +146,7 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) { Protocol: protocol, Name: name, Value: v, + Comparator: comparator, StringValue: vs, DataValue: vd, From: from, diff --git a/readers/cassandra/messages.go b/readers/cassandra/messages.go index 813ccf0a..ff25da24 100644 --- a/readers/cassandra/messages.go +++ b/readers/cassandra/messages.go @@ -36,8 +36,9 @@ func New(session *gocql.Session) readers.MessageRepository { } func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) { - if rpm.Format == "" { - rpm.Format = defTable + format := defTable + if rpm.Format != "" { + format = rpm.Format } q, vals := buildQuery(chanID, rpm) @@ -46,12 +47,12 @@ func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( value, string_value, bool_value, data_value, sum, time, update_time FROM messages WHERE channel = ? %s LIMIT ? ALLOW FILTERING`, q) - countCQL := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, rpm.Format, q) + countCQL := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, format, q) - if rpm.Format != defTable { + if format != defTable { selectCQL = fmt.Sprintf(`SELECT channel, subtopic, publisher, protocol, created, payload FROM %s WHERE channel = ? %s LIMIT ? - ALLOW FILTERING`, rpm.Format, q) - countCQL = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, rpm.Format, q) + ALLOW FILTERING`, format, q) + countCQL = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, format, q) } iter := cr.session.Query(selectCQL, vals...).Iter() @@ -70,7 +71,7 @@ func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( Messages: []readers.Message{}, } - switch rpm.Format { + switch format { case defTable: for scanner.Next() { var msg senml.Message @@ -128,7 +129,8 @@ func buildQuery(chanID string, rpm readers.PageMetadata) (string, []interface{}) condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name) case "v": vals = append(vals, val) - condCQL = fmt.Sprintf(`%s AND value = ?`, condCQL) + comparator := readers.ParseValueComparator(query) + condCQL = fmt.Sprintf(`%s AND value %s ?`, condCQL, comparator) case "vb": vals = append(vals, val) condCQL = fmt.Sprintf(`%s AND bool_value = ?`, condCQL) diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index 4660839a..1bc6744f 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -215,7 +215,72 @@ func TestReadSenml(t *testing.T) { Value: v, }, page: readers.MessagesPage{ - Total: uint64(len(queryMsgs)), + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v, + Comparator: readers.EqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and lower-than comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v + 1, + Comparator: readers.LowerThanKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and lower-than-or-equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v + 1, + Comparator: readers.LowerThanEqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and greater-than comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v - 1, + Comparator: readers.GreaterThanKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and greater-than-or-equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v - 1, + Comparator: readers.GreaterThanEqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), Messages: fromSenml(valueMsgs[0:limit]), }, }, diff --git a/readers/influxdb/messages.go b/readers/influxdb/messages.go index e243fc1a..952c96f0 100644 --- a/readers/influxdb/messages.go +++ b/readers/influxdb/messages.go @@ -18,7 +18,6 @@ import ( const ( countCol = "count_protocol" - format = "format" // Measurement for SenML messages defMeasurement = "messages" ) @@ -41,13 +40,14 @@ func New(client influxdata.Client, database string) readers.MessageRepository { } func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) { - if rpm.Format == "" { - rpm.Format = defMeasurement + format := defMeasurement + if rpm.Format != "" { + format = rpm.Format } condition := fmtCondition(chanID, rpm) - cmd := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY time DESC LIMIT %d OFFSET %d`, rpm.Format, condition, rpm.Limit, rpm.Offset) + cmd := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY time DESC LIMIT %d OFFSET %d`, format, condition, rpm.Limit, rpm.Offset) q := influxdata.Query{ Command: cmd, Database: repo.database, @@ -69,10 +69,10 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( result := resp.Results[0].Series[0] for _, v := range result.Values { - ret = append(ret, parseMessage(rpm.Format, result.Columns, v)) + ret = append(ret, parseMessage(format, result.Columns, v)) } - total, err := repo.count(rpm.Format, condition) + total, err := repo.count(format, condition) if err != nil { return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } @@ -148,7 +148,8 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) string { "protocol": condition = fmt.Sprintf(`%s AND "%s"='%s'`, condition, name, value) case "v": - condition = fmt.Sprintf(`%s AND value = %f`, condition, value) + comparator := readers.ParseValueComparator(query) + condition = fmt.Sprintf(`%s AND value %s %f`, condition, comparator, value) case "vb": condition = fmt.Sprintf(`%s AND boolValue = %t`, condition, value) case "vs": diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index 36d63642..96a8082a 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -29,7 +29,7 @@ const ( var ( v float64 = 5 - vs = "value" + vs = "a" vb = true vd = "dataValue" sum float64 = 42 @@ -213,6 +213,71 @@ func TestReadAll(t *testing.T) { Messages: fromSenml(valueMsgs[0:limit]), }, }, + "read message with value and equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v, + Comparator: readers.EqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and lower-than comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v + 1, + Comparator: readers.LowerThanKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and lower-than-or-equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v + 1, + Comparator: readers.LowerThanEqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and greater-than comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v - 1, + Comparator: readers.GreaterThanKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and greater-than-or-equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v - 1, + Comparator: readers.GreaterThanEqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, "read message with boolean value": { chanID: chanID, pageMeta: readers.PageMetadata{ diff --git a/readers/messages.go b/readers/messages.go index 3e184697..c5170b82 100644 --- a/readers/messages.go +++ b/readers/messages.go @@ -5,6 +5,19 @@ package readers import "errors" +const ( + // EqualKey represents the equal comparison operator key. + EqualKey = "eq" + // LowerThanKey represents the lower-than comparison operator key. + LowerThanKey = "lt" + // LowerThanEqualKey represents the lower-than-or-equal comparison operator key. + LowerThanEqualKey = "le" + // GreaterThanKey represents the greater-than-or-equal comparison operator key. + GreaterThanKey = "gt" + // GreaterThanEqualKey represents the greater-than-or-equal comparison operator key. + GreaterThanEqualKey = "ge" +) + // ErrNotFound indicates that requested entity doesn't exist. var ErrNotFound = errors.New("entity not found") @@ -28,13 +41,14 @@ type MessagesPage struct { // PageMetadata represents the parameters used to create database queries type PageMetadata struct { - Offset uint64 `json:"offset,omitempty"` - Limit uint64 `json:"limit,omitempty"` + Offset uint64 `json:"offset"` + Limit uint64 `json:"limit"` Subtopic string `json:"subtopic,omitempty"` Publisher string `json:"publisher,omitempty"` Protocol string `json:"protocol,omitempty"` Name string `json:"name,omitempty"` Value float64 `json:"v,omitempty"` + Comparator string `json:"comparator,omitempty"` BoolValue bool `json:"vb,omitempty"` StringValue string `json:"vs,omitempty"` DataValue string `json:"vd,omitempty"` @@ -42,3 +56,25 @@ type PageMetadata struct { To float64 `json:"to,omitempty"` Format string `json:"format,omitempty"` } + +// ParseValueComparator convert comparison operator keys into mathematic anotation +func ParseValueComparator(query map[string]interface{}) string { + comparator := "=" + val, ok := query["comparator"] + if ok { + switch val.(string) { + case EqualKey: + comparator = "=" + case LowerThanKey: + comparator = "<" + case LowerThanEqualKey: + comparator = "<=" + case GreaterThanKey: + comparator = ">" + case GreaterThanEqualKey: + comparator = ">=" + } + } + + return comparator +} diff --git a/readers/mocks/messages.go b/readers/mocks/messages.go index 2fd0e6aa..0a55c892 100644 --- a/readers/mocks/messages.go +++ b/readers/mocks/messages.go @@ -67,11 +67,41 @@ func (repo *messageRepositoryMock) ReadAll(chanID string, rpm readers.PageMetada ok = false } case "v": - if senml.Value == nil || - (senml.Value != nil && - *senml.Value != rpm.Value) { + if senml.Value == nil { ok = false } + + val, okQuery := query["comparator"] + if okQuery { + switch val.(string) { + case readers.LowerThanKey: + if senml.Value != nil && + *senml.Value >= rpm.Value { + ok = false + } + case readers.LowerThanEqualKey: + if senml.Value != nil && + *senml.Value > rpm.Value { + ok = false + } + case readers.GreaterThanKey: + if senml.Value != nil && + *senml.Value <= rpm.Value { + ok = false + } + case readers.GreaterThanEqualKey: + if senml.Value != nil && + *senml.Value < rpm.Value { + ok = false + } + case readers.EqualKey: + default: + if senml.Value != nil && + *senml.Value != rpm.Value { + ok = false + } + } + } case "vb": if senml.BoolValue == nil || (senml.BoolValue != nil && diff --git a/readers/mongodb/messages.go b/readers/mongodb/messages.go index 035b13e6..c14004b2 100644 --- a/readers/mongodb/messages.go +++ b/readers/mongodb/messages.go @@ -38,11 +38,12 @@ func New(db *mongo.Database) readers.MessageRepository { } func (repo mongoRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) { - if rpm.Format == "" { - rpm.Format = defCollection + format := defCollection + if rpm.Format != "" { + format = rpm.Format } - col := repo.db.Collection(rpm.Format) + col := repo.db.Collection(format) sortMap := map[string]interface{}{ "time": -1, @@ -56,7 +57,7 @@ func (repo mongoRepository) ReadAll(chanID string, rpm readers.PageMetadata) (re defer cursor.Close(context.Background()) var messages []readers.Message - switch rpm.Format { + switch format { case defCollection: for cursor.Next(context.Background()) { var m senml.Message @@ -117,7 +118,23 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) bson.D { "protocol": filter = append(filter, bson.E{Key: name, Value: value}) case "v": - filter = append(filter, bson.E{Key: "value", Value: value}) + bsonFilter := value + val, ok := query["comparator"] + if ok { + switch val.(string) { + case readers.EqualKey: + bsonFilter = value + case readers.LowerThanKey: + bsonFilter = bson.M{"$lt": value} + case readers.LowerThanEqualKey: + bsonFilter = bson.M{"$lte": value} + case readers.GreaterThanKey: + bsonFilter = bson.M{"$gt": value} + case readers.GreaterThanEqualKey: + bsonFilter = bson.M{"$gte": value} + } + } + filter = append(filter, bson.E{Key: "value", Value: bsonFilter}) case "vb": filter = append(filter, bson.E{Key: "bool_value", Value: value}) case "vs": diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index 5878a5b3..9afc755c 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -217,6 +217,71 @@ func TestReadSenml(t *testing.T) { Messages: fromSenml(valueMsgs[0:limit]), }, }, + "read message with value and equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v, + Comparator: readers.EqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and lower-than comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v + 1, + Comparator: readers.LowerThanKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and lower-than-or-equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v + 1, + Comparator: readers.LowerThanEqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and greater-than comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v - 1, + Comparator: readers.GreaterThanKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and greater-than-or-equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v - 1, + Comparator: readers.GreaterThanEqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, "read message with boolean value": { chanID: chanID, pageMeta: readers.PageMetadata{ diff --git a/readers/openapi.yml b/readers/openapi.yml index 43243ad0..8c6cacf1 100644 --- a/readers/openapi.yml +++ b/readers/openapi.yml @@ -174,6 +174,20 @@ components: schema: type: string required: false + Comparator: + name: comparator + description: Value comparison operator. + in: query + schema: + type: string + default: eq + enum: + - eq + - lt + - le + - gt + - ge + required: false From: name: from description: SenML message time in nanoseconds (integer part represents seconds). diff --git a/readers/postgres/messages.go b/readers/postgres/messages.go index 911797dc..1039ecf8 100644 --- a/readers/postgres/messages.go +++ b/readers/postgres/messages.go @@ -38,15 +38,17 @@ func New(db *sqlx.DB) readers.MessageRepository { } func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) { - order := "created" - if rpm.Format == "" { - order = "time" - rpm.Format = defTable + order := "time" + format := defTable + + if rpm.Format != "" { + order = "created" + format = rpm.Format } q := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY %s DESC - LIMIT :limit OFFSET :offset;`, rpm.Format, fmtCondition(chanID, rpm), order) + LIMIT :limit OFFSET :offset;`, format, fmtCondition(chanID, rpm), order) params := map[string]interface{}{ "channel": chanID, @@ -74,7 +76,7 @@ func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (r PageMetadata: rpm, Messages: []readers.Message{}, } - switch rpm.Format { + switch format { case defTable: for rows.Next() { msg := dbMessage{Message: senml.Message{}} @@ -100,7 +102,7 @@ func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (r } - q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, rpm.Format, fmtCondition(chanID, rpm)) + q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, fmtCondition(chanID, rpm)) rows, err = tr.db.NamedQuery(q, params) if err != nil { return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) @@ -137,7 +139,8 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) string { "protocol": condition = fmt.Sprintf(`%s AND %s = :%s`, condition, name, name) case "v": - condition = fmt.Sprintf(`%s AND value = :value`, condition) + comparator := readers.ParseValueComparator(query) + condition = fmt.Sprintf(`%s AND value %s :value`, condition, comparator) case "vb": condition = fmt.Sprintf(`%s AND bool_value = :bool_value`, condition) case "vs": diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index 4c03f610..228931cf 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -206,7 +206,72 @@ func TestReadSenml(t *testing.T) { Value: v, }, page: readers.MessagesPage{ - Total: uint64(len(queryMsgs)), + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v, + Comparator: readers.EqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and lower-than comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v + 1, + Comparator: readers.LowerThanKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and lower-than-or-equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v + 1, + Comparator: readers.LowerThanEqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and greater-than comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v - 1, + Comparator: readers.GreaterThanKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), + Messages: fromSenml(valueMsgs[0:limit]), + }, + }, + "read message with value and greater-than-or-equal comparator": { + chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v - 1, + Comparator: readers.GreaterThanEqualKey, + }, + page: readers.MessagesPage{ + Total: uint64(len(valueMsgs)), Messages: fromSenml(valueMsgs[0:limit]), }, },