From 4619576e9468cbb98ea17e636b522a941c2b35bd Mon Sep 17 00:00:00 2001 From: Manuel Imperiale Date: Tue, 26 Jan 2021 12:23:15 +0100 Subject: [PATCH] MF-1061 - Add PageMetadata to readers (#1333) * MF-1061 - Add PageMetadata to readers Signed-off-by: Manuel Imperiale * Fix merge conflicts Signed-off-by: Manuel Imperiale * Fix typo Signed-off-by: Manuel Imperiale * Mv Total to MessagesPage Signed-off-by: Manuel Imperiale * Fix review Signed-off-by: Manuel Imperiale * Fix readers mock and add filters tests Signed-off-by: Manuel Imperiale * Add Total check and allow combinations of query parameters Signed-off-by: Manuel Imperiale * Use slices length as Total Signed-off-by: Manuel Imperiale * Simplify readers mock Signed-off-by: Manuel Imperiale * Add empty lines Signed-off-by: Manuel Imperiale --- readers/api/endpoint.go | 9 +- readers/api/endpoint_test.go | 319 +++++++++++++++++++++++------ readers/api/logging.go | 6 +- readers/api/metrics.go | 4 +- readers/api/requests.go | 10 +- readers/api/responses.go | 3 +- readers/api/transport.go | 156 ++++++++++++-- readers/cassandra/messages.go | 64 +++--- readers/cassandra/messages_test.go | 138 +++++++------ readers/influxdb/messages.go | 57 +++--- readers/influxdb/messages_test.go | 138 +++++++------ readers/messages.go | 22 +- readers/mocks/messages.go | 107 ++++++++-- readers/mongodb/messages.go | 67 +++--- readers/mongodb/messages_test.go | 146 +++++++------ readers/postgres/messages.go | 55 ++--- readers/postgres/messages_test.go | 126 +++++++----- 17 files changed, 950 insertions(+), 477 deletions(-) diff --git a/readers/api/endpoint.go b/readers/api/endpoint.go index 112ffa58..865dddde 100644 --- a/readers/api/endpoint.go +++ b/readers/api/endpoint.go @@ -18,16 +18,15 @@ func listMessagesEndpoint(svc readers.MessageRepository) endpoint.Endpoint { return nil, err } - page, err := svc.ReadAll(req.chanID, req.offset, req.limit, req.query) + page, err := svc.ReadAll(req.chanID, req.pageMeta) if err != nil { return nil, err } return pageRes{ - Total: page.Total, - Offset: page.Offset, - Limit: page.Limit, - Messages: page.Messages, + PageMetadata: page.PageMetadata, + Total: page.Total, + Messages: page.Messages, }, nil } } diff --git a/readers/api/endpoint_test.go b/readers/api/endpoint_test.go index 82b51272..51474a2a 100644 --- a/readers/api/endpoint_test.go +++ b/readers/api/endpoint_test.go @@ -4,26 +4,34 @@ package api_test import ( + "encoding/json" "fmt" + "io" "net/http" "net/http/httptest" "testing" + "time" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/pkg/transformers/senml" + "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/readers/api" "github.com/mainflux/mainflux/readers/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( svcName = "test-service" token = "1" invalid = "invalid" - numOfMessages = 42 - chanID = "1" + numOfMessages = 100 valueFields = 5 + subtopic = "topic" + mqttProt = "mqtt" + httpProt = "http" + msgName = "temperature" ) var ( @@ -32,40 +40,10 @@ var ( vb = true vd = "dataValue" sum float64 = 42 + + idProvider = uuid.New() ) -func newService() readers.MessageRepository { - var messages []readers.Message - for i := 0; i < numOfMessages; i++ { - msg := senml.Message{ - Channel: chanID, - Publisher: "1", - Protocol: "mqtt", - } - // Mix possible values as well as value sum. - count := i % valueFields - - switch count { - case 0: - msg.Value = &v - case 1: - msg.BoolValue = &vb - case 2: - msg.StringValue = &vs - case 3: - msg.DataValue = &vd - case 4: - msg.Sum = &sum - } - - messages = append(messages, msg) - } - - return mocks.NewMessageRepository(map[string][]readers.Message{ - chanID: messages, - }) -} - func newServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient) *httptest.Server { mux := api.MakeHandler(repo, tc, svcName) return httptest.NewServer(mux) @@ -76,6 +54,7 @@ type testRequest struct { method string url string token string + body io.Reader } func (tr testRequest) make() (*http.Response, error) { @@ -91,114 +70,300 @@ func (tr testRequest) make() (*http.Response, error) { } func TestReadAll(t *testing.T) { - svc := newService() - tc := mocks.NewThingsService() - ts := newServer(svc, tc) + chanID, err := idProvider.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + pubID, err := idProvider.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + pubID2, err := idProvider.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + now := time.Now().Unix() + + var messages []senml.Message + var queryMsgs []senml.Message + var valueMsgs []senml.Message + var boolMsgs []senml.Message + var stringMsgs []senml.Message + var dataMsgs []senml.Message + + for i := 0; i < numOfMessages; i++ { + // Mix possible values as well as value sum. + msg := senml.Message{ + Channel: chanID, + Publisher: pubID, + Protocol: mqttProt, + Time: float64(now - int64(i)), + Name: "name", + } + + count := i % valueFields + switch count { + case 0: + msg.Value = &v + valueMsgs = append(valueMsgs, msg) + case 1: + msg.BoolValue = &vb + boolMsgs = append(boolMsgs, msg) + case 2: + msg.StringValue = &vs + stringMsgs = append(stringMsgs, msg) + case 3: + msg.DataValue = &vd + dataMsgs = append(dataMsgs, msg) + case 4: + msg.Sum = &sum + msg.Subtopic = subtopic + msg.Protocol = httpProt + msg.Publisher = pubID2 + msg.Name = msgName + queryMsgs = append(queryMsgs, msg) + } + + messages = append(messages, msg) + } + + svc := mocks.NewThingsService() + repo := mocks.NewMessageRepository(chanID, fromSenml(messages)) + ts := newServer(repo, svc) defer ts.Close() - cases := map[string]struct { + cases := []struct { + desc string + req string url string token string status int + res pageRes }{ - "read page with valid offset and limit": { + { + desc: "read page with valid offset and limit", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, chanID), token: token, status: http.StatusOK, + res: pageRes{ + Total: uint64(len(messages)), + Messages: messages[0:10], + }, }, - "read page with negative offset": { + { + desc: "read page with negative offset", url: fmt.Sprintf("%s/channels/%s/messages?offset=-1&limit=10", ts.URL, chanID), token: token, status: http.StatusBadRequest, }, - "read page with negative limit": { + { + desc: "read page with negative limit", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=-10", ts.URL, chanID), token: token, status: http.StatusBadRequest, }, - "read page with zero limit": { + { + desc: "read page with zero limit", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=0", ts.URL, chanID), token: token, status: http.StatusBadRequest, }, - "read page with non-integer offset": { + { + desc: "read page with non-integer offset", url: fmt.Sprintf("%s/channels/%s/messages?offset=abc&limit=10", ts.URL, chanID), token: token, status: http.StatusBadRequest, }, - "read page with non-integer limit": { + { + desc: "read page with non-integer limit", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=abc", ts.URL, chanID), token: token, status: http.StatusBadRequest, }, - "read page with invalid channel id": { + { + desc: "read page with invalid channel id", url: fmt.Sprintf("%s/channels//messages?offset=0&limit=10", ts.URL), token: token, status: http.StatusBadRequest, }, - "read page with invalid token": { + { + desc: "read page with invalid token", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, chanID), token: invalid, status: http.StatusForbidden, }, - "read page with multiple offset": { + { + desc: "read page with multiple offset", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&offset=1&limit=10", ts.URL, chanID), token: token, status: http.StatusBadRequest, }, - "read page with multiple limit": { + { + desc: "read page with multiple limit", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=20&limit=10", ts.URL, chanID), token: token, status: http.StatusBadRequest, }, - "read page with empty token": { + { + desc: "read page with empty token", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, chanID), token: "", status: http.StatusForbidden, }, - "read page with default offset": { + { + desc: "read page with default offset", url: fmt.Sprintf("%s/channels/%s/messages?limit=10", ts.URL, chanID), token: token, status: http.StatusOK, + res: pageRes{ + Total: uint64(len(messages)), + Messages: messages[0:10], + }, }, - "read page with default limit": { + { + desc: "read page with default limit", url: fmt.Sprintf("%s/channels/%s/messages?offset=0", ts.URL, chanID), token: token, status: http.StatusOK, + res: pageRes{ + Total: uint64(len(messages)), + Messages: messages[0:10], + }, }, - "read page with value": { + { + desc: "read page with senml fornat", + url: fmt.Sprintf("%s/channels/%s/messages?format=messages", ts.URL, chanID), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(messages)), + Messages: messages[0:10], + }, + }, + { + desc: "read page with subtopic", + url: fmt.Sprintf("%s/channels/%s/messages?subtopic=%s&protocol=%s", ts.URL, chanID, subtopic, httpProt), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(queryMsgs)), + Messages: queryMsgs[0:10], + }, + }, + { + desc: "read page with subtopic and protocol", + url: fmt.Sprintf("%s/channels/%s/messages?subtopic=%s&protocol=%s", ts.URL, chanID, subtopic, httpProt), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(queryMsgs)), + Messages: queryMsgs[0:10], + }, + }, + { + desc: "read page with publisher", + url: fmt.Sprintf("%s/channels/%s/messages?publisher=%s", ts.URL, chanID, pubID2), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(queryMsgs)), + Messages: queryMsgs[0:10], + }, + }, + { + desc: "read page with protocol", + url: fmt.Sprintf("%s/channels/%s/messages?protocol=http", ts.URL, chanID), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(queryMsgs)), + Messages: queryMsgs[0:10], + }, + }, + { + desc: "read page with name", + url: fmt.Sprintf("%s/channels/%s/messages?name=%s", ts.URL, chanID, msgName), + token: token, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(queryMsgs)), + Messages: queryMsgs[0:10], + }, + }, + { + desc: "read page with value", url: fmt.Sprintf("%s/channels/%s/messages?v=%f", ts.URL, chanID, v), token: token, status: http.StatusOK, + res: pageRes{ + Total: uint64(len(valueMsgs)), + Messages: valueMsgs[0:10], + }, }, - "read page with boolean value": { - url: fmt.Sprintf("%s/channels/%s/messages?vb=%t", ts.URL, chanID, vb), + { + desc: "read page with non-float value", + url: fmt.Sprintf("%s/channels/%s/messages?v=ab01", ts.URL, chanID), + token: token, + status: http.StatusBadRequest, + }, + { + desc: "read page with boolean value", + url: fmt.Sprintf("%s/channels/%s/messages?vb=true", ts.URL, chanID), token: token, status: http.StatusOK, + res: pageRes{ + Total: uint64(len(boolMsgs)), + Messages: boolMsgs[0:10], + }, }, - "read page with string value": { - url: fmt.Sprintf("%s/channels/%s/messages?vs=%s", ts.URL, chanID, vd), + { + desc: "read page with non-boolean value", + url: fmt.Sprintf("%s/channels/%s/messages?vb=yes", ts.URL, chanID), + token: token, + status: http.StatusBadRequest, + }, + { + desc: "read page with string value", + url: fmt.Sprintf("%s/channels/%s/messages?vs=%s", ts.URL, chanID, vs), token: token, status: http.StatusOK, + res: pageRes{ + Total: uint64(len(stringMsgs)), + Messages: stringMsgs[0:10], + }, }, - "read page with data value": { + { + desc: "read page with data value", url: fmt.Sprintf("%s/channels/%s/messages?vd=%s", ts.URL, chanID, vd), token: token, status: http.StatusOK, + res: pageRes{ + Total: uint64(len(dataMsgs)), + Messages: dataMsgs[0:10], + }, }, - "read page with from": { - url: fmt.Sprintf("%s/channels/%s/messages?from=1608651539.673909", ts.URL, chanID), - token: token, - status: http.StatusOK, - }, - "read page with to": { - url: fmt.Sprintf("%s/channels/%s/messages?to=1508651539.673909", ts.URL, chanID), + + { + desc: "read page with non-float from", + url: fmt.Sprintf("%s/channels/%s/messages?from=ABCD", ts.URL, chanID), + token: token, + status: http.StatusBadRequest, + }, + + { + desc: "read page with non-float to", + url: fmt.Sprintf("%s/channels/%s/messages?to=ABCD", ts.URL, chanID), + token: token, + status: http.StatusBadRequest, + }, + { + desc: "read page with from/to", + url: fmt.Sprintf("%s/channels/%s/messages?from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time), token: token, status: http.StatusOK, + res: pageRes{ + Total: uint64(len(messages[5:20])), + Messages: messages[5:15], + }, }, } - for desc, tc := range cases { + for _, tc := range cases { req := testRequest{ client: ts.Client(), method: http.MethodGet, @@ -206,7 +371,27 @@ func TestReadAll(t *testing.T) { token: tc.token, } res, err := req.make() - assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err)) - assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected %d got %d", desc, tc.status, res.StatusCode)) + assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) + + var page pageRes + json.NewDecoder(res.Body).Decode(&page) + assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) + assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected %d got %d", tc.desc, tc.status, res.StatusCode)) + assert.Equal(t, tc.res.Total, page.Total, fmt.Sprintf("%s: expected %d got %d", tc.desc, tc.res.Total, page.Total)) + assert.ElementsMatch(t, tc.res.Messages, page.Messages, fmt.Sprintf("%s: expected body %v got %v", tc.desc, tc.res.Messages, page.Messages)) } } + +type pageRes struct { + readers.PageMetadata + Total uint64 `json:"total"` + Messages []senml.Message `json:"messages,omitempty"` +} + +func fromSenml(in []senml.Message) []readers.Message { + var ret []readers.Message + for _, m := range in { + ret = append(ret, m) + } + return ret +} diff --git a/readers/api/logging.go b/readers/api/logging.go index 96dfb722..43154686 100644 --- a/readers/api/logging.go +++ b/readers/api/logging.go @@ -28,9 +28,9 @@ func LoggingMiddleware(svc readers.MessageRepository, logger logger.Logger) read } } -func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) (page readers.MessagesPage, err error) { +func (lm *loggingMiddleware) ReadAll(chanID string, rpm readers.PageMetadata) (page readers.MessagesPage, err error) { defer func(begin time.Time) { - message := fmt.Sprintf("Method read_all for channel %s with offset %d and limit %d took %s to complete", chanID, offset, limit, time.Since(begin)) + message := fmt.Sprintf("Method read_all for channel %s with query %v took %s to complete", chanID, rpm, time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -38,5 +38,5 @@ func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.ReadAll(chanID, offset, limit, query) + return lm.svc.ReadAll(chanID, rpm) } diff --git a/readers/api/metrics.go b/readers/api/metrics.go index 43a6d6f5..1ce4684a 100644 --- a/readers/api/metrics.go +++ b/readers/api/metrics.go @@ -29,11 +29,11 @@ func MetricsMiddleware(svc readers.MessageRepository, counter metrics.Counter, l } } -func (mm *metricsMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { +func (mm *metricsMiddleware) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) { defer func(begin time.Time) { mm.counter.With("method", "read_all").Add(1) mm.latency.With("method", "read_all").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.ReadAll(chanID, offset, limit, query) + return mm.svc.ReadAll(chanID, rpm) } diff --git a/readers/api/requests.go b/readers/api/requests.go index 5e058136..b75d9efb 100644 --- a/readers/api/requests.go +++ b/readers/api/requests.go @@ -3,19 +3,19 @@ package api +import "github.com/mainflux/mainflux/readers" + type apiReq interface { validate() error } type listMessagesReq struct { - chanID string - offset uint64 - limit uint64 - query map[string]string + chanID string + pageMeta readers.PageMetadata } func (req listMessagesReq) validate() error { - if req.limit < 1 { + if req.pageMeta.Limit < 1 { return errInvalidRequest } diff --git a/readers/api/responses.go b/readers/api/responses.go index 64da8819..a33a5bcb 100644 --- a/readers/api/responses.go +++ b/readers/api/responses.go @@ -13,9 +13,8 @@ import ( var _ mainflux.Response = (*pageRes)(nil) type pageRes struct { + readers.PageMetadata Total uint64 `json:"total"` - Offset uint64 `json:"offset"` - Limit uint64 `json:"limit"` Messages []readers.Message `json:"messages,omitempty"` } diff --git a/readers/api/transport.go b/readers/api/transport.go index 0caa5a8a..b8b1a94e 100644 --- a/readers/api/transport.go +++ b/readers/api/transport.go @@ -31,8 +31,8 @@ const ( var ( errInvalidRequest = errors.New("received invalid request") errUnauthorizedAccess = errors.New("missing or invalid credentials provided") + errNotInQuery = errors.New("parameter missing in the query") auth mainflux.ThingsServiceClient - queryFields = []string{"format", "subtopic", "publisher", "protocol", "name", "v", "vs", "vb", "vd", "from", "to"} ) // MakeHandler returns a HTTP handler for API endpoints. @@ -67,31 +67,94 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) { return nil, err } - offset, err := getQuery(r, "offset", defOffset) + offset, err := readUintQuery(r, "offset", defOffset) if err != nil { return nil, err } - limit, err := getQuery(r, "limit", defLimit) + limit, err := readUintQuery(r, "limit", defLimit) if err != nil { return nil, err } - query := map[string]string{} - for _, name := range queryFields { - if value := bone.GetQuery(r, name); len(value) == 1 { - query[name] = value[0] - } + format, err := readStringQuery(r, "format") + if err != nil { + return nil, err } - if query[format] == "" { - query[format] = defFormat + if format != "" { + format = defFormat + } + + subtopic, err := readStringQuery(r, "subtopic") + if err != nil { + return nil, err + } + + publisher, err := readStringQuery(r, "publisher") + if err != nil { + return nil, err + } + + protocol, err := readStringQuery(r, "protocol") + if err != nil { + return nil, err + } + + name, err := readStringQuery(r, "name") + if err != nil { + return nil, err + } + + v, err := readFloatQuery(r, "v") + if err != nil { + return nil, err + } + + vs, err := readStringQuery(r, "vs") + if err != nil { + return nil, err + } + + vd, err := readStringQuery(r, "vd") + if err != nil { + return nil, err + } + + from, err := readFloatQuery(r, "from") + if err != nil { + return nil, err + } + + to, err := readFloatQuery(r, "to") + if err != nil { + return nil, err } req := listMessagesReq{ chanID: chanID, - offset: offset, - limit: limit, - query: query, + pageMeta: readers.PageMetadata{ + Offset: offset, + Limit: limit, + Format: format, + Subtopic: subtopic, + Publisher: publisher, + Protocol: protocol, + Name: name, + Value: v, + StringValue: vs, + DataValue: vd, + From: from, + To: to, + }, + } + + vb, err := readBoolQuery(r, "vb") + // Check if vb is in the query + if err != nil && err != errNotInQuery { + return nil, err + } + if err == nil { + req.pageMeta.BoolValue = vb } return req, nil @@ -155,20 +218,71 @@ func authorize(r *http.Request, chanID string) error { return nil } -func getQuery(req *http.Request, name string, fallback uint64) (uint64, error) { - vals := bone.GetQuery(req, name) - if len(vals) == 0 { - return fallback, nil - } - +func readUintQuery(r *http.Request, key string, def uint64) (uint64, error) { + vals := bone.GetQuery(r, key) if len(vals) > 1 { return 0, errInvalidRequest } - val, err := strconv.ParseUint(vals[0], 10, 64) + if len(vals) == 0 { + return def, nil + } + + strval := vals[0] + val, err := strconv.ParseUint(strval, 10, 64) if err != nil { return 0, errInvalidRequest } - return uint64(val), nil + return val, nil +} + +func readFloatQuery(r *http.Request, key string) (float64, error) { + vals := bone.GetQuery(r, key) + if len(vals) > 1 { + return 0, errInvalidRequest + } + + if len(vals) == 0 { + return 0, nil + } + + fval := vals[0] + val, err := strconv.ParseFloat(fval, 64) + if err != nil { + return 0, errInvalidRequest + } + + return val, nil +} + +func readStringQuery(r *http.Request, key string) (string, error) { + vals := bone.GetQuery(r, key) + if len(vals) > 1 { + return "", errInvalidRequest + } + + if len(vals) == 0 { + return "", nil + } + + return vals[0], nil +} + +func readBoolQuery(r *http.Request, key string) (bool, error) { + vals := bone.GetQuery(r, key) + if len(vals) > 1 { + return false, errInvalidRequest + } + + if len(vals) == 0 { + return false, errNotInQuery + } + + b, err := strconv.ParseBool(vals[0]) + if err != nil { + return false, errInvalidRequest + } + + return b, nil } diff --git a/readers/cassandra/messages.go b/readers/cassandra/messages.go index 3d5de261..813ccf0a 100644 --- a/readers/cassandra/messages.go +++ b/readers/cassandra/messages.go @@ -6,7 +6,6 @@ package cassandra import ( "encoding/json" "fmt" - "strconv" "github.com/gocql/gocql" "github.com/mainflux/mainflux/pkg/errors" @@ -36,26 +35,23 @@ func New(session *gocql.Session) readers.MessageRepository { } } -func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { - table, ok := query[format] - if !ok { - table = defTable +func (cr cassandraRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) { + if rpm.Format == "" { + rpm.Format = defTable } - // Remove format filter and format the rest properly. - delete(query, format) - q, vals := buildQuery(chanID, offset, limit, query) + q, vals := buildQuery(chanID, rpm) selectCQL := fmt.Sprintf(`SELECT channel, subtopic, publisher, protocol, name, unit, value, string_value, bool_value, data_value, sum, time, update_time FROM messages WHERE channel = ? %s LIMIT ? ALLOW FILTERING`, q) - countCQL := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, defTable, q) + countCQL := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, rpm.Format, q) - if table != defTable { + if rpm.Format != defTable { selectCQL = fmt.Sprintf(`SELECT channel, subtopic, publisher, protocol, created, payload FROM %s WHERE channel = ? %s LIMIT ? - ALLOW FILTERING`, table, q) - countCQL = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, table, q) + ALLOW FILTERING`, rpm.Format, q) + countCQL = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = ? %s ALLOW FILTERING`, rpm.Format, q) } iter := cr.session.Query(selectCQL, vals...).Iter() @@ -63,19 +59,18 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query scanner := iter.Scanner() // skip first OFFSET rows - for i := uint64(0); i < offset; i++ { + for i := uint64(0); i < rpm.Offset; i++ { if !scanner.Next() { break } } page := readers.MessagesPage{ - Offset: offset, - Limit: limit, - Messages: []readers.Message{}, + PageMetadata: rpm, + Messages: []readers.Message{}, } - switch table { + switch rpm.Format { case defTable: for scanner.Next() { var msg senml.Message @@ -110,10 +105,17 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query return page, nil } -func buildQuery(chanID string, offset, limit uint64, query map[string]string) (string, []interface{}) { +func buildQuery(chanID string, rpm readers.PageMetadata) (string, []interface{}) { var condCQL string vals := []interface{}{chanID} + var query map[string]interface{} + meta, err := json.Marshal(rpm) + if err != nil { + return condCQL, vals + } + json.Unmarshal(meta, &query) + for name, val := range query { switch name { case @@ -125,18 +127,10 @@ func buildQuery(chanID string, offset, limit uint64, query map[string]string) (s vals = append(vals, val) condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name) case "v": - fVal, err := strconv.ParseFloat(val, 64) - if err != nil { - continue - } - vals = append(vals, fVal) + vals = append(vals, val) condCQL = fmt.Sprintf(`%s AND value = ?`, condCQL) case "vb": - bVal, err := strconv.ParseBool(val) - if err != nil { - continue - } - vals = append(vals, bVal) + vals = append(vals, val) condCQL = fmt.Sprintf(`%s AND bool_value = ?`, condCQL) case "vs": vals = append(vals, val) @@ -145,22 +139,14 @@ func buildQuery(chanID string, offset, limit uint64, query map[string]string) (s vals = append(vals, val) condCQL = fmt.Sprintf(`%s AND data_value = ?`, condCQL) case "from": - fVal, err := strconv.ParseFloat(val, 64) - if err != nil { - continue - } - vals = append(vals, fVal) + vals = append(vals, val) condCQL = fmt.Sprintf(`%s AND time >= ?`, condCQL) case "to": - fVal, err := strconv.ParseFloat(val, 64) - if err != nil { - continue - } - vals = append(vals, fVal) + vals = append(vals, val) condCQL = fmt.Sprintf(`%s AND time < ?`, condCQL) } } - vals = append(vals, offset+limit) + vals = append(vals, rpm.Offset+rpm.Limit) return condCQL, vals } diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index a1a388ee..4660839a 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -55,6 +55,8 @@ func TestReadSenml(t *testing.T) { require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) pubID2, err := idProvider.ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + wrongID, err := idProvider.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) m := senml.Message{ Channel: chanID, @@ -110,168 +112,190 @@ func TestReadSenml(t *testing.T) { // cases that return subset of messages are only // checking data result set size, but not content. cases := map[string]struct { - chanID string - query map[string]string - page readers.MessagesPage + chanID string + pageMeta readers.PageMetadata + page readers.MessagesPage }{ "read message page for existing channel": { chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: msgsNum, + }, page: readers.MessagesPage{ Total: msgsNum, - Offset: 0, - Limit: msgsNum, Messages: fromSenml(messages), }, }, "read message page for non-existent channel": { - chanID: "2", + chanID: wrongID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: msgsNum, + }, page: readers.MessagesPage{ - Total: 0, - Offset: 0, - Limit: msgsNum, Messages: []readers.Message{}, }, }, "read message last page": { chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: msgsNum - 20, + Limit: msgsNum, + }, page: readers.MessagesPage{ Total: msgsNum, - Offset: 95, - Limit: limit, - Messages: fromSenml(messages[95:msgsNum]), + Messages: fromSenml(messages[msgsNum-20 : msgsNum]), }, }, "read message with non-existent subtopic": { chanID: chanID, - query: map[string]string{"subtopic": "not-present"}, - page: readers.MessagesPage{ - Total: 0, + pageMeta: readers.PageMetadata{ Offset: 0, Limit: msgsNum, + Subtopic: "not-present", + }, + page: readers.MessagesPage{ Messages: []readers.Message{}, }, }, "read message with subtopic": { chanID: chanID, - query: map[string]string{"subtopic": subtopic}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(queryMsgs)), + Subtopic: subtopic, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 5, - Limit: msgsNum, - Messages: fromSenml(queryMsgs[5:]), + Messages: fromSenml(queryMsgs), }, }, "read message with publisher": { chanID: chanID, - query: map[string]string{"publisher": pubID2}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(queryMsgs)), + Publisher: pubID2, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, - Messages: fromSenml(queryMsgs[0:limit]), + Messages: fromSenml(queryMsgs), }, }, "read message with protocol": { chanID: chanID, - query: map[string]string{"protocol": httpProt}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(queryMsgs)), + Protocol: httpProt, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, - Messages: fromSenml(queryMsgs[0:limit]), + Messages: fromSenml(queryMsgs), }, }, "read message with name": { chanID: chanID, - query: map[string]string{"name": msgName}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Name: msgName, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(queryMsgs[0:limit]), }, }, "read message with value": { chanID: chanID, - query: map[string]string{"v": fmt.Sprintf("%f", v)}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v, + }, page: readers.MessagesPage{ - Total: uint64(len(valueMsgs)), - Offset: 0, - Limit: limit, + Total: uint64(len(queryMsgs)), Messages: fromSenml(valueMsgs[0:limit]), }, }, "read message with boolean value": { chanID: chanID, - query: map[string]string{"vb": fmt.Sprintf("%t", vb)}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + BoolValue: vb, + }, page: readers.MessagesPage{ Total: uint64(len(boolMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(boolMsgs[0:limit]), }, }, "read message with string value": { chanID: chanID, - query: map[string]string{"vs": vs}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + StringValue: vs, + }, page: readers.MessagesPage{ Total: uint64(len(stringMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(stringMsgs[0:limit]), }, }, "read message with data value": { chanID: chanID, - query: map[string]string{"vd": vd}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + DataValue: vd, + }, page: readers.MessagesPage{ Total: uint64(len(dataMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(dataMsgs[0:limit]), }, }, "read message with from": { chanID: chanID, - query: map[string]string{ - "from": fmt.Sprintf("%f", messages[20].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(messages[0:21])), + From: messages[20].Time, }, page: readers.MessagesPage{ Total: uint64(len(messages[0:21])), - Offset: 0, - Limit: uint64(len(messages[0:21])), Messages: fromSenml(messages[0:21]), }, }, "read message with to": { chanID: chanID, - query: map[string]string{ - "to": fmt.Sprintf("%f", messages[20].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(messages[21:])), + To: messages[20].Time, }, page: readers.MessagesPage{ Total: uint64(len(messages[21:])), - Offset: 0, - Limit: uint64(len(messages[21:])), Messages: fromSenml(messages[21:]), }, }, "read message with from/to": { chanID: chanID, - query: map[string]string{ - "from": fmt.Sprintf("%f", messages[5].Time), - "to": fmt.Sprintf("%f", messages[0].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + From: messages[5].Time, + To: messages[0].Time, }, page: readers.MessagesPage{ Total: 5, - Offset: 0, - Limit: limit, Messages: fromSenml(messages[1:6]), }, }, } for desc, tc := range cases { - result, err := reader.ReadAll(tc.chanID, tc.page.Offset, tc.page.Limit, tc.query) + result, err := reader.ReadAll(tc.chanID, tc.pageMeta) assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages)) assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total)) diff --git a/readers/influxdb/messages.go b/readers/influxdb/messages.go index d9c05e5b..e243fc1a 100644 --- a/readers/influxdb/messages.go +++ b/readers/influxdb/messages.go @@ -40,16 +40,14 @@ func New(client influxdata.Client, database string) readers.MessageRepository { } } -func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { - measurement, ok := query[format] - if !ok { - measurement = defMeasurement +func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) { + if rpm.Format == "" { + rpm.Format = defMeasurement } - // Remove format filter and format the rest properly. - delete(query, format) - condition := fmtCondition(chanID, query) - cmd := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY time DESC LIMIT %d OFFSET %d`, measurement, condition, limit, offset) + condition := fmtCondition(chanID, rpm) + + cmd := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY time DESC LIMIT %d OFFSET %d`, rpm.Format, condition, rpm.Limit, rpm.Offset) q := influxdata.Query{ Command: cmd, Database: repo.database, @@ -71,20 +69,21 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query result := resp.Results[0].Series[0] for _, v := range result.Values { - ret = append(ret, parseMessage(measurement, result.Columns, v)) + ret = append(ret, parseMessage(rpm.Format, result.Columns, v)) } - total, err := repo.count(measurement, condition) + total, err := repo.count(rpm.Format, condition) if err != nil { return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } - return readers.MessagesPage{ - Total: total, - Offset: offset, - Limit: limit, - Messages: ret, - }, nil + page := readers.MessagesPage{ + PageMetadata: rpm, + Total: total, + Messages: ret, + } + + return page, nil } func (repo *influxRepository) count(measurement, condition string) (uint64, error) { @@ -129,8 +128,16 @@ func (repo *influxRepository) count(measurement, condition string) (uint64, erro return strconv.ParseUint(count.String(), 10, 64) } -func fmtCondition(chanID string, query map[string]string) string { +func fmtCondition(chanID string, rpm readers.PageMetadata) string { condition := fmt.Sprintf(`channel='%s'`, chanID) + + var query map[string]interface{} + meta, err := json.Marshal(rpm) + if err != nil { + return condition + } + json.Unmarshal(meta, &query) + for name, value := range query { switch name { case @@ -141,26 +148,18 @@ func fmtCondition(chanID string, query map[string]string) string { "protocol": condition = fmt.Sprintf(`%s AND "%s"='%s'`, condition, name, value) case "v": - condition = fmt.Sprintf(`%s AND value = %s`, condition, value) + condition = fmt.Sprintf(`%s AND value = %f`, condition, value) case "vb": - condition = fmt.Sprintf(`%s AND boolValue = %s`, condition, value) + condition = fmt.Sprintf(`%s AND boolValue = %t`, condition, value) case "vs": condition = fmt.Sprintf(`%s AND stringValue = '%s'`, condition, value) case "vd": condition = fmt.Sprintf(`%s AND dataValue = '%s'`, condition, value) case "from": - fVal, err := strconv.ParseFloat(value, 64) - if err != nil { - continue - } - iVal := int64(fVal * 1e9) + iVal := int64(value.(float64) * 1e9) condition = fmt.Sprintf(`%s AND time >= %d`, condition, iVal) case "to": - fVal, err := strconv.ParseFloat(value, 64) - if err != nil { - continue - } - iVal := int64(fVal * 1e9) + iVal := int64(value.(float64) * 1e9) condition = fmt.Sprintf(`%s AND time < %d`, condition, iVal) } } diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index de1ebd63..36d63642 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -47,6 +47,8 @@ func TestReadAll(t *testing.T) { require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) pubID2, err := idProvider.ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + wrongID, err := idProvider.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) m := senml.Message{ Channel: chanID, @@ -104,168 +106,190 @@ func TestReadAll(t *testing.T) { require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB reader expected to succeed: %s.\n", err)) cases := map[string]struct { - chanID string - query map[string]string - page readers.MessagesPage + chanID string + pageMeta readers.PageMetadata + page readers.MessagesPage }{ "read message page for existing channel": { chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: msgsNum, + }, page: readers.MessagesPage{ Total: msgsNum, - Offset: 0, - Limit: limit, - Messages: fromSenml(messages[0:limit]), + Messages: fromSenml(messages), }, }, "read message page for non-existent channel": { - chanID: "wrong", + chanID: wrongID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: msgsNum, + }, page: readers.MessagesPage{ - Total: 0, - Offset: 0, - Limit: limit, Messages: []readers.Message{}, }, }, "read message last page": { chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: msgsNum - 20, + Limit: msgsNum, + }, page: readers.MessagesPage{ Total: msgsNum, - Offset: 95, - Limit: limit, - Messages: fromSenml(messages[95:msgsNum]), + Messages: fromSenml(messages[msgsNum-20 : msgsNum]), }, }, "read message with non-existent subtopic": { chanID: chanID, - query: map[string]string{"subtopic": "not-present"}, - page: readers.MessagesPage{ - Total: 0, + pageMeta: readers.PageMetadata{ Offset: 0, Limit: msgsNum, + Subtopic: "not-present", + }, + page: readers.MessagesPage{ Messages: []readers.Message{}, }, }, "read message with subtopic": { chanID: chanID, - query: map[string]string{"subtopic": subtopic}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(queryMsgs)), + Subtopic: subtopic, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, - Messages: fromSenml(queryMsgs[0:limit]), + Messages: fromSenml(queryMsgs), }, }, "read message with publisher": { chanID: chanID, - query: map[string]string{"publisher": pubID2}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(queryMsgs)), + Publisher: pubID2, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, - Messages: fromSenml(queryMsgs[0:limit]), + Messages: fromSenml(queryMsgs), }, }, "read message with protocol": { chanID: chanID, - query: map[string]string{"protocol": httpProt}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(queryMsgs)), + Protocol: httpProt, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, - Messages: fromSenml(queryMsgs[0:limit]), + Messages: fromSenml(queryMsgs), }, }, "read message with name": { chanID: chanID, - query: map[string]string{"name": msgName}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Name: msgName, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(queryMsgs[0:limit]), }, }, "read message with value": { chanID: chanID, - query: map[string]string{"v": fmt.Sprintf("%f", v)}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v, + }, page: readers.MessagesPage{ Total: uint64(len(valueMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(valueMsgs[0:limit]), }, }, "read message with boolean value": { chanID: chanID, - query: map[string]string{"vb": fmt.Sprintf("%t", vb)}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + BoolValue: vb, + }, page: readers.MessagesPage{ Total: uint64(len(boolMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(boolMsgs[0:limit]), }, }, "read message with string value": { chanID: chanID, - query: map[string]string{"vs": vs}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + StringValue: vs, + }, page: readers.MessagesPage{ Total: uint64(len(stringMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(stringMsgs[0:limit]), }, }, "read message with data value": { chanID: chanID, - query: map[string]string{"vd": vd}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + DataValue: vd, + }, page: readers.MessagesPage{ Total: uint64(len(dataMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(dataMsgs[0:limit]), }, }, "read message with from": { chanID: chanID, - query: map[string]string{ - "from": fmt.Sprintf("%f", messages[20].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(messages[0:21])), + From: messages[20].Time, }, page: readers.MessagesPage{ Total: uint64(len(messages[0:21])), - Offset: 0, - Limit: uint64(len(messages[0:21])), Messages: fromSenml(messages[0:21]), }, }, "read message with to": { chanID: chanID, - query: map[string]string{ - "to": fmt.Sprintf("%f", messages[20].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(messages[21:])), + To: messages[20].Time, }, page: readers.MessagesPage{ Total: uint64(len(messages[21:])), - Offset: 0, - Limit: uint64(len(messages[21:])), Messages: fromSenml(messages[21:]), }, }, "read message with from/to": { chanID: chanID, - query: map[string]string{ - "from": fmt.Sprintf("%f", messages[5].Time), - "to": fmt.Sprintf("%f", messages[0].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + From: messages[5].Time, + To: messages[0].Time, }, page: readers.MessagesPage{ Total: 5, - Offset: 0, - Limit: limit, Messages: fromSenml(messages[1:6]), }, }, } for desc, tc := range cases { - result, err := reader.ReadAll(tc.chanID, tc.page.Offset, tc.page.Limit, tc.query) + result, err := reader.ReadAll(tc.chanID, tc.pageMeta) assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected: %v, got: %v", desc, tc.page.Messages, result.Messages)) diff --git a/readers/messages.go b/readers/messages.go index 5d71fc77..3e184697 100644 --- a/readers/messages.go +++ b/readers/messages.go @@ -12,7 +12,7 @@ var ErrNotFound = errors.New("entity not found") type MessageRepository interface { // ReadAll skips given number of messages for given channel and returns next // limited number of messages. - ReadAll(chanID string, offset, limit uint64, query map[string]string) (MessagesPage, error) + ReadAll(chanID string, pm PageMetadata) (MessagesPage, error) } // Message represents any message format. @@ -21,8 +21,24 @@ type Message interface{} // MessagesPage contains page related metadata as well as list of messages that // belong to this page. type MessagesPage struct { + PageMetadata Total uint64 - Offset uint64 - Limit uint64 Messages []Message } + +// PageMetadata represents the parameters used to create database queries +type PageMetadata struct { + Offset uint64 `json:"offset,omitempty"` + Limit uint64 `json:"limit,omitempty"` + Subtopic string `json:"subtopic,omitempty"` + Publisher string `json:"publisher,omitempty"` + Protocol string `json:"protocol,omitempty"` + Name string `json:"name,omitempty"` + Value float64 `json:"v,omitempty"` + BoolValue bool `json:"vb,omitempty"` + StringValue string `json:"vs,omitempty"` + DataValue string `json:"vd,omitempty"` + From float64 `json:"from,omitempty"` + To float64 `json:"to,omitempty"` + Format string `json:"format,omitempty"` +} diff --git a/readers/mocks/messages.go b/readers/mocks/messages.go index f39cde5e..2fd0e6aa 100644 --- a/readers/mocks/messages.go +++ b/readers/mocks/messages.go @@ -4,8 +4,10 @@ package mocks import ( + "encoding/json" "sync" + "github.com/mainflux/mainflux/pkg/transformers/senml" "github.com/mainflux/mainflux/readers" ) @@ -17,36 +19,115 @@ type messageRepositoryMock struct { } // NewMessageRepository returns mock implementation of message repository. -func NewMessageRepository(messages map[string][]readers.Message) readers.MessageRepository { +func NewMessageRepository(chanID string, messages []readers.Message) readers.MessageRepository { + repo := map[string][]readers.Message{ + chanID: messages, + } + return &messageRepositoryMock{ mutex: sync.Mutex{}, - messages: messages, + messages: repo, } } -func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { +func (repo *messageRepositoryMock) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) { repo.mutex.Lock() defer repo.mutex.Unlock() - end := offset + limit - - numOfMessages := uint64(len(repo.messages[chanID])) - if offset < 0 || offset >= numOfMessages { + if rpm.Format != "" && rpm.Format != "messages" { return readers.MessagesPage{}, nil } - if limit < 1 { + var query map[string]interface{} + meta, _ := json.Marshal(rpm) + json.Unmarshal(meta, &query) + + var msgs []readers.Message + for _, m := range repo.messages[chanID] { + senml := m.(senml.Message) + + ok := true + + for name := range query { + switch name { + case "subtopic": + if rpm.Subtopic != senml.Subtopic { + ok = false + } + case "publisher": + if rpm.Publisher != senml.Publisher { + ok = false + } + case "name": + if rpm.Name != senml.Name { + ok = false + } + case "protocol": + if rpm.Protocol != senml.Protocol { + ok = false + } + case "v": + if senml.Value == nil || + (senml.Value != nil && + *senml.Value != rpm.Value) { + ok = false + } + case "vb": + if senml.BoolValue == nil || + (senml.BoolValue != nil && + *senml.BoolValue != rpm.BoolValue) { + ok = false + } + case "vs": + if senml.StringValue == nil || + (senml.StringValue != nil && + *senml.StringValue != rpm.StringValue) { + ok = false + } + case "vd": + if senml.DataValue == nil || + (senml.DataValue != nil && + *senml.DataValue != rpm.DataValue) { + ok = false + } + case "from": + if senml.Time < rpm.From { + ok = false + } + case "to": + if senml.Time >= rpm.To { + ok = false + } + } + + if !ok { + break + } + } + + if ok { + msgs = append(msgs, m) + } + } + + numOfMessages := uint64(len(msgs)) + + if rpm.Offset >= numOfMessages { return readers.MessagesPage{}, nil } - if offset+limit > numOfMessages { + if rpm.Limit < 1 { + return readers.MessagesPage{}, nil + } + + end := rpm.Offset + rpm.Limit + if rpm.Offset+rpm.Limit > numOfMessages { end = numOfMessages } return readers.MessagesPage{ - Total: numOfMessages, - Limit: limit, - Offset: offset, - Messages: repo.messages[chanID][offset:end], + PageMetadata: rpm, + Total: uint64(len(msgs)), + Messages: msgs[rpm.Offset:end], }, nil } diff --git a/readers/mongodb/messages.go b/readers/mongodb/messages.go index 264ea249..035b13e6 100644 --- a/readers/mongodb/messages.go +++ b/readers/mongodb/messages.go @@ -5,7 +5,7 @@ package mongodb import ( "context" - "strconv" + "encoding/json" "github.com/mainflux/mainflux/pkg/errors" jsont "github.com/mainflux/mainflux/pkg/transformers/json" @@ -37,26 +37,26 @@ func New(db *mongo.Database) readers.MessageRepository { } } -func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { - collection, ok := query[format] - if !ok { - collection = defCollection +func (repo mongoRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) { + if rpm.Format == "" { + rpm.Format = defCollection } - col := repo.db.Collection(collection) - delete(query, format) + + col := repo.db.Collection(rpm.Format) + sortMap := map[string]interface{}{ "time": -1, } // Remove format filter and format the rest properly. - filter := fmtCondition(chanID, query) - cursor, err := col.Find(context.Background(), filter, options.Find().SetSort(sortMap).SetLimit(int64(limit)).SetSkip(int64(offset))) + filter := fmtCondition(chanID, rpm) + cursor, err := col.Find(context.Background(), filter, options.Find().SetSort(sortMap).SetLimit(int64(rpm.Limit)).SetSkip(int64(rpm.Offset))) if err != nil { return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } defer cursor.Close(context.Background()) var messages []readers.Message - switch collection { + switch rpm.Format { case defCollection: for cursor.Next(context.Background()) { var m senml.Message @@ -83,21 +83,30 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } - return readers.MessagesPage{ - Total: uint64(total), - Offset: offset, - Limit: limit, - Messages: messages, - }, nil + mp := readers.MessagesPage{ + PageMetadata: rpm, + Total: uint64(total), + Messages: messages, + } + + return mp, nil } -func fmtCondition(chanID string, query map[string]string) bson.D { +func fmtCondition(chanID string, rpm readers.PageMetadata) bson.D { filter := bson.D{ bson.E{ Key: "channel", Value: chanID, }, } + + var query map[string]interface{} + meta, err := json.Marshal(rpm) + if err != nil { + return filter + } + json.Unmarshal(meta, &query) + for name, value := range query { switch name { case @@ -108,33 +117,17 @@ func fmtCondition(chanID string, query map[string]string) bson.D { "protocol": filter = append(filter, bson.E{Key: name, Value: value}) case "v": - fVal, err := strconv.ParseFloat(value, 64) - if err != nil { - continue - } - filter = append(filter, bson.E{Key: "value", Value: fVal}) + filter = append(filter, bson.E{Key: "value", Value: value}) case "vb": - bVal, err := strconv.ParseBool(value) - if err != nil { - continue - } - filter = append(filter, bson.E{Key: "bool_value", Value: bVal}) + filter = append(filter, bson.E{Key: "bool_value", Value: value}) case "vs": filter = append(filter, bson.E{Key: "string_value", Value: value}) case "vd": filter = append(filter, bson.E{Key: "data_value", Value: value}) case "from": - fVal, err := strconv.ParseFloat(value, 64) - if err != nil { - continue - } - filter = append(filter, bson.E{Key: "time", Value: bson.M{"$gte": fVal}}) + filter = append(filter, bson.E{Key: "time", Value: bson.M{"$gte": value}}) case "to": - fVal, err := strconv.ParseFloat(value, 64) - if err != nil { - continue - } - filter = append(filter, bson.E{Key: "time", Value: bson.M{"$lt": fVal}}) + filter = append(filter, bson.E{Key: "time", Value: bson.M{"$lt": value}}) } } diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index fd17133f..5878a5b3 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -42,6 +42,8 @@ var ( vb = true vd = "dataValue" sum float64 = 42 + + idProvider = uuid.New() ) func TestReadSenml(t *testing.T) { @@ -51,11 +53,13 @@ func TestReadSenml(t *testing.T) { db := client.Database(testDB) writer := writer.New(db) - chanID, err := uuid.New().ID() + chanID, err := idProvider.ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID, err := uuid.New().ID() + pubID, err := idProvider.ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID2, err := uuid.New().ID() + pubID2, err := idProvider.ID() + require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + wrongID, err := idProvider.ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) m := senml.Message{ @@ -106,168 +110,190 @@ func TestReadSenml(t *testing.T) { reader := reader.New(db) cases := map[string]struct { - chanID string - query map[string]string - page readers.MessagesPage + chanID string + pageMeta readers.PageMetadata + page readers.MessagesPage }{ "read message page for existing channel": { chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: msgsNum, + }, page: readers.MessagesPage{ Total: msgsNum, - Offset: 0, - Limit: 11, - Messages: fromSenml(messages[0:11]), + Messages: fromSenml(messages), }, }, "read message page for non-existent channel": { - chanID: "2", + chanID: wrongID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: msgsNum, + }, page: readers.MessagesPage{ - Total: 0, - Offset: 0, - Limit: 10, Messages: []readers.Message{}, }, }, "read message last page": { chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: msgsNum - 20, + Limit: msgsNum, + }, page: readers.MessagesPage{ Total: msgsNum, - Offset: 95, - Limit: limit, - Messages: fromSenml(messages[95:msgsNum]), + Messages: fromSenml(messages[msgsNum-20 : msgsNum]), }, }, "read message with non-existent subtopic": { chanID: chanID, - query: map[string]string{"subtopic": "not-present"}, - page: readers.MessagesPage{ - Total: 0, + pageMeta: readers.PageMetadata{ Offset: 0, Limit: msgsNum, + Subtopic: "not-present", + }, + page: readers.MessagesPage{ Messages: []readers.Message{}, }, }, "read message with subtopic": { chanID: chanID, - query: map[string]string{"subtopic": subtopic}, - page: readers.MessagesPage{ - Total: uint64(len(queryMsgs)), + pageMeta: readers.PageMetadata{ Offset: 0, Limit: uint64(len(queryMsgs)), + Subtopic: subtopic, + }, + page: readers.MessagesPage{ + Total: uint64(len(queryMsgs)), Messages: fromSenml(queryMsgs), }, }, "read message with publisher": { chanID: chanID, - query: map[string]string{"publisher": pubID2}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(queryMsgs)), + Publisher: pubID2, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, - Messages: fromSenml(queryMsgs[0:limit]), + Messages: fromSenml(queryMsgs), }, }, "read message with protocol": { chanID: chanID, - query: map[string]string{"protocol": httpProt}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(queryMsgs)), + Protocol: httpProt, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, - Messages: fromSenml(queryMsgs[0:limit]), + Messages: fromSenml(queryMsgs), }, }, "read message with name": { chanID: chanID, - query: map[string]string{"name": msgName}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Name: msgName, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(queryMsgs[0:limit]), }, }, "read message with value": { chanID: chanID, - query: map[string]string{"v": fmt.Sprintf("%f", v)}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v, + }, page: readers.MessagesPage{ - Total: uint64(len(valueMsgs)), - Offset: 0, - Limit: limit, + Total: uint64(len(queryMsgs)), Messages: fromSenml(valueMsgs[0:limit]), }, }, "read message with boolean value": { chanID: chanID, - query: map[string]string{"vb": fmt.Sprintf("%t", vb)}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + BoolValue: vb, + }, page: readers.MessagesPage{ Total: uint64(len(boolMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(boolMsgs[0:limit]), }, }, "read message with string value": { chanID: chanID, - query: map[string]string{"vs": vs}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + StringValue: vs, + }, page: readers.MessagesPage{ Total: uint64(len(stringMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(stringMsgs[0:limit]), }, }, "read message with data value": { chanID: chanID, - query: map[string]string{"vd": vd}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + DataValue: vd, + }, page: readers.MessagesPage{ Total: uint64(len(dataMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(dataMsgs[0:limit]), }, }, "read message with from": { chanID: chanID, - query: map[string]string{ - "from": fmt.Sprintf("%f", messages[20].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(messages[0:21])), + From: messages[20].Time, }, page: readers.MessagesPage{ Total: uint64(len(messages[0:21])), - Offset: 0, - Limit: uint64(len(messages[0:21])), Messages: fromSenml(messages[0:21]), }, }, "read message with to": { chanID: chanID, - query: map[string]string{ - "to": fmt.Sprintf("%f", messages[20].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(messages[21:])), + To: messages[20].Time, }, page: readers.MessagesPage{ Total: uint64(len(messages[21:])), - Offset: 0, - Limit: uint64(len(messages[21:])), Messages: fromSenml(messages[21:]), }, }, "read message with from/to": { chanID: chanID, - query: map[string]string{ - "from": fmt.Sprintf("%f", messages[5].Time), - "to": fmt.Sprintf("%f", messages[0].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + From: messages[5].Time, + To: messages[0].Time, }, page: readers.MessagesPage{ Total: 5, - Offset: 0, - Limit: limit, Messages: fromSenml(messages[1:6]), }, }, } for desc, tc := range cases { - result, err := reader.ReadAll(tc.chanID, tc.page.Offset, tc.page.Limit, tc.query) + result, err := reader.ReadAll(tc.chanID, tc.pageMeta) assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages)) diff --git a/readers/postgres/messages.go b/readers/postgres/messages.go index 88634010..911797dc 100644 --- a/readers/postgres/messages.go +++ b/readers/postgres/messages.go @@ -37,33 +37,31 @@ func New(db *sqlx.DB) readers.MessageRepository { } } -func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { - table, ok := query[format] +func (tr postgresRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) { order := "created" - if !ok { - table = defTable + if rpm.Format == "" { order = "time" + rpm.Format = defTable } - // Remove format filter and format the rest properly. - delete(query, format) + q := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY %s DESC - LIMIT :limit OFFSET :offset;`, table, fmtCondition(chanID, query), order) + LIMIT :limit OFFSET :offset;`, rpm.Format, fmtCondition(chanID, rpm), order) params := map[string]interface{}{ "channel": chanID, - "limit": limit, - "offset": offset, - "subtopic": query["subtopic"], - "publisher": query["publisher"], - "name": query["name"], - "protocol": query["protocol"], - "value": query["v"], - "bool_value": query["vb"], - "string_value": query["vs"], - "data_value": query["vd"], - "from": query["from"], - "to": query["to"], + "limit": rpm.Limit, + "offset": rpm.Offset, + "subtopic": rpm.Subtopic, + "publisher": rpm.Publisher, + "name": rpm.Name, + "protocol": rpm.Protocol, + "value": rpm.Value, + "bool_value": rpm.BoolValue, + "string_value": rpm.StringValue, + "data_value": rpm.DataValue, + "from": rpm.From, + "to": rpm.To, } rows, err := tr.db.NamedQuery(q, params) @@ -73,11 +71,10 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query defer rows.Close() page := readers.MessagesPage{ - Offset: offset, - Limit: limit, - Messages: []readers.Message{}, + PageMetadata: rpm, + Messages: []readers.Message{}, } - switch table { + switch rpm.Format { case defTable: for rows.Next() { msg := dbMessage{Message: senml.Message{}} @@ -103,7 +100,7 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query } - q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, table, fmtCondition(chanID, query)) + q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, rpm.Format, fmtCondition(chanID, rpm)) rows, err = tr.db.NamedQuery(q, params) if err != nil { return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) @@ -121,8 +118,16 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query return page, nil } -func fmtCondition(chanID string, query map[string]string) string { +func fmtCondition(chanID string, rpm readers.PageMetadata) string { condition := `channel = :channel` + + var query map[string]interface{} + meta, err := json.Marshal(rpm) + if err != nil { + return condition + } + json.Unmarshal(meta, &query) + for name := range query { switch name { case diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index c09d7943..4c03f610 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -103,168 +103,190 @@ func TestReadSenml(t *testing.T) { // cases that return subset of messages are only // checking data result set size, but not content. cases := map[string]struct { - chanID string - query map[string]string - page readers.MessagesPage + chanID string + pageMeta readers.PageMetadata + page readers.MessagesPage }{ "read message page for existing channel": { chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: msgsNum, + }, page: readers.MessagesPage{ Total: msgsNum, - Offset: 0, - Limit: msgsNum, Messages: fromSenml(messages), }, }, "read message page for non-existent channel": { chanID: wrongID, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: msgsNum, + }, page: readers.MessagesPage{ - Total: 0, - Offset: 0, - Limit: msgsNum, Messages: []readers.Message{}, }, }, "read message last page": { chanID: chanID, + pageMeta: readers.PageMetadata{ + Offset: msgsNum - 20, + Limit: msgsNum, + }, page: readers.MessagesPage{ Total: msgsNum, - Offset: msgsNum - 20, - Limit: msgsNum, Messages: fromSenml(messages[msgsNum-20 : msgsNum]), }, }, "read message with non-existent subtopic": { chanID: chanID, - query: map[string]string{"subtopic": "not-present"}, - page: readers.MessagesPage{ - Total: 0, + pageMeta: readers.PageMetadata{ Offset: 0, Limit: msgsNum, + Subtopic: "not-present", + }, + page: readers.MessagesPage{ Messages: []readers.Message{}, }, }, "read message with subtopic": { chanID: chanID, - query: map[string]string{"subtopic": subtopic}, - page: readers.MessagesPage{ - Total: uint64(len(queryMsgs)), + pageMeta: readers.PageMetadata{ Offset: 0, Limit: uint64(len(queryMsgs)), + Subtopic: subtopic, + }, + page: readers.MessagesPage{ + Total: uint64(len(queryMsgs)), Messages: fromSenml(queryMsgs), }, }, "read message with publisher": { chanID: chanID, - query: map[string]string{"publisher": pubID2}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(queryMsgs)), + Publisher: pubID2, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: uint64(len(queryMsgs)), Messages: fromSenml(queryMsgs), }, }, "read message with protocol": { chanID: chanID, - query: map[string]string{"protocol": httpProt}, - page: readers.MessagesPage{ - Total: uint64(len(queryMsgs)), + pageMeta: readers.PageMetadata{ Offset: 0, Limit: uint64(len(queryMsgs)), + Protocol: httpProt, + }, + page: readers.MessagesPage{ + Total: uint64(len(queryMsgs)), Messages: fromSenml(queryMsgs), }, }, "read message with name": { chanID: chanID, - query: map[string]string{"name": msgName}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Name: msgName, + }, page: readers.MessagesPage{ Total: uint64(len(queryMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(queryMsgs[0:limit]), }, }, "read message with value": { chanID: chanID, - query: map[string]string{"v": fmt.Sprintf("%f", v)}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + Value: v, + }, page: readers.MessagesPage{ - Total: uint64(len(valueMsgs)), - Offset: 0, - Limit: limit, + Total: uint64(len(queryMsgs)), Messages: fromSenml(valueMsgs[0:limit]), }, }, "read message with boolean value": { chanID: chanID, - query: map[string]string{"vb": fmt.Sprintf("%t", vb)}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + BoolValue: vb, + }, page: readers.MessagesPage{ Total: uint64(len(boolMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(boolMsgs[0:limit]), }, }, "read message with string value": { chanID: chanID, - query: map[string]string{"vs": vs}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + StringValue: vs, + }, page: readers.MessagesPage{ Total: uint64(len(stringMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(stringMsgs[0:limit]), }, }, "read message with data value": { chanID: chanID, - query: map[string]string{"vd": vd}, + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + DataValue: vd, + }, page: readers.MessagesPage{ Total: uint64(len(dataMsgs)), - Offset: 0, - Limit: limit, Messages: fromSenml(dataMsgs[0:limit]), }, }, "read message with from": { chanID: chanID, - query: map[string]string{ - "from": fmt.Sprintf("%f", messages[20].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(messages[0:21])), + From: messages[20].Time, }, page: readers.MessagesPage{ Total: uint64(len(messages[0:21])), - Offset: 0, - Limit: uint64(len(messages[0:21])), Messages: fromSenml(messages[0:21]), }, }, "read message with to": { chanID: chanID, - query: map[string]string{ - "to": fmt.Sprintf("%f", messages[20].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: uint64(len(messages[21:])), + To: messages[20].Time, }, page: readers.MessagesPage{ Total: uint64(len(messages[21:])), - Offset: 0, - Limit: uint64(len(messages[21:])), Messages: fromSenml(messages[21:]), }, }, "read message with from/to": { chanID: chanID, - query: map[string]string{ - "from": fmt.Sprintf("%f", messages[5].Time), - "to": fmt.Sprintf("%f", messages[0].Time), + pageMeta: readers.PageMetadata{ + Offset: 0, + Limit: limit, + From: messages[5].Time, + To: messages[0].Time, }, page: readers.MessagesPage{ Total: 5, - Offset: 0, - Limit: limit, Messages: fromSenml(messages[1:6]), }, }, } for desc, tc := range cases { - result, err := reader.ReadAll(tc.chanID, tc.page.Offset, tc.page.Limit, tc.query) + result, err := reader.ReadAll(tc.chanID, tc.pageMeta) assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages)) assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))