1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-29 13:49:28 +08:00

158 lines
4.5 KiB
Go
Raw Normal View History

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package main
import (
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/influxdb"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const (
svcName = "influxdb-writer"
MF-1096 - Fix AuthN and Things Auth ENVARS (#1066) * NOISSUE - Fix AuthN and Things Auth ENVARS Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add DB envars to env.go Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix envars Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add DefLogLLevelError Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix DB names Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix DB names Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix DB names and HTTP ports Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix .env Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert writers DB names Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Rm unused Twins envars Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Remove definitions from env.go Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert HTTP gRPC ports Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use default NATS URL as string Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert default ports Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix cassandra ENVARS Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix cassandra reader ENVARS Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix readers and writers envars Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix readers and writers .env Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
2020-04-07 12:02:17 +02:00
defNatsURL = "nats://localhost:4222"
NOISSUE - Adding subtopics filtering in writer services (#1072) * Add feature of filtering by subtopics in writer Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Fix mistake Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactoring writer sevices Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Rename variables related to filter (channels & subtopics) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Set default value of filtering when configuration file doesn't exist Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add a blank line at the end of the file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactor loading filters configuration (moving into writer package, merge both loading methods & returning error) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Remove useless log Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Change type of variables (channels & subtopics) and simplify loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add logging error when loading filters Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Simplify return configuration in loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Merge both filter files into one file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Move loading subjects into writer package Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add subscribe to selected subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit README of writer services Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Keep only subscribe loop Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Use full NATS subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit comment in subjects files Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
2020-03-30 18:24:27 +02:00
defLogLevel = "error"
defPort = "8180"
defDB = "mainflux"
NOISSUE - Adding subtopics filtering in writer services (#1072) * Add feature of filtering by subtopics in writer Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Fix mistake Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactoring writer sevices Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Rename variables related to filter (channels & subtopics) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Set default value of filtering when configuration file doesn't exist Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add a blank line at the end of the file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactor loading filters configuration (moving into writer package, merge both loading methods & returning error) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Remove useless log Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Change type of variables (channels & subtopics) and simplify loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add logging error when loading filters Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Simplify return configuration in loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Merge both filter files into one file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Move loading subjects into writer package Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add subscribe to selected subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit README of writer services Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Keep only subscribe loop Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Use full NATS subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit comment in subjects files Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
2020-03-30 18:24:27 +02:00
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defSubjectsCfgPath = "/config/subjects.toml"
defContentType = "application/senml+json"
NOISSUE - Adding subtopics filtering in writer services (#1072) * Add feature of filtering by subtopics in writer Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Fix mistake Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactoring writer sevices Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Rename variables related to filter (channels & subtopics) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Set default value of filtering when configuration file doesn't exist Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add a blank line at the end of the file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactor loading filters configuration (moving into writer package, merge both loading methods & returning error) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Remove useless log Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Change type of variables (channels & subtopics) and simplify loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add logging error when loading filters Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Simplify return configuration in loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Merge both filter files into one file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Move loading subjects into writer package Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add subscribe to selected subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit README of writer services Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Keep only subscribe loop Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Use full NATS subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit comment in subjects files Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
2020-03-30 18:24:27 +02:00
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
envPort = "MF_INFLUX_WRITER_PORT"
MF-1096 - Fix AuthN and Things Auth ENVARS (#1066) * NOISSUE - Fix AuthN and Things Auth ENVARS Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add DB envars to env.go Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix envars Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add DefLogLLevelError Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix DB names Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix DB names Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix DB names and HTTP ports Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix .env Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert writers DB names Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Rm unused Twins envars Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Remove definitions from env.go Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert HTTP gRPC ports Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use default NATS URL as string Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert default ports Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix cassandra ENVARS Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix cassandra reader ENVARS Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix readers and writers envars Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix readers and writers .env Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
2020-04-07 12:02:17 +02:00
envDB = "MF_INFLUX_WRITER_DB"
NOISSUE - Adding subtopics filtering in writer services (#1072) * Add feature of filtering by subtopics in writer Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Fix mistake Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactoring writer sevices Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Rename variables related to filter (channels & subtopics) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Set default value of filtering when configuration file doesn't exist Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add a blank line at the end of the file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactor loading filters configuration (moving into writer package, merge both loading methods & returning error) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Remove useless log Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Change type of variables (channels & subtopics) and simplify loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add logging error when loading filters Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Simplify return configuration in loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Merge both filter files into one file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Move loading subjects into writer package Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add subscribe to selected subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit README of writer services Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Keep only subscribe loop Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Use full NATS subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit comment in subjects files Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
2020-03-30 18:24:27 +02:00
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUX_WRITER_DB_PORT"
envDBUser = "MF_INFLUX_WRITER_DB_USER"
envDBPass = "MF_INFLUX_WRITER_DB_PASS"
envSubjectsCfgPath = "MF_INFLUX_WRITER_SUBJECTS_CONFIG"
envContentType = "MF_INFLUX_WRITER_CONTENT_TYPE"
)
type config struct {
NOISSUE - Adding subtopics filtering in writer services (#1072) * Add feature of filtering by subtopics in writer Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Fix mistake Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactoring writer sevices Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Rename variables related to filter (channels & subtopics) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Set default value of filtering when configuration file doesn't exist Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add a blank line at the end of the file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactor loading filters configuration (moving into writer package, merge both loading methods & returning error) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Remove useless log Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Change type of variables (channels & subtopics) and simplify loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add logging error when loading filters Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Simplify return configuration in loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Merge both filter files into one file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Move loading subjects into writer package Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add subscribe to selected subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit README of writer services Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Keep only subscribe loop Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Use full NATS subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit comment in subjects files Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
2020-03-30 18:24:27 +02:00
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
subjectsCfgPath string
contentType string
}
func main() {
cfg, clientCfg := loadConfigs()
logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}
NOISSUE - Refactor messaging (#1141) * Refactor messaging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename SubscribeHandler to MessageHandler Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove `Auth` event logs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update message pubsub APi Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix topics handling Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update CoAP adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update Twins service Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update LoRa adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update OPC UA adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove broker package Package `broker` is conceptually renamed to package `nats`. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update makefile Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Add comment explanation Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix MQTT adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix typo Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Move NATS pub/sub implementation to pubsub pkg Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove an empty line in main methods Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Move messaging-related code to messaging package Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix Twins mocks Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Change Occurred back to Created Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix tranformer test Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix message proto commands Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Replace string literal with constant Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove alias from main method Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Change messaging pubsub alias Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename occured to created Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Handle NATS connection in the NATS PubSub Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename n to pub/pubSub Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix typos Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2020-04-28 11:02:35 +02:00
pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
NOISSUE - Refactor messaging (#1141) * Refactor messaging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename SubscribeHandler to MessageHandler Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove `Auth` event logs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update message pubsub APi Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix topics handling Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update CoAP adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update Twins service Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update LoRa adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update OPC UA adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove broker package Package `broker` is conceptually renamed to package `nats`. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update makefile Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Add comment explanation Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix MQTT adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix typo Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Move NATS pub/sub implementation to pubsub pkg Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove an empty line in main methods Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Move messaging-related code to messaging package Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix Twins mocks Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Change Occurred back to Created Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix tranformer test Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix message proto commands Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Replace string literal with constant Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove alias from main method Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Change messaging pubsub alias Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename occured to created Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Handle NATS connection in the NATS PubSub Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename n to pub/pubSub Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix typos Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2020-04-28 11:02:35 +02:00
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
NOISSUE - Refactor messaging (#1141) * Refactor messaging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename SubscribeHandler to MessageHandler Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove `Auth` event logs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update message pubsub APi Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix topics handling Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update CoAP adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update Twins service Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update LoRa adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update OPC UA adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove broker package Package `broker` is conceptually renamed to package `nats`. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update makefile Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Add comment explanation Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix MQTT adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix typo Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Move NATS pub/sub implementation to pubsub pkg Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove an empty line in main methods Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Move messaging-related code to messaging package Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix Twins mocks Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Change Occurred back to Created Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix tranformer test Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix message proto commands Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Replace string literal with constant Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove alias from main method Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Change messaging pubsub alias Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename occured to created Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Handle NATS connection in the NATS PubSub Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename n to pub/pubSub Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix typos Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2020-04-28 11:02:35 +02:00
defer pubSub.Close()
client, err := influxdata.NewHTTPClient(clientCfg)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create InfluxDB client: %s", err))
os.Exit(1)
}
defer client.Close()
MF-780 - Use Normalizer as a lib (#915) * Use Normalizer as a lib To normalize messages on the consumer side, Normalizer is moved to the internal pkgs. Writers being message consumers are modified to do message normalization instead of subscribing to normalized messages subject. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix logging middleware for readers and writers Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove normalizer interface Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Use Normalizer in writers As we agreed on #919, we'll use normalizer as an interface and provide the default SenML implementation. Because of that, Normalizer is removed from `internal` and we'll use the project structure proposed in the aforementioned issue. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix tests Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove unused batch settings from influxDB reader Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update docs Move Normalizer service to `addons`. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename channels input topic Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update Noramlizer docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove commented code Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update readers logging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update addons docker-compose files Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update topcis explanations Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2019-10-31 14:04:47 +01:00
repo := influxdb.New(client, cfg.dbName)
counter, latency := makeMetrics()
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New(cfg.contentType)
NOISSUE - Refactor messaging (#1141) * Refactor messaging Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename SubscribeHandler to MessageHandler Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove `Auth` event logs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update message pubsub APi Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix topics handling Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update CoAP adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update Twins service Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update LoRa adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update OPC UA adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove broker package Package `broker` is conceptually renamed to package `nats`. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update makefile Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Add comment explanation Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix MQTT adapter Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix typo Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Move NATS pub/sub implementation to pubsub pkg Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove an empty line in main methods Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Move messaging-related code to messaging package Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix Twins mocks Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Change Occurred back to Created Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix tranformer test Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix message proto commands Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Replace string literal with constant Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Remove alias from main method Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Change messaging pubsub alias Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename occured to created Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Handle NATS connection in the NATS PubSub Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Rename n to pub/pubSub Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix typos Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2020-04-28 11:02:35 +02:00
if err := writers.Start(pubSub, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err))
os.Exit(1)
}
errs := make(chan error, 2)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
go startHTTPService(cfg.port, logger, errs)
err = <-errs
logger.Error(fmt.Sprintf("InfluxDB writer service terminated: %s", err))
}
func loadConfigs() (config, influxdata.HTTPConfig) {
cfg := config{
NOISSUE - Adding subtopics filtering in writer services (#1072) * Add feature of filtering by subtopics in writer Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Fix mistake Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactoring writer sevices Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Rename variables related to filter (channels & subtopics) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Set default value of filtering when configuration file doesn't exist Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add a blank line at the end of the file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactor loading filters configuration (moving into writer package, merge both loading methods & returning error) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Remove useless log Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Change type of variables (channels & subtopics) and simplify loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add logging error when loading filters Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Simplify return configuration in loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Merge both filter files into one file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Move loading subjects into writer package Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add subscribe to selected subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit README of writer services Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Keep only subscribe loop Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Use full NATS subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit comment in subjects files Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
2020-03-30 18:24:27 +02:00
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
MF-1096 - Fix AuthN and Things Auth ENVARS (#1066) * NOISSUE - Fix AuthN and Things Auth ENVARS Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add DB envars to env.go Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix envars Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add DefLogLLevelError Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix DB names Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix DB names Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix DB names and HTTP ports Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix .env Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert writers DB names Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Rm unused Twins envars Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Remove definitions from env.go Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert HTTP gRPC ports Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use default NATS URL as string Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert default ports Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix cassandra ENVARS Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix cassandra reader ENVARS Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix readers and writers envars Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix readers and writers .env Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
2020-04-07 12:02:17 +02:00
dbName: mainflux.Env(envDB, defDB),
NOISSUE - Adding subtopics filtering in writer services (#1072) * Add feature of filtering by subtopics in writer Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Fix mistake Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactoring writer sevices Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Rename variables related to filter (channels & subtopics) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Set default value of filtering when configuration file doesn't exist Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add a blank line at the end of the file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Refactor loading filters configuration (moving into writer package, merge both loading methods & returning error) Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Remove useless log Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Change type of variables (channels & subtopics) and simplify loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add logging error when loading filters Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Simplify return configuration in loading method Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Merge both filter files into one file Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Move loading subjects into writer package Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Add subscribe to selected subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit README of writer services Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Keep only subscribe loop Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Use full NATS subjects Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> * Edit comment in subjects files Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
2020-03-30 18:24:27 +02:00
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
subjectsCfgPath: mainflux.Env(envSubjectsCfgPath, defSubjectsCfgPath),
contentType: mainflux.Env(envContentType, defContentType),
}
clientCfg := influxdata.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%s", cfg.dbHost, cfg.dbPort),
Username: cfg.dbUser,
Password: cfg.dbPass,
}
return cfg, clientCfg
}
func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) {
counter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "influxdb",
Subsystem: "message_writer",
Name: "request_count",
Help: "Number of database inserts.",
}, []string{"method"})
latency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "influxdb",
Subsystem: "message_writer",
Name: "request_latency_microseconds",
Help: "Total duration of inserts in microseconds.",
}, []string{"method"})
return counter, latency
}
func startHTTPService(port string, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("InfluxDB writer service started, exposed port %s", p))
errs <- http.ListenAndServe(p, api.MakeHandler(svcName))
}