1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-28 13:48:49 +08:00
Dušan Borovčanin 8be2516321 MF-780 - Use Normalizer as a lib (#915)
* Use Normalizer as a lib

To normalize messages on the consumer side, Normalizer is moved
to the internal pkgs. Writers being message consumers are modified to
do message normalization instead of subscribing to normalized messages
subject.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix logging middleware for readers and writers

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove normalizer interface

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Use Normalizer in writers

As we agreed on #919, we'll use normalizer as an interface and provide
the default SenML implementation. Because of that, Normalizer is removed
from `internal` and we'll use the project structure proposed in the
aforementioned issue.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix tests

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove unused batch settings from influxDB reader

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update docs

Move Normalizer service to `addons`.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Rename channels input topic

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update Noramlizer docs

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove commented code

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update readers logging

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update addons docker-compose files

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update topcis explanations

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2019-10-31 14:04:47 +01:00

195 lines
4.8 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"github.com/BurntSushi/toml"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/normalizer"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/cassandra"
nats "github.com/nats-io/go-nats"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const (
svcName = "cassandra-writer"
sep = ","
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defCluster = "127.0.0.1"
defKeyspace = "mainflux"
defDBUsername = ""
defDBPassword = ""
defDBPort = "9042"
defChanCfgPath = "/config/channels.toml"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL"
envPort = "MF_CASSANDRA_WRITER_PORT"
envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER"
envKeyspace = "MF_CASSANDRA_WRITER_DB_KEYSPACE"
envDBUsername = "MF_CASSANDRA_WRITER_DB_USERNAME"
envDBPassword = "MF_CASSANDRA_WRITER_DB_PASSWORD"
envDBPort = "MF_CASSANDRA_WRITER_DB_PORT"
envChanCfgPath = "MF_CASSANDRA_WRITER_CHANNELS_CONFIG"
)
type config struct {
natsURL string
logLevel string
port string
dbCfg cassandra.DBConfig
channels map[string]bool
}
func main() {
cfg := loadConfig()
logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}
nc := connectToNATS(cfg.natsURL, logger)
defer nc.Close()
session := connectToCassandra(cfg.dbCfg, logger)
defer session.Close()
repo := newService(session, logger)
norm := normalizer.New()
if err := writers.Start(nc, repo, norm, svcName, cfg.channels, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err))
}
errs := make(chan error, 2)
go startHTTPServer(cfg.port, errs, logger)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
err = <-errs
logger.Error(fmt.Sprintf("Cassandra writer service terminated: %s", err))
}
func loadConfig() config {
dbPort, err := strconv.Atoi(mainflux.Env(envDBPort, defDBPort))
if err != nil {
log.Fatal(err)
}
dbCfg := cassandra.DBConfig{
Hosts: strings.Split(mainflux.Env(envCluster, defCluster), sep),
Keyspace: mainflux.Env(envKeyspace, defKeyspace),
Username: mainflux.Env(envDBUsername, defDBUsername),
Password: mainflux.Env(envDBPassword, defDBPassword),
Port: dbPort,
}
chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath)
return config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbCfg: dbCfg,
channels: loadChansConfig(chanCfgPath),
}
}
type channels struct {
List []string `toml:"filter"`
}
type chanConfig struct {
Channels channels `toml:"channels"`
}
func loadChansConfig(chanConfigPath string) map[string]bool {
data, err := ioutil.ReadFile(chanConfigPath)
if err != nil {
log.Fatal(err)
}
var chanCfg chanConfig
if err := toml.Unmarshal(data, &chanCfg); err != nil {
log.Fatal(err)
}
chans := map[string]bool{}
for _, ch := range chanCfg.Channels.List {
chans[ch] = true
}
return chans
}
func connectToNATS(url string, logger logger.Logger) *nats.Conn {
nc, err := nats.Connect(url)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
return nc
}
func connectToCassandra(dbCfg cassandra.DBConfig, logger logger.Logger) *gocql.Session {
session, err := cassandra.Connect(dbCfg)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to Cassandra cluster: %s", err))
os.Exit(1)
}
return session
}
func newService(session *gocql.Session, logger logger.Logger) writers.MessageRepository {
repo := cassandra.New(session)
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(
repo,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "cassandra",
Subsystem: "message_writer",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "cassandra",
Subsystem: "message_writer",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
return repo
}
func startHTTPServer(port string, errs chan error, logger logger.Logger) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("Cassandra writer service started, exposed port %s", port))
errs <- http.ListenAndServe(p, api.MakeHandler(svcName))
}