mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-27 13:48:49 +08:00
NOISSUE - Fix Readers logs (#735)
* NOISSUE - Fix Readers logs Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
This commit is contained in:
parent
9f1151b8dc
commit
138dae9277
@ -22,7 +22,10 @@ func listMessagesEndpoint(svc readers.MessageRepository) endpoint.Endpoint {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
page := svc.ReadAll(req.chanID, req.offset, req.limit, req.query)
|
||||
page, err := svc.ReadAll(req.chanID, req.offset, req.limit, req.query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pageRes{
|
||||
Total: page.Total,
|
||||
|
@ -32,7 +32,7 @@ func LoggingMiddleware(svc readers.MessageRepository, logger logger.Logger) read
|
||||
}
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage {
|
||||
func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Info(fmt.Sprintf(`Method read_all for offset %d and limit %d took %s to complete without errors.`, offset, limit, time.Since(begin)))
|
||||
}(time.Now())
|
||||
|
@ -34,7 +34,7 @@ func MetricsMiddleware(svc readers.MessageRepository, counter metrics.Counter, l
|
||||
}
|
||||
}
|
||||
|
||||
func (mm *metricsMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage {
|
||||
func (mm *metricsMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) {
|
||||
defer func(begin time.Time) {
|
||||
mm.counter.With("method", "read_all").Add(1)
|
||||
mm.latency.With("method", "read_all").Observe(time.Since(begin).Seconds())
|
||||
|
@ -23,10 +23,12 @@ type cassandraRepository struct {
|
||||
|
||||
// New instantiates Cassandra message repository.
|
||||
func New(session *gocql.Session) readers.MessageRepository {
|
||||
return cassandraRepository{session: session}
|
||||
return cassandraRepository{
|
||||
session: session,
|
||||
}
|
||||
}
|
||||
|
||||
func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage {
|
||||
func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) {
|
||||
names := []string{}
|
||||
vals := []interface{}{chanID}
|
||||
for name, val := range query {
|
||||
@ -39,6 +41,7 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query
|
||||
countCQL := buildCountQuery(chanID, names)
|
||||
|
||||
iter := cr.session.Query(selectCQL, vals...).Iter()
|
||||
defer iter.Close()
|
||||
scanner := iter.Scanner()
|
||||
|
||||
// skip first OFFSET rows
|
||||
@ -63,7 +66,7 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query
|
||||
&msg.Name, &msg.Unit, &floatVal, &strVal, &boolVal,
|
||||
&dataVal, &valueSum, &msg.Time, &msg.UpdateTime, &msg.Link)
|
||||
if err != nil {
|
||||
return readers.MessagesPage{}
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
|
||||
switch {
|
||||
@ -84,22 +87,18 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query
|
||||
page.Messages = append(page.Messages, msg)
|
||||
}
|
||||
|
||||
if err := iter.Close(); err != nil {
|
||||
return readers.MessagesPage{}
|
||||
}
|
||||
|
||||
if err := cr.session.Query(countCQL, vals[:len(vals)-1]...).Scan(&page.Total); err != nil {
|
||||
return readers.MessagesPage{}
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
|
||||
return page
|
||||
return page, nil
|
||||
}
|
||||
|
||||
func buildSelectQuery(chanID string, offset, limit uint64, names []string) string {
|
||||
var condCQL string
|
||||
cql := `SELECT channel, subtopic, publisher, protocol, name, unit,
|
||||
value, string_value, bool_value, data_value, value_sum, time,
|
||||
update_time, link FROM messages WHERE channel = ? %s LIMIT ?
|
||||
update_time, link FROM messages WHERE channel = ? %s LIMIT ?
|
||||
ALLOW FILTERING`
|
||||
|
||||
for _, name := range names {
|
||||
|
@ -147,7 +147,8 @@ func TestReadAll(t *testing.T) {
|
||||
}
|
||||
|
||||
for desc, tc := range cases {
|
||||
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query)
|
||||
result, err := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query)
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
|
||||
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages))
|
||||
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
|
||||
}
|
||||
|
@ -27,11 +27,14 @@ type influxRepository struct {
|
||||
}
|
||||
|
||||
// New returns new InfluxDB reader.
|
||||
func New(client influxdata.Client, database string) (readers.MessageRepository, error) {
|
||||
return &influxRepository{database, client}, nil
|
||||
func New(client influxdata.Client, database string) readers.MessageRepository {
|
||||
return &influxRepository{
|
||||
database,
|
||||
client,
|
||||
}
|
||||
}
|
||||
|
||||
func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage {
|
||||
func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) {
|
||||
if limit > maxLimit {
|
||||
limit = maxLimit
|
||||
}
|
||||
@ -46,12 +49,15 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query
|
||||
ret := []mainflux.Message{}
|
||||
|
||||
resp, err := repo.client.Query(q)
|
||||
if err != nil || resp.Error() != nil {
|
||||
return readers.MessagesPage{}
|
||||
if err != nil {
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
if resp.Error() != nil {
|
||||
return readers.MessagesPage{}, resp.Error()
|
||||
}
|
||||
|
||||
if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 {
|
||||
return readers.MessagesPage{}
|
||||
return readers.MessagesPage{}, nil
|
||||
}
|
||||
|
||||
result := resp.Results[0].Series[0]
|
||||
@ -61,7 +67,7 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query
|
||||
|
||||
total, err := repo.count(condition)
|
||||
if err != nil {
|
||||
return readers.MessagesPage{}
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
|
||||
return readers.MessagesPage{
|
||||
@ -69,7 +75,7 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query
|
||||
Offset: offset,
|
||||
Limit: limit,
|
||||
Messages: ret,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (repo *influxRepository) count(condition string) (uint64, error) {
|
||||
|
@ -85,7 +85,7 @@ func TestReadAll(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
reader, err := reader.New(client, testDB)
|
||||
reader := reader.New(client, testDB)
|
||||
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB reader expected to succeed: %s.\n", err))
|
||||
|
||||
cases := map[string]struct {
|
||||
@ -166,7 +166,8 @@ func TestReadAll(t *testing.T) {
|
||||
}
|
||||
|
||||
for desc, tc := range cases {
|
||||
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query)
|
||||
result, err := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query)
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
|
||||
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected: %v \n-------------\n got: %v", desc, tc.page.Messages, result.Messages))
|
||||
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %d got %d", desc, tc.page.Total, result.Total))
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ var ErrNotFound = errors.New("entity not found")
|
||||
type MessageRepository interface {
|
||||
// ReadAll skips given number of messages for given channel and returns next
|
||||
// limited number of messages.
|
||||
ReadAll(string, uint64, uint64, map[string]string) MessagesPage
|
||||
ReadAll(string, uint64, uint64, map[string]string) (MessagesPage, error)
|
||||
}
|
||||
|
||||
// MessagesPage contains page related metadata as well as list of messages that
|
||||
|
@ -29,7 +29,7 @@ func NewMessageRepository(messages map[string][]mainflux.Message) readers.Messag
|
||||
}
|
||||
}
|
||||
|
||||
func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage {
|
||||
func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) {
|
||||
repo.mutex.Lock()
|
||||
defer repo.mutex.Unlock()
|
||||
|
||||
@ -37,11 +37,11 @@ func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64,
|
||||
|
||||
numOfMessages := uint64(len(repo.messages[chanID]))
|
||||
if offset < 0 || offset >= numOfMessages {
|
||||
return readers.MessagesPage{}
|
||||
return readers.MessagesPage{}, nil
|
||||
}
|
||||
|
||||
if limit < 1 {
|
||||
return readers.MessagesPage{}
|
||||
return readers.MessagesPage{}, nil
|
||||
}
|
||||
|
||||
if offset+limit > numOfMessages {
|
||||
@ -53,5 +53,5 @@ func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64,
|
||||
Limit: limit,
|
||||
Offset: offset,
|
||||
Messages: repo.messages[chanID][offset:end],
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
@ -45,10 +45,12 @@ type message struct {
|
||||
|
||||
// New returns new MongoDB reader.
|
||||
func New(db *mongo.Database) readers.MessageRepository {
|
||||
return mongoRepository{db: db}
|
||||
return mongoRepository{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage {
|
||||
func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) {
|
||||
col := repo.db.Collection(collection)
|
||||
sortMap := map[string]interface{}{
|
||||
"time": -1,
|
||||
@ -57,7 +59,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m
|
||||
filter := fmtCondition(chanID, query)
|
||||
cursor, err := col.Find(context.Background(), filter, options.Find().SetSort(sortMap).SetLimit(int64(limit)).SetSkip(int64(offset)))
|
||||
if err != nil {
|
||||
return readers.MessagesPage{}
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
defer cursor.Close(context.Background())
|
||||
|
||||
@ -65,7 +67,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m
|
||||
for cursor.Next(context.Background()) {
|
||||
var m message
|
||||
if err := cursor.Decode(&m); err != nil {
|
||||
return readers.MessagesPage{}
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
|
||||
msg := mainflux.Message{
|
||||
@ -100,10 +102,10 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m
|
||||
|
||||
total, err := col.CountDocuments(context.Background(), filter)
|
||||
if err != nil {
|
||||
return readers.MessagesPage{}
|
||||
return readers.MessagesPage{}, err
|
||||
}
|
||||
if total < 0 {
|
||||
return readers.MessagesPage{}
|
||||
return readers.MessagesPage{}, nil
|
||||
}
|
||||
|
||||
return readers.MessagesPage{
|
||||
@ -111,7 +113,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m
|
||||
Offset: offset,
|
||||
Limit: limit,
|
||||
Messages: messages,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func fmtCondition(chanID string, query map[string]string) *bson.D {
|
||||
|
@ -155,7 +155,8 @@ func TestReadAll(t *testing.T) {
|
||||
}
|
||||
|
||||
for desc, tc := range cases {
|
||||
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query)
|
||||
result, err := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query)
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err))
|
||||
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages))
|
||||
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user