mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-29 13:49:28 +08:00

* Use Normalizer as a lib To normalize messages on the consumer side, Normalizer is moved to the internal pkgs. Writers being message consumers are modified to do message normalization instead of subscribing to normalized messages subject. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix logging middleware for readers and writers Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove normalizer interface Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use Normalizer in writers As we agreed on #919, we'll use normalizer as an interface and provide the default SenML implementation. Because of that, Normalizer is removed from `internal` and we'll use the project structure proposed in the aforementioned issue. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove unused batch settings from influxDB reader Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update docs Move Normalizer service to `addons`. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename channels input topic Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update Noramlizer docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove commented code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update readers logging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update addons docker-compose files Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update topcis explanations Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
199 lines
5.2 KiB
Go
199 lines
5.2 KiB
Go
// Copyright (c) Mainflux
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
|
|
"github.com/BurntSushi/toml"
|
|
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/mainflux/mainflux"
|
|
"github.com/mainflux/mainflux/logger"
|
|
"github.com/mainflux/mainflux/normalizer"
|
|
"github.com/mainflux/mainflux/writers"
|
|
"github.com/mainflux/mainflux/writers/api"
|
|
"github.com/mainflux/mainflux/writers/postgres"
|
|
nats "github.com/nats-io/go-nats"
|
|
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
const (
|
|
svcName = "postgres-writer"
|
|
sep = ","
|
|
|
|
defNatsURL = nats.DefaultURL
|
|
defLogLevel = "error"
|
|
defPort = "9104"
|
|
defDBHost = "postgres"
|
|
defDBPort = "5432"
|
|
defDBUser = "mainflux"
|
|
defDBPass = "mainflux"
|
|
defDBName = "messages"
|
|
defDBSSLMode = "disable"
|
|
defDBSSLCert = ""
|
|
defDBSSLKey = ""
|
|
defDBSSLRootCert = ""
|
|
defChanCfgPath = "/config/channels.toml"
|
|
|
|
envNatsURL = "MF_NATS_URL"
|
|
envLogLevel = "MF_POSTGRES_WRITER_LOG_LEVEL"
|
|
envPort = "MF_POSTGRES_WRITER_PORT"
|
|
envDBHost = "MF_POSTGRES_WRITER_DB_HOST"
|
|
envDBPort = "MF_POSTGRES_WRITER_DB_PORT"
|
|
envDBUser = "MF_POSTGRES_WRITER_DB_USER"
|
|
envDBPass = "MF_POSTGRES_WRITER_DB_PASS"
|
|
envDBName = "MF_POSTGRES_WRITER_DB_NAME"
|
|
envDBSSLMode = "MF_POSTGRES_WRITER_DB_SSL_MODE"
|
|
envDBSSLCert = "MF_POSTGRES_WRITER_DB_SSL_CERT"
|
|
envDBSSLKey = "MF_POSTGRES_WRITER_DB_SSL_KEY"
|
|
envDBSSLRootCert = "MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT"
|
|
envChanCfgPath = "MF_POSTGRES_WRITER_CHANNELS_CONFIG"
|
|
)
|
|
|
|
type config struct {
|
|
natsURL string
|
|
logLevel string
|
|
port string
|
|
dbConfig postgres.Config
|
|
channels map[string]bool
|
|
}
|
|
|
|
func main() {
|
|
cfg := loadConfig()
|
|
|
|
logger, err := logger.New(os.Stdout, cfg.logLevel)
|
|
if err != nil {
|
|
log.Fatalf(err.Error())
|
|
}
|
|
|
|
nc := connectToNATS(cfg.natsURL, logger)
|
|
defer nc.Close()
|
|
|
|
db := connectToDB(cfg.dbConfig, logger)
|
|
defer db.Close()
|
|
|
|
repo := newService(db, logger)
|
|
norm := normalizer.New()
|
|
if err = writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil {
|
|
logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err))
|
|
}
|
|
|
|
errs := make(chan error, 2)
|
|
|
|
go startHTTPServer(cfg.port, errs, logger)
|
|
|
|
go func() {
|
|
c := make(chan os.Signal)
|
|
signal.Notify(c, syscall.SIGINT)
|
|
errs <- fmt.Errorf("%s", <-c)
|
|
}()
|
|
|
|
err = <-errs
|
|
logger.Error(fmt.Sprintf("Postgres writer service terminated: %s", err))
|
|
}
|
|
|
|
func loadConfig() config {
|
|
chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath)
|
|
dbConfig := postgres.Config{
|
|
Host: mainflux.Env(envDBHost, defDBHost),
|
|
Port: mainflux.Env(envDBPort, defDBPort),
|
|
User: mainflux.Env(envDBUser, defDBUser),
|
|
Pass: mainflux.Env(envDBPass, defDBPass),
|
|
Name: mainflux.Env(envDBName, defDBName),
|
|
SSLMode: mainflux.Env(envDBSSLMode, defDBSSLMode),
|
|
SSLCert: mainflux.Env(envDBSSLCert, defDBSSLCert),
|
|
SSLKey: mainflux.Env(envDBSSLKey, defDBSSLKey),
|
|
SSLRootCert: mainflux.Env(envDBSSLRootCert, defDBSSLRootCert),
|
|
}
|
|
|
|
return config{
|
|
natsURL: mainflux.Env(envNatsURL, defNatsURL),
|
|
logLevel: mainflux.Env(envLogLevel, defLogLevel),
|
|
port: mainflux.Env(envPort, defPort),
|
|
dbConfig: dbConfig,
|
|
channels: loadChansConfig(chanCfgPath),
|
|
}
|
|
}
|
|
|
|
type channels struct {
|
|
List []string `toml:"filter"`
|
|
}
|
|
|
|
type chanConfig struct {
|
|
Channels channels `toml:"channels"`
|
|
}
|
|
|
|
func loadChansConfig(chanConfigPath string) map[string]bool {
|
|
data, err := ioutil.ReadFile(chanConfigPath)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
var chanCfg chanConfig
|
|
if err := toml.Unmarshal(data, &chanCfg); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
chans := map[string]bool{}
|
|
for _, ch := range chanCfg.Channels.List {
|
|
chans[ch] = true
|
|
}
|
|
|
|
return chans
|
|
}
|
|
|
|
func connectToNATS(url string, logger logger.Logger) *nats.Conn {
|
|
nc, err := nats.Connect(url)
|
|
if err != nil {
|
|
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
|
|
os.Exit(1)
|
|
}
|
|
|
|
return nc
|
|
}
|
|
|
|
func connectToDB(dbConfig postgres.Config, logger logger.Logger) *sqlx.DB {
|
|
db, err := postgres.Connect(dbConfig)
|
|
if err != nil {
|
|
logger.Error(fmt.Sprintf("Failed to connect to Postgres: %s", err))
|
|
os.Exit(1)
|
|
}
|
|
return db
|
|
}
|
|
|
|
func newService(db *sqlx.DB, logger logger.Logger) writers.MessageRepository {
|
|
svc := postgres.New(db)
|
|
svc = api.LoggingMiddleware(svc, logger)
|
|
svc = api.MetricsMiddleware(
|
|
svc,
|
|
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
|
Namespace: "postgres",
|
|
Subsystem: "message_writer",
|
|
Name: "request_count",
|
|
Help: "Number of requests received.",
|
|
}, []string{"method"}),
|
|
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
|
|
Namespace: "postgres",
|
|
Subsystem: "message_writer",
|
|
Name: "request_latency_microseconds",
|
|
Help: "Total duration of requests in microseconds.",
|
|
}, []string{"method"}),
|
|
)
|
|
|
|
return svc
|
|
}
|
|
|
|
func startHTTPServer(port string, errs chan error, logger logger.Logger) {
|
|
p := fmt.Sprintf(":%s", port)
|
|
logger.Info(fmt.Sprintf("Postgres writer service started, exposed port %s", port))
|
|
errs <- http.ListenAndServe(p, api.MakeHandler(svcName))
|
|
}
|