1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-29 13:49:28 +08:00
Dušan Borovčanin 6b7dc54c8b
NOISSUE - Switch to Consumers interface (#1316)
* Replace Writer with Consumer

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add Notifications package

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Update Consumer Start

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix Readers

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix Consumer naming

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add repo to Notify

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Remove notify

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Rename consumer field in middlewares

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix remarks and add Readme

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
2021-01-11 23:55:34 +01:00

222 lines
5.3 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package postgres
import (
"context"
"encoding/json"
"fmt"
"github.com/gofrs/uuid"
"github.com/jmoiron/sqlx"
"github.com/lib/pq" // required for DB access
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/pkg/errors"
mfjson "github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
)
const (
errInvalid = "invalid_text_representation"
errUndefinedTable = "undefined_table"
)
var (
errInvalidMessage = errors.New("invalid message representation")
errSaveMessage = errors.New("failed to save message to postgres database")
errTransRollback = errors.New("failed to rollback transaction")
errMessageFormat = errors.New("invalid message format")
errNoMessages = errors.New("empty message")
errNoTable = errors.New("relation does not exist")
)
var _ consumers.Consumer = (*postgresRepo)(nil)
type postgresRepo struct {
db *sqlx.DB
}
// New returns new PostgreSQL writer.
func New(db *sqlx.DB) consumers.Consumer {
return &postgresRepo{db: db}
}
func (pr postgresRepo) Consume(message interface{}) (err error) {
switch m := message.(type) {
case mfjson.Messages:
return pr.saveJSON(m)
default:
return pr.saveSenml(m)
}
}
func (pr postgresRepo) saveSenml(messages interface{}) error {
msgs, ok := messages.([]senml.Message)
if !ok {
return errSaveMessage
}
q := `INSERT INTO messages (id, channel, subtopic, publisher, protocol,
name, unit, value, string_value, bool_value, data_value, sum,
time, update_time)
VALUES (:id, :channel, :subtopic, :publisher, :protocol, :name, :unit,
:value, :string_value, :bool_value, :data_value, :sum,
:time, :update_time);`
tx, err := pr.db.BeginTxx(context.Background(), nil)
if err != nil {
return errors.Wrap(errSaveMessage, err)
}
defer func() {
if err != nil {
if txErr := tx.Rollback(); txErr != nil {
err = errors.Wrap(err, errors.Wrap(errTransRollback, txErr))
}
return
}
if err = tx.Commit(); err != nil {
err = errors.Wrap(errSaveMessage, err)
}
return
}()
for _, msg := range msgs {
id, err := uuid.NewV4()
if err != nil {
return err
}
m := senmlMessage{Message: msg, ID: id.String()}
if _, err := tx.NamedExec(q, m); err != nil {
pqErr, ok := err.(*pq.Error)
if ok {
switch pqErr.Code.Name() {
case errInvalid:
return errors.Wrap(errSaveMessage, errInvalidMessage)
}
}
return errors.Wrap(errSaveMessage, err)
}
}
return err
}
func (pr postgresRepo) saveJSON(msgs mfjson.Messages) error {
if err := pr.insertJSON(msgs); err != nil {
if err == errNoTable {
if err := pr.createTable(msgs.Format); err != nil {
return err
}
return pr.insertJSON(msgs)
}
return err
}
return nil
}
func (pr postgresRepo) insertJSON(msgs mfjson.Messages) error {
tx, err := pr.db.BeginTxx(context.Background(), nil)
if err != nil {
return errors.Wrap(errSaveMessage, err)
}
defer func() {
if err != nil {
if txErr := tx.Rollback(); txErr != nil {
err = errors.Wrap(err, errors.Wrap(errTransRollback, txErr))
}
return
}
if err = tx.Commit(); err != nil {
err = errors.Wrap(errSaveMessage, err)
}
return
}()
q := `INSERT INTO %s (id, channel, created, subtopic, publisher, protocol, payload)
VALUES (:id, :channel, :created, :subtopic, :publisher, :protocol, :payload);`
q = fmt.Sprintf(q, msgs.Format)
for _, m := range msgs.Data {
var dbmsg jsonMessage
dbmsg, err = toJSONMessage(m)
if err != nil {
return errors.Wrap(errSaveMessage, err)
}
if _, err = tx.NamedExec(q, dbmsg); err != nil {
pqErr, ok := err.(*pq.Error)
if ok {
switch pqErr.Code.Name() {
case errInvalid:
return errors.Wrap(errSaveMessage, errInvalidMessage)
case errUndefinedTable:
return errNoTable
}
}
return err
}
}
return nil
}
func (pr postgresRepo) createTable(name string) error {
q := `CREATE TABLE IF NOT EXISTS %s (
id UUID,
created BIGINT,
channel VARCHAR(254),
subtopic VARCHAR(254),
publisher VARCHAR(254),
protocol TEXT,
payload JSONB,
PRIMARY KEY (id)
)`
q = fmt.Sprintf(q, name)
_, err := pr.db.Exec(q)
return err
}
type senmlMessage struct {
senml.Message
ID string `db:"id"`
}
type jsonMessage struct {
ID string `db:"id"`
Channel string `db:"channel"`
Created int64 `db:"created"`
Subtopic string `db:"subtopic"`
Publisher string `db:"publisher"`
Protocol string `db:"protocol"`
Payload []byte `db:"payload"`
}
func toJSONMessage(msg mfjson.Message) (jsonMessage, error) {
id, err := uuid.NewV4()
if err != nil {
return jsonMessage{}, err
}
data := []byte("{}")
if msg.Payload != nil {
b, err := json.Marshal(msg.Payload)
if err != nil {
return jsonMessage{}, errors.Wrap(errSaveMessage, err)
}
data = b
}
m := jsonMessage{
ID: id.String(),
Channel: msg.Channel,
Created: msg.Created,
Subtopic: msg.Subtopic,
Publisher: msg.Publisher,
Protocol: msg.Protocol,
Payload: data,
}
return m, nil
}