diff --git a/Makefile b/Makefile index a5143c88..2c372fcb 100644 --- a/Makefile +++ b/Makefile @@ -14,9 +14,15 @@ VERSION ?= $(shell git describe --abbrev=0 --tags) COMMIT ?= $(shell git rev-parse HEAD) TIME ?= $(shell date +%F_%T) +ifneq ($(MF_BROKER_TYPE),) + MF_BROKER_TYPE := $(MF_BROKER_TYPE) +else + MF_BROKER_TYPE=nats +endif + define compile_service CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \ - go build -mod=vendor -ldflags "-s -w \ + go build -mod=vendor -tags $(MF_BROKER_TYPE) -ldflags "-s -w \ -X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \ -X 'github.com/mainflux/mainflux.Version=$(VERSION)' \ -X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \ @@ -111,4 +117,13 @@ rundev: cd scripts && ./run.sh run: +ifeq ("$(MF_BROKER_TYPE)", "rabbitmq") + sed -i "s,file: brokers/.*.yml,file: brokers/rabbitmq.yml," docker/docker-compose.yml + sed -i "s,MF_BROKER_URL: .*,MF_BROKER_URL: $$\{MF_RABBITMQ_URL\}," docker/docker-compose.yml +else ifeq ("$(MF_BROKER_TYPE)", "nats") + sed -i "s,file: brokers/.*.yml,file: brokers/nats.yml," docker/docker-compose.yml + sed -i "s,MF_BROKER_URL: .*,MF_BROKER_URL: $$\{MF_NATS_URL\}," docker/docker-compose.yml +else + echo "Invalid broker type"; exit 1 +endif docker-compose -f docker/docker-compose.yml up diff --git a/api/asyncapi/mqtt.yml b/api/asyncapi/mqtt.yml index 2c42564a..bcb37a41 100644 --- a/api/asyncapi/mqtt.yml +++ b/api/asyncapi/mqtt.yml @@ -7,7 +7,7 @@ info: version: '1.0.0' description: | MQTT adapter provides an MQTT API for sending messages through the platform. MQTT adapter uses [mProxy](https://github.com/mainflux/mproxy) for proxying traffic between client and MQTT broker. - Additionally, the MQTT adapter and the NATS message broker are replicating the traffic between brokers. + Additionally, the MQTT adapter and the message broker are replicating the traffic between brokers. defaultContentType: application/json diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index 15cefeed..3fff6986 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -21,7 +21,7 @@ import ( "github.com/mainflux/mainflux/consumers/writers/cassandra" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" stdprometheus "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" ) @@ -31,7 +31,7 @@ const ( sep = "," stopWaitTime = 5 * time.Second - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defLogLevel = "error" defPort = "8180" defCluster = "127.0.0.1" @@ -41,7 +41,7 @@ const ( defDBPort = "9042" defConfigPath = "/config.toml" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL" envPort = "MF_CASSANDRA_WRITER_PORT" envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER" @@ -53,7 +53,7 @@ const ( ) type config struct { - natsURL string + brokerURL string logLevel string port string configPath string @@ -70,9 +70,9 @@ func main() { log.Fatalf(err.Error()) } - pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger) + pubSub, err := brokers.NewPubSub(cfg.brokerURL, "", logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pubSub.Close() @@ -116,7 +116,7 @@ func loadConfig() config { } return config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), port: mainflux.Env(envPort, defPort), configPath: mainflux.Env(envConfigPath, defConfigPath), diff --git a/cmd/coap/main.go b/cmd/coap/main.go index bf0aae1f..ced471d9 100644 --- a/cmd/coap/main.go +++ b/cmd/coap/main.go @@ -19,8 +19,8 @@ import ( "github.com/mainflux/mainflux/coap" "github.com/mainflux/mainflux/coap/api" logger "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/errors" + "github.com/mainflux/mainflux/pkg/messaging/brokers" thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc" opentracing "github.com/opentracing/opentracing-go" gocoap "github.com/plgd-dev/go-coap/v2" @@ -35,7 +35,7 @@ const ( stopWaitTime = 5 * time.Second defPort = "5683" - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defLogLevel = "error" defClientTLS = "false" defCACerts = "" @@ -44,7 +44,7 @@ const ( defThingsAuthTimeout = "1s" envPort = "MF_COAP_ADAPTER_PORT" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envLogLevel = "MF_COAP_ADAPTER_LOG_LEVEL" envClientTLS = "MF_COAP_ADAPTER_CLIENT_TLS" envCACerts = "MF_COAP_ADAPTER_CA_CERTS" @@ -55,7 +55,7 @@ const ( type config struct { port string - natsURL string + brokerURL string logLevel string clientTLS bool caCerts string @@ -82,9 +82,9 @@ func main() { tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout) - nps, err := nats.NewPubSub(cfg.natsURL, "", logger) + nps, err := brokers.NewPubSub(cfg.brokerURL, "", logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer nps.Close() @@ -142,7 +142,7 @@ func loadConfig() config { } return config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), port: mainflux.Env(envPort, defPort), logLevel: mainflux.Env(envLogLevel, defLogLevel), clientTLS: tls, diff --git a/cmd/http/main.go b/cmd/http/main.go index 7396de7c..d058718d 100644 --- a/cmd/http/main.go +++ b/cmd/http/main.go @@ -22,7 +22,7 @@ import ( "github.com/mainflux/mainflux/http/api" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc" "github.com/opentracing/opentracing-go" stdprometheus "github.com/prometheus/client_golang/prometheus" @@ -38,7 +38,7 @@ const ( defClientTLS = "false" defCACerts = "" defPort = "8180" - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defJaegerURL = "" defThingsAuthURL = "localhost:8183" defThingsAuthTimeout = "1s" @@ -47,14 +47,14 @@ const ( envClientTLS = "MF_HTTP_ADAPTER_CLIENT_TLS" envCACerts = "MF_HTTP_ADAPTER_CA_CERTS" envPort = "MF_HTTP_ADAPTER_PORT" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envJaegerURL = "MF_JAEGER_URL" envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL" envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT" ) type config struct { - natsURL string + brokerURL string logLevel string port string clientTLS bool @@ -83,9 +83,9 @@ func main() { thingsTracer, thingsCloser := initJaeger("things", cfg.jaegerURL, logger) defer thingsCloser.Close() - pub, err := nats.NewPublisher(cfg.natsURL) + pub, err := brokers.NewPublisher(cfg.brokerURL) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pub.Close() @@ -139,7 +139,7 @@ func loadConfig() config { } return config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), port: mainflux.Env(envPort, defPort), clientTLS: tls, diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index 5bfea64b..3a31fabc 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -19,7 +19,7 @@ import ( "github.com/mainflux/mainflux/consumers/writers/influxdb" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" stdprometheus "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" ) @@ -28,7 +28,7 @@ const ( svcName = "influxdb-writer" stopWaitTime = 5 * time.Second - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defLogLevel = "error" defPort = "8180" defDB = "mainflux" @@ -38,7 +38,7 @@ const ( defDBPass = "mainflux" defConfigPath = "/config.toml" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL" envPort = "MF_INFLUX_WRITER_PORT" envDB = "MF_INFLUXDB_DB" @@ -50,7 +50,7 @@ const ( ) type config struct { - natsURL string + brokerURL string logLevel string port string dbName string @@ -71,9 +71,9 @@ func main() { log.Fatalf(err.Error()) } - pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger) + pubSub, err := brokers.NewPubSub(cfg.brokerURL, "", logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pubSub.Close() @@ -115,7 +115,7 @@ func main() { func loadConfigs() (config, influxdata.HTTPConfig) { cfg := config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), port: mainflux.Env(envPort, defPort), dbName: mainflux.Env(envDB, defDB), diff --git a/cmd/lora/main.go b/cmd/lora/main.go index 57edff27..c01644ef 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -20,7 +20,7 @@ import ( "github.com/mainflux/mainflux/lora/api" "github.com/mainflux/mainflux/lora/mqtt" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" "golang.org/x/sync/errgroup" kitprometheus "github.com/go-kit/kit/metrics/prometheus" @@ -34,11 +34,11 @@ const ( defLogLevel = "error" defHTTPPort = "8180" defLoraMsgURL = "tcp://localhost:1883" + defBrokerURL = "nats://localhost:4222" defLoraMsgTopic = "application/+/device/+/event/up" defLoraMsgUser = "" defLoraMsgPass = "" defLoraMsgTimeout = "30s" - defNatsURL = "nats://localhost:4222" defESURL = "localhost:6379" defESPass = "" defESDB = "0" @@ -49,11 +49,11 @@ const ( envHTTPPort = "MF_LORA_ADAPTER_HTTP_PORT" envLoraMsgURL = "MF_LORA_ADAPTER_MESSAGES_URL" + envBrokerURL = "MF_BROKER_URL" envLoraMsgTopic = "MF_LORA_ADAPTER_MESSAGES_TOPIC" envLoraMsgUser = "MF_LORA_ADAPTER_MESSAGES_USER" envLoraMsgPass = "MF_LORA_ADAPTER_MESSAGES_PASS" envLoraMsgTimeout = "MF_LORA_ADAPTER_MESSAGES_TIMEOUT" - envNatsURL = "MF_NATS_URL" envLogLevel = "MF_LORA_ADAPTER_LOG_LEVEL" envESURL = "MF_THINGS_ES_URL" envESPass = "MF_THINGS_ES_PASS" @@ -71,11 +71,11 @@ const ( type config struct { httpPort string loraMsgURL string + brokerURL string loraMsgUser string loraMsgPass string loraMsgTopic string loraMsgTimeout time.Duration - natsURL string logLevel string esURL string esPass string @@ -102,9 +102,9 @@ func main() { esConn := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger) defer esConn.Close() - pub, err := nats.NewPublisher(cfg.natsURL) + pub, err := brokers.NewPublisher(cfg.brokerURL) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pub.Close() @@ -163,11 +163,11 @@ func loadConfig() config { return config{ httpPort: mainflux.Env(envHTTPPort, defHTTPPort), loraMsgURL: mainflux.Env(envLoraMsgURL, defLoraMsgURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), loraMsgTopic: mainflux.Env(envLoraMsgTopic, defLoraMsgTopic), loraMsgUser: mainflux.Env(envLoraMsgUser, defLoraMsgUser), loraMsgPass: mainflux.Env(envLoraMsgPass, defLoraMsgPass), loraMsgTimeout: mqttTimeout, - natsURL: mainflux.Env(envNatsURL, defNatsURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), esURL: mainflux.Env(envESURL, defESURL), esPass: mainflux.Env(envESPass, defESPass), diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index 9219e0e4..a1f12101 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -18,7 +18,7 @@ import ( "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" stdprometheus "github.com/prometheus/client_golang/prometheus" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -30,14 +30,14 @@ const ( stopWaitTime = 5 * time.Second defLogLevel = "error" - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defPort = "8180" defDB = "mainflux" defDBHost = "localhost" defDBPort = "27017" defConfigPath = "/config.toml" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envLogLevel = "MF_MONGO_WRITER_LOG_LEVEL" envPort = "MF_MONGO_WRITER_PORT" envDB = "MF_MONGO_WRITER_DB" @@ -47,7 +47,7 @@ const ( ) type config struct { - natsURL string + brokerURL string logLevel string port string dbName string @@ -66,9 +66,9 @@ func main() { log.Fatal(err) } - pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger) + pubSub, err := brokers.NewPubSub(cfg.brokerURL, "", logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pubSub.Close() @@ -112,7 +112,7 @@ func main() { func loadConfigs() config { return config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), port: mainflux.Env(envPort, defPort), dbName: mainflux.Env(envDB, defDB), diff --git a/cmd/mqtt/main.go b/cmd/mqtt/main.go index 7163bad5..d479a51a 100644 --- a/cmd/mqtt/main.go +++ b/cmd/mqtt/main.go @@ -20,8 +20,8 @@ import ( "github.com/mainflux/mainflux/pkg/auth" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/messaging" + "github.com/mainflux/mainflux/pkg/messaging/brokers" mqttpub "github.com/mainflux/mainflux/pkg/messaging/mqtt" - "github.com/mainflux/mainflux/pkg/messaging/nats" thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc" mp "github.com/mainflux/mproxy/pkg/mqtt" "github.com/mainflux/mproxy/pkg/session" @@ -35,62 +35,54 @@ import ( const ( svcName = "mqtt" - // Logging - defLogLevel = "error" - envLogLevel = "MF_MQTT_ADAPTER_LOG_LEVEL" - // MQTT + + defLogLevel = "error" defMQTTPort = "1883" defMQTTTargetHost = "0.0.0.0" defMQTTTargetPort = "1883" defMQTTForwarderTimeout = "30s" // 30 seconds defMQTTTargetHealthCheck = "" + defHTTPPort = "8080" + defHTTPTargetHost = "localhost" + defHTTPTargetPort = "8080" + defHTTPTargetPath = "/mqtt" + defThingsAuthURL = "localhost:8183" + defThingsAuthTimeout = "1s" + defBrokerURL = "nats://localhost:4222" + defJaegerURL = "" + defClientTLS = "false" + defCACerts = "" + defInstance = "" + defESURL = "localhost:6379" + defESPass = "" + defESDB = "0" + defAuthcacheURL = "localhost:6379" + defAuthCachePass = "" + defAuthCacheDB = "0" + + envLogLevel = "MF_MQTT_ADAPTER_LOG_LEVEL" envMQTTPort = "MF_MQTT_ADAPTER_MQTT_PORT" envMQTTTargetHost = "MF_MQTT_ADAPTER_MQTT_TARGET_HOST" envMQTTTargetPort = "MF_MQTT_ADAPTER_MQTT_TARGET_PORT" envMQTTTargetHealthCheck = "MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envMQTTForwarderTimeout = "MF_MQTT_ADAPTER_FORWARDER_TIMEOUT" - // HTTP - defHTTPPort = "8080" - defHTTPTargetHost = "localhost" - defHTTPTargetPort = "8080" - defHTTPTargetPath = "/mqtt" - envHTTPPort = "MF_MQTT_ADAPTER_WS_PORT" - envHTTPTargetHost = "MF_MQTT_ADAPTER_WS_TARGET_HOST" - envHTTPTargetPort = "MF_MQTT_ADAPTER_WS_TARGET_PORT" - envHTTPTargetPath = "MF_MQTT_ADAPTER_WS_TARGET_PATH" - // Things - defThingsAuthURL = "localhost:8183" - defThingsAuthTimeout = "1s" - envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL" - envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT" - // Nats - defNatsURL = "nats://localhost:4222" - envNatsURL = "MF_NATS_URL" - // Jaeger - defJaegerURL = "" - envJaegerURL = "MF_JAEGER_URL" - // TLS - defClientTLS = "false" - defCACerts = "" - envClientTLS = "MF_MQTT_ADAPTER_CLIENT_TLS" - envCACerts = "MF_MQTT_ADAPTER_CA_CERTS" - // Instance - envInstance = "MF_MQTT_ADAPTER_INSTANCE" - defInstance = "" - // ES - envESURL = "MF_MQTT_ADAPTER_ES_URL" - envESPass = "MF_MQTT_ADAPTER_ES_PASS" - envESDB = "MF_MQTT_ADAPTER_ES_DB" - defESURL = "localhost:6379" - defESPass = "" - defESDB = "0" - // Auth cache - envAuthCacheURL = "MF_AUTH_CACHE_URL" - envAuthCachePass = "MF_AUTH_CACHE_PASS" - envAuthCacheDB = "MF_AUTH_CACHE_DB" - defAuthcacheURL = "localhost:6379" - defAuthCachePass = "" - defAuthCacheDB = "0" + envHTTPPort = "MF_MQTT_ADAPTER_WS_PORT" + envHTTPTargetHost = "MF_MQTT_ADAPTER_WS_TARGET_HOST" + envHTTPTargetPort = "MF_MQTT_ADAPTER_WS_TARGET_PORT" + envHTTPTargetPath = "MF_MQTT_ADAPTER_WS_TARGET_PATH" + envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL" + envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT" + envBrokerURL = "MF_BROKER_URL" + envJaegerURL = "MF_JAEGER_URL" + envClientTLS = "MF_MQTT_ADAPTER_CLIENT_TLS" + envCACerts = "MF_MQTT_ADAPTER_CA_CERTS" + envInstance = "MF_MQTT_ADAPTER_INSTANCE" + envESURL = "MF_MQTT_ADAPTER_ES_URL" + envESPass = "MF_MQTT_ADAPTER_ES_PASS" + envESDB = "MF_MQTT_ADAPTER_ES_DB" + envAuthCacheURL = "MF_AUTH_CACHE_URL" + envAuthCachePass = "MF_AUTH_CACHE_PASS" + envAuthCacheDB = "MF_AUTH_CACHE_DB" ) type config struct { @@ -108,7 +100,7 @@ type config struct { thingsURL string thingsAuthURL string thingsAuthTimeout time.Duration - natsURL string + brokerURL string clientTLS bool caCerts string instance string @@ -148,9 +140,9 @@ func main() { ec := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger) defer ec.Close() - nps, err := nats.NewPubSub(cfg.natsURL, "mqtt", logger) + nps, err := brokers.NewPubSub(cfg.brokerURL, "mqtt", logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer nps.Close() @@ -161,15 +153,15 @@ func main() { os.Exit(1) } - fwd := mqtt.NewForwarder(nats.SubjectAllChannels, logger) + fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger) if err := fwd.Forward(svcName, nps, mpub); err != nil { - logger.Error(fmt.Sprintf("Failed to forward NATS messages: %s", err)) + logger.Error(fmt.Sprintf("Failed to forward message broker messages: %s", err)) os.Exit(1) } - np, err := nats.NewPublisher(cfg.natsURL) + np, err := brokers.NewPublisher(cfg.brokerURL) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer np.Close() @@ -241,7 +233,7 @@ func loadConfig() config { thingsAuthURL: mainflux.Env(envThingsAuthURL, defThingsAuthURL), thingsAuthTimeout: authTimeout, thingsURL: mainflux.Env(envThingsAuthURL, defThingsAuthURL), - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), clientTLS: tls, caCerts: mainflux.Env(envCACerts, defCACerts), diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index f61973fd..9999b7d3 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -21,7 +21,7 @@ import ( "github.com/mainflux/mainflux/opcua/gopcua" "github.com/mainflux/mainflux/opcua/redis" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" "golang.org/x/sync/errgroup" kitprometheus "github.com/go-kit/kit/metrics/prometheus" @@ -38,7 +38,7 @@ const ( defOPCMode = "" defOPCCertFile = "" defOPCKeyFile = "" - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defESURL = "localhost:6379" defESPass = "" defESDB = "0" @@ -54,7 +54,7 @@ const ( envOPCMode = "MF_OPCUA_ADAPTER_MODE" envOPCCertFile = "MF_OPCUA_ADAPTER_CERT_FILE" envOPCKeyFile = "MF_OPCUA_ADAPTER_KEY_FILE" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envESURL = "MF_THINGS_ES_URL" envESPass = "MF_THINGS_ES_PASS" envESDB = "MF_THINGS_ES_DB" @@ -71,7 +71,7 @@ const ( type config struct { httpPort string opcuaConfig opcua.Config - natsURL string + brokerURL string logLevel string esURL string esPass string @@ -102,9 +102,9 @@ func main() { esConn := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger) defer esConn.Close() - pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger) + pubSub, err := brokers.NewPubSub(cfg.brokerURL, "", logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pubSub.Close() @@ -162,7 +162,7 @@ func loadConfig() config { return config{ httpPort: mainflux.Env(envHTTPPort, defHTTPPort), opcuaConfig: oc, - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), esURL: mainflux.Env(envESURL, defESURL), esPass: mainflux.Env(envESPass, defESPass), diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index 935f2592..b2af7f1a 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -19,7 +19,7 @@ import ( "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" stdprometheus "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" ) @@ -29,7 +29,7 @@ const ( stopWaitTime = 5 * time.Second defLogLevel = "error" - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defPort = "8180" defDBHost = "localhost" defDBPort = "5432" @@ -42,7 +42,7 @@ const ( defDBSSLRootCert = "" defConfigPath = "/config.toml" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envLogLevel = "MF_POSTGRES_WRITER_LOG_LEVEL" envPort = "MF_POSTGRES_WRITER_PORT" envDBHost = "MF_POSTGRES_WRITER_DB_HOST" @@ -58,7 +58,7 @@ const ( ) type config struct { - natsURL string + brokerURL string logLevel string port string configPath string @@ -75,9 +75,9 @@ func main() { log.Fatalf(err.Error()) } - pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger) + pubSub, err := brokers.NewPubSub(cfg.brokerURL, "", logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pubSub.Close() @@ -123,7 +123,7 @@ func loadConfig() config { } return config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), port: mainflux.Env(envPort, defPort), configPath: mainflux.Env(envConfigPath, defConfigPath), diff --git a/cmd/smpp-notifier/main.go b/cmd/smpp-notifier/main.go index aa598e96..d2b2d451 100644 --- a/cmd/smpp-notifier/main.go +++ b/cmd/smpp-notifier/main.go @@ -28,7 +28,7 @@ import ( "github.com/mainflux/mainflux/consumers/notifiers/tracing" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" "github.com/mainflux/mainflux/pkg/ulid" opentracing "github.com/opentracing/opentracing-go" stdprometheus "github.com/prometheus/client_golang/prometheus" @@ -56,7 +56,7 @@ const ( defServerKey = "" defFrom = "" defJaegerURL = "" - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defSmppAddress = "" defSmppUsername = "" @@ -88,7 +88,7 @@ const ( envServerKey = "MF_SMPP_NOTIFIER_SERVER_KEY" envFrom = "MF_SMPP_NOTIFIER_SOURCE_ADDR" envJaegerURL = "MF_JAEGER_URL" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envSmppAddress = "MF_SMPP_ADDRESS" envSmppUsername = "MF_SMPP_USERNAME" @@ -106,7 +106,7 @@ const ( ) type config struct { - natsURL string + brokerURL string configPath string logLevel string dbConfig postgres.Config @@ -135,9 +135,9 @@ func main() { db := connectToDB(cfg.dbConfig, logger) defer db.Close() - pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger) + pubSub, err := brokers.NewPubSub(cfg.brokerURL, "", logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pubSub.Close() @@ -233,7 +233,7 @@ func loadConfig() config { return config{ logLevel: mainflux.Env(envLogLevel, defLogLevel), - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), configPath: mainflux.Env(envConfigPath, defConfigPath), dbConfig: dbConfig, smppConf: smppConf, diff --git a/cmd/smtp-notifier/main.go b/cmd/smtp-notifier/main.go index 808fad00..053cafac 100644 --- a/cmd/smtp-notifier/main.go +++ b/cmd/smtp-notifier/main.go @@ -27,7 +27,7 @@ import ( "github.com/mainflux/mainflux/internal/email" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" "github.com/mainflux/mainflux/pkg/ulid" opentracing "github.com/opentracing/opentracing-go" stdprometheus "github.com/prometheus/client_golang/prometheus" @@ -56,7 +56,7 @@ const ( defServerKey = "" defFrom = "" defJaegerURL = "" - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defEmailHost = "localhost" defEmailPort = "25" @@ -87,7 +87,7 @@ const ( envServerKey = "MF_SMTP_NOTIFIER_SERVER_KEY" envFrom = "MF_SMTP_NOTIFIER_FROM_ADDR" envJaegerURL = "MF_JAEGER_URL" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envEmailHost = "MF_EMAIL_HOST" envEmailPort = "MF_EMAIL_PORT" @@ -104,7 +104,7 @@ const ( ) type config struct { - natsURL string + brokerURL string configPath string logLevel string dbConfig postgres.Config @@ -133,9 +133,9 @@ func main() { db := connectToDB(cfg.dbConfig, logger) defer db.Close() - pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger) + pubSub, err := brokers.NewPubSub(cfg.brokerURL, "", logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pubSub.Close() @@ -213,7 +213,7 @@ func loadConfig() config { return config{ logLevel: mainflux.Env(envLogLevel, defLogLevel), - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), configPath: mainflux.Env(envConfigPath, defConfigPath), dbConfig: dbConfig, emailConf: emailConf, diff --git a/cmd/timescale-writer/main.go b/cmd/timescale-writer/main.go index b5a79fba..98d9b552 100644 --- a/cmd/timescale-writer/main.go +++ b/cmd/timescale-writer/main.go @@ -19,7 +19,7 @@ import ( "github.com/mainflux/mainflux/consumers/writers/timescale" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" stdprometheus "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" ) @@ -29,7 +29,7 @@ const ( stopWaitTime = 5 * time.Second defLogLevel = "error" - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defPort = "8180" defDBHost = "localhost" defDBPort = "5432" @@ -42,7 +42,7 @@ const ( defDBSSLRootCert = "" defConfigPath = "/config.toml" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envLogLevel = "MF_TIMESCALE_WRITER_LOG_LEVEL" envPort = "MF_TIMESCALE_WRITER_PORT" envDBHost = "MF_TIMESCALE_WRITER_DB_HOST" @@ -58,7 +58,7 @@ const ( ) type config struct { - natsURL string + brokerURL string logLevel string port string configPath string @@ -75,9 +75,9 @@ func main() { log.Fatalf(err.Error()) } - pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger) + pubSub, err := brokers.NewPubSub(cfg.brokerURL, "", logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pubSub.Close() @@ -122,7 +122,7 @@ func loadConfig() config { } return config{ - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), port: mainflux.Env(envPort, defPort), configPath: mainflux.Env(envConfigPath, defConfigPath), diff --git a/cmd/twins/main.go b/cmd/twins/main.go index be28532b..f5a27755 100644 --- a/cmd/twins/main.go +++ b/cmd/twins/main.go @@ -21,7 +21,7 @@ import ( "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/messaging" - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" "github.com/mainflux/mainflux/pkg/uuid" localusers "github.com/mainflux/mainflux/things/standalone" "github.com/mainflux/mainflux/twins" @@ -59,7 +59,7 @@ const ( defClientTLS = "false" defCACerts = "" defChannelID = "" - defNatsURL = "nats://localhost:4222" + defBrokerURL = "nats://localhost:4222" defAuthURL = "localhost:8181" defAuthTimeout = "1s" @@ -79,7 +79,7 @@ const ( envClientTLS = "MF_TWINS_CLIENT_TLS" envCACerts = "MF_TWINS_CA_CERTS" envChannelID = "MF_TWINS_CHANNEL_ID" - envNatsURL = "MF_NATS_URL" + envBrokerURL = "MF_BROKER_URL" envAuthURL = "MF_AUTH_GRPC_URL" envAuthTimeout = "MF_AUTH_GRPC_TIMEOUT" ) @@ -99,7 +99,7 @@ type config struct { clientTLS bool caCerts string channelID string - natsURL string + brokerURL string authURL string authTimeout time.Duration @@ -131,9 +131,9 @@ func main() { defer authCloser.Close() auth, _ := createAuthClient(cfg, authTracer, logger) - pubSub, err := nats.NewPubSub(cfg.natsURL, queue, logger) + pubSub, err := brokers.NewPubSub(cfg.brokerURL, queue, logger) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err)) os.Exit(1) } defer pubSub.Close() @@ -192,7 +192,7 @@ func loadConfig() config { clientTLS: tls, caCerts: mainflux.Env(envCACerts, defCACerts), channelID: mainflux.Env(envChannelID, defChannelID), - natsURL: mainflux.Env(envNatsURL, defNatsURL), + brokerURL: mainflux.Env(envBrokerURL, defBrokerURL), authURL: mainflux.Env(envAuthURL, defAuthURL), authTimeout: authTimeout, } @@ -298,12 +298,11 @@ func newService(id string, ps messaging.PubSub, chanID string, users mainflux.Au Help: "Total duration of requests in microseconds.", }, []string{"method"}), ) - err := ps.Subscribe(id, nats.SubjectAllChannels, handle(logger, chanID, svc)) + err := ps.Subscribe(id, brokers.SubjectAllChannels, handle(logger, chanID, svc)) if err != nil { logger.Error(err.Error()) os.Exit(1) } - return svc } diff --git a/coap/README.md b/coap/README.md index a92e0934..c0896edc 100644 --- a/coap/README.md +++ b/coap/README.md @@ -12,7 +12,7 @@ default values. | Variable | Description | Default | |--------------------------------|--------------------------------------------------------|-----------------------| | MF_COAP_ADAPTER_PORT | Service listening port | 5683 | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | +| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 | | MF_COAP_ADAPTER_LOG_LEVEL | Service log level | error | | MF_COAP_ADAPTER_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | | MF_COAP_ADAPTER_CA_CERTS | Path to trusted CAs in PEM format | | @@ -26,7 +26,7 @@ default values. The service itself is distributed as Docker container. Check the [`coap-adapter`](https://github.com/mainflux/mainflux/blob/master/docker/docker-compose.yml#L273-L291) service section in docker-compose to see how service is deployed. -Running this service outside of container requires working instance of the NATS service. +Running this service outside of container requires working instance of the message broker service. To start the service outside of the container, execute the following shell script: ```bash @@ -42,7 +42,7 @@ make coap make install # set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] \ +MF_BROKER_URL=[Message broker instance URL] \ MF_COAP_ADAPTER_PORT=[Service HTTP port] \ MF_COAP_ADAPTER_LOG_LEVEL=[Service log level] \ MF_COAP_ADAPTER_CLIENT_TLS=[Flag that indicates if TLS should be turned on] \ diff --git a/coap/adapter.go b/coap/adapter.go index d2598772..7aba2893 100644 --- a/coap/adapter.go +++ b/coap/adapter.go @@ -12,7 +12,6 @@ import ( "sync" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/pkg/messaging" @@ -41,12 +40,12 @@ var _ Service = (*adapterService)(nil) // Observers is a map of maps, type adapterService struct { auth mainflux.ThingsServiceClient - pubsub nats.PubSub + pubsub messaging.PubSub obsLock sync.Mutex } // New instantiates the CoAP adapter implementation. -func New(auth mainflux.ThingsServiceClient, pubsub nats.PubSub) Service { +func New(auth mainflux.ThingsServiceClient, pubsub messaging.PubSub) Service { as := &adapterService{ auth: auth, pubsub: pubsub, diff --git a/consumers/messages.go b/consumers/messages.go index 3d34e161..51469230 100644 --- a/consumers/messages.go +++ b/consumers/messages.go @@ -14,7 +14,7 @@ import ( "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/messaging" - pubsub "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/messaging/brokers" "github.com/mainflux/mainflux/pkg/transformers" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" @@ -30,7 +30,7 @@ var ( errParseConfFile = errors.New("unable to parse configuration file") ) -// Start method starts consuming messages received from NATS. +// Start method starts consuming messages received from Message broker. // This method transforms messages to SenML format before // using MessageRepository to store them. func Start(id string, sub messaging.Subscriber, consumer Consumer, configPath string, logger logger.Logger) error { @@ -92,7 +92,7 @@ type config struct { func loadConfig(configPath string) (config, error) { cfg := config{ SubscriberCfg: subscriberConfig{ - Subjects: []string{pubsub.SubjectAllChannels}, + Subjects: []string{brokers.SubjectAllChannels}, }, TransformerCfg: transformerConfig{ Format: defFormat, diff --git a/consumers/notifiers/smpp/README.md b/consumers/notifiers/smpp/README.md index f1d8b007..e7581c6f 100644 --- a/consumers/notifiers/smpp/README.md +++ b/consumers/notifiers/smpp/README.md @@ -25,7 +25,7 @@ default values. | MF_SMPP_NOTIFIER_SERVER_CERT | Path to server cert in pem format | | | MF_SMPP_NOTIFIER_SERVER_KEY | Path to server key in pem format | | | MF_JAEGER_URL | Jaeger server URL | localhost:6831 | -| MF_NATS_URL | NATS broker URL | nats://127.0.0.1:4222 | +| MF_BROKER_URL | Message broker URL | nats://127.0.0.1:4222 | | MF_SMPP_ADDRESS | SMPP address [host:port] | | | MF_SMPP_USERNAME | SMPP Username | | | MF_SMPP_PASSWORD | SMPP Password | | diff --git a/consumers/notifiers/smtp/README.md b/consumers/notifiers/smtp/README.md index e4ee713c..27f05581 100644 --- a/consumers/notifiers/smtp/README.md +++ b/consumers/notifiers/smtp/README.md @@ -16,7 +16,7 @@ default values. | MF_SMTP_NOTIFIER_DB_USER | Database user | mainflux | | MF_SMTP_NOTIFIER_DB_PASS | Database password | mainflux | | MF_SMTP_NOTIFIER_DB | Name of the database used by the service | subscriptions | -| MF_SMTP_NOTIFIER_CONFIG_PATH | Path to the config file with NATS subjects configuration | disable | +| MF_SMTP_NOTIFIER_CONFIG_PATH | Path to the config file with message broker subjects configuration | disable | | MF_SMTP_NOTIFIER_DB_SSL_MODE | Database connection SSL mode (disable, require, verify-ca, verify-full) | | | MF_SMTP_NOTIFIER_DB_SSL_CERT | Path to the PEM encoded cert file | | | MF_SMTP_NOTIFIER_DB_SSL_KEY | Path to the PEM encoded certificate key | | @@ -25,7 +25,7 @@ default values. | MF_SMTP_NOTIFIER_SERVER_CERT | Path to server cert in pem format | | | MF_SMTP_NOTIFIER_SERVER_KEY | Path to server key in pem format | | | MF_JAEGER_URL | Jaeger server URL | localhost:6831 | -| MF_NATS_URL | NATS broker URL | nats://127.0.0.1:4222 | +| MF_BROKER_URL | Message broker URL | nats://127.0.0.1:4222 | | MF_EMAIL_HOST | Mail server host | localhost | | MF_EMAIL_PORT | Mail server port | 25 | | MF_EMAIL_USERNAME | Mail server username | | diff --git a/consumers/writers/cassandra/README.md b/consumers/writers/cassandra/README.md index 3ff9dc7f..c3d56a1b 100644 --- a/consumers/writers/cassandra/README.md +++ b/consumers/writers/cassandra/README.md @@ -10,7 +10,7 @@ default values. | Variable | Description | Default | | -------------------------------- | ----------------------------------------------------------------------- | --------------------- | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | +| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 | | MF_CASSANDRA_WRITER_LOG_LEVEL | Log level for Cassandra writer (debug, info, warn, error) | error | | MF_CASSANDRA_WRITER_PORT | Service HTTP port | 8180 | | MF_CASSANDRA_WRITER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 | @@ -38,7 +38,7 @@ make cassandra-writer make install # Set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] \ +MF_BROKER_URL=[NATS instance URL] \ MF_CASSANDRA_WRITER_LOG_LEVEL=[Cassandra writer log level] \ MF_CASSANDRA_WRITER_PORT=[Service HTTP port] \ MF_CASSANDRA_WRITER_DB_CLUSTER=[Cassandra cluster comma separated addresses] \ diff --git a/consumers/writers/influxdb/README.md b/consumers/writers/influxdb/README.md index 11df052e..fd94ea5d 100644 --- a/consumers/writers/influxdb/README.md +++ b/consumers/writers/influxdb/README.md @@ -8,17 +8,17 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ----------------------------- | ----------------------------------------------------------------------- | ---------------------- | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error | -| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 | -| MF_INFLUXDB_HOST | InfluxDB host | localhost | -| MF_INFLUXDB_PORT | Default port of InfluxDB database | 8086 | -| MF_INFLUXDB_ADMIN_USER | Default user of InfluxDB database | mainflux | -| MF_INFLUXDB_ADMIN_PASSWORD | Default password of InfluxDB user | mainflux | -| MF_INFLUXDB_DB | InfluxDB database name | mainflux | -| MF_INFLUX_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /configs.toml | +| Variable | Description | Default | +| ----------------------------- | --------------------------------------------------------------------------------- | ---------------------- | +| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 | +| MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error | +| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 | +| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost | +| MF_INFLUXDB_PORT | Default port of InfluxDB database | 8086 | +| MF_INFLUXDB_ADMIN_USER | Default user of InfluxDB database | mainflux | +| MF_INFLUXDB_ADMIN_PASSWORD | Default password of InfluxDB user | mainflux | +| MF_INFLUXDB_DB | InfluxDB database name | mainflux | +| MF_INFLUX_WRITER_CONFIG_PATH | Config file path with message broker subjects list, payload type and content-type | /configs.toml | ## Deployment @@ -39,7 +39,7 @@ make influxdb make install # Set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] \ +MF_BROKER_URL=[Message broker instance URL] \ MF_INFLUX_WRITER_LOG_LEVEL=[Influx writer log level] \ MF_INFLUX_WRITER_PORT=[Service HTTP port] \ MF_INFLUXDB_DB=[InfluxDB database name] \ @@ -47,7 +47,7 @@ MF_INFLUXDB_HOST=[InfluxDB database host] \ MF_INFLUXDB_PORT=[InfluxDB database port] \ MF_INFLUXDB_ADMIN_USER=[InfluxDB admin user] \ MF_INFLUXDB_ADMIN_PASSWORD=[InfluxDB admin password] \ -MF_INFLUX_WRITER_CONFIG_PATH=[Config file path with NATS subjects list, payload type and content-type] \ +MF_INFLUX_WRITER_CONFIG_PATH=[Config file path with Message broker subjects list, payload type and content-type] \ $GOBIN/mainflux-influxdb ``` diff --git a/consumers/writers/mongodb/README.md b/consumers/writers/mongodb/README.md index 6c46eb72..f26ecfb0 100644 --- a/consumers/writers/mongodb/README.md +++ b/consumers/writers/mongodb/README.md @@ -8,15 +8,15 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ---------------------------- | ----------------------------------------------------------------------- | ---------------------- | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_MONGO_WRITER_LOG_LEVEL | Log level for MongoDB writer | error | -| MF_MONGO_WRITER_PORT | Service HTTP port | 8180 | -| MF_MONGO_WRITER_DB | Default MongoDB database name | messages | -| MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost | -| MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 | -| MF_MONGO_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /config.toml | +| Variable | Description | Default | +| ---------------------------- | --------------------------------------------------------------------------------- | ---------------------- | +| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 | +| MF_MONGO_WRITER_LOG_LEVEL | Log level for MongoDB writer | error | +| MF_MONGO_WRITER_PORT | Service HTTP port | 8180 | +| MF_MONGO_WRITER_DB | Default MongoDB database name | messages | +| MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost | +| MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 | +| MF_MONGO_WRITER_CONFIG_PATH | Config file path with Message broker subjects list, payload type and content-type | /config.toml | ## Deployment @@ -37,13 +37,13 @@ make mongodb-writer make install # Set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] \ +MF_BROKER_URL=[Message broker instance URL] \ MF_MONGO_WRITER_LOG_LEVEL=[MongoDB writer log level] \ MF_MONGO_WRITER_PORT=[Service HTTP port] \ MF_MONGO_WRITER_DB=[MongoDB database name] \ MF_MONGO_WRITER_DB_HOST=[MongoDB database host] \ MF_MONGO_WRITER_DB_PORT=[MongoDB database port] \ -MF_MONGO_WRITER_CONFIG_PATH=[Configuration file path with NATS subjects list] \ +MF_MONGO_WRITER_CONFIG_PATH=[Configuration file path with Message broker subjects list] \ $GOBIN/mainflux-mongodb-writer ``` diff --git a/consumers/writers/postgres/README.md b/consumers/writers/postgres/README.md index a1f4957d..2b7f37a6 100644 --- a/consumers/writers/postgres/README.md +++ b/consumers/writers/postgres/README.md @@ -8,21 +8,21 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ----------------------------------- | ----------------------------------------------------------------------- | ---------------------- | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_POSTGRES_WRITER_LOG_LEVEL | Service log level | error | -| MF_POSTGRES_WRITER_PORT | Service HTTP port | 9104 | -| MF_POSTGRES_WRITER_DB_HOST | Postgres DB host | postgres | -| MF_POSTGRES_WRITER_DB_PORT | Postgres DB port | 5432 | -| MF_POSTGRES_WRITER_DB_USER | Postgres user | mainflux | -| MF_POSTGRES_WRITER_DB_PASS | Postgres password | mainflux | -| MF_POSTGRES_WRITER_DB | Postgres database name | messages | -| MF_POSTGRES_WRITER_DB_SSL_MODE | Postgres SSL mode | disabled | -| MF_POSTGRES_WRITER_DB_SSL_CERT | Postgres SSL certificate path | "" | -| MF_POSTGRES_WRITER_DB_SSL_KEY | Postgres SSL key | "" | -| MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" | -| MF_POSTGRES_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /config.toml | +| Variable | Description | Default | +| ----------------------------------- | --------------------------------------------------------------------------------- | ---------------------- | +| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 | +| MF_POSTGRES_WRITER_LOG_LEVEL | Service log level | error | +| MF_POSTGRES_WRITER_PORT | Service HTTP port | 9104 | +| MF_POSTGRES_WRITER_DB_HOST | Postgres DB host | postgres | +| MF_POSTGRES_WRITER_DB_PORT | Postgres DB port | 5432 | +| MF_POSTGRES_WRITER_DB_USER | Postgres user | mainflux | +| MF_POSTGRES_WRITER_DB_PASS | Postgres password | mainflux | +| MF_POSTGRES_WRITER_DB | Postgres database name | messages | +| MF_POSTGRES_WRITER_DB_SSL_MODE | Postgres SSL mode | disabled | +| MF_POSTGRES_WRITER_DB_SSL_CERT | Postgres SSL certificate path | "" | +| MF_POSTGRES_WRITER_DB_SSL_KEY | Postgres SSL key | "" | +| MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" | +| MF_POSTGRES_WRITER_CONFIG_PATH | Config file path with Message broker subjects list, payload type and content-type | /config.toml | ## Deployment @@ -43,7 +43,7 @@ make postgres-writer make install # Set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] \ +MF_BROKER_URL=[Message broker instance URL] \ MF_POSTGRES_WRITER_LOG_LEVEL=[Service log level] \ MF_POSTGRES_WRITER_PORT=[Service HTTP port] \ MF_POSTGRES_WRITER_DB_HOST=[Postgres host] \ @@ -55,7 +55,7 @@ MF_POSTGRES_WRITER_DB_SSL_MODE=[Postgres SSL mode] \ MF_POSTGRES_WRITER_DB_SSL_CERT=[Postgres SSL cert] \ MF_POSTGRES_WRITER_DB_SSL_KEY=[Postgres SSL key] \ MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT=[Postgres SSL Root cert] \ -MF_POSTGRES_WRITER_CONFIG_PATH=[Config file path with NATS subjects list, payload type and content-type] \ +MF_POSTGRES_WRITER_CONFIG_PATH=[Config file path with Message broker subjects list, payload type and content-type] \ $GOBIN/mainflux-postgres-writer ``` diff --git a/consumers/writers/timescale/README.md b/consumers/writers/timescale/README.md index adee2fc6..bb749e08 100644 --- a/consumers/writers/timescale/README.md +++ b/consumers/writers/timescale/README.md @@ -8,21 +8,21 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| ----------------------------------- | ----------------------------------------------- | ---------------------- | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_TIMESCALE_WRITER_LOG_LEVEL | Service log level | error | -| MF_TIMESCALE_WRITER_PORT | Service HTTP port | 9104 | -| MF_TIMESCALE_WRITER_DB_HOST | Timescale DB host | timescale | -| MF_TIMESCALE_WRITER_DB_PORT | Timescale DB port | 5432 | -| MF_TIMESCALE_WRITER_DB_USER | Timescale user | mainflux | -| MF_TIMESCALE_WRITER_DB_PASS | Timescale password | mainflux | -| MF_TIMESCALE_WRITER_DB | Timescale database name | messages | -| MF_TIMESCALE_WRITER_DB_SSL_MODE | Timescale SSL mode | disabled | -| MF_TIMESCALE_WRITER_DB_SSL_CERT | Timescale SSL certificate path | "" | -| MF_TIMESCALE_WRITER_DB_SSL_KEY | Timescale SSL key | "" | -| MF_TIMESCALE_WRITER_DB_SSL_ROOT_CERT | Timescale SSL root certificate path | "" | -| MF_TIMESCALE_WRITER_CONFIG_PATH | Configuration file path with NATS subjects list | /config.toml | +| Variable | Description | Default | +| ----------------------------------- | --------------------------------------------------------- | ---------------------- | +| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 | +| MF_TIMESCALE_WRITER_LOG_LEVEL | Service log level | error | +| MF_TIMESCALE_WRITER_PORT | Service HTTP port | 9104 | +| MF_TIMESCALE_WRITER_DB_HOST | Timescale DB host | timescale | +| MF_TIMESCALE_WRITER_DB_PORT | Timescale DB port | 5432 | +| MF_TIMESCALE_WRITER_DB_USER | Timescale user | mainflux | +| MF_TIMESCALE_WRITER_DB_PASS | Timescale password | mainflux | +| MF_TIMESCALE_WRITER_DB | Timescale database name | messages | +| MF_TIMESCALE_WRITER_DB_SSL_MODE | Timescale SSL mode | disabled | +| MF_TIMESCALE_WRITER_DB_SSL_CERT | Timescale SSL certificate path | "" | +| MF_TIMESCALE_WRITER_DB_SSL_KEY | Timescale SSL key | "" | +| MF_TIMESCALE_WRITER_DB_SSL_ROOT_CERT | Timescale SSL root certificate path | "" | +| MF_TIMESCALE_WRITER_CONFIG_PATH | Configuration file path with Message broker subjects list | /config.toml | ## Deployment @@ -43,7 +43,7 @@ make timescale-writer make install # Set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] \ +MF_BROKER_URL=[Message broker instance URL] \ MF_TIMESCALE_WRITER_LOG_LEVEL=[Service log level] \ MF_TIMESCALE_WRITER_PORT=[Service HTTP port] \ MF_TIMESCALE_WRITER_DB_HOST=[Timescale host] \ @@ -55,7 +55,7 @@ MF_TIMESCALE_WRITER_DB_SSL_MODE=[Timescale SSL mode] \ MF_TIMESCALE_WRITER_DB_SSL_CERT=[Timescale SSL cert] \ MF_TIMESCALE_WRITER_DB_SSL_KEY=[Timescale SSL key] \ MF_TIMESCALE_WRITER_DB_SSL_ROOT_CERT=[Timescale SSL Root cert] \ -MF_TIMESCALE_WRITER_CONFIG_PATH=[Configuration file path with NATS subjects list] \ +MF_TIMESCALE_WRITER_CONFIG_PATH=[Configuration file path with Message broker subjects list] \ MF_TIMESCALE_WRITER_TRANSFORMER=[Message transformer type] \ $GOBIN/mainflux-timescale-writer ``` diff --git a/docker/.env b/docker/.env index 6c306699..449fd43d 100644 --- a/docker/.env +++ b/docker/.env @@ -6,8 +6,21 @@ MF_NGINX_SSL_PORT=443 MF_NGINX_MQTT_PORT=1883 MF_NGINX_MQTTS_PORT=8883 -## NATS -MF_NATS_URL=nats://nats:4222 +# Message Broker +MF_BROKER_TYPE=nats + +## Nats +MF_NATS_PORT=4222 +MF_NATS_URL=nats://broker:${MF_NATS_PORT} + +## RabbitMQ +MF_RABBITMQ_PORT=5672 +MF_RABBITMQ_HTTP_PORT=15672 +MF_RABBITMQ_USER=mainflux +MF_RABBITMQ_PASS=mainflux +MF_RABBITMQ_COOKIE=mainflux +MF_RABBITMQ_VHOST=/ +MF_RABBITMQ_URL=amqp://${MF_RABBITMQ_USER}:${MF_RABBITMQ_PASS}@broker:${MF_RABBITMQ_PORT}${MF_MF_RABBITMQ_VHOST} ## Redis MF_REDIS_TCP_PORT=6379 diff --git a/docker/addons/cassandra-writer/docker-compose.yml b/docker/addons/cassandra-writer/docker-compose.yml index 97b3b589..bb3831fb 100644 --- a/docker/addons/cassandra-writer/docker-compose.yml +++ b/docker/addons/cassandra-writer/docker-compose.yml @@ -35,7 +35,7 @@ services: restart: on-failure environment: MF_CASSANDRA_WRITER_LOG_LEVEL: ${MF_CASSANDRA_WRITER_LOG_LEVEL} - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_BROKER_URL} MF_CASSANDRA_WRITER_PORT: ${MF_CASSANDRA_WRITER_PORT} MF_CASSANDRA_WRITER_DB_PORT: ${MF_CASSANDRA_WRITER_DB_PORT} MF_CASSANDRA_WRITER_DB_CLUSTER: ${MF_CASSANDRA_WRITER_DB_CLUSTER} diff --git a/docker/addons/influxdb-writer/docker-compose.yml b/docker/addons/influxdb-writer/docker-compose.yml index 1ebb2e0b..b7b69e96 100644 --- a/docker/addons/influxdb-writer/docker-compose.yml +++ b/docker/addons/influxdb-writer/docker-compose.yml @@ -41,7 +41,7 @@ services: restart: on-failure environment: MF_INFLUX_WRITER_LOG_LEVEL: debug - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_BROKER_URL} MF_INFLUX_WRITER_PORT: ${MF_INFLUX_WRITER_PORT} MF_INFLUX_WRITER_BATCH_SIZE: ${MF_INFLUX_WRITER_BATCH_SIZE} MF_INFLUX_WRITER_BATCH_TIMEOUT: ${MF_INFLUX_WRITER_BATCH_TIMEOUT} diff --git a/docker/addons/lora-adapter/docker-compose.yml b/docker/addons/lora-adapter/docker-compose.yml index 177bfb21..150885eb 100644 --- a/docker/addons/lora-adapter/docker-compose.yml +++ b/docker/addons/lora-adapter/docker-compose.yml @@ -34,7 +34,7 @@ services: MF_LORA_ADAPTER_MESSAGES_PASS: ${MF_LORA_ADAPTER_MESSAGES_PASS} MF_LORA_ADAPTER_MESSAGES_TIMEOUT: ${MF_LORA_ADAPTER_MESSAGES_TIMEOUT} MF_LORA_ADAPTER_HTTP_PORT: ${MF_LORA_ADAPTER_HTTP_PORT} - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_BROKER_URL} ports: - ${MF_LORA_ADAPTER_HTTP_PORT}:${MF_LORA_ADAPTER_HTTP_PORT} networks: diff --git a/docker/addons/mongodb-writer/docker-compose.yml b/docker/addons/mongodb-writer/docker-compose.yml index 16f293fe..70da6dc3 100644 --- a/docker/addons/mongodb-writer/docker-compose.yml +++ b/docker/addons/mongodb-writer/docker-compose.yml @@ -41,7 +41,7 @@ services: restart: on-failure environment: MF_MONGO_WRITER_LOG_LEVEL: ${MF_MONGO_WRITER_LOG_LEVEL} - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_BROKER_URL} MF_MONGO_WRITER_PORT: ${MF_MONGO_WRITER_PORT} MF_MONGO_WRITER_DB: ${MF_MONGO_WRITER_DB} MF_MONGO_WRITER_DB_HOST: mongodb diff --git a/docker/addons/opcua-adapter/docker-compose.yml b/docker/addons/opcua-adapter/docker-compose.yml index 823efc5b..70e63664 100644 --- a/docker/addons/opcua-adapter/docker-compose.yml +++ b/docker/addons/opcua-adapter/docker-compose.yml @@ -33,7 +33,7 @@ services: environment: MF_OPCUA_ADAPTER_HTTP_PORT: ${MF_OPCUA_ADAPTER_HTTP_PORT} MF_OPCUA_ADAPTER_LOG_LEVEL: ${MF_OPCUA_ADAPTER_LOG_LEVEL} - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_BROKER_URL} MF_OPCUA_ADAPTER_POLICY: ${MF_OPCUA_ADAPTER_POLICY} MF_OPCUA_ADAPTER_MODE: ${MF_OPCUA_ADAPTER_MODE} MF_OPCUA_ADAPTER_CERT_FILE: ${MF_OPCUA_ADAPTER_CERT_FILE} diff --git a/docker/addons/postgres-writer/docker-compose.yml b/docker/addons/postgres-writer/docker-compose.yml index e46ba145..4c2cdad1 100644 --- a/docker/addons/postgres-writer/docker-compose.yml +++ b/docker/addons/postgres-writer/docker-compose.yml @@ -38,7 +38,7 @@ services: - postgres restart: on-failure environment: - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_BROKER_URL} MF_POSTGRES_WRITER_LOG_LEVEL: ${MF_POSTGRES_WRITER_LOG_LEVEL} MF_POSTGRES_WRITER_PORT: ${MF_POSTGRES_WRITER_PORT} MF_POSTGRES_WRITER_DB_HOST: postgres diff --git a/docker/addons/smpp-notifier/docker-compose.yml b/docker/addons/smpp-notifier/docker-compose.yml index bf8af3a3..d46cc2cb 100644 --- a/docker/addons/smpp-notifier/docker-compose.yml +++ b/docker/addons/smpp-notifier/docker-compose.yml @@ -36,7 +36,7 @@ services: - smpp-notifier-db restart: on-failure environment: - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_BROKER_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_AUTH_GRPC_URL: ${MF_AUTH_GRPC_URL} MF_AUTH_GRPC_TIMEOUT: ${MF_AUTH_GRPC_TIMEOUT} diff --git a/docker/addons/smtp-notifier/docker-compose.yml b/docker/addons/smtp-notifier/docker-compose.yml index 8da313a2..877e652c 100644 --- a/docker/addons/smtp-notifier/docker-compose.yml +++ b/docker/addons/smtp-notifier/docker-compose.yml @@ -43,7 +43,7 @@ services: MF_SMTP_NOTIFIER_DB_PASS: ${MF_SMTP_NOTIFIER_DB_PASS} MF_SMTP_NOTIFIER_DB: ${MF_SMTP_NOTIFIER_DB} MF_SMTP_NOTIFIER_PORT: ${MF_SMTP_NOTIFIER_PORT} - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_BROKER_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_AUTH_GRPC_URL: ${MF_AUTH_GRPC_URL} MF_AUTH_GRPC_TIMEOUT: ${MF_AUTH_GRPC_TIMEOUT} diff --git a/docker/addons/timescale-writer/docker-compose.yml b/docker/addons/timescale-writer/docker-compose.yml index c08a6191..7c8a21a3 100644 --- a/docker/addons/timescale-writer/docker-compose.yml +++ b/docker/addons/timescale-writer/docker-compose.yml @@ -38,7 +38,7 @@ services: - timescale restart: on-failure environment: - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_BROKER_URL} MF_TIMESCALE_WRITER_LOG_LEVEL: ${MF_TIMESCALE_WRITER_LOG_LEVEL} MF_TIMESCALE_WRITER_PORT: ${MF_TIMESCALE_WRITER_PORT} MF_TIMESCALE_WRITER_DB_HOST: timescale diff --git a/docker/addons/twins/docker-compose.yml b/docker/addons/twins/docker-compose.yml index 3c142a24..675bda22 100644 --- a/docker/addons/twins/docker-compose.yml +++ b/docker/addons/twins/docker-compose.yml @@ -43,7 +43,7 @@ services: MF_TWINS_DB_HOST: ${MF_TWINS_DB_HOST} MF_TWINS_DB_PORT: ${MF_TWINS_DB_PORT} MF_TWINS_CHANNEL_ID: ${MF_TWINS_CHANNEL_ID} - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_BROKER_URL} MF_AUTH_GRPC_URL: ${MF_AUTH_GRPC_URL} MF_AUTH_GRPC_TIMEOUT: ${MF_AUTH_GRPC_TIMEOUT} MF_TWINS_CACHE_URL: ${MF_TWINS_CACHE_URL} diff --git a/docker/brokers/nats.yml b/docker/brokers/nats.yml new file mode 100644 index 00000000..efbbf2a7 --- /dev/null +++ b/docker/brokers/nats.yml @@ -0,0 +1,8 @@ +services: + broker: + image: nats:2.2.4-alpine + command: "-c /etc/nats/nats.conf" + volumes: + - ./../nats/:/etc/nats + ports: + - ${MF_NATS_PORT}:${MF_NATS_PORT} diff --git a/docker/brokers/rabbitmq.yml b/docker/brokers/rabbitmq.yml new file mode 100644 index 00000000..90c81cf9 --- /dev/null +++ b/docker/brokers/rabbitmq.yml @@ -0,0 +1,11 @@ +services: + broker: + image: rabbitmq:3.9.20-management-alpine + environment: + RABBITMQ_ERLANG_COOKIE: ${MF_RABBITMQ_COOKIE} + RABBITMQ_DEFAULT_USER: ${MF_RABBITMQ_USER} + RABBITMQ_DEFAULT_PASS: ${MF_RABBITMQ_PASS} + RABBITMQ_DEFAULT_VHOST: ${MF_RABBITMQ_VHOST} + ports: + - ${MF_RABBITMQ_PORT}:${MF_RABBITMQ_PORT} + - ${MF_RABBITMQ_HTTP_PORT}:${MF_RABBITMQ_HTTP_PORT} diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index a663f058..481ce994 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -95,13 +95,12 @@ services: - mqtt-adapter - http-adapter - nats: - image: nats:2.2.4-alpine - container_name: mainflux-nats - command: "-c /etc/nats/nats.conf" + broker: + extends: + file: brokers/nats.yml + service: broker + container_name: mainflux-broker restart: on-failure - volumes: - - ./nats/:/etc/nats networks: - mainflux-base-net ports: @@ -280,14 +279,14 @@ services: depends_on: - vernemq - things - - nats + - broker restart: on-failure environment: MF_MQTT_ADAPTER_LOG_LEVEL: ${MF_MQTT_ADAPTER_LOG_LEVEL} MF_MQTT_ADAPTER_MQTT_PORT: ${MF_MQTT_ADAPTER_MQTT_PORT} MF_MQTT_ADAPTER_WS_PORT: ${MF_MQTT_ADAPTER_WS_PORT} MF_MQTT_ADAPTER_ES_URL: es-redis:${MF_REDIS_TCP_PORT} - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_NATS_URL} MF_MQTT_ADAPTER_MQTT_TARGET_HOST: vernemq MF_MQTT_ADAPTER_MQTT_TARGET_PORT: ${MF_MQTT_BROKER_PORT} MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK: http://vernemq:8888/health @@ -305,12 +304,12 @@ services: container_name: mainflux-http depends_on: - things - - nats + - broker restart: on-failure environment: MF_HTTP_ADAPTER_LOG_LEVEL: debug MF_HTTP_ADAPTER_PORT: ${MF_HTTP_ADAPTER_PORT} - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_NATS_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} @@ -333,12 +332,12 @@ services: container_name: mainflux-coap depends_on: - things - - nats + - broker restart: on-failure environment: MF_COAP_ADAPTER_LOG_LEVEL: ${MF_COAP_ADAPTER_LOG_LEVEL} MF_COAP_ADAPTER_PORT: ${MF_COAP_ADAPTER_PORT} - MF_NATS_URL: ${MF_NATS_URL} + MF_BROKER_URL: ${MF_NATS_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} diff --git a/http/README.md b/http/README.md index 8ed6a3a0..b3c943c5 100644 --- a/http/README.md +++ b/http/README.md @@ -8,16 +8,16 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -| --------------------------- | --------------------------------------------------- | --------------------- | -| MF_HTTP_ADAPTER_LOG_LEVEL | Log level for the HTTP Adapter | error | -| MF_HTTP_ADAPTER_PORT | Service HTTP port | 8180 | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_HTTP_ADAPTER_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | -| MF_HTTP_ADAPTER_CA_CERTS | Path to trusted CAs in PEM format | | -| MF_JAEGER_URL | Jaeger server URL | localhost:6831 | -| MF_THINGS_AUTH_GRPC_URL | Things service Auth gRPC URL | localhost:8181 | -| MF_THINGS_AUTH_GRPC_TIMEOUT | Things service Auth gRPC request timeout in seconds | 1s | +| Variable | Description | Default | +| --------------------------- | ------------------------------------------------------------- | --------------------- | +| MF_HTTP_ADAPTER_LOG_LEVEL | Log level for the HTTP Adapter | error | +| MF_HTTP_ADAPTER_PORT | Service HTTP port | 8180 | +| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 | +| MF_HTTP_ADAPTER_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | +| MF_HTTP_ADAPTER_CA_CERTS | Path to trusted CAs in PEM format | | +| MF_JAEGER_URL | Jaeger server URL | localhost:6831 | +| MF_THINGS_AUTH_GRPC_URL | Things service Auth gRPC URL | localhost:8181 | +| MF_THINGS_AUTH_GRPC_TIMEOUT | Things service Auth gRPC request timeout in seconds | 1s | ## Deployment @@ -39,7 +39,7 @@ make http make install # set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] \ +MF_BROKER_URL=[Message broker instance URL] \ MF_HTTP_ADAPTER_LOG_LEVEL=[HTTP Adapter Log Level] \ MF_HTTP_ADAPTER_PORT=[Service HTTP port] \ MF_HTTP_ADAPTER_CA_CERTS=[Path to trusted CAs in PEM format] \ diff --git a/http/mocks/publisher.go b/http/mocks/publisher.go index 5f52520b..0d690ed9 100644 --- a/http/mocks/publisher.go +++ b/http/mocks/publisher.go @@ -17,3 +17,7 @@ func NewPublisher() messaging.Publisher { func (pub mockPublisher) Publish(topic string, msg messaging.Message) error { return nil } + +func (pub mockPublisher) Close() error { + return nil +} diff --git a/lora/README.md b/lora/README.md index 2ff68b78..27e5b526 100644 --- a/lora/README.md +++ b/lora/README.md @@ -15,7 +15,7 @@ default values. |----------------------------------|---------------------------------------|---------------------------------| | MF_LORA_ADAPTER_HTTP_PORT | Service HTTP port | 8180 | | MF_LORA_ADAPTER_LOG_LEVEL | Service Log level | error | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | +| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 | | MF_LORA_ADAPTER_MESSAGES_URL | LoRa adapter MQTT broker URL | tcp://localhost:1883 | | MF_LORA_ADAPTER_MESSAGES_TOPIC | LoRa adapter MQTT subscriber Topic | application/+/device/+/event/up | | MF_LORA_ADAPTER_MESSAGES_USER | LoRa adapter MQTT subscriber Username | | @@ -50,7 +50,7 @@ make install # set the environment variables and run the service MF_LORA_ADAPTER_LOG_LEVEL=[Lora Adapter Log Level] \ -MF_NATS_URL=[NATS instance URL] \ +MF_BROKER_URL=[Message broker instance URL] \ MF_LORA_ADAPTER_MESSAGES_URL=[LoRa adapter MQTT broker URL] \ MF_LORA_ADAPTER_MESSAGES_TOPIC=[LoRa adapter MQTT subscriber Topic] \ MF_LORA_ADAPTER_MESSAGES_USER=[LoRa adapter MQTT subscriber Username] \ diff --git a/lora/adapter.go b/lora/adapter.go index 081fd0c2..116f7997 100644 --- a/lora/adapter.go +++ b/lora/adapter.go @@ -54,7 +54,7 @@ type Service interface { // DisconnectThing removes thingID:channelID route-map DisconnectThing(ctx context.Context, chanID, thingID string) error - // Publish forwards messages from the LoRa MQTT broker to Mainflux NATS broker + // Publish forwards messages from the LoRa MQTT broker to Mainflux Message Broker Publish(ctx context.Context, msg Message) error } @@ -77,7 +77,7 @@ func New(publisher messaging.Publisher, thingsRM, channelsRM, connectRM RouteMap } } -// Publish forwards messages from Lora MQTT broker to Mainflux NATS broker +// Publish forwards messages from Lora MQTT broker to Mainflux Message broker func (as *adapterService) Publish(ctx context.Context, m Message) error { // Get route map of lora application thingID, err := as.thingsRM.Get(ctx, m.DevEUI) @@ -113,7 +113,7 @@ func (as *adapterService) Publish(ctx context.Context, m Message) error { payload = []byte(jo) } - // Publish on Mainflux NATS broker + // Publish on Mainflux Message broker msg := messaging.Message{ Publisher: thingID, Protocol: protocol, diff --git a/lora/mocks/publisher.go b/lora/mocks/publisher.go index 5f52520b..0d690ed9 100644 --- a/lora/mocks/publisher.go +++ b/lora/mocks/publisher.go @@ -17,3 +17,7 @@ func NewPublisher() messaging.Publisher { func (pub mockPublisher) Publish(topic string, msg messaging.Message) error { return nil } + +func (pub mockPublisher) Close() error { + return nil +} diff --git a/mqtt/README.md b/mqtt/README.md index 2a77cbdb..5ccb8fa0 100644 --- a/mqtt/README.md +++ b/mqtt/README.md @@ -10,31 +10,31 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -|------------------------------------------|--------------------------------------------------------|-----------------------| -| MF_MQTT_ADAPTER_LOG_LEVEL | mProxy Log level | error | -| MF_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 | -| MF_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | 0.0.0.0 | -| MF_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 | -| MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" | -| MF_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 | -| MF_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost | -| MF_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 | -| MF_MQTT_ADAPTER_WS_TARGET_PATH | MQTT broker MQTT over WS path | /mqtt | -| MF_MQTT_ADAPTER_FORWARDER_TIMEOUT | MQTT forwarder for multiprotocol communication timeout | 30s | -| MF_NATS_URL | NATS broker URL | nats://127.0.0.1:4222 | -| MF_THINGS_AUTH_GRPC_URL | Things gRPC endpoint URL | localhost:8181 | -| MF_THINGS_AUTH_GRPC_TIMEOUT | Timeout in seconds for Things service gRPC calls | 1s | -| MF_JAEGER_URL | URL of Jaeger tracing service | "" | -| MF_MQTT_ADAPTER_CLIENT_TLS | gRPC client TLS | false | -| MF_MQTT_ADAPTER_CA_CERTS | CA certs for gRPC client TLS | "" | -| MF_MQTT_ADAPTER_INSTANCE | Instance name for event sourcing | "" | -| MF_MQTT_ADAPTER_ES_URL | Event sourcing URL | localhost:6379 | -| MF_MQTT_ADAPTER_ES_PASS | Event sourcing password | "" | -| MF_MQTT_ADAPTER_ES_DB | Event sourcing database | "0" | -| MF_AUTH_CACHE_URL | Auth cache URL | localhost:6379 | -| MF_AUTH_CACHE_PASS | Auth cache password | "" | -| MF_AUTH_CACHE_DB | Auth cache database | "0" | +| Variable | Description | Default | +|------------------------------------------|------------------------------------------------------------------|-----------------------| +| MF_MQTT_ADAPTER_LOG_LEVEL | mProxy Log level | error | +| MF_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 | +| MF_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | 0.0.0.0 | +| MF_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 | +| MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" | +| MF_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 | +| MF_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost | +| MF_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 | +| MF_MQTT_ADAPTER_WS_TARGET_PATH | MQTT broker MQTT over WS path | /mqtt | +| MF_MQTT_ADAPTER_FORWARDER_TIMEOUT | MQTT forwarder for multiprotocol communication timeout | 30s | +| MF_BROKER_URL | Message broker broker URL | nats://127.0.0.1:4222 | +| MF_THINGS_AUTH_GRPC_URL | Things gRPC endpoint URL | localhost:8181 | +| MF_THINGS_AUTH_GRPC_TIMEOUT | Timeout in seconds for Things service gRPC calls | 1s | +| MF_JAEGER_URL | URL of Jaeger tracing service | "" | +| MF_MQTT_ADAPTER_CLIENT_TLS | gRPC client TLS | false | +| MF_MQTT_ADAPTER_CA_CERTS | CA certs for gRPC client TLS | "" | +| MF_MQTT_ADAPTER_INSTANCE | Instance name for event sourcing | "" | +| MF_MQTT_ADAPTER_ES_URL | Event sourcing URL | localhost:6379 | +| MF_MQTT_ADAPTER_ES_PASS | Event sourcing password | "" | +| MF_MQTT_ADAPTER_ES_DB | Event sourcing database | "0" | +| MF_AUTH_CACHE_URL | Auth cache URL | localhost:6379 | +| MF_AUTH_CACHE_PASS | Auth cache password | "" | +| MF_AUTH_CACHE_DB | Auth cache database | "0" | ## Deployment @@ -66,7 +66,7 @@ MF_MQTT_ADAPTER_WS_TARGET_HOST=[MQTT broker for MQTT over WS host] \ MF_MQTT_ADAPTER_WS_TARGET_PORT=[MQTT broker for MQTT over WS port]] \ MF_MQTT_ADAPTER_WS_TARGET_PATH=[MQTT adapter WS path] \ MF_MQTT_ADAPTER_FORWARDER_TIMEOUT=[MQTT forwarder for multiprotocol support timeout] \ -MF_NATS_URL=[NATS instance URL] \ +MF_BROKER_URL=[Message broker instance URL] \ MF_THINGS_AUTH_GRPC_URL=[Things service Auth gRPC URL] \ MF_THINGS_AUTH_GRPC_TIMEOUT=[Things service Auth gRPC request timeout in seconds] \ MF_JAEGER_URL=[Jaeger service URL] \ diff --git a/opcua/README.md b/opcua/README.md index 4eb42d19..c78399f7 100644 --- a/opcua/README.md +++ b/opcua/README.md @@ -11,23 +11,23 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -|----------------------------------|----------------------------------------|----------------------------| -| MF_OPCUA_ADAPTER_HTTP_PORT | Service HTTP port | 8180 | -| MF_OPCUA_ADAPTER_LOG_LEVEL | Service Log level | error | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_OPCUA_ADAPTER_INTERVAL_MS | OPC-UA Server Interval in milliseconds | 1000 | -| MF_OPCUA_ADAPTER_POLICY | OPC-UA Server Policy | | -| MF_OPCUA_ADAPTER_MODE | OPC-UA Server Mode | | -| MF_OPCUA_ADAPTER_CERT_FILE | OPC-UA Server Certificate file | | -| MF_OPCUA_ADAPTER_KEY_FILE | OPC-UA Server Key file | | -| MF_OPCUA_ADAPTER_ROUTE_MAP_URL | Route-map database URL | localhost:6379 | -| MF_OPCUA_ADAPTER_ROUTE_MAP_PASS | Route-map database password | | -| MF_OPCUA_ADAPTER_ROUTE_MAP_DB | Route-map instance name | 0 | -| MF_THINGS_ES_URL | Things service event source URL | localhost:6379 | -| MF_THINGS_ES_PASS | Things service event source password | | -| MF_THINGS_ES_DB | Things service event source DB | 0 | -| MF_OPCUA_ADAPTER_EVENT_CONSUMER | Service event consumer name | opcua | +| Variable | Description | Default | +|----------------------------------|--------------------------------------------------|----------------------------| +| MF_OPCUA_ADAPTER_HTTP_PORT | Service HTTP port | 8180 | +| MF_OPCUA_ADAPTER_LOG_LEVEL | Service Log level | error | +| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 | +| MF_OPCUA_ADAPTER_INTERVAL_MS | OPC-UA Server Interval in milliseconds | 1000 | +| MF_OPCUA_ADAPTER_POLICY | OPC-UA Server Policy | | +| MF_OPCUA_ADAPTER_MODE | OPC-UA Server Mode | | +| MF_OPCUA_ADAPTER_CERT_FILE | OPC-UA Server Certificate file | | +| MF_OPCUA_ADAPTER_KEY_FILE | OPC-UA Server Key file | | +| MF_OPCUA_ADAPTER_ROUTE_MAP_URL | Route-map database URL | localhost:6379 | +| MF_OPCUA_ADAPTER_ROUTE_MAP_PASS | Route-map database password | | +| MF_OPCUA_ADAPTER_ROUTE_MAP_DB | Route-map instance name | 0 | +| MF_THINGS_ES_URL | Things service event source URL | localhost:6379 | +| MF_THINGS_ES_PASS | Things service event source password | | +| MF_THINGS_ES_DB | Things service event source DB | 0 | +| MF_OPCUA_ADAPTER_EVENT_CONSUMER | Service event consumer name | opcua | ## Deployment @@ -51,7 +51,7 @@ make install # set the environment variables and run the service MF_OPCUA_ADAPTER_HTTP_PORT=[Service HTTP port] \ MF_OPCUA_ADAPTER_LOG_LEVEL=[OPC-UA Adapter Log Level] \ -MF_NATS_URL=[NATS instance URL] \ +MF_BROKER_URL=[Message broker instance URL] \ MF_OPCUA_ADAPTER_INTERVAL_MS: [OPC-UA Server Interval (milliseconds)] \ MF_OPCUA_ADAPTER_POLICY=[OPC-UA Server Policy] \ MF_OPCUA_ADAPTER_MODE=[OPC-UA Server Mode] \ diff --git a/opcua/gopcua/subscribe.go b/opcua/gopcua/subscribe.go index b8646c1e..81201681 100644 --- a/opcua/gopcua/subscribe.go +++ b/opcua/gopcua/subscribe.go @@ -203,7 +203,7 @@ func (c client) runHandler(ctx context.Context, sub *opcuaGopcua.Subscription, u } } -// Publish forwards messages from the OPC-UA Server to Mainflux NATS broker +// Publish forwards messages from the OPC-UA Server to Mainflux Message broker func (c client) publish(ctx context.Context, token string, m message) error { // Get route-map of the OPC-UA ServerURI chanID, err := c.channelsRM.Get(ctx, m.ServerURI) @@ -223,7 +223,7 @@ func (c client) publish(ctx context.Context, token string, m message) error { return fmt.Errorf("%s between channel %s and thing %s", errNotFoundConn, chanID, thingID) } - // Publish on Mainflux NATS broker + // Publish on Mainflux Message broker SenML := fmt.Sprintf(`[{"n":"%s", "t": %d, "%s":%v}]`, m.Type, m.Time, m.DataKey, m.Data) payload := []byte(SenML) diff --git a/pkg/messaging/README.md b/pkg/messaging/README.md index 086eff97..f8b07f8e 100644 --- a/pkg/messaging/README.md +++ b/pkg/messaging/README.md @@ -2,8 +2,8 @@ `messaging` package defines `Publisher`, `Subscriber` and an aggregate `Pubsub` interface. -`Subscriber` interface defines methods used to subscribe to a message broker such as MQTT or NATS. +`Subscriber` interface defines methods used to subscribe to a message broker such as MQTT or NATS or RabbitMQ. -`Publisher` interface defines methods used to publish messages to a message broker such as MQTT or NATS. +`Publisher` interface defines methods used to publish messages to a message broker such as MQTT or NATS or RabbitMQ. `Pubsub` interface is composed of `Publisher` and `Subscriber` interface and can be used to send messages to as well as to receive messages from a message broker. diff --git a/pkg/messaging/brokers/brokers_nats.go b/pkg/messaging/brokers/brokers_nats.go new file mode 100644 index 00000000..f9e8991f --- /dev/null +++ b/pkg/messaging/brokers/brokers_nats.go @@ -0,0 +1,39 @@ +//go:build !rabbitmq +// +build !rabbitmq + +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package brokers + +import ( + "log" + + "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/mainflux/mainflux/pkg/messaging/nats" +) + +// SubjectAllChannels represents subject to subscribe for all the channels. +const SubjectAllChannels = "channels.>" + +func init() { + log.Println("The binary was build using Nats as the message broker") +} + +func NewPublisher(url string) (messaging.Publisher, error) { + pb, err := nats.NewPublisher(url) + if err != nil { + return nil, err + } + return pb, nil + +} + +func NewPubSub(url, queue string, logger logger.Logger) (messaging.PubSub, error) { + pb, err := nats.NewPubSub(url, queue, logger) + if err != nil { + return nil, err + } + return pb, nil +} diff --git a/pkg/messaging/brokers/brokers_rabbitmq.go b/pkg/messaging/brokers/brokers_rabbitmq.go new file mode 100644 index 00000000..378f577e --- /dev/null +++ b/pkg/messaging/brokers/brokers_rabbitmq.go @@ -0,0 +1,38 @@ +//go:build rabbitmq +// +build rabbitmq + +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package brokers + +import ( + "log" + + "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/mainflux/mainflux/pkg/messaging/rabbitmq" +) + +// SubjectAllChannels represents subject to subscribe for all the channels. +const SubjectAllChannels = "channels.>" + +func init() { + log.Println("The binary was build using RabbitMQ as the message broker") +} + +func NewPublisher(url string) (messaging.Publisher, error) { + pb, err := rabbitmq.NewPublisher(url) + if err != nil { + return nil, err + } + return pb, nil +} + +func NewPubSub(url, queue string, logger logger.Logger) (messaging.PubSub, error) { + pb, err := rabbitmq.NewPubSub(url, queue, logger) + if err != nil { + return nil, err + } + return pb, nil +} diff --git a/pkg/messaging/mqtt/publisher.go b/pkg/messaging/mqtt/publisher.go index 4592ae5c..9ab7e536 100644 --- a/pkg/messaging/mqtt/publisher.go +++ b/pkg/messaging/mqtt/publisher.go @@ -23,7 +23,7 @@ type publisher struct { // NewPublisher returns a new MQTT message publisher. func NewPublisher(address string, timeout time.Duration) (messaging.Publisher, error) { - client, err := newClient(address, timeout) + client, err := newClient(address, "mqtt-publisher", timeout) if err != nil { return nil, err } @@ -47,3 +47,8 @@ func (pub publisher) Publish(topic string, msg messaging.Message) error { return token.Error() } + +func (pub publisher) Close() error { + pub.client.Disconnect(uint(pub.timeout)) + return nil +} diff --git a/pkg/messaging/mqtt/pubsub.go b/pkg/messaging/mqtt/pubsub.go index 5d30a333..bc158f4b 100644 --- a/pkg/messaging/mqtt/pubsub.go +++ b/pkg/messaging/mqtt/pubsub.go @@ -5,9 +5,14 @@ package mqtt import ( "errors" + "fmt" + "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gogo/protobuf/proto" + log "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/messaging" ) const ( @@ -15,12 +20,142 @@ const ( qos = 2 ) -var errConnect = errors.New("failed to connect to MQTT broker") +var ( + errConnect = errors.New("failed to connect to MQTT broker") + errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached") + errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached") + errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic") + errAlreadySubscribed = errors.New("already subscribed to topic") + errNotSubscribed = errors.New("not subscribed") + errEmptyTopic = errors.New("empty topic") + errEmptyID = errors.New("empty ID") +) -func newClient(address string, timeout time.Duration) (mqtt.Client, error) { - opts := mqtt.NewClientOptions(). - SetUsername(username). - AddBroker(address) +var _ messaging.PubSub = (*pubsub)(nil) + +type subscription struct { + client mqtt.Client + topics []string +} + +type pubsub struct { + publisher + logger log.Logger + mu *sync.RWMutex + address string + timeout time.Duration + subscriptions map[string]subscription +} + +func NewPubSub(url, queue string, timeout time.Duration, logger log.Logger) (messaging.PubSub, error) { + client, err := newClient(url, "mqtt-publisher", timeout) + if err != nil { + return nil, err + } + ret := pubsub{ + publisher: publisher{ + client: client, + timeout: timeout, + }, + address: url, + timeout: timeout, + logger: logger, + subscriptions: make(map[string]subscription), + } + return ret, nil +} + +func (ps pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) error { + if id == "" { + return errEmptyID + } + if topic == "" { + return errEmptyTopic + } + ps.mu.Lock() + defer ps.mu.Unlock() + // Check client ID + s, ok := ps.subscriptions[id] + switch ok { + case true: + // Check topic + if ok = s.contains(topic); ok { + return errAlreadySubscribed + } + s.topics = append(s.topics, topic) + default: + client, err := newClient(ps.address, id, ps.timeout) + if err != nil { + return err + } + s = subscription{ + client: client, + topics: []string{topic}, + } + } + token := s.client.Subscribe(topic, qos, ps.mqttHandler(handler)) + if token.Error() != nil { + return token.Error() + } + if ok := token.WaitTimeout(ps.timeout); !ok { + return errSubscribeTimeout + } + return token.Error() +} + +func (ps pubsub) Unsubscribe(id, topic string) error { + if id == "" { + return errEmptyID + } + if topic == "" { + return errEmptyTopic + } + ps.mu.Lock() + defer ps.mu.Unlock() + // Check client ID + s, ok := ps.subscriptions[id] + switch ok { + case true: + // Check topic + if ok := s.contains(topic); !ok { + return errNotSubscribed + } + default: + return errNotSubscribed + } + token := s.client.Unsubscribe(topic) + if token.Error() != nil { + return token.Error() + } + + ok = token.WaitTimeout(ps.timeout) + if !ok { + return errUnsubscribeTimeout + } + if ok := s.delete(topic); !ok { + return errUnsubscribeDeleteTopic + } + if len(s.topics) == 0 { + delete(ps.subscriptions, id) + } + return token.Error() +} + +func (ps pubsub) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler { + return func(c mqtt.Client, m mqtt.Message) { + var msg messaging.Message + if err := proto.Unmarshal(m.Payload(), &msg); err != nil { + ps.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err)) + return + } + if err := h.Handle(msg); err != nil { + ps.logger.Warn(fmt.Sprintf("Failed to handle Mainflux message: %s", err)) + } + } +} + +func newClient(address, id string, timeout time.Duration) (mqtt.Client, error) { + opts := mqtt.NewClientOptions().SetUsername(username).AddBroker(address).SetClientID(id) client := mqtt.NewClient(opts) token := client.Connect() if token.Error() != nil { @@ -38,3 +173,31 @@ func newClient(address string, timeout time.Duration) (mqtt.Client, error) { return client, nil } + +// contains checks if a topic is present +func (sub subscription) contains(topic string) bool { + return sub.indexOf(topic) != -1 +} + +// Finds the index of an item in the topics +func (sub subscription) indexOf(element string) int { + for k, v := range sub.topics { + if element == v { + return k + } + } + return -1 +} + +// Deletes a topic from the slice +func (sub subscription) delete(topic string) bool { + index := sub.indexOf(topic) + if index == -1 { + return false + } + topics := make([]string, len(sub.topics)-1) + copy(topics[:index], sub.topics[:index]) + copy(topics[index:], sub.topics[index+1:]) + sub.topics = topics + return true +} diff --git a/pkg/messaging/mqtt/subscriber.go b/pkg/messaging/mqtt/subscriber.go deleted file mode 100644 index ddb810b4..00000000 --- a/pkg/messaging/mqtt/subscriber.go +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package mqtt - -import ( - "fmt" - "sync" - "time" - - mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/gogo/protobuf/proto" - - log "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging" -) - -var ( - errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached") - errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached") - errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic") - errAlreadySubscribed = errors.New("already subscribed to topic") - errNotSubscribed = errors.New("not subscribed") - errEmptyTopic = errors.New("empty topic") - errEmptyID = errors.New("empty ID") -) - -var _ messaging.Subscriber = (*subscriber)(nil) - -type subscription struct { - client mqtt.Client - topics []string -} - -type subscriber struct { - address string - timeout time.Duration - logger log.Logger - subscriptions map[string]subscription - mu *sync.RWMutex -} - -// NewSubscriber returns a new MQTT message subscriber. -func NewSubscriber(address string, timeout time.Duration, logger log.Logger) (messaging.Subscriber, error) { - ret := subscriber{ - address: address, - timeout: timeout, - logger: logger, - subscriptions: make(map[string]subscription), - } - return ret, nil -} - -func (sub subscriber) Subscribe(id, topic string, handler messaging.MessageHandler) error { - if id == "" { - return errEmptyID - } - if topic == "" { - return errEmptyTopic - } - sub.mu.Lock() - defer sub.mu.Unlock() - // Check client ID - s, ok := sub.subscriptions[id] - switch ok { - case true: - // Check topic - if ok = s.contains(topic); ok { - return errAlreadySubscribed - } - s.topics = append(s.topics, topic) - default: - opts := mqtt.NewClientOptions().SetUsername(username).AddBroker(sub.address).SetClientID(id) - client := mqtt.NewClient(opts) - token := client.Connect() - if token.Error() != nil { - return token.Error() - } - s = subscription{ - client: client, - topics: []string{topic}, - } - } - token := s.client.Subscribe(topic, qos, sub.mqttHandler(handler)) - if token.Error() != nil { - return token.Error() - } - if ok := token.WaitTimeout(sub.timeout); !ok { - return errSubscribeTimeout - } - return token.Error() -} - -func (sub subscriber) Unsubscribe(id, topic string) error { - if id == "" { - return errEmptyID - } - if topic == "" { - return errEmptyTopic - } - sub.mu.Lock() - defer sub.mu.Unlock() - // Check client ID - s, ok := sub.subscriptions[id] - switch ok { - case true: - // Check topic - if ok := s.contains(topic); !ok { - return errNotSubscribed - } - default: - return errNotSubscribed - } - token := s.client.Unsubscribe(topic) - if token.Error() != nil { - return token.Error() - } - - ok = token.WaitTimeout(sub.timeout) - if !ok { - return errUnsubscribeTimeout - } - if ok := s.delete(topic); !ok { - return errUnsubscribeDeleteTopic - } - if len(s.topics) == 0 { - delete(sub.subscriptions, id) - } - return token.Error() -} - -func (sub subscriber) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler { - return func(c mqtt.Client, m mqtt.Message) { - var msg messaging.Message - if err := proto.Unmarshal(m.Payload(), &msg); err != nil { - sub.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err)) - return - } - if err := h.Handle(msg); err != nil { - sub.logger.Warn(fmt.Sprintf("Failed to handle Mainflux message: %s", err)) - } - } -} - -// contains checks if a topic is present -func (sub subscription) contains(topic string) bool { - return sub.indexOf(topic) != -1 -} - -// Finds the index of an item in the topics -func (sub subscription) indexOf(element string) int { - for k, v := range sub.topics { - if element == v { - return k - } - } - return -1 -} - -// Deletes a topic from the slice -func (sub subscription) delete(topic string) bool { - index := sub.indexOf(topic) - if index == -1 { - return false - } - topics := make([]string, len(sub.topics)-1) - copy(topics[:index], sub.topics[:index]) - copy(topics[index:], sub.topics[index+1:]) - sub.topics = topics - return true -} diff --git a/pkg/messaging/nats/publisher.go b/pkg/messaging/nats/publisher.go index 530d19f9..330aed2f 100644 --- a/pkg/messaging/nats/publisher.go +++ b/pkg/messaging/nats/publisher.go @@ -19,13 +19,9 @@ type publisher struct { // Publisher wraps messaging Publisher exposing // Close() method for NATS connection. -type Publisher interface { - messaging.Publisher - Close() -} // NewPublisher returns NATS message Publisher. -func NewPublisher(url string) (Publisher, error) { +func NewPublisher(url string) (messaging.Publisher, error) { conn, err := broker.Connect(url) if err != nil { return nil, err @@ -56,6 +52,7 @@ func (pub *publisher) Publish(topic string, msg messaging.Message) error { return nil } -func (pub *publisher) Close() { +func (pub *publisher) Close() error { pub.conn.Close() + return nil } diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index 532f089a..c7b40658 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -17,9 +17,6 @@ import ( const chansPrefix = "channels" -// SubjectAllChannels represents subject to subscribe for all the channels. -const SubjectAllChannels = "channels.>" - var ( ErrAlreadySubscribed = errors.New("already subscribed to topic") ErrNotSubscribed = errors.New("not subscribed") @@ -29,13 +26,6 @@ var ( var _ messaging.PubSub = (*pubsub)(nil) -// PubSub wraps messaging Publisher exposing -// Close() method for NATS connection. -type PubSub interface { - messaging.PubSub - Close() -} - type subscription struct { *broker.Subscription cancel func() error @@ -56,7 +46,7 @@ type pubsub struct { // from ordinary subscribe. For more information, please take a look // here: https://docs.nats.io/developing-with-nats/receiving/queues. // If the queue is empty, Subscribe will be used. -func NewPubSub(url, queue string, logger log.Logger) (PubSub, error) { +func NewPubSub(url, queue string, logger log.Logger) (messaging.PubSub, error) { conn, err := broker.Connect(url) if err != nil { return nil, err diff --git a/pkg/messaging/pubsub.go b/pkg/messaging/pubsub.go index 89550bb8..9d067e8d 100644 --- a/pkg/messaging/pubsub.go +++ b/pkg/messaging/pubsub.go @@ -7,6 +7,9 @@ package messaging type Publisher interface { // Publishes message to the stream. Publish(topic string, msg Message) error + + // Close gracefully closes message publisher's connection. + Close() error } // MessageHandler represents Message handler for Subscriber. @@ -26,6 +29,9 @@ type Subscriber interface { // Unsubscribe unsubscribes from the message stream and // stops consuming messages. Unsubscribe(id, topic string) error + + // Close gracefully closes message subscriber's connection. + Close() error } // PubSub represents aggregation interface for publisher and subscriber. diff --git a/pkg/messaging/rabbitmq/publisher.go b/pkg/messaging/rabbitmq/publisher.go index 7b9fea0e..e734e36c 100644 --- a/pkg/messaging/rabbitmq/publisher.go +++ b/pkg/messaging/rabbitmq/publisher.go @@ -18,17 +18,9 @@ type publisher struct { ch *amqp.Channel } -// Publisher wraps messaging Publisher exposing -// Close() method for RabbitMQ connection. -type Publisher interface { - messaging.Publisher - Close() -} - // NewPublisher returns RabbitMQ message Publisher. -func NewPublisher(url string) (Publisher, error) { - endpoint := fmt.Sprintf("amqp://%s", url) - conn, err := amqp.Dial(endpoint) +func NewPublisher(url string) (messaging.Publisher, error) { + conn, err := amqp.Dial(url) if err != nil { return nil, err } @@ -55,7 +47,6 @@ func (pub *publisher) Publish(topic string, msg messaging.Message) error { if err != nil { return err } - subject := fmt.Sprintf("%s.%s", chansPrefix, topic) if msg.Subtopic != "" { subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic) @@ -79,6 +70,6 @@ func (pub *publisher) Publish(topic string, msg messaging.Message) error { return nil } -func (pub *publisher) Close() { - pub.conn.Close() +func (pub *publisher) Close() error { + return pub.conn.Close() } diff --git a/pkg/messaging/rabbitmq/pubsub.go b/pkg/messaging/rabbitmq/pubsub.go index 4d02d307..632f2f52 100644 --- a/pkg/messaging/rabbitmq/pubsub.go +++ b/pkg/messaging/rabbitmq/pubsub.go @@ -30,13 +30,6 @@ var ( var _ messaging.PubSub = (*pubsub)(nil) -// PubSub wraps messaging Publisher exposing -// Close() method for RabbitMQ connection. -type PubSub interface { - messaging.PubSub - Close() -} - type subscription struct { cancel func() error } @@ -48,9 +41,8 @@ type pubsub struct { } // NewPubSub returns RabbitMQ message publisher/subscriber. -func NewPubSub(url, queue string, logger log.Logger) (PubSub, error) { - endpoint := fmt.Sprintf("amqp://%s", url) - conn, err := amqp.Dial(endpoint) +func NewPubSub(url, queue string, logger log.Logger) (messaging.PubSub, error) { + conn, err := amqp.Dial(url) if err != nil { return nil, err } diff --git a/pkg/messaging/rabbitmq/setup_test.go b/pkg/messaging/rabbitmq/setup_test.go index 259ab509..e644e2cf 100644 --- a/pkg/messaging/rabbitmq/setup_test.go +++ b/pkg/messaging/rabbitmq/setup_test.go @@ -28,13 +28,13 @@ func TestMain(m *testing.M) { log.Fatalf("Could not connect to docker: %s", err) } - container, err := pool.Run("rabbitmq", "3.9.10", []string{}) + container, err := pool.Run("rabbitmq", "3.9.20", []string{}) if err != nil { log.Fatalf("Could not start container: %s", err) } handleInterrupt(pool, container) - address := fmt.Sprintf("%s:%s", "localhost", container.GetPort("5672/tcp")) + address := fmt.Sprintf("amqp://%s:%s", "localhost", container.GetPort("5672/tcp")) if err := pool.Retry(func() error { publisher, err = rabbitmq.NewPublisher(address) return err diff --git a/scripts/ci.sh b/scripts/ci.sh index 3d1638de..f0a50342 100755 --- a/scripts/ci.sh +++ b/scripts/ci.sh @@ -64,6 +64,8 @@ setup_mf() { exit 1 fi done + echo "Compile check for rabbitmq..." + MF_BROKER_TYPE=rabbitmq make http make -j$NPROC } diff --git a/twins/README.md b/twins/README.md index acd06a03..d8cb281f 100644 --- a/twins/README.md +++ b/twins/README.md @@ -22,12 +22,12 @@ default values. | MF_TWINS_DB | Database name | mainflux | | MF_TWINS_DB_HOST | Database host address | localhost | | MF_TWINS_DB_PORT | Database host port | 27017 | -| MF_THINGS_STANDALONE_EMAIL | User email for standalone mode (no gRPC communication with users) | | -| MF_THINGS_STANDALONE_TOKEN | User token for standalone mode that should be passed in auth header | | +| MF_THINGS_STANDALONE_EMAIL | User email for standalone mode (no gRPC communication with users) | | +| MF_THINGS_STANDALONE_TOKEN | User token for standalone mode that should be passed in auth header | | | MF_TWINS_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | | MF_TWINS_CA_CERTS | Path to trusted CAs in PEM format | | -| MF_TWINS_CHANNEL_ID | NATS notifications channel ID | | -| MF_NATS_URL | Mainflux NATS broker URL | nats://localhost:4222 | +| MF_TWINS_CHANNEL_ID | Message broker notifications channel ID | | +| MF_BROKER_URL | Mainflux Message broker URL | nats://localhost:4222 | | MF_AUTH_GRPC_URL | Auth service gRPC URL | localhost:8181 | | MF_AUTH_GRPC_TIMEOUT | Auth service gRPC request timeout in seconds | 1s | | MF_TWINS_CACHE_URL | Cache database URL | localhost:6379 | @@ -67,8 +67,8 @@ MF_THINGS_STANDALONE_EMAIL=[User email for standalone mode (no gRPC communicatio MF_THINGS_STANDALONE_TOKEN=[User token for standalone mode that should be passed in auth header] \ MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on] \ MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format] \ -MF_TWINS_CHANNEL_ID: [NATS notifications channel ID] \ -MF_NATS_URL: [Mainflux NATS broker URL] \ +MF_TWINS_CHANNEL_ID: [Message broker notifications channel ID] \ +MF_BROKER_URL: [Mainflux Message broker URL] \ MF_AUTH_GRPC_URL: [Auth service gRPC URL] \ MF_AUTH_GRPC_TIMEOUT: [Auth service gRPC request timeout in seconds] \ $GOBIN/mainflux-twins @@ -78,7 +78,7 @@ $GOBIN/mainflux-twins ### Starting twins service -The twins service publishes notifications on a NATS subject of the format +The twins service publishes notifications on a Message broker subject of the format `channels..messages..`, where `crudOp` stands for the crud operation done on twin - create, update, delete or retrieve - or state - save state. In order to use twin service notifications, diff --git a/twins/mocks/messages.go b/twins/mocks/messages.go index c98a5374..59ec03e4 100644 --- a/twins/mocks/messages.go +++ b/twins/mocks/messages.go @@ -27,3 +27,7 @@ func (mb mockBroker) Publish(topic string, msg messaging.Message) error { } return nil } + +func (mb mockBroker) Close() error { + return nil +} diff --git a/twins/service.go b/twins/service.go index 0835e0b2..a1632d95 100644 --- a/twins/service.go +++ b/twins/service.go @@ -422,6 +422,6 @@ func (ts *twinsService) publish(twinID *string, err *error, succOp, failOp strin } if err := ts.publisher.Publish(msg.Channel, msg); err != nil { - ts.logger.Warn(fmt.Sprintf("Failed to publish notification on NATS: %s", err)) + ts.logger.Warn(fmt.Sprintf("Failed to publish notification on Message Broker: %s", err)) } }