From 138dae9277d792da5d6b5e54387fae913efbd847 Mon Sep 17 00:00:00 2001 From: Manuel Imperiale Date: Tue, 7 May 2019 15:59:18 +0200 Subject: [PATCH] NOISSUE - Fix Readers logs (#735) * NOISSUE - Fix Readers logs Signed-off-by: Manuel Imperiale * Fix reviews Signed-off-by: Manuel Imperiale --- readers/api/endpoint.go | 5 ++++- readers/api/logging.go | 2 +- readers/api/metrics.go | 2 +- readers/cassandra/messages.go | 19 +++++++++---------- readers/cassandra/messages_test.go | 3 ++- readers/influxdb/messages.go | 22 ++++++++++++++-------- readers/influxdb/messages_test.go | 5 +++-- readers/messages.go | 2 +- readers/mocks/messages.go | 8 ++++---- readers/mongodb/messages.go | 16 +++++++++------- readers/mongodb/messages_test.go | 3 ++- 11 files changed, 50 insertions(+), 37 deletions(-) diff --git a/readers/api/endpoint.go b/readers/api/endpoint.go index 2158fb8a..9433de9b 100644 --- a/readers/api/endpoint.go +++ b/readers/api/endpoint.go @@ -22,7 +22,10 @@ func listMessagesEndpoint(svc readers.MessageRepository) endpoint.Endpoint { return nil, err } - page := svc.ReadAll(req.chanID, req.offset, req.limit, req.query) + page, err := svc.ReadAll(req.chanID, req.offset, req.limit, req.query) + if err != nil { + return nil, err + } return pageRes{ Total: page.Total, diff --git a/readers/api/logging.go b/readers/api/logging.go index 4631b702..2b71c276 100644 --- a/readers/api/logging.go +++ b/readers/api/logging.go @@ -32,7 +32,7 @@ func LoggingMiddleware(svc readers.MessageRepository, logger logger.Logger) read } } -func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { +func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { defer func(begin time.Time) { lm.logger.Info(fmt.Sprintf(`Method read_all for offset %d and limit %d took %s to complete without errors.`, offset, limit, time.Since(begin))) }(time.Now()) diff --git a/readers/api/metrics.go b/readers/api/metrics.go index c7df232a..04e9d80a 100644 --- a/readers/api/metrics.go +++ b/readers/api/metrics.go @@ -34,7 +34,7 @@ func MetricsMiddleware(svc readers.MessageRepository, counter metrics.Counter, l } } -func (mm *metricsMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { +func (mm *metricsMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { defer func(begin time.Time) { mm.counter.With("method", "read_all").Add(1) mm.latency.With("method", "read_all").Observe(time.Since(begin).Seconds()) diff --git a/readers/cassandra/messages.go b/readers/cassandra/messages.go index 9664a4be..63e9976b 100644 --- a/readers/cassandra/messages.go +++ b/readers/cassandra/messages.go @@ -23,10 +23,12 @@ type cassandraRepository struct { // New instantiates Cassandra message repository. func New(session *gocql.Session) readers.MessageRepository { - return cassandraRepository{session: session} + return cassandraRepository{ + session: session, + } } -func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { +func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { names := []string{} vals := []interface{}{chanID} for name, val := range query { @@ -39,6 +41,7 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query countCQL := buildCountQuery(chanID, names) iter := cr.session.Query(selectCQL, vals...).Iter() + defer iter.Close() scanner := iter.Scanner() // skip first OFFSET rows @@ -63,7 +66,7 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query &msg.Name, &msg.Unit, &floatVal, &strVal, &boolVal, &dataVal, &valueSum, &msg.Time, &msg.UpdateTime, &msg.Link) if err != nil { - return readers.MessagesPage{} + return readers.MessagesPage{}, err } switch { @@ -84,22 +87,18 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query page.Messages = append(page.Messages, msg) } - if err := iter.Close(); err != nil { - return readers.MessagesPage{} - } - if err := cr.session.Query(countCQL, vals[:len(vals)-1]...).Scan(&page.Total); err != nil { - return readers.MessagesPage{} + return readers.MessagesPage{}, err } - return page + return page, nil } func buildSelectQuery(chanID string, offset, limit uint64, names []string) string { var condCQL string cql := `SELECT channel, subtopic, publisher, protocol, name, unit, value, string_value, bool_value, data_value, value_sum, time, - update_time, link FROM messages WHERE channel = ? %s LIMIT ? + update_time, link FROM messages WHERE channel = ? %s LIMIT ? ALLOW FILTERING` for _, name := range names { diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index 5b221ce4..f7b70647 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -147,7 +147,8 @@ func TestReadAll(t *testing.T) { } for desc, tc := range cases { - result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query) + result, err := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query) + assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages)) assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total)) } diff --git a/readers/influxdb/messages.go b/readers/influxdb/messages.go index 0e8b8740..c73beb5f 100644 --- a/readers/influxdb/messages.go +++ b/readers/influxdb/messages.go @@ -27,11 +27,14 @@ type influxRepository struct { } // New returns new InfluxDB reader. -func New(client influxdata.Client, database string) (readers.MessageRepository, error) { - return &influxRepository{database, client}, nil +func New(client influxdata.Client, database string) readers.MessageRepository { + return &influxRepository{ + database, + client, + } } -func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { +func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { if limit > maxLimit { limit = maxLimit } @@ -46,12 +49,15 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query ret := []mainflux.Message{} resp, err := repo.client.Query(q) - if err != nil || resp.Error() != nil { - return readers.MessagesPage{} + if err != nil { + return readers.MessagesPage{}, err + } + if resp.Error() != nil { + return readers.MessagesPage{}, resp.Error() } if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 { - return readers.MessagesPage{} + return readers.MessagesPage{}, nil } result := resp.Results[0].Series[0] @@ -61,7 +67,7 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query total, err := repo.count(condition) if err != nil { - return readers.MessagesPage{} + return readers.MessagesPage{}, err } return readers.MessagesPage{ @@ -69,7 +75,7 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query Offset: offset, Limit: limit, Messages: ret, - } + }, nil } func (repo *influxRepository) count(condition string) (uint64, error) { diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index f2e76a92..ca0aecda 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -85,7 +85,7 @@ func TestReadAll(t *testing.T) { } } - reader, err := reader.New(client, testDB) + reader := reader.New(client, testDB) require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB reader expected to succeed: %s.\n", err)) cases := map[string]struct { @@ -166,7 +166,8 @@ func TestReadAll(t *testing.T) { } for desc, tc := range cases { - result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query) + result, err := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query) + assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected: %v \n-------------\n got: %v", desc, tc.page.Messages, result.Messages)) assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %d got %d", desc, tc.page.Total, result.Total)) } diff --git a/readers/messages.go b/readers/messages.go index cf6f3a1b..5b2240dd 100644 --- a/readers/messages.go +++ b/readers/messages.go @@ -20,7 +20,7 @@ var ErrNotFound = errors.New("entity not found") type MessageRepository interface { // ReadAll skips given number of messages for given channel and returns next // limited number of messages. - ReadAll(string, uint64, uint64, map[string]string) MessagesPage + ReadAll(string, uint64, uint64, map[string]string) (MessagesPage, error) } // MessagesPage contains page related metadata as well as list of messages that diff --git a/readers/mocks/messages.go b/readers/mocks/messages.go index 5fb8524c..1078cead 100644 --- a/readers/mocks/messages.go +++ b/readers/mocks/messages.go @@ -29,7 +29,7 @@ func NewMessageRepository(messages map[string][]mainflux.Message) readers.Messag } } -func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { +func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { repo.mutex.Lock() defer repo.mutex.Unlock() @@ -37,11 +37,11 @@ func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64, numOfMessages := uint64(len(repo.messages[chanID])) if offset < 0 || offset >= numOfMessages { - return readers.MessagesPage{} + return readers.MessagesPage{}, nil } if limit < 1 { - return readers.MessagesPage{} + return readers.MessagesPage{}, nil } if offset+limit > numOfMessages { @@ -53,5 +53,5 @@ func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64, Limit: limit, Offset: offset, Messages: repo.messages[chanID][offset:end], - } + }, nil } diff --git a/readers/mongodb/messages.go b/readers/mongodb/messages.go index 3c4e0800..5275c253 100644 --- a/readers/mongodb/messages.go +++ b/readers/mongodb/messages.go @@ -45,10 +45,12 @@ type message struct { // New returns new MongoDB reader. func New(db *mongo.Database) readers.MessageRepository { - return mongoRepository{db: db} + return mongoRepository{ + db: db, + } } -func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { +func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { col := repo.db.Collection(collection) sortMap := map[string]interface{}{ "time": -1, @@ -57,7 +59,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m filter := fmtCondition(chanID, query) cursor, err := col.Find(context.Background(), filter, options.Find().SetSort(sortMap).SetLimit(int64(limit)).SetSkip(int64(offset))) if err != nil { - return readers.MessagesPage{} + return readers.MessagesPage{}, err } defer cursor.Close(context.Background()) @@ -65,7 +67,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m for cursor.Next(context.Background()) { var m message if err := cursor.Decode(&m); err != nil { - return readers.MessagesPage{} + return readers.MessagesPage{}, err } msg := mainflux.Message{ @@ -100,10 +102,10 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m total, err := col.CountDocuments(context.Background(), filter) if err != nil { - return readers.MessagesPage{} + return readers.MessagesPage{}, err } if total < 0 { - return readers.MessagesPage{} + return readers.MessagesPage{}, nil } return readers.MessagesPage{ @@ -111,7 +113,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m Offset: offset, Limit: limit, Messages: messages, - } + }, nil } func fmtCondition(chanID string, query map[string]string) *bson.D { diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index 32fdf8d5..89e621e1 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -155,7 +155,8 @@ func TestReadAll(t *testing.T) { } for desc, tc := range cases { - result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query) + result, err := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query) + assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages)) assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total)) }