1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-28 13:48:49 +08:00
Dušan Borovčanin e326494166
MF-1264 - Add support for JSON readers (#1295)
* MF-1254 - Create universal JSON writer (#1260)

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add JSON support to Readers

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix Influx Reader tests

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix messages format query

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix Postgres reader

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix Cassandra Readers and writers

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix Mongo reader

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Extract utility method to the JSON transformer

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix Influx and Postgres count

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Update JSON transformer

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix Influxdb Reader total count

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Refactor init.go for Cassandra writer

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Create a Payload type

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add comments for defaults

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix variable declarations

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Replace interface{} with a new type

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Don't set channel just to overwrite it later

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix range search

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Rename Messages field

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

Co-authored-by: Manuel Imperiale <manuel.imperiale@gmail.com>
2020-12-30 15:43:04 +01:00

162 lines
3.9 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package postgres
import (
"encoding/json"
"fmt"
"github.com/jmoiron/sqlx" // required for DB access
"github.com/mainflux/mainflux/pkg/errors"
jsont "github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/readers"
)
const errInvalid = "invalid_text_representation"
const (
format = "format"
// Table for SenML messages
defTable = "messages"
)
var errReadMessages = errors.New("failed to read messages from postgres database")
var _ readers.MessageRepository = (*postgresRepository)(nil)
type postgresRepository struct {
db *sqlx.DB
}
// New returns new PostgreSQL writer.
func New(db *sqlx.DB) readers.MessageRepository {
return &postgresRepository{
db: db,
}
}
func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) {
table, ok := query[format]
order := "created"
if !ok {
table = defTable
order = "time"
}
// Remove format filter and format the rest properly.
delete(query, format)
q := fmt.Sprintf(`SELECT * FROM %s
WHERE %s ORDER BY %s DESC
LIMIT :limit OFFSET :offset;`, table, fmtCondition(chanID, query), order)
params := map[string]interface{}{
"channel": chanID,
"limit": limit,
"offset": offset,
"subtopic": query["subtopic"],
"publisher": query["publisher"],
"name": query["name"],
"protocol": query["protocol"],
}
rows, err := tr.db.NamedQuery(q, params)
if err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
defer rows.Close()
page := readers.MessagesPage{
Offset: offset,
Limit: limit,
Messages: []readers.Message{},
}
switch table {
case defTable:
for rows.Next() {
msg := dbMessage{Message: senml.Message{}}
if err := rows.StructScan(&msg); err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
page.Messages = append(page.Messages, msg.Message)
}
default:
for rows.Next() {
msg := jsonMessage{}
if err := rows.StructScan(&msg); err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
m, err := msg.toMap()
if err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
m["payload"] = jsont.ParseFlat(m["payload"])
page.Messages = append(page.Messages, m)
}
}
q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = $1;`, table)
qParams := []interface{}{chanID}
if query["subtopic"] != "" {
q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE channel = $1 AND subtopic = $2;`, table)
qParams = append(qParams, query["subtopic"])
}
if err := tr.db.QueryRow(q, qParams...).Scan(&page.Total); err != nil {
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
return page, nil
}
func fmtCondition(chanID string, query map[string]string) string {
condition := `channel = :channel`
for name := range query {
switch name {
case
"subtopic",
"publisher",
"name",
"protocol":
condition = fmt.Sprintf(`%s AND %s = :%s`, condition, name, name)
}
}
return condition
}
type dbMessage struct {
ID string `db:"id"`
senml.Message
}
type jsonMessage struct {
ID string `db:"id"`
Channel string `db:"channel"`
Created int64 `db:"created"`
Subtopic string `db:"subtopic"`
Publisher string `db:"publisher"`
Protocol string `db:"protocol"`
Payload []byte `db:"payload"`
}
func (msg jsonMessage) toMap() (map[string]interface{}, error) {
ret := map[string]interface{}{
"id": msg.ID,
"channel": msg.Channel,
"created": msg.Created,
"subtopic": msg.Subtopic,
"publisher": msg.Publisher,
"protocol": msg.Protocol,
"payload": map[string]interface{}{},
}
pld := make(map[string]interface{})
if err := json.Unmarshal(msg.Payload, &pld); err != nil {
return nil, err
}
ret["payload"] = pld
return ret, nil
}