1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-24 13:48:49 +08:00

MF-1361 - Add StringValue and DataValue comparison filters (#1714)

* Upgrade InfluxDB from 1.x to 2.x

Signed-off-by: fatih <fatihdurmaz@sabanciuniv.edu>

* go mod tidy vendor

Signed-off-by: aryan <aryangodara03@gmail.com>

* add all changes to single commit

Signed-off-by: aryan <aryangodara03@gmail.com>

* handle unmarshal errors

Signed-off-by: aryan <aryangodara03@gmail.com>

* fix tests

Signed-off-by: aryan <aryangodara03@gmail.com>

* fix primary keys in cassandra writer

Signed-off-by: aryan <aryangodara03@gmail.com>

* update mongodb vs queries

Signed-off-by: aryan <aryangodara03@gmail.com>

* update mongodb queries

Signed-off-by: aryan <aryangodara03@gmail.com>

* update mongodb queries again

Signed-off-by: aryan <aryangodara03@gmail.com>

* update readmes, revert mongo and cassandra

Signed-off-by: aryan <aryangodara03@gmail.com>

* remove unwanted print statement

Signed-off-by: aryan <aryangodara03@gmail.com>

* fix errors in go.sum

Signed-off-by: aryan <aryangodara03@gmail.com>

* fix typos

Signed-off-by: aryan <aryangodara03@gmail.com>

* revert changes to cassandra init

Signed-off-by: aryan <aryangodara03@gmail.com>

* fix errors acc to pr review

Signed-off-by: aryan <aryangodara03@gmail.com>

* fix errors in datavalue testcases

Signed-off-by: aryan <aryangodara03@gmail.com>

---------

Signed-off-by: fatih <fatihdurmaz@sabanciuniv.edu>
Signed-off-by: aryan <aryangodara03@gmail.com>
Co-authored-by: fatih <fatihdurmaz@sabanciuniv.edu>
Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
This commit is contained in:
Aryan Godara 2023-07-28 06:14:15 +05:30 committed by GitHub
parent 2b78902e01
commit 33eb8d8bd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 624 additions and 90 deletions

View File

@ -124,6 +124,11 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) {
return nil, err
}
vb, err := apiutil.ReadBoolQuery(r, boolValueKey, false)
if err != nil && err != errors.ErrNotFoundParam {
return nil, err
}
from, err := apiutil.ReadFloatQuery(r, fromKey, 0)
if err != nil {
return nil, err
@ -150,19 +155,12 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) {
Comparator: comparator,
StringValue: vs,
DataValue: vd,
BoolValue: vb,
From: from,
To: to,
},
}
vb, err := apiutil.ReadBoolQuery(r, boolValueKey, false)
if err != nil && err != errors.ErrNotFoundParam {
return nil, err
}
if err == nil {
req.pageMeta.BoolValue = vb
}
return req, nil
}

View File

@ -83,4 +83,8 @@ docker-compose -f docker/addons/casandra-reader/docker-compose.yml up -d
Service exposes [HTTP API](https://api.mainflux.io/?urls.primaryName=readers-openapi.yml) for fetching messages.
```
Note: Cassandra Reader doesn't support searching substrings from string_value, due to inefficient searching as the current data model is not suitable for this type of queries.
```
[doc]: https://docs.mainflux.io

View File

@ -38,9 +38,9 @@ const (
var (
addr = "localhost"
v float64 = 5
vs = "value"
vs = "stringValue"
vb = true
vd = "base64"
vd = "dataValue"
sum float64 = 42
idProvider = uuid.New()

View File

@ -8,15 +8,15 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|------------------------------|-----------------------------------------------------|----------------|
| MF_INFLUX_READER_LOG_LEVEL | Service log level | info |
| MF_INFLUX_READER_PORT | Service HTTP port | 9005 |
| MF_INFLUXDB_HOST | InfluxDB host | localhost |
| MF_INFLUXDB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUXDB_ADMIN_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUXDB_ADMIN_PASSWORD | Default password of InfluxDB user | mainflux |
| MF_INFLUXDB_DB | InfluxDB database name | mainflux |
| Variable | Description | Default |
|------------------------------|-----------------------------------------------------|-------------------|
| MF_INFLUX_READER_LOG_LEVEL | Service log level | info |
| MF_INFLUX_READER_PORT | Service HTTP port | 9005 |
| MF_INFLUXDB_HOST | InfluxDB host | localhost |
| MF_INFLUXDB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUXDB_ADMIN_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUXDB_ADMIN_PASSWORD | Default password of InfluxDB user | mainflux |
| MF_INFLUXDB_DB | InfluxDB database name | mainflux |
| MF_INFLUXDB_HOST | InfluxDB host name | mainflux-influxdb |
| MF_INFLUXDB_PROTOCOL | InfluxDB protocol | http |
| MF_INFLUXDB_TIMEOUT | InfluxDB client connection readiness timeout | 1s |
@ -102,4 +102,13 @@ docker-compose -f docker/addons/influxdb-reader/docker-compose.yml up --env-file
Service exposes [HTTP API](https://api.mainflux.io/?urls.primaryName=readers-openapi.yml) for fetching messages.
Comparator Usage Guide:
| Comparator | Usage | Example |
|----------------------|-----------------------------------------------------------------------------|------------------------------------|
| eq | Return values that are equal to the query | eq["active"] -> "active" |
| ge | Return values that are substrings of the query | ge["tiv"] -> "active" and "tiv" |
| gt | Return values that are substrings of the query and not equal to the query | gt["tiv"] -> "active" |
| le | Return values that are superstrings of the query | le["active"] -> "tiv" |
| lt | Return values that are superstrings of the query and not equal to the query | lt["active"] -> "active" and "tiv" |
Official docs can be found [here](https://docs.mainflux.io).

View File

@ -55,8 +55,11 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
queryAPI := repo.client.QueryAPI(repo.cfg.Org)
condition, timeRange := fmtCondition(chanID, rpm)
query := fmt.Sprintf(`
import "influxdata/influxdb/v1" from(bucket: "%s")
import "influxdata/influxdb/v1"
import "strings"
from(bucket: "%s")
%s
|> v1.fieldsAsCols()
|> group()
@ -108,7 +111,9 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
func (repo *influxRepository) count(measurement, condition string, timeRange string) (uint64, error) {
cmd := fmt.Sprintf(`
import "influxdata/influxdb/v1" from(bucket: "%s")
import "influxdata/influxdb/v1"
import "strings"
from(bucket: "%s")
%s
|> v1.fieldsAsCols()
|> filter(fn: (r) => r._measurement == "%s")
@ -122,8 +127,8 @@ func (repo *influxRepository) count(measurement, condition string, timeRange str
measurement,
condition)
queryAPI := repo.client.QueryAPI(repo.cfg.Org)
resp, err := queryAPI.Query(context.Background(), cmd)
if err != nil {
return 0, err
}
@ -197,11 +202,29 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) (string, string) {
sb.WriteString(`|> filter(fn: (r) => exists r.boolValue)`)
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.boolValue == %v)`, value))
case "vs":
comparator := readers.ParseValueComparator(query)
sb.WriteString(`|> filter(fn: (r) => exists r.stringValue)`)
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.stringValue == "%s")`, value))
switch comparator {
case "=":
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.stringValue == "%s")`, value))
case "<":
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => strings.containsStr(v: "%s", substr: r.stringValue) == true)`, value))
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.stringValue !="%s")`, value))
case "<=":
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => strings.containsStr(v: "%s", substr: r.stringValue) == true)`, value))
case ">":
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => strings.containsStr(v: r.stringValue, substr: "%s") == true)`, value))
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.stringValue != "%s")`, value))
case ">=":
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => strings.containsStr(v: r.stringValue, substr: "%s") == true)`, value))
}
case "vd":
comparator := readers.ParseValueComparator(query)
if comparator == "=" {
comparator = "=="
}
sb.WriteString(`|> filter(fn: (r) => exists r.dataValue)`)
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.dataValue == "%s")`, value))
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.dataValue%s"%s")`, comparator, value))
}
}

View File

@ -37,7 +37,7 @@ const (
var (
v float64 = 5
vs string = "a"
vs string = "stringValue"
vb bool = true
vd string = "dataValue"
sum float64 = 42
@ -349,6 +349,76 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.EqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and lower-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: "a stringValues b",
Comparator: readers.LowerThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and lower-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.LowerThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and greater-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: "alu",
Comparator: readers.GreaterThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and greater-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.GreaterThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with data value",
chanID: chanID,
@ -362,6 +432,88 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and lower-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd + string(rune(1)),
Comparator: readers.LowerThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and lower-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd + string(rune(1)),
Comparator: readers.LowerThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and greater-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd[:len(vd)-1] + string(rune(1)),
Comparator: readers.GreaterThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and greater-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd[:len(vd)-1] + string(rune(1)),
Comparator: readers.GreaterThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with from",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: uint64(len(messages[0:21])),
From: messages[20].Time,
},
page: readers.MessagesPage{
Total: uint64(len(messages[0:21])),
Messages: fromSenml(messages[0:21]),
},
},
{
desc: "read message with to",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: uint64(len(messages[21:])),
To: messages[20].Time,
},
page: readers.MessagesPage{
Total: uint64(len(messages[21:])),
Messages: fromSenml(messages[21:]),
},
},
{
desc: "failing test case : read message with from",
chanID: chanID,
@ -406,9 +558,9 @@ func TestReadSenml(t *testing.T) {
for _, tc := range cases {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", tc.desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: got incorrect list of senml Messages from ReadAll()", tc.desc))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %d got %d", tc.desc, tc.page.Total, result.Total))
assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected: %v, got: %v\n", tc.desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %d got %d\n", tc.desc, tc.page.Total, result.Total))
}
}

View File

@ -78,4 +78,8 @@ docker-compose -f docker/addons/mongodb-reader/docker-compose.yml up -d
Service exposes [HTTP API](https://api.mainflux.io/?urls.primaryName=readers-openapi.yml) for fetching messages.
```
Note: MongoDB Reader doesn't support searching substrings from string_value, due to inefficient searching as the current data model is not suitable for this type of queries.
```
[doc]: https://docs.mainflux.io

View File

@ -42,7 +42,7 @@ var (
addr string
v float64 = 5
vs = "value"
vs = "stringValue"
vb = true
vd = "dataValue"
sum float64 = 42

View File

@ -72,3 +72,14 @@ $GOBIN/mainflux-postgres-reader
## Usage
Starting service will start consuming normalized messages in SenML format.
Comparator Usage Guide:
| Comparator | Usage | Example |
|----------------------|-----------------------------------------------------------------------------|------------------------------------|
| eq | Return values that are equal to the query | eq["active"] -> "active" |
| ge | Return values that are substrings of the query | ge["tiv"] -> "active" and "tiv" |
| gt | Return values that are substrings of the query and not equal to the query | gt["tiv"] -> "active" |
| le | Return values that are superstrings of the query | le["active"] -> "tiv" |
| lt | Return values that are superstrings of the query and not equal to the query | lt["active"] -> "active" and "tiv" |
Official docs can be found [here](https://docs.mainflux.io).

View File

@ -36,10 +36,11 @@ func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (r
order = "created"
format = rpm.Format
}
cond := fmtCondition(chanID, rpm)
q := fmt.Sprintf(`SELECT * FROM %s
WHERE %s ORDER BY %s DESC
LIMIT :limit OFFSET :offset;`, format, fmtCondition(chanID, rpm), order)
LIMIT :limit OFFSET :offset;`, format, cond, order)
params := map[string]interface{}{
"channel": chanID,
@ -56,7 +57,6 @@ func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (r
"from": rpm.From,
"to": rpm.To,
}
rows, err := tr.db.NamedQuery(q, params)
if err != nil {
if pgErr, ok := err.(*pgconn.PgError); ok {
@ -97,7 +97,7 @@ func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (r
}
q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, fmtCondition(chanID, rpm))
q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, cond)
rows, err = tr.db.NamedQuery(q, params)
if err != nil {
return readers.MessagesPage{}, errors.Wrap(readers.ErrReadMessages, err)
@ -141,9 +141,22 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) string {
case "vb":
condition = fmt.Sprintf(`%s AND bool_value = :bool_value`, condition)
case "vs":
condition = fmt.Sprintf(`%s AND string_value = :string_value`, condition)
comparator := readers.ParseValueComparator(query)
switch comparator {
case "=":
condition = fmt.Sprintf("%s AND string_value = :string_value ", condition)
case ">":
condition = fmt.Sprintf("%s AND string_value LIKE '%%' || :string_value || '%%' AND string_value <> :string_value", condition)
case ">=":
condition = fmt.Sprintf("%s AND string_value LIKE '%%' || :string_value || '%%'", condition)
case "<=":
condition = fmt.Sprintf("%s AND :string_value LIKE '%%' || string_value || '%%'", condition)
case "<":
condition = fmt.Sprintf("%s AND :string_value LIKE '%%' || string_value || '%%' AND string_value <> :string_value", condition)
}
case "vd":
condition = fmt.Sprintf(`%s AND data_value = :data_value`, condition)
comparator := readers.ParseValueComparator(query)
condition = fmt.Sprintf(`%s AND data_value %s :data_value`, condition, comparator)
case "from":
condition = fmt.Sprintf(`%s AND time >= :from`, condition)
case "to":

View File

@ -35,7 +35,7 @@ const (
var (
v float64 = 5
vs = "value"
vs = "stringValue"
vb = true
vd = "dataValue"
sum float64 = 42
@ -108,12 +108,14 @@ func TestReadSenml(t *testing.T) {
// Since messages are not saved in natural order,
// cases that return subset of messages are only
// checking data result set size, but not content.
cases := map[string]struct {
cases := []struct {
desc string
chanID string
pageMeta readers.PageMetadata
page readers.MessagesPage
}{
"read message page for existing channel": {
{
desc: "read message page for existing channel",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -124,7 +126,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(messages),
},
},
"read message page for non-existent channel": {
{
desc: "read message page for non-existent channel",
chanID: wrongID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -134,7 +137,8 @@ func TestReadSenml(t *testing.T) {
Messages: []readers.Message{},
},
},
"read message last page": {
{
desc: "read message last page",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: msgsNum - 20,
@ -145,7 +149,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(messages[msgsNum-20 : msgsNum]),
},
},
"read message with non-existent subtopic": {
{
desc: "read message with non-existent subtopic",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -156,7 +161,8 @@ func TestReadSenml(t *testing.T) {
Messages: []readers.Message{},
},
},
"read message with subtopic": {
{
desc: "read message with subtopic",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -168,7 +174,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs),
},
},
"read message with publisher": {
{
desc: "read message with publisher",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -180,7 +187,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs),
},
},
"read message with wrong format": {
{
desc: "read message with wrong format",
chanID: chanID,
pageMeta: readers.PageMetadata{
Format: "messagess",
@ -193,7 +201,8 @@ func TestReadSenml(t *testing.T) {
Messages: []readers.Message{},
},
},
"read message with protocol": {
{
desc: "read message with protocol",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -205,7 +214,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs),
},
},
"read message with name": {
{
desc: "read message with name",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -217,7 +227,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs[0:limit]),
},
},
"read message with value": {
{
desc: "read message with value",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -229,7 +240,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with value and equal comparator": {
{
desc: "read message with value and equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -242,7 +254,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with value and lower-than comparator": {
{
desc: "read message with value and lower-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -255,7 +268,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with value and lower-than-or-equal comparator": {
{
desc: "read message with value and lower-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -268,7 +282,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with value and greater-than comparator": {
{
desc: "read message with value and greater-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -281,7 +296,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with value and greater-than-or-equal comparator": {
{
desc: "read message with value and greater-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -294,7 +310,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with boolean value": {
{
desc: "read message with boolean value",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -306,7 +323,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(boolMsgs[0:limit]),
},
},
"read message with string value": {
{
desc: "read message with string value",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -318,7 +336,78 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(stringMsgs[0:limit]),
},
},
"read message with data value": {
{
desc: "read message with string value and equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.EqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and lower-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: "a stringValues b",
Comparator: readers.LowerThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and lower-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.LowerThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and greater-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: "alu",
Comparator: readers.GreaterThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and greater-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.GreaterThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with data value",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -330,7 +419,64 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(dataMsgs[0:limit]),
},
},
"read message with from": {
{
desc: "read message with data value and lower-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd + string(rune(1)),
Comparator: readers.LowerThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and lower-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd + string(rune(1)),
Comparator: readers.LowerThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and greater-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd[:len(vd)-1],
Comparator: readers.GreaterThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and greater-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd[:len(vd)-1],
Comparator: readers.GreaterThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with from",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -342,7 +488,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(messages[0:21]),
},
},
"read message with to": {
{
desc: "read message with to",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -354,7 +501,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(messages[21:]),
},
},
"read message with from/to": {
{
desc: "read message with from/to",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -369,11 +517,11 @@ func TestReadSenml(t *testing.T) {
},
}
for desc, tc := range cases {
for _, tc := range cases {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: got incorrect list of senml Messages from ReadAll()", desc))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", tc.desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: got incorrect list of senml Messages from ReadAll()", tc.desc))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.page.Total, result.Total))
}
}

View File

@ -69,3 +69,14 @@ $GOBIN/mainflux-timescale-reader
## Usage
Starting service will start consuming normalized messages in SenML format.
Comparator Usage Guide:
| Comparator | Usage | Example |
|----------------------|-----------------------------------------------------------------------------|------------------------------------|
| eq | Return values that are equal to the query | eq["active"] -> "active" |
| ge | Return values that are substrings of the query | ge["tiv"] -> "active" and "tiv" |
| gt | Return values that are substrings of the query and not equal to the query | gt["tiv"] -> "active" |
| le | Return values that are superstrings of the query | le["active"] -> "tiv" |
| lt | Return values that are superstrings of the query and not equal to the query | lt["active"] -> "active" and "tiv" |
Official docs can be found [here](https://docs.mainflux.io).

View File

@ -139,9 +139,22 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) string {
case "vb":
condition = fmt.Sprintf(`%s AND bool_value = :bool_value`, condition)
case "vs":
condition = fmt.Sprintf(`%s AND string_value = :string_value`, condition)
comparator := readers.ParseValueComparator(query)
switch comparator {
case "=":
condition = fmt.Sprintf("%s AND string_value = :string_value ", condition)
case ">":
condition = fmt.Sprintf("%s AND string_value LIKE '%%' || :string_value || '%%' AND string_value <> :string_value", condition)
case ">=":
condition = fmt.Sprintf("%s AND string_value LIKE '%%' || :string_value || '%%'", condition)
case "<=":
condition = fmt.Sprintf("%s AND :string_value LIKE '%%' || string_value || '%%'", condition)
case "<":
condition = fmt.Sprintf("%s AND :string_value LIKE '%%' || string_value || '%%' AND string_value <> :string_value", condition)
}
case "vd":
condition = fmt.Sprintf(`%s AND data_value = :data_value`, condition)
comparator := readers.ParseValueComparator(query)
condition = fmt.Sprintf(`%s AND data_value %s :data_value`, condition, comparator)
case "from":
condition = fmt.Sprintf(`%s AND time >= :from`, condition)
case "to":

View File

@ -34,7 +34,7 @@ const (
var (
v float64 = 5
vs = "value"
vs = "stringValue"
vb = true
vd = "dataValue"
sum float64 = 42
@ -107,12 +107,14 @@ func TestReadSenml(t *testing.T) {
// Since messages are not saved in natural order,
// cases that return subset of messages are only
// checking data result set size, but not content.
cases := map[string]struct {
cases := []struct {
desc string
chanID string
pageMeta readers.PageMetadata
page readers.MessagesPage
}{
"read message page for existing channel": {
{
desc: "read message page for existing channel",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -123,7 +125,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(messages),
},
},
"read message page for non-existent channel": {
{
desc: "read message page for non-existent channel",
chanID: wrongID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -133,7 +136,8 @@ func TestReadSenml(t *testing.T) {
Messages: []readers.Message{},
},
},
"read message last page": {
{
desc: "read message last page",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: msgsNum - 20,
@ -144,7 +148,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(messages[msgsNum-20 : msgsNum]),
},
},
"read message with non-existent subtopic": {
{
desc: "read message with non-existent subtopic",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -155,7 +160,8 @@ func TestReadSenml(t *testing.T) {
Messages: []readers.Message{},
},
},
"read message with subtopic": {
{
desc: "read message with subtopic",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -167,7 +173,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs),
},
},
"read message with publisher": {
{
desc: "read message with publisher",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -179,7 +186,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs),
},
},
"read message with wrong format": {
{
desc: "read message with wrong format",
chanID: chanID,
pageMeta: readers.PageMetadata{
Format: "messagess",
@ -192,7 +200,8 @@ func TestReadSenml(t *testing.T) {
Messages: []readers.Message{},
},
},
"read message with protocol": {
{
desc: "read message with protocol",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -204,7 +213,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs),
},
},
"read message with name": {
{
desc: "read message with name",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -216,7 +226,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(queryMsgs[0:limit]),
},
},
"read message with value": {
{
desc: "read message with value",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -228,7 +239,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with value and equal comparator": {
{
desc: "read message with value and equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -241,7 +253,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with value and lower-than comparator": {
{
desc: "read message with value and lower-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -254,7 +267,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with value and lower-than-or-equal comparator": {
{
desc: "read message with value and lower-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -267,7 +281,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with value and greater-than comparator": {
{
desc: "read message with value and greater-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -280,7 +295,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with value and greater-than-or-equal comparator": {
{
desc: "read message with value and greater-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -293,7 +309,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(valueMsgs[0:limit]),
},
},
"read message with boolean value": {
{
desc: "read message with boolean value",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -305,7 +322,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(boolMsgs[0:limit]),
},
},
"read message with string value": {
{
desc: "read message with string value",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -317,7 +335,78 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(stringMsgs[0:limit]),
},
},
"read message with data value": {
{
desc: "read message with string value and equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.EqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and lower-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: "a stringValues b",
Comparator: readers.LowerThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and lower-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.LowerThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and greater-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: "alu",
Comparator: readers.GreaterThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and greater-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.GreaterThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with data value",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -329,7 +418,64 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(dataMsgs[0:limit]),
},
},
"read message with from": {
{
desc: "read message with data value and lower-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd + string(rune(1)),
Comparator: readers.LowerThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and lower-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd + string(rune(1)),
Comparator: readers.LowerThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and greater-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd[:len(vd)-1] + string(rune(1)),
Comparator: readers.GreaterThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and greater-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd[:len(vd)-1] + string(rune(1)),
Comparator: readers.GreaterThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with from",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -341,7 +487,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(messages[0:21]),
},
},
"read message with to": {
{
desc: "read message with to",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -353,7 +500,8 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(messages[21:]),
},
},
"read message with from/to": {
{
desc: "read message with from/to",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
@ -368,11 +516,11 @@ func TestReadSenml(t *testing.T) {
},
}
for desc, tc := range cases {
for _, tc := range cases {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: got incorrect list of senml Messages from ReadAll()", desc))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", tc.desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.page.Total, result.Total))
}
}