1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-05-02 22:17:10 +08:00

183 lines
4.6 KiB
Go
Raw Normal View History

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/BurntSushi/toml"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
MF-919 - Mainflux message updates (#924) * Remove RawMessage Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove Normalizer Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Replace normalizer with senml-transformer Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename Transformer interface and package Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove SenML transformer service Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove SenML Protobuf support Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix readers Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix writers tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Refactor tests and remove normalizer Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Reanme Service interface to Transformer Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use msg instead of rawmsg Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove rawMsg from Aedes code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix VerneMQ files Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove RawMessage code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix missing subtopic return Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove remaining RawMessage reference Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix formatting Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix readers and writers tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename SenML transformer variables Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix readers and writers tests constants Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2019-11-05 11:57:16 +01:00
"github.com/mainflux/mainflux/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/influxdb"
MF-415 - Merge mProxy support (#1045) * NOISSUE - Add mProxy support (#1017) * Add mproxy Signed-off-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com> * Fix docker and add EMQ compose Signed-off-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com> * Fix EMQX name Signed-off-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com> * Add nats, auth and es Signed-off-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com> * Removed unucessary vendoring Signed-off-by: Drasko Draskovic <drasko.draskovic@gmail.com> * Update vendoring Signed-off-by: Drasko Draskovic <drasko.draskovic@gmail.com> * Fix mproxy interface implementation Signed-off-by: Drasko Draskovic <drasko.draskovic@gmail.com> NOISSUE - Aligned Event interface method signatures with new spec (#1025) * Aligned Event interface method signatures with new spec Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> * Updated deps Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> NOISSUE - Update mproxy dependency (#1038) Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> Update Vendor with new mProxy (#1043) Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Twins merge conflict reverted Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> Twins merge conflict reverted Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> Twins fixed nats import Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> Update deps Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> * Resolved GolangCI remarks Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> Resolved GolangCI remarks Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> Resolved GolangCI remarks Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> * Fixed Event interface Unsubscribe() typo Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> * Update vendors Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Upgrade CI script Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2020-02-26 17:14:16 +01:00
nats "github.com/nats-io/nats.go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const (
svcName = "influxdb-writer"
MF-780 - Use Normalizer as a lib (#915) * Use Normalizer as a lib To normalize messages on the consumer side, Normalizer is moved to the internal pkgs. Writers being message consumers are modified to do message normalization instead of subscribing to normalized messages subject. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix logging middleware for readers and writers Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove normalizer interface Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use Normalizer in writers As we agreed on #919, we'll use normalizer as an interface and provide the default SenML implementation. Because of that, Normalizer is removed from `internal` and we'll use the project structure proposed in the aforementioned issue. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove unused batch settings from influxDB reader Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update docs Move Normalizer service to `addons`. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename channels input topic Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update Noramlizer docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove commented code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update readers logging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update addons docker-compose files Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update topcis explanations Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2019-10-31 14:04:47 +01:00
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defChanCfgPath = "/config/channels.toml"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
envPort = "MF_INFLUX_WRITER_PORT"
envDBName = "MF_INFLUX_WRITER_DB_NAME"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUX_WRITER_DB_PORT"
envDBUser = "MF_INFLUX_WRITER_DB_USER"
envDBPass = "MF_INFLUX_WRITER_DB_PASS"
envChanCfgPath = "MF_INFLUX_WRITER_CHANNELS_CONFIG"
)
type config struct {
MF-780 - Use Normalizer as a lib (#915) * Use Normalizer as a lib To normalize messages on the consumer side, Normalizer is moved to the internal pkgs. Writers being message consumers are modified to do message normalization instead of subscribing to normalized messages subject. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix logging middleware for readers and writers Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove normalizer interface Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use Normalizer in writers As we agreed on #919, we'll use normalizer as an interface and provide the default SenML implementation. Because of that, Normalizer is removed from `internal` and we'll use the project structure proposed in the aforementioned issue. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove unused batch settings from influxDB reader Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update docs Move Normalizer service to `addons`. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename channels input topic Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update Noramlizer docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove commented code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update readers logging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update addons docker-compose files Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update topcis explanations Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2019-10-31 14:04:47 +01:00
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
channels map[string]bool
}
func main() {
cfg, clientCfg := loadConfigs()
logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}
nc, err := nats.Connect(cfg.natsURL)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer nc.Close()
client, err := influxdata.NewHTTPClient(clientCfg)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create InfluxDB client: %s", err))
os.Exit(1)
}
defer client.Close()
MF-780 - Use Normalizer as a lib (#915) * Use Normalizer as a lib To normalize messages on the consumer side, Normalizer is moved to the internal pkgs. Writers being message consumers are modified to do message normalization instead of subscribing to normalized messages subject. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix logging middleware for readers and writers Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove normalizer interface Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use Normalizer in writers As we agreed on #919, we'll use normalizer as an interface and provide the default SenML implementation. Because of that, Normalizer is removed from `internal` and we'll use the project structure proposed in the aforementioned issue. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove unused batch settings from influxDB reader Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update docs Move Normalizer service to `addons`. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename channels input topic Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update Noramlizer docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove commented code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update readers logging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update addons docker-compose files Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update topcis explanations Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2019-10-31 14:04:47 +01:00
repo := influxdb.New(client, cfg.dbName)
counter, latency := makeMetrics()
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
MF-919 - Mainflux message updates (#924) * Remove RawMessage Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove Normalizer Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Replace normalizer with senml-transformer Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename Transformer interface and package Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove SenML transformer service Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove SenML Protobuf support Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix readers Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix writers tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Refactor tests and remove normalizer Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Reanme Service interface to Transformer Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use msg instead of rawmsg Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove rawMsg from Aedes code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix VerneMQ files Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove RawMessage code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix missing subtopic return Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove remaining RawMessage reference Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix formatting Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix readers and writers tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename SenML transformer variables Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix readers and writers tests constants Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2019-11-05 11:57:16 +01:00
st := senml.New()
if err := writers.Start(nc, repo, st, svcName, cfg.channels, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err))
os.Exit(1)
}
errs := make(chan error, 2)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
go startHTTPService(cfg.port, logger, errs)
err = <-errs
logger.Error(fmt.Sprintf("InfluxDB writer service terminated: %s", err))
}
func loadConfigs() (config, influxdata.HTTPConfig) {
chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath)
cfg := config{
MF-780 - Use Normalizer as a lib (#915) * Use Normalizer as a lib To normalize messages on the consumer side, Normalizer is moved to the internal pkgs. Writers being message consumers are modified to do message normalization instead of subscribing to normalized messages subject. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix logging middleware for readers and writers Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove normalizer interface Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use Normalizer in writers As we agreed on #919, we'll use normalizer as an interface and provide the default SenML implementation. Because of that, Normalizer is removed from `internal` and we'll use the project structure proposed in the aforementioned issue. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove unused batch settings from influxDB reader Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update docs Move Normalizer service to `addons`. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename channels input topic Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update Noramlizer docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove commented code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update readers logging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update addons docker-compose files Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update topcis explanations Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2019-10-31 14:04:47 +01:00
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDBName, defDBName),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
channels: loadChansConfig(chanCfgPath),
}
clientCfg := influxdata.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%s", cfg.dbHost, cfg.dbPort),
Username: cfg.dbUser,
Password: cfg.dbPass,
}
return cfg, clientCfg
}
type channels struct {
List []string `toml:"filter"`
}
type chanConfig struct {
Channels channels `toml:"channels"`
}
func loadChansConfig(chanConfigPath string) map[string]bool {
data, err := ioutil.ReadFile(chanConfigPath)
if err != nil {
log.Fatal(err)
}
var chanCfg chanConfig
if err := toml.Unmarshal(data, &chanCfg); err != nil {
log.Fatal(err)
}
chans := map[string]bool{}
for _, ch := range chanCfg.Channels.List {
chans[ch] = true
}
return chans
}
func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) {
counter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "influxdb",
Subsystem: "message_writer",
Name: "request_count",
Help: "Number of database inserts.",
}, []string{"method"})
latency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "influxdb",
Subsystem: "message_writer",
Name: "request_latency_microseconds",
Help: "Total duration of inserts in microseconds.",
}, []string{"method"})
return counter, latency
}
func startHTTPService(port string, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("InfluxDB writer service started, exposed port %s", p))
errs <- http.ListenAndServe(p, api.MakeHandler(svcName))
}