From c8cb2655c01a645f5e32189697b13d8e074ffebc Mon Sep 17 00:00:00 2001 From: Parham Alvani Date: Sat, 21 Sep 2019 02:50:09 +0430 Subject: [PATCH] MF-820 - Fetch messages for a particular device (#843) * feat: Add Support for Publisher Query in Postgres Signed-off-by: Parham Alvani * chore: Remove Redundant Case Signed-off-by: Parham Alvani * chore: Add Test for Postgres Query Signed-off-by: Parham Alvani --- readers/postgres/messages.go | 34 ++++++++++++++++++++++--------- readers/postgres/messages_test.go | 12 +++++++++++ 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/readers/postgres/messages.go b/readers/postgres/messages.go index 825f66c3..015de477 100644 --- a/readers/postgres/messages.go +++ b/readers/postgres/messages.go @@ -34,19 +34,18 @@ func New(db *sqlx.DB) readers.MessageRepository { } func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { - subtopicQuery := "" - if query["subtopic"] != "" { - subtopicQuery = `AND subtopic = :subtopic` - } q := fmt.Sprintf(`SELECT * FROM messages - WHERE channel = :channel %s ORDER BY time DESC - LIMIT :limit OFFSET :offset;`, subtopicQuery) + WHERE %s ORDER BY time DESC + LIMIT :limit OFFSET :offset;`, fmtCondition(chanID, query)) params := map[string]interface{}{ - "channel": chanID, - "limit": limit, - "offset": offset, - "subtopic": query["subtopic"], + "channel": chanID, + "limit": limit, + "offset": offset, + "subtopic": query["subtopic"], + "publisher": query["publisher"], + "name": query["name"], + "protocol": query["protocol"], } rows, err := tr.db.NamedQuery(q, params) @@ -89,6 +88,21 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query return page, nil } +func fmtCondition(chanID string, query map[string]string) string { + condition := `channel = :channel` + for name := range query { + switch name { + case + "subtopic", + "publisher", + "name", + "protocol": + condition = fmt.Sprintf(`%s AND %s = :%s`, condition, name, name) + } + } + return condition +} + type dbMessage struct { ID string `db:"id"` Channel string `db:"channel"` diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index b3760021..79bc51e3 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -142,6 +142,18 @@ func TestMessageReadAll(t *testing.T) { Messages: subtopicMsgs, }, }, + "read message with publisher/protocols": { + chanID: chanID.String(), + offset: 0, + limit: msgsNum, + query: map[string]string{"publisher": pubID.String(), "protocol": "mqtt"}, + page: readers.MessagesPage{ + Total: msgsNum, + Offset: 0, + Limit: msgsNum, + Messages: messages, + }, + }, } for desc, tc := range cases {