1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00

NOISSUE - Adding subtopics filtering in writer services (#1072)

* Add feature of filtering by subtopics in writer

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Fix mistake

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Refactoring writer sevices

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Rename variables related to filter (channels & subtopics)

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Set default value of filtering when configuration file doesn't exist

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Add a blank line at the end of the file

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Refactor loading filters configuration (moving into writer package, merge both loading methods & returning error)

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Remove useless log

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Change type of variables (channels & subtopics) and simplify loading method

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Add logging error when loading filters

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Simplify return configuration in loading method

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Merge both filter files into one file

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Move loading subjects into writer package

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Add subscribe to selected subjects

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Edit README of writer services

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Keep only subscribe loop

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Use full NATS subjects

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Edit comment in subjects files

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
This commit is contained in:
Jonathan Dreyer 2020-03-30 18:24:27 +02:00 committed by GitHub
parent d2153a8846
commit 46aadcfd9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 251 additions and 350 deletions

View File

@ -5,7 +5,6 @@ package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
@ -14,7 +13,6 @@ import (
"strings"
"syscall"
"github.com/BurntSushi/toml"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
@ -31,33 +29,33 @@ const (
svcName = "cassandra-writer"
sep = ","
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defCluster = "127.0.0.1"
defKeyspace = "mainflux"
defDBUsername = ""
defDBPassword = ""
defDBPort = "9042"
defChanCfgPath = "/config/channels.toml"
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defCluster = "127.0.0.1"
defKeyspace = "mainflux"
defDBUsername = ""
defDBPassword = ""
defDBPort = "9042"
defSubjectsCfgPath = "/config/subjects.toml"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL"
envPort = "MF_CASSANDRA_WRITER_PORT"
envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER"
envKeyspace = "MF_CASSANDRA_WRITER_DB_KEYSPACE"
envDBUsername = "MF_CASSANDRA_WRITER_DB_USERNAME"
envDBPassword = "MF_CASSANDRA_WRITER_DB_PASSWORD"
envDBPort = "MF_CASSANDRA_WRITER_DB_PORT"
envChanCfgPath = "MF_CASSANDRA_WRITER_CHANNELS_CONFIG"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL"
envPort = "MF_CASSANDRA_WRITER_PORT"
envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER"
envKeyspace = "MF_CASSANDRA_WRITER_DB_KEYSPACE"
envDBUsername = "MF_CASSANDRA_WRITER_DB_USERNAME"
envDBPassword = "MF_CASSANDRA_WRITER_DB_PASSWORD"
envDBPort = "MF_CASSANDRA_WRITER_DB_PORT"
envSubjectsCfgPath = "MF_CASSANDRA_WRITER_SUBJECTS_CONFIG"
)
type config struct {
natsURL string
logLevel string
port string
dbCfg cassandra.DBConfig
channels map[string]bool
natsURL string
logLevel string
port string
dbCfg cassandra.DBConfig
subjectsCfgPath string
}
func main() {
@ -76,7 +74,7 @@ func main() {
repo := newService(session, logger)
st := senml.New()
if err := writers.Start(nc, repo, st, svcName, cfg.channels, logger); err != nil {
if err := writers.Start(nc, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err))
}
@ -108,43 +106,15 @@ func loadConfig() config {
Port: dbPort,
}
chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath)
return config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbCfg: dbCfg,
channels: loadChansConfig(chanCfgPath),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbCfg: dbCfg,
subjectsCfgPath: mainflux.Env(envSubjectsCfgPath, defSubjectsCfgPath),
}
}
type channels struct {
List []string `toml:"filter"`
}
type chanConfig struct {
Channels channels `toml:"channels"`
}
func loadChansConfig(chanConfigPath string) map[string]bool {
data, err := ioutil.ReadFile(chanConfigPath)
if err != nil {
log.Fatal(err)
}
var chanCfg chanConfig
if err := toml.Unmarshal(data, &chanCfg); err != nil {
log.Fatal(err)
}
chans := map[string]bool{}
for _, ch := range chanCfg.Channels.List {
chans[ch] = true
}
return chans
}
func connectToNATS(url string, logger logger.Logger) *nats.Conn {
nc, err := nats.Connect(url)
if err != nil {

View File

@ -5,14 +5,12 @@ package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/BurntSushi/toml"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
@ -28,37 +26,37 @@ import (
const (
svcName = "influxdb-writer"
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defChanCfgPath = "/config/channels.toml"
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defSubjectsCfgPath = "/config/subjects.toml"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
envPort = "MF_INFLUX_WRITER_PORT"
envDBName = "MF_INFLUX_WRITER_DB_NAME"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUX_WRITER_DB_PORT"
envDBUser = "MF_INFLUX_WRITER_DB_USER"
envDBPass = "MF_INFLUX_WRITER_DB_PASS"
envChanCfgPath = "MF_INFLUX_WRITER_CHANNELS_CONFIG"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
envPort = "MF_INFLUX_WRITER_PORT"
envDBName = "MF_INFLUX_WRITER_DB_NAME"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUX_WRITER_DB_PORT"
envDBUser = "MF_INFLUX_WRITER_DB_USER"
envDBPass = "MF_INFLUX_WRITER_DB_PASS"
envSubjectsCfgPath = "MF_INFLUX_WRITER_SUBJECTS_CONFIG"
)
type config struct {
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
channels map[string]bool
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
subjectsCfgPath string
}
func main() {
@ -89,7 +87,7 @@ func main() {
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New()
if err := writers.Start(nc, repo, st, svcName, cfg.channels, logger); err != nil {
if err := writers.Start(nc, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err))
os.Exit(1)
}
@ -108,17 +106,16 @@ func main() {
}
func loadConfigs() (config, influxdata.HTTPConfig) {
chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath)
cfg := config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDBName, defDBName),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
channels: loadChansConfig(chanCfgPath),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDBName, defDBName),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
subjectsCfgPath: mainflux.Env(envSubjectsCfgPath, defSubjectsCfgPath),
}
clientCfg := influxdata.HTTPConfig{
@ -130,33 +127,6 @@ func loadConfigs() (config, influxdata.HTTPConfig) {
return cfg, clientCfg
}
type channels struct {
List []string `toml:"filter"`
}
type chanConfig struct {
Channels channels `toml:"channels"`
}
func loadChansConfig(chanConfigPath string) map[string]bool {
data, err := ioutil.ReadFile(chanConfigPath)
if err != nil {
log.Fatal(err)
}
var chanCfg chanConfig
if err := toml.Unmarshal(data, &chanCfg); err != nil {
log.Fatal(err)
}
chans := map[string]bool{}
for _, ch := range chanCfg.Channels.List {
chans[ch] = true
}
return chans
}
func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) {
counter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "influxdb",

View File

@ -6,14 +6,12 @@ package main
import (
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/BurntSushi/toml"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
@ -30,31 +28,31 @@ import (
const (
svcName = "mongodb-writer"
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "27017"
defChanCfgPath = "/config/channels.toml"
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "27017"
defSubjectsCfgPath = "/config/subjects.toml"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_MONGO_WRITER_LOG_LEVEL"
envPort = "MF_MONGO_WRITER_PORT"
envDBName = "MF_MONGO_WRITER_DB_NAME"
envDBHost = "MF_MONGO_WRITER_DB_HOST"
envDBPort = "MF_MONGO_WRITER_DB_PORT"
envChanCfgPath = "MF_MONGO_WRITER_CHANNELS_CONFIG"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_MONGO_WRITER_LOG_LEVEL"
envPort = "MF_MONGO_WRITER_PORT"
envDBName = "MF_MONGO_WRITER_DB_NAME"
envDBHost = "MF_MONGO_WRITER_DB_HOST"
envDBPort = "MF_MONGO_WRITER_DB_PORT"
envSubjectsCfgPath = "MF_MONGO_WRITER_SUBJECTS_CONFIG"
)
type config struct {
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
channels map[string]bool
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
subjectsCfgPath string
}
func main() {
@ -86,7 +84,7 @@ func main() {
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New()
if err := writers.Start(nc, repo, st, svcName, cfg.channels, logger); err != nil {
if err := writers.Start(nc, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err))
os.Exit(1)
}
@ -105,45 +103,17 @@ func main() {
}
func loadConfigs() config {
chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath)
return config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDBName, defDBName),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
channels: loadChansConfig(chanCfgPath),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDBName, defDBName),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
subjectsCfgPath: mainflux.Env(envSubjectsCfgPath, defSubjectsCfgPath),
}
}
type channels struct {
List []string `toml:"filter"`
}
type chanConfig struct {
Channels channels `toml:"channels"`
}
func loadChansConfig(chanConfigPath string) map[string]bool {
data, err := ioutil.ReadFile(chanConfigPath)
if err != nil {
log.Fatal(err)
}
var chanCfg chanConfig
if err := toml.Unmarshal(data, &chanCfg); err != nil {
log.Fatal(err)
}
chans := map[string]bool{}
for _, ch := range chanCfg.Channels.List {
chans[ch] = true
}
return chans
}
func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) {
counter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "mongodb",

View File

@ -5,14 +5,12 @@ package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/BurntSushi/toml"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/jmoiron/sqlx"
"github.com/mainflux/mainflux"
@ -29,41 +27,41 @@ const (
svcName = "postgres-writer"
sep = ","
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "9104"
defDBHost = "postgres"
defDBPort = "5432"
defDBUser = "mainflux"
defDBPass = "mainflux"
defDBName = "messages"
defDBSSLMode = "disable"
defDBSSLCert = ""
defDBSSLKey = ""
defDBSSLRootCert = ""
defChanCfgPath = "/config/channels.toml"
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "9104"
defDBHost = "postgres"
defDBPort = "5432"
defDBUser = "mainflux"
defDBPass = "mainflux"
defDBName = "messages"
defDBSSLMode = "disable"
defDBSSLCert = ""
defDBSSLKey = ""
defDBSSLRootCert = ""
defSubjectsCfgPath = "/config/subjects.toml"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_POSTGRES_WRITER_LOG_LEVEL"
envPort = "MF_POSTGRES_WRITER_PORT"
envDBHost = "MF_POSTGRES_WRITER_DB_HOST"
envDBPort = "MF_POSTGRES_WRITER_DB_PORT"
envDBUser = "MF_POSTGRES_WRITER_DB_USER"
envDBPass = "MF_POSTGRES_WRITER_DB_PASS"
envDBName = "MF_POSTGRES_WRITER_DB_NAME"
envDBSSLMode = "MF_POSTGRES_WRITER_DB_SSL_MODE"
envDBSSLCert = "MF_POSTGRES_WRITER_DB_SSL_CERT"
envDBSSLKey = "MF_POSTGRES_WRITER_DB_SSL_KEY"
envDBSSLRootCert = "MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT"
envChanCfgPath = "MF_POSTGRES_WRITER_CHANNELS_CONFIG"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_POSTGRES_WRITER_LOG_LEVEL"
envPort = "MF_POSTGRES_WRITER_PORT"
envDBHost = "MF_POSTGRES_WRITER_DB_HOST"
envDBPort = "MF_POSTGRES_WRITER_DB_PORT"
envDBUser = "MF_POSTGRES_WRITER_DB_USER"
envDBPass = "MF_POSTGRES_WRITER_DB_PASS"
envDBName = "MF_POSTGRES_WRITER_DB_NAME"
envDBSSLMode = "MF_POSTGRES_WRITER_DB_SSL_MODE"
envDBSSLCert = "MF_POSTGRES_WRITER_DB_SSL_CERT"
envDBSSLKey = "MF_POSTGRES_WRITER_DB_SSL_KEY"
envDBSSLRootCert = "MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT"
envSubjectsCfgPath = "MF_POSTGRES_WRITER_SUBJECTS_CONFIG"
)
type config struct {
natsURL string
logLevel string
port string
dbConfig postgres.Config
channels map[string]bool
natsURL string
logLevel string
port string
dbConfig postgres.Config
subjectsCfgPath string
}
func main() {
@ -82,7 +80,7 @@ func main() {
repo := newService(db, logger)
st := senml.New()
if err = writers.Start(nc, repo, st, svcName, cfg.channels, logger); err != nil {
if err = writers.Start(nc, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err))
}
@ -101,7 +99,6 @@ func main() {
}
func loadConfig() config {
chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath)
dbConfig := postgres.Config{
Host: mainflux.Env(envDBHost, defDBHost),
Port: mainflux.Env(envDBPort, defDBPort),
@ -115,41 +112,14 @@ func loadConfig() config {
}
return config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbConfig: dbConfig,
channels: loadChansConfig(chanCfgPath),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbConfig: dbConfig,
subjectsCfgPath: mainflux.Env(envSubjectsCfgPath, defSubjectsCfgPath),
}
}
type channels struct {
List []string `toml:"filter"`
}
type chanConfig struct {
Channels channels `toml:"channels"`
}
func loadChansConfig(chanConfigPath string) map[string]bool {
data, err := ioutil.ReadFile(chanConfigPath)
if err != nil {
log.Fatal(err)
}
var chanCfg chanConfig
if err := toml.Unmarshal(data, &chanCfg); err != nil {
log.Fatal(err)
}
chans := map[string]bool{}
for _, ch := range chanCfg.Channels.List {
chans[ch] = true
}
return chans
}
func connectToNATS(url string, logger logger.Logger) *nats.Conn {
nc, err := nats.Connect(url)
if err != nil {

View File

@ -1,4 +0,0 @@
# If you want to listen on all channels, just pass one element ["*"], otherwise
# pass the list of channels.
[channels]
filter = ["*"]

View File

@ -45,4 +45,4 @@ services:
networks:
- docker_mainflux-base-net
volumes:
- ./channels.toml:/config/channels.toml
- ./subjects.toml:/config/subjects.toml

View File

@ -0,0 +1,4 @@
# If you want to listen on all subjects, just pass one element ["channel.>"], otherwise
# pass the list of subjects (e.g ["channel.<channel_id>", "channel.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channel.>"]

View File

@ -1,4 +0,0 @@
# If you want to listen on all channels, just pass one element ["*"], otherwise
# pass the list of channels.
[channels]
filter = ["*"]

View File

@ -54,7 +54,7 @@ services:
networks:
- docker_mainflux-base-net
volumes:
- ./channels.toml:/config/channels.toml
- ./subjects.toml:/config/subjects.toml
grafana:
image: grafana/grafana:5.1.3

View File

@ -0,0 +1,4 @@
# If you want to listen on all subjects, just pass one element ["channel.>"], otherwise
# pass the list of subjects (e.g ["channel.<channel_id>", "channel.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channel.>"]

View File

@ -1,4 +0,0 @@
# If you want to listen on all channels, just pass one element ["*"], otherwise
# pass the list of channels.
[channels]
filter = ["*"]

View File

@ -51,4 +51,4 @@ services:
networks:
- docker_mainflux-base-net
volumes:
- ./channels.toml:/config/channels.toml
- ./subjects.toml:/config/subjects.toml

View File

@ -0,0 +1,4 @@
# If you want to listen on all subjects, just pass one element ["channel.>"], otherwise
# pass the list of subjects (e.g ["channel.<channel_id>", "channel.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channel.>"]

View File

@ -1,4 +0,0 @@
# If you want to listen on all channels, just pass one element ["*"], otherwise
# pass the list of channels.
[channels]
filter = ["*"]

View File

@ -55,4 +55,4 @@ services:
networks:
- docker_mainflux-base-net
volumes:
- ./channels.toml:/config/channels.toml
- ./subjects.toml:/config/subjects.toml

View File

@ -0,0 +1,4 @@
# If you want to listen on all subjects, just pass one element ["channel.>"], otherwise
# pass the list of subjects (e.g ["channel.<channel_id>", "channel.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channel.>"]

View File

@ -8,17 +8,17 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|-------------------------------------|------------------------------------------------------------|-----------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_CASSANDRA_WRITER_LOG_LEVEL | Log level for Cassandra writer (debug, info, warn, error) | error |
| MF_CASSANDRA_WRITER_PORT | Service HTTP port | 8180 |
| MF_CASSANDRA_WRITER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 |
| MF_CASSANDRA_WRITER_DB_KEYSPACE | Cassandra keyspace name | mainflux |
| MF_CASSANDRA_WRITER_DB_USERNAME | Cassandra DB username | |
| MF_CASSANDRA_WRITER_DB_PASSWORD | Cassandra DB password | |
| MF_CASSANDRA_WRITER_DB_PORT | Cassandra DB port | 9042 |
| MF_CASSANDRA_WRITER_CHANNELS_CONFIG | Configuration file path with channels list | /config/channels.toml |
| Variable | Description | Default |
|--------------------------------------|-------------------------------------------------------------|------------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_CASSANDRA_WRITER_LOG_LEVEL | Log level for Cassandra writer (debug, info, warn, error) | error |
| MF_CASSANDRA_WRITER_PORT | Service HTTP port | 8180 |
| MF_CASSANDRA_WRITER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 |
| MF_CASSANDRA_WRITER_DB_KEYSPACE | Cassandra keyspace name | mainflux |
| MF_CASSANDRA_WRITER_DB_USERNAME | Cassandra DB username | |
| MF_CASSANDRA_WRITER_DB_PASSWORD | Cassandra DB password | |
| MF_CASSANDRA_WRITER_DB_PORT | Cassandra DB port | 9042 |
| MF_CASSANDRA_WRITER_SUBJECTS_CONFIG | Configuration file path with subjects list | /config/subjects.toml |
## Deployment
```yaml
@ -38,11 +38,11 @@ default values.
MF_CASSANDRA_WRITER_DB_USERNAME: [Cassandra DB username]
MF_CASSANDRA_WRITER_DB_PASSWORD: [Cassandra DB password]
MF_CASSANDRA_WRITER_DB_PORT: [Cassandra DB port]
MF_CASSANDRA_WRITER_CHANNELS_CONFIG: [Configuration file path with channels list]
MF_CASSANDRA_WRITER_SUBJECTS_CONFIG: [Configuration file path with subjects list]
ports:
- [host machine port]:[configured HTTP port]
volume:
- ./channels.yaml:/config/channels.yaml
- ./subjects.yaml:/config/subjects.yaml
```
To start the service, execute the following shell script:
@ -60,8 +60,7 @@ make cassandra-writer
make install
# Set the environment variables and run the service
MF_NATS_URL=[NATS instance URL] MF_CASSANDRA_WRITER_LOG_LEVEL=[Cassandra writer log level] MF_CASSANDRA_WRITER_PORT=[Service HTTP port] MF_CASSANDRA_WRITER_DB_CLUSTER=[Cassandra cluster comma separated addresses] MF_CASSANDRA_WRITER_DB_KEYSPACE=[Cassandra keyspace name] MF_CASSANDRA_READER_DB_USERNAME=[Cassandra DB username] MF_CASSANDRA_READER_DB_PASSWORD=[Cassandra DB password] MF_CASSANDRA_READER_DB_PORT=[Cassandra DB port] MF_CASSANDRA_WRITER_CHANNELS_CONFIG=[Configuration file path with channels list] $GOBIN/mainflux-cassandra-writer
MF_NATS_URL=[NATS instance URL] MF_CASSANDRA_WRITER_LOG_LEVEL=[Cassandra writer log level] MF_CASSANDRA_WRITER_PORT=[Service HTTP port] MF_CASSANDRA_WRITER_DB_CLUSTER=[Cassandra cluster comma separated addresses] MF_CASSANDRA_WRITER_DB_KEYSPACE=[Cassandra keyspace name] MF_CASSANDRA_READER_DB_USERNAME=[Cassandra DB username] MF_CASSANDRA_READER_DB_PASSWORD=[Cassandra DB password] MF_CASSANDRA_READER_DB_PORT=[Cassandra DB port] MF_CASSANDRA_WRITER_SUBJECTS_CONFIG=[Configuration file path with subjects list] $GOBIN/mainflux-cassandra-writer
```
### Using docker-compose

View File

@ -8,17 +8,17 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|----------------------------------|-----------------------------------------------------------|-----------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error |
| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 |
| MF_INFLUX_WRITER_DB_NAME | InfluxDB database name | mainflux |
| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost |
| MF_INFLUX_WRITER_DB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUX_WRITER_DB_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUX_WRITER_DB_PASS | Default password of InfluxDB user | mainflux |
| MF_INFLUX_WRITER_CHANNELS_CONFIG | Configuration file path with channels list | /config/channels.toml |
| Variable | Description | Default |
|-----------------------------------|-----------------------------------------------------------|------------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error |
| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 |
| MF_INFLUX_WRITER_DB_NAME | InfluxDB database name | mainflux |
| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost |
| MF_INFLUX_WRITER_DB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUX_WRITER_DB_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUX_WRITER_DB_PASS | Default password of InfluxDB user | mainflux |
| MF_INFLUX_WRITER_SUBJECTS_CONFIG | Configuration file path with subjects list | /config/subjects.toml |
## Deployment
@ -39,11 +39,11 @@ default values.
MF_INFLUX_WRITER_DB_PORT: [InfluxDB port]
MF_INFLUX_WRITER_DB_USER: [InfluxDB admin user]
MF_INFLUX_WRITER_DB_PASS: [InfluxDB admin password]
MF_INFLUX_WRITER_CHANNELS_CONFIG: [Configuration file path with channels list]
MF_INFLUX_WRITER_SUBJECTS_CONFIG: [Configuration file path with subjects list]
ports:
- [host machine port]:[configured HTTP port]
volume:
- ./channels.yaml:/config/channels.yaml
- ./subjects.yaml:/config/subjects.yaml
```
To start the service, execute the following shell script:
@ -61,8 +61,7 @@ make influxdb
make install
# Set the environment variables and run the service
MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_LOG_LEVEL=[Influx writer log level] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] MF_INFLUX_WRITER_CHANNELS_CONFIG=[Configuration file path with channels list] $GOBIN/mainflux-influxdb
MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_LOG_LEVEL=[Influx writer log level] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] MF_INFLUX_WRITER_SUBJECTS_CONFIG=[Configuration file path with subjects list] $GOBIN/mainflux-influxdb
```
### Using docker-compose

View File

@ -8,15 +8,15 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|---------------------------------|--------------------------------------------|-----------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_MONGO_WRITER_LOG_LEVEL | Log level for MongoDB writer | error |
| MF_MONGO_WRITER_PORT | Service HTTP port | 8180 |
| MF_MONGO_WRITER_DB_NAME | Default MongoDB database name | mainflux |
| MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost |
| MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 |
| MF_MONGO_WRITER_CHANNELS_CONFIG | Configuration file path with channels list | /config/channels.toml |
| Variable | Description | Default |
|----------------------------------|---------------------------------------------|------------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_MONGO_WRITER_LOG_LEVEL | Log level for MongoDB writer | error |
| MF_MONGO_WRITER_PORT | Service HTTP port | 8180 |
| MF_MONGO_WRITER_DB_NAME | Default MongoDB database name | mainflux |
| MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost |
| MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 |
| MF_MONGO_WRITER_SUBJECTS_CONFIG | Configuration file path with subjects list | /config/subjects.toml |
## Deployment
@ -38,11 +38,11 @@ default values.
MF_MONGO_WRITER_DB_NAME: [MongoDB name]
MF_MONGO_WRITER_DB_HOST: [MongoDB host]
MF_MONGO_WRITER_DB_PORT: [MongoDB port]
MF_MONGO_WRITER_CHANNELS_CONFIG: [Configuration file path with channels list]
MF_MONGO_WRITER_SUBJETCS_CONFIG: [Configuration file path with subjects list]
ports:
- [host machine port]:[configured HTTP port]
volume:
- ./channels.yaml:/config/channels.yaml
- ./subjects.yaml:/config/subjects.yaml
```
To start the service, execute the following shell script:
@ -60,7 +60,7 @@ make mongodb-writer
make install
# Set the environment variables and run the service
MF_NATS_URL=[NATS instance URL] MF_MONGO_WRITER_LOG_LEVEL=[MongoDB writer log level] MF_MONGO_WRITER_PORT=[Service HTTP port] MF_MONGO_WRITER_DB_NAME=[MongoDB database name] MF_MONGO_WRITER_DB_HOST=[MongoDB database host] MF_MONGO_WRITER_DB_PORT=[MongoDB database port] MF_MONGO_WRITER_CHANNELS_CONFIG=[Configuration file path with channels list] $GOBIN/mainflux-mongodb-writer
MF_NATS_URL=[NATS instance URL] MF_MONGO_WRITER_LOG_LEVEL=[MongoDB writer log level] MF_MONGO_WRITER_PORT=[Service HTTP port] MF_MONGO_WRITER_DB_NAME=[MongoDB database name] MF_MONGO_WRITER_DB_HOST=[MongoDB database host] MF_MONGO_WRITER_DB_PORT=[MongoDB database port] MF_MONGO_WRITER_SUBJETCS_CONFIG=[Configuration file path with subjetcs list] $GOBIN/mainflux-mongodb-writer
```
## Usage

View File

@ -8,21 +8,21 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|-------------------------------------|--------------------------------------------|-----------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_POSTGRES_WRITER_LOG_LEVEL | Service log level | error |
| MF_POSTGRES_WRITER_PORT | Service HTTP port | 9104 |
| MF_POSTGRES_WRITER_DB_HOST | Postgres DB host | postgres |
| MF_POSTGRES_WRITER_DB_PORT | Postgres DB port | 5432 |
| MF_POSTGRES_WRITER_DB_USER | Postgres user | mainflux |
| MF_POSTGRES_WRITER_DB_PASS | Postgres password | mainflux |
| MF_POSTGRES_WRITER_DB_NAME | Postgres database name | messages |
| MF_POSTGRES_WRITER_DB_SSL_MODE | Postgres SSL mode | disabled |
| MF_POSTGRES_WRITER_DB_SSL_CERT | Postgres SSL certificate path | "" |
| MF_POSTGRES_WRITER_DB_SSL_KEY | Postgres SSL key | "" |
| MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" |
| MF_POSTGRES_WRITER_CHANNELS_CONFIG | Configuration file path with channels list | /config/channels.toml |
| Variable | Description | Default |
|--------------------------------------|---------------------------------------------|------------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_POSTGRES_WRITER_LOG_LEVEL | Service log level | error |
| MF_POSTGRES_WRITER_PORT | Service HTTP port | 9104 |
| MF_POSTGRES_WRITER_DB_HOST | Postgres DB host | postgres |
| MF_POSTGRES_WRITER_DB_PORT | Postgres DB port | 5432 |
| MF_POSTGRES_WRITER_DB_USER | Postgres user | mainflux |
| MF_POSTGRES_WRITER_DB_PASS | Postgres password | mainflux |
| MF_POSTGRES_WRITER_DB_NAME | Postgres database name | messages |
| MF_POSTGRES_WRITER_DB_SSL_MODE | Postgres SSL mode | disabled |
| MF_POSTGRES_WRITER_DB_SSL_CERT | Postgres SSL certificate path | "" |
| MF_POSTGRES_WRITER_DB_SSL_KEY | Postgres SSL key | "" |
| MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" |
| MF_POSTGRES_WRITER_SUBJECTS_CONFIG | Configuration file path with subjects list | /config/subjects.toml |
## Deployment
@ -48,13 +48,13 @@ default values.
MF_POSTGRES_WRITER_DB_SSL_CERT: [Postgres SSL cert]
MF_POSTGRES_WRITER_DB_SSL_KEY: [Postgres SSL key]
MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT: [Postgres SSL Root cert]
MF_POSTGRES_WRITER_CHANNELS_CONFIG: [Configuration file path with channels list]
MF_POSTGRES_WRITER_SUBJECTS_CONFIG: [Configuration file path with subjects list]
ports:
- 9104:9104
networks:
- docker_mainflux-base-net
volume:
- ./channels.yaml:/config/channels.yaml
- ./subjects.yaml:/config/subjects.yaml
```
To start the service, execute the following shell script:
@ -72,7 +72,7 @@ make postgres-writer
make install
# Set the environment variables and run the service
MF_NATS_URL=[NATS instance URL] MF_POSTGRES_WRITER_LOG_LEVEL=[Service log level] MF_POSTGRES_WRITER_PORT=[Service HTTP port] MF_POSTGRES_WRITER_DB_HOST=[Postgres host] MF_POSTGRES_WRITER_DB_PORT=[Postgres port] MF_POSTGRES_WRITER_DB_USER=[Postgres user] MF_POSTGRES_WRITER_DB_PASS=[Postgres password] MF_POSTGRES_WRITER_DB_NAME=[Postgres database name] MF_POSTGRES_WRITER_DB_SSL_MODE=[Postgres SSL mode] MF_POSTGRES_WRITER_DB_SSL_CERT=[Postgres SSL cert] MF_POSTGRES_WRITER_DB_SSL_KEY=[Postgres SSL key] MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT=[Postgres SSL Root cert] MF_POSTGRES_WRITER_CHANNELS_CONFIG=[Configuration file path with channels list] $GOBIN/mainflux-postgres-writer
MF_NATS_URL=[NATS instance URL] MF_POSTGRES_WRITER_LOG_LEVEL=[Service log level] MF_POSTGRES_WRITER_PORT=[Service HTTP port] MF_POSTGRES_WRITER_DB_HOST=[Postgres host] MF_POSTGRES_WRITER_DB_PORT=[Postgres port] MF_POSTGRES_WRITER_DB_USER=[Postgres user] MF_POSTGRES_WRITER_DB_PASS=[Postgres password] MF_POSTGRES_WRITER_DB_NAME=[Postgres database name] MF_POSTGRES_WRITER_DB_SSL_MODE=[Postgres SSL mode] MF_POSTGRES_WRITER_DB_SSL_CERT=[Postgres SSL cert] MF_POSTGRES_WRITER_DB_SSL_KEY=[Postgres SSL key] MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT=[Postgres SSL Root cert] MF_POSTGRES_WRITER_SUBJECTS_CONFIG=[Configuration file path with subjects list] $GOBIN/mainflux-postgres-writer
```
## Usage

View File

@ -5,36 +5,52 @@ package writers
import (
"fmt"
"io/ioutil"
"github.com/BurntSushi/toml"
"github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/transformers"
"github.com/mainflux/mainflux/transformers/senml"
nats "github.com/nats-io/nats.go"
)
var (
errOpenConfFile = errors.New("Unable to open configuration file")
errParseConfFile = errors.New("Unable to parse configuration file")
)
type consumer struct {
nc *nats.Conn
channels map[string]bool
repo MessageRepository
transformer transformers.Transformer
logger log.Logger
logger logger.Logger
}
// Start method starts consuming messages received from NATS.
// This method transforms messages to SenML format before
// using MessageRepository to store them.
func Start(nc *nats.Conn, repo MessageRepository, transformer transformers.Transformer, queue string, channels map[string]bool, logger log.Logger) error {
func Start(nc *nats.Conn, repo MessageRepository, transformer transformers.Transformer, queue string, subjectsCfgPath string, logger logger.Logger) error {
c := consumer{
nc: nc,
channels: channels,
repo: repo,
transformer: transformer,
logger: logger,
}
_, err := nc.QueueSubscribe(mainflux.InputChannels, queue, c.consume)
subjects, err := LoadSubjectsConfig(subjectsCfgPath)
if err != nil {
logger.Warn(fmt.Sprintf("Failed to load subjects: %s", err))
}
for _, subject := range subjects {
_, err := nc.QueueSubscribe(subject, queue, c.consume)
if err != nil {
return err
}
}
return err
}
@ -50,17 +66,11 @@ func (c *consumer) consume(m *nats.Msg) {
c.logger.Warn(fmt.Sprintf("Failed to tranform received message: %s", err))
return
}
norm, ok := t.([]senml.Message)
msgs, ok := t.([]senml.Message)
if !ok {
c.logger.Warn("Invalid message format from the Transformer output.")
return
}
var msgs []senml.Message
for _, v := range norm {
if c.channelExists(v.Channel) {
msgs = append(msgs, v)
}
}
if err := c.repo.Save(msgs...); err != nil {
c.logger.Warn(fmt.Sprintf("Failed to save message: %s", err))
@ -68,11 +78,24 @@ func (c *consumer) consume(m *nats.Msg) {
}
}
func (c *consumer) channelExists(channel string) bool {
if _, ok := c.channels["*"]; ok {
return true
type filterConfig struct {
List []string `toml:"filter"`
}
type subjectsConfig struct {
Subjects filterConfig `toml:"subjects"`
}
func LoadSubjectsConfig(subjectsConfigPath string) ([]string, error) {
data, err := ioutil.ReadFile(subjectsConfigPath)
if err != nil {
return []string{mainflux.InputChannels}, errors.Wrap(errOpenConfFile, err)
}
_, found := c.channels[channel]
return found
var subjectsCfg subjectsConfig
if err := toml.Unmarshal(data, &subjectsCfg); err != nil {
return []string{mainflux.InputChannels}, errors.Wrap(errParseConfFile, err)
}
return subjectsCfg.Subjects.List, err
}