mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-24 13:48:49 +08:00
NOISSUE - Make MQTT Broker Configurable (#1904)
* Minor changes on mqtt publisher using nats Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Remove vernemq dependencies Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Restore VerneMQ config files Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Fix Makefile to support custom Docker profiles The Makefile has been updated to support custom Docker profiles. Previously, the Makefile only supported the default profiles for the message broker and MQTT broker. Now, the Makefile allows for custom profiles to be specified using environment variables. If the MF_BROKER_TYPE or MF_MQTT_BROKER_TYPE variables are not set, the default values "nats" and "nats" will be used, respectively. This change enables more flexibility in configuring the Docker environment for the project. The `run` target has also been modified to use the correct broker configuration file based on the MF_BROKER_TYPE variable. The sed command in the `run` target now replaces the placeholder in the docker/docker-compose.yml file with the appropriate broker configuration file. This commit improves the Makefile to support custom Docker profiles and ensures the correct broker configuration file is used when running the project. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Fix queue binding issue in RabbitMQ pubsub The commit fixes an issue in the RabbitMQ pubsub implementation where the queue binding was not correctly set up. Instead of using the topic as the queue name, the commit now uses a unique client ID generated by combining the topic and subscriber ID. This ensures that each subscriber has its own dedicated queue. The commit also updates the queue binding to use the correct queue name. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Refactor Docker config editing in Makefile The commit refactors the `edit_docker_config` function in the Makefile to improve readability and maintainability. The changes include: - Removing unnecessary conditionals related to the `rabbitmq` broker These changes ensure that the Docker configuration is correctly updated based on the specified MQTT broker type. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Fix failing tests on RabbitMQ Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Refactor MQTT_BROKER comment in docker-compose.yml The MQTT_BROKER comment in the docker-compose.yml file has been updated to provide a more accurate description of its functionality. The comment now states that the MQTT_BROKER handles MQTT communication between MQTT adapters and the message broker, instead of Mainflux services. This change improves clarity and aligns with the actual purpose of the MQTT_BROKER. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Rename `MF_BROKER` to `MF_MESSAGE_BROKER` The Makefile and Semaphore configuration files have been refactored to update the variable names related to the message broker type. These changes ensure consistency and clarity in the codebase by using more descriptive variable names related to the message broker type. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Fix Docker profile configuration for nats_rabbitmq Update the Docker profile configuration for nats_rabbitmq by replacing the NATS URL in the .env file with the correct value. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Rename MF_BROKER_URL to MF_MESSAGE_BROKER_URL Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Fix MQTT QoS level in pubsub.go The MQTT QoS level in the pubsub.go file was set to 1, which is the default level. However, since NATS supports up to QoS 1, I updated the QoS level comment to reflect this. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Refactor NewPublisher to accept QoS parameter The NewPublisher function in the pkg/messaging/mqtt/publisher.go file has been refactored to accept a new parameter, qos, which represents the Quality of Service level for MQTT message publishing. This change allows for more flexibility in configuring the MQTT publisher. The NewPublisher function now has the following signature: ```go func NewPublisher(address string, qos uint8, timeout time.Duration) (messaging.Publisher, error) ``` This change ensures that the MQTT publisher can be created with the desired QoS level, enhancing the reliability and delivery guarantees of the published messages. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Fix test assertions in pubsub_test.go The test assertions in the pubsub_test.go file were incorrect. This commit fixes the assertions to properly compare the expected and received message values. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * Test configurable MQTT broker Signed-off-by: rodneyosodo <blackd0t@protonmail.com> --------- Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> Signed-off-by: rodneyosodo <blackd0t@protonmail.com>
This commit is contained in:
parent
a91a8f44d3
commit
790f8a6abf
@ -695,5 +695,5 @@ blocks:
|
||||
jobs:
|
||||
- name: Compile For RabbitMQ
|
||||
commands:
|
||||
- MF_BROKER_TYPE=rabbitmq make mqtt
|
||||
- MF_MESSAGE_BROKER_TYPE=rabbitmq make mqtt
|
||||
|
||||
|
57
Makefile
57
Makefile
@ -22,15 +22,23 @@ DOCKER_PROJECT ?= $(shell echo $(subst $(space),,$(USER_REPO)_$(BRANCH)) | tr -c
|
||||
DOCKER_COMPOSE_COMMANDS_SUPPORTED := up down config
|
||||
DEFAULT_DOCKER_COMPOSE_COMMAND := up
|
||||
GRPC_MTLS_CERT_FILES_EXISTS = 0
|
||||
ifneq ($(MF_BROKER_TYPE),)
|
||||
MF_BROKER_TYPE := $(MF_BROKER_TYPE)
|
||||
DOCKER_PROFILE ?= $(MF_MQTT_BROKER_TYPE)_$(MF_MESSAGE_BROKER_TYPE)
|
||||
ifneq ($(MF_MESSAGE_BROKER_TYPE),)
|
||||
MF_MESSAGE_BROKER_TYPE := $(MF_MESSAGE_BROKER_TYPE)
|
||||
else
|
||||
MF_BROKER_TYPE=nats
|
||||
MF_MESSAGE_BROKER_TYPE=nats
|
||||
endif
|
||||
|
||||
ifneq ($(MF_MQTT_BROKER_TYPE),)
|
||||
MF_MQTT_BROKER_TYPE := $(MF_MQTT_BROKER_TYPE)
|
||||
else
|
||||
MF_MQTT_BROKER_TYPE=nats
|
||||
endif
|
||||
|
||||
|
||||
define compile_service
|
||||
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \
|
||||
go build -mod=vendor -tags $(MF_BROKER_TYPE) -ldflags "-s -w \
|
||||
go build -mod=vendor -tags $(MF_MESSAGE_BROKER_TYPE) -ldflags "-s -w \
|
||||
-X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \
|
||||
-X 'github.com/mainflux/mainflux.Version=$(VERSION)' \
|
||||
-X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \
|
||||
@ -183,12 +191,45 @@ endif
|
||||
endif
|
||||
endif
|
||||
|
||||
run: check_certs
|
||||
sed -i "s,file: brokers/.*.yml,file: brokers/${MF_BROKER_TYPE}.yml," docker/docker-compose.yml
|
||||
sed -i "s,MF_BROKER_URL=.*,MF_BROKER_URL=$$\{MF_$(shell echo ${MF_BROKER_TYPE} | tr 'a-z' 'A-Z')_URL\}," docker/.env
|
||||
docker-compose -f docker/docker-compose.yml -p $(DOCKER_PROJECT) $(DOCKER_COMPOSE_COMMAND) $(args)
|
||||
define edit_docker_config
|
||||
sed -i "s/MF_MQTT_BROKER_TYPE=.*/MF_MQTT_BROKER_TYPE=$(1)/" docker/.env
|
||||
sed -i "s/MF_MQTT_BROKER_HEALTH_CHECK=.*/MF_MQTT_BROKER_HEALTH_CHECK=$$\{MF_$(shell echo ${MF_MQTT_BROKER_TYPE} | tr 'a-z' 'A-Z')_HEALTH_CHECK}/" docker/.env
|
||||
sed -i "s/MF_MQTT_ADAPTER_WS_TARGET_PATH=.*/MF_MQTT_ADAPTER_WS_TARGET_PATH=$$\{MF_$(shell echo ${MF_MQTT_BROKER_TYPE} | tr 'a-z' 'A-Z')_WS_TARGET_PATH}/" docker/.env
|
||||
sed -i "s/MF_MESSAGE_BROKER_TYPE=.*/MF_MESSAGE_BROKER_TYPE=$(2)/" docker/.env
|
||||
sed -i "s,file: .*.yml,file: $(2).yml," docker/brokers/docker-compose.yml
|
||||
sed -i "s,MF_MESSAGE_BROKER_URL=.*,MF_MESSAGE_BROKER_URL=$$\{MF_$(shell echo ${MF_MESSAGE_BROKER_TYPE} | tr 'a-z' 'A-Z')_URL\}," docker/.env
|
||||
sed -i "s,MF_MQTT_ADAPTER_MQTT_QOS=.*,MF_MQTT_ADAPTER_MQTT_QOS=$$\{MF_$(shell echo ${MF_MQTT_BROKER_TYPE} | tr 'a-z' 'A-Z')_MQTT_QOS\}," docker/.env
|
||||
endef
|
||||
|
||||
change_config:
|
||||
ifeq ($(DOCKER_PROFILE),nats_nats)
|
||||
sed -i "s/- broker/- nats/g" docker/docker-compose.yml
|
||||
sed -i "s/- rabbitmq/- nats/g" docker/docker-compose.yml
|
||||
sed -i "s,MF_NATS_URL=.*,MF_NATS_URL=nats://nats:$$\{MF_NATS_PORT}," docker/.env
|
||||
$(call edit_docker_config,nats,nats)
|
||||
else ifeq ($(DOCKER_PROFILE),nats_rabbitmq)
|
||||
sed -i "s/nats/broker/g" docker/docker-compose.yml
|
||||
sed -i "s,MF_NATS_URL=.*,MF_NATS_URL=nats://nats:$$\{MF_NATS_PORT}," docker/.env
|
||||
sed -i "s/rabbitmq/broker/g" docker/docker-compose.yml
|
||||
$(call edit_docker_config,nats,rabbitmq)
|
||||
else ifeq ($(DOCKER_PROFILE),vernemq_nats)
|
||||
sed -i "s/nats/broker/g" docker/docker-compose.yml
|
||||
sed -i "s/rabbitmq/broker/g" docker/docker-compose.yml
|
||||
sed -i "s,MF_NATS_URL=.*,MF_NATS_URL=nats://broker:$$\{MF_NATS_PORT}," docker/.env
|
||||
$(call edit_docker_config,vernemq,nats)
|
||||
else ifeq ($(DOCKER_PROFILE),vernemq_rabbitmq)
|
||||
sed -i "s/nats/broker/g" docker/docker-compose.yml
|
||||
sed -i "s/rabbitmq/broker/g" docker/docker-compose.yml
|
||||
$(call edit_docker_config,vernemq,rabbitmq)
|
||||
else
|
||||
$(error Invalid DOCKER_PROFILE $(DOCKER_PROFILE))
|
||||
endif
|
||||
|
||||
run: check_certs change_config
|
||||
docker-compose -f docker/docker-compose.yml --profile $(DOCKER_PROFILE) -p $(DOCKER_PROJECT) $(DOCKER_COMPOSE_COMMAND) $(args)
|
||||
|
||||
run_addons: check_certs
|
||||
$(call change_config)
|
||||
$(foreach SVC,$(RUN_ADDON_ARGS),$(if $(filter $(SVC),$(ADDON_SERVICES) $(EXTERNAL_SERVICES)),,$(error Invalid Service $(SVC))))
|
||||
@for SVC in $(RUN_ADDON_ARGS); do \
|
||||
MF_ADDONS_CERTS_PATH_PREFIX="../." docker-compose -f docker/addons/$$SVC/docker-compose.yml -p $(DOCKER_PROJECT) --env-file ./docker/.env $(DOCKER_COMPOSE_COMMAND) $(args) & \
|
||||
|
@ -40,7 +40,7 @@ const (
|
||||
type config struct {
|
||||
LogLevel string `env:"MF_CASSANDRA_WRITER_LOG_LEVEL" envDefault:"info"`
|
||||
ConfigPath string `env:"MF_CASSANDRA_WRITER_CONFIG_PATH" envDefault:"/config.toml"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_CASSANDRA_WRITER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -39,7 +39,7 @@ const (
|
||||
|
||||
type config struct {
|
||||
LogLevel string `env:"MF_COAP_ADAPTER_LOG_LEVEL" envDefault:"info"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_COAP_ADAPTER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -38,10 +38,10 @@ const (
|
||||
|
||||
type config struct {
|
||||
LogLevel string `env:"MF_HTTP_ADAPTER_LOG_LEVEL" envDefault:"info"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_HTTP_ADAPTER_INSTANCE_ID" envDefault:""`
|
||||
InstanceID string `env:"MF_HTTP_ADAPTER_INSTANCE_ID" envDefault:""`
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -38,7 +38,7 @@ const (
|
||||
type config struct {
|
||||
LogLevel string `env:"MF_INFLUX_WRITER_LOG_LEVEL" envDefault:"info"`
|
||||
ConfigPath string `env:"MF_INFLUX_WRITER_CONFIG_PATH" envDefault:"/config.toml"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_INFLUX_WRITER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -54,7 +54,7 @@ type config struct {
|
||||
LoraMsgTopic string `env:"MF_LORA_ADAPTER_MESSAGES_TOPIC" envDefault:"application/+/device/+/event/up"`
|
||||
LoraMsgTimeout time.Duration `env:"MF_LORA_ADAPTER_MESSAGES_TIMEOUT" envDefault:"30s"`
|
||||
ESConsumerName string `env:"MF_LORA_ADAPTER_EVENT_CONSUMER" envDefault:"lora-adapter"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_LORA_ADAPTER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -40,7 +40,7 @@ const (
|
||||
type config struct {
|
||||
LogLevel string `env:"MF_MONGO_WRITER_LOG_LEVEL" envDefault:"info"`
|
||||
ConfigPath string `env:"MF_MONGO_WRITER_CONFIG_PATH" envDefault:"/config.toml"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_MONGO_WRITER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"github.com/mainflux/mainflux/mqtt/events"
|
||||
mqtttracing "github.com/mainflux/mainflux/mqtt/tracing"
|
||||
"github.com/mainflux/mainflux/pkg/errors"
|
||||
"github.com/mainflux/mainflux/pkg/messaging"
|
||||
"github.com/mainflux/mainflux/pkg/messaging/brokers"
|
||||
brokerstracing "github.com/mainflux/mainflux/pkg/messaging/brokers/tracing"
|
||||
mqttpub "github.com/mainflux/mainflux/pkg/messaging/mqtt"
|
||||
@ -45,13 +44,14 @@ type config struct {
|
||||
MQTTTargetPort string `env:"MF_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
|
||||
MQTTForwarderTimeout time.Duration `env:"MF_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"`
|
||||
MQTTTargetHealthCheck string `env:"MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
|
||||
MQTTQoS uint8 `env:"MF_MQTT_ADAPTER_MQTT_QOS" envDefault:"1"`
|
||||
HTTPPort string `env:"MF_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
|
||||
HTTPTargetHost string `env:"MF_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
|
||||
HTTPTargetPort string `env:"MF_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
|
||||
HTTPTargetPath string `env:"MF_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"`
|
||||
Instance string `env:"MF_MQTT_ADAPTER_INSTANCE" envDefault:""`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_MQTT_ADAPTER_INSTANCE_ID" envDefault:""`
|
||||
ESURL string `env:"MF_MQTT_ADAPTER_ES_URL" envDefault:"redis://localhost:6379/0"`
|
||||
@ -113,16 +113,16 @@ func main() {
|
||||
}()
|
||||
tracer := tp.Tracer(svcName)
|
||||
|
||||
nps, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
|
||||
bsub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
defer nps.Close()
|
||||
nps = brokerstracing.NewPubSub(serverConfig, tracer, nps)
|
||||
defer bsub.Close()
|
||||
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)
|
||||
|
||||
mpub, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTForwarderTimeout)
|
||||
mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
|
||||
exitCode = 1
|
||||
@ -132,7 +132,7 @@ func main() {
|
||||
|
||||
fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
|
||||
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
|
||||
if err := fwd.Forward(ctx, svcName, nps, mpub); err != nil {
|
||||
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err))
|
||||
exitCode = 1
|
||||
return
|
||||
@ -164,7 +164,7 @@ func main() {
|
||||
|
||||
logger.Info("Successfully connected to things grpc server " + aHandler.Secure())
|
||||
|
||||
h := mqtt.NewHandler([]messaging.Publisher{np}, es, logger, auth)
|
||||
h := mqtt.NewHandler(np, es, logger, auth)
|
||||
h = mqtttracing.NewHandler(tracer, h)
|
||||
|
||||
if cfg.SendTelemetry {
|
||||
|
@ -48,7 +48,7 @@ const (
|
||||
type config struct {
|
||||
LogLevel string `env:"MF_OPCUA_ADAPTER_LOG_LEVEL" envDefault:"info"`
|
||||
ESConsumerName string `env:"MF_OPCUA_ADAPTER_EVENT_CONSUMER" envDefault:"opcua-adapter"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_OPCUA_ADAPTER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -41,7 +41,7 @@ const (
|
||||
type config struct {
|
||||
LogLevel string `env:"MF_POSTGRES_WRITER_LOG_LEVEL" envDefault:"info"`
|
||||
ConfigPath string `env:"MF_POSTGRES_WRITER_CONFIG_PATH" envDefault:"/config.toml"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_POSTGRES_WRITER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -47,7 +47,7 @@ type config struct {
|
||||
LogLevel string `env:"MF_SMPP_NOTIFIER_LOG_LEVEL" envDefault:"info"`
|
||||
From string `env:"MF_SMPP_NOTIFIER_FROM_ADDR" envDefault:""`
|
||||
ConfigPath string `env:"MF_SMPP_NOTIFIER_CONFIG_PATH" envDefault:"/config.toml"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_SMPP_NOTIFIER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -48,7 +48,7 @@ type config struct {
|
||||
LogLevel string `env:"MF_SMTP_NOTIFIER_LOG_LEVEL" envDefault:"info"`
|
||||
ConfigPath string `env:"MF_SMTP_NOTIFIER_CONFIG_PATH" envDefault:"/config.toml"`
|
||||
From string `env:"MF_SMTP_NOTIFIER_FROM_ADDR" envDefault:""`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_SMTP_NOTIFIER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -41,7 +41,7 @@ const (
|
||||
type config struct {
|
||||
LogLevel string `env:"MF_TIMESCALE_WRITER_LOG_LEVEL" envDefault:"info"`
|
||||
ConfigPath string `env:"MF_TIMESCALE_WRITER_CONFIG_PATH" envDefault:"/config.toml"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_TIMESCALE_WRITER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -51,7 +51,7 @@ type config struct {
|
||||
StandaloneID string `env:"MF_TWINS_STANDALONE_ID" envDefault:""`
|
||||
StandaloneToken string `env:"MF_TWINS_STANDALONE_TOKEN" envDefault:""`
|
||||
ChannelID string `env:"MF_TWINS_CHANNEL_ID" envDefault:""`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_TWINS_INSTANCE_ID" envDefault:""`
|
||||
|
@ -38,7 +38,7 @@ const (
|
||||
|
||||
type config struct {
|
||||
LogLevel string `env:"MF_WS_ADAPTER_LOG_LEVEL" envDefault:"info"`
|
||||
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
BrokerURL string `env:"MF_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_WS_ADAPTER_INSTANCE_ID" envDefault:""`
|
||||
|
@ -24,7 +24,7 @@ default values.
|
||||
| MF_THINGS_AUTH_GRPC_TIMEOUT | Things service Auth gRPC request timeout in seconds | 1s |
|
||||
| MF_THINGS_AUTH_GRPC_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
|
||||
| MF_THINGS_AUTH_GRPC_CA_CERTS | Path to trusted CAs in PEM format | |
|
||||
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_JAEGER_URL | Jaeger server URL | http://jaeger:14268/api/traces |
|
||||
| MF_SEND_TELEMETRY | Send telemetry to mainflux call home server | true |
|
||||
| MF_COAP_ADAPTER_INSTANCE_ID | CoAP adapter instance ID | |
|
||||
@ -63,7 +63,7 @@ MF_THINGS_AUTH_GRPC_URL=[Things service Auth gRPC URL] \
|
||||
MF_THINGS_AUTH_GRPC_TIMEOUT=[Things service Auth gRPC request timeout in seconds] \
|
||||
MF_THINGS_AUTH_GRPC_CLIENT_TLS=[Flag that indicates if TLS should be turned on] \
|
||||
MF_THINGS_AUTH_GRPC_CA_CERTS=[Path to trusted CAs in PEM format] \
|
||||
MF_BROKER_URL=[Message broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message broker instance URL] \
|
||||
MF_JAEGER_URL=[Jaeger server URL] \
|
||||
MF_SEND_TELEMETRY=[Send telemetry to mainflux call home server] \
|
||||
MF_COAP_ADAPTER_INSTANCE_ID=[CoAP adapter instance ID] \
|
||||
|
@ -78,7 +78,7 @@ func (c *client) Handle(msg *messaging.Message) error {
|
||||
Code: codes.Content,
|
||||
Token: c.token,
|
||||
Context: c.client.Context(),
|
||||
Body: bytes.NewReader(msg.Payload),
|
||||
Body: bytes.NewReader(msg.GetPayload()),
|
||||
}
|
||||
|
||||
atomic.AddUint32(&c.observe, 1)
|
||||
|
@ -38,7 +38,7 @@ default values.
|
||||
| MF_AUTH_GRPC_TIMEOUT | Users service gRPC request timeout in seconds | 1s |
|
||||
| MF_AUTH_GRPC_CLIENT_TLS | Users client TLS flag | false |
|
||||
| MF_AUTH_GRPC_CA_CERT | Path to Auth client CA certs in pem format | "" |
|
||||
| MF_BROKER_URL | Message broker URL | nats://127.0.0.1:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker URL | nats://127.0.0.1:4222 |
|
||||
| MF_JAEGER_URL | Jaeger server URL | http://jaeger:14268/api/traces |
|
||||
| MF_SEND_TELEMETRY | Send telemetry to mainflux call home server | true |
|
||||
| MF_SMPP_NOTIFIER_INSTANCE_ID | SMPP Notifier instance ID | "" |
|
||||
|
@ -27,7 +27,7 @@ default values.
|
||||
| MF_SMTP_NOTIFIER_DB_SSL_KEY | Path to the PEM encoded certificate key | "" |
|
||||
| MF_SMTP_NOTIFIER_DB_SSL_ROOT_CERT | Path to the PEM encoded root certificate file | "" |
|
||||
| MF_JAEGER_URL | Jaeger server URL | http://jaeger:14268/api/traces |
|
||||
| MF_BROKER_URL | Message broker URL | nats://127.0.0.1:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker URL | nats://127.0.0.1:4222 |
|
||||
| MF_EMAIL_HOST | Mail server host | localhost |
|
||||
| MF_EMAIL_PORT | Mail server port | 25 |
|
||||
| MF_EMAIL_USERNAME | Mail server username | |
|
||||
|
@ -21,7 +21,7 @@ default values.
|
||||
| MF_CASSANDRA_USER | Cassandra DB username | mainflux |
|
||||
| MF_CASSANDRA_PASS | Cassandra DB password | mainflux |
|
||||
| MF_CASSANDRA_PORT | Cassandra DB port | 9042 |
|
||||
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_JAEGER_URL | Jaeger server URL | http://jaeger:14268/api/traces |
|
||||
| MF_SEND_TELEMETRY | Send telemetry to mainflux call home server | true |
|
||||
| MF_CASSANDRA_WRITER_INSANCE_ID | Cassandra writer instance ID | |
|
||||
@ -56,7 +56,7 @@ MF_CASSANDRA_KEYSPACE=[Cassandra keyspace name] \
|
||||
MF_CASSANDRA_USER=[Cassandra DB username] \
|
||||
MF_CASSANDRA_PASS=[Cassandra DB password] \
|
||||
MF_CASSANDRA_PORT=[Cassandra DB port] \
|
||||
MF_BROKER_URL=[Message Broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message Broker instance URL] \
|
||||
MF_JAEGER_URL=[Jaeger server URL] \
|
||||
MF_SEND_TELEMETRY=[Send telemetry to mainflux call home server] \
|
||||
MF_CASSANDRA_WRITER_INSANCE_ID=[Cassandra writer instance ID] \
|
||||
|
@ -29,7 +29,7 @@ default values.
|
||||
| MF_INFLUXDB_USER_AGENT | InfluxDB user agent | |
|
||||
| MF_INFLUXDB_TIMEOUT | InfluxDB client connection readiness timeout | 1s |
|
||||
| MF_INFLUXDB_INSECURE_SKIP_VERIFY | InfluxDB client connection insecure skip verify | false |
|
||||
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_JAEGER_URL | Jaeger server URL | http://jaeger:14268/api/traces |
|
||||
| MF_SEND_TELEMETRY | Send telemetry to mainflux call home server | true |
|
||||
| MF_INFLUX_WRITER_INSTANCE_ID | InfluxDB writer instance ID | |
|
||||
@ -72,7 +72,7 @@ MF_INFLUXDB_DBURL=[InfluxDB database url] \
|
||||
MF_INFLUXDB_USER_AGENT=[InfluxDB user agent] \
|
||||
MF_INFLUXDB_TIMEOUT=[InfluxDB timeout] \
|
||||
MF_INFLUXDB_INSECURE_SKIP_VERIFY=[InfluxDB insecure skip verify] \
|
||||
MF_BROKER_URL=[Message broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message broker instance URL] \
|
||||
MF_JAEGER_URL=[Jaeger server URL] \
|
||||
MF_SEND_TELEMETRY=[Send telemetry to mainflux call home server] \
|
||||
MF_INFLUX_WRITER_INSTANCE_ID=[Influx writer instance ID] \
|
||||
|
@ -19,7 +19,7 @@ default values.
|
||||
| MF_MONGO_NAME | Default MongoDB database name | messages |
|
||||
| MF_MONGO_HOST | Default MongoDB database host | localhost |
|
||||
| MF_MONGO_PORT | Default MongoDB database port | 27017 |
|
||||
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_JAEGER_URL | Jaeger server URL | http://jaeger:14268/api/traces |
|
||||
| MF_SEND_TELEMETRY | Send telemetry to mainflux call home server | true |
|
||||
| MF_MONGO_WRITER_INSTANCE_ID | MongoDB writer instance ID | "" |
|
||||
@ -52,7 +52,7 @@ MF_MONGO_WRITER_HTTP_SERVER_KEY=[Service HTTP server key] \
|
||||
MF_MONGO_NAME=[MongoDB database name] \
|
||||
MF_MONGO_HOST=[MongoDB database host] \
|
||||
MF_MONGO_PORT=[MongoDB database port] \
|
||||
MF_BROKER_URL=[Message broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message broker instance URL] \
|
||||
MF_JAEGER_URL=[Jaeger server URL] \
|
||||
MF_SEND_TELEMETRY=[Send telemetry to mainflux call home server] \
|
||||
MF_MONGO_WRITER_INSTANCE_ID=[MongoDB writer instance ID] \
|
||||
|
@ -25,7 +25,7 @@ default values.
|
||||
| MF_POSTGRES_SSL_CERT | Postgres SSL certificate path | "" |
|
||||
| MF_POSTGRES_SSL_KEY | Postgres SSL key | "" |
|
||||
| MF_POSTGRES_SSL_ROOT_CERT | Postgres SSL root certificate path | "" |
|
||||
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_JAEGER_URL | Jaeger server URL | http://jaeger:14268/api/traces |
|
||||
| MF_SEND_TELEMETRY | Send telemetry to mainflux call home server | true |
|
||||
| MF_POSTGRES_WRITER_INSTANCE_ID | Service instance ID | "" |
|
||||
@ -64,7 +64,7 @@ MF_POSTGRES_SSL_MODE=[Postgres SSL mode] \
|
||||
MF_POSTGRES_SSL_CERT=[Postgres SSL cert] \
|
||||
MF_POSTGRES_SSL_KEY=[Postgres SSL key] \
|
||||
MF_POSTGRES_SSL_ROOT_CERT=[Postgres SSL Root cert] \
|
||||
MF_BROKER_URL=[Message broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message broker instance URL] \
|
||||
MF_JAEGER_URL=[Jaeger server URL] \
|
||||
MF_SEND_TELEMETRY=[Send telemetry to mainflux call home server] \
|
||||
MF_POSTGRES_WRITER_INSTANCE_ID=[Service instance ID] \
|
||||
|
@ -25,7 +25,7 @@ default values.
|
||||
| MF_TIMESCALE_SSL_CERT | Timescale SSL certificate path | "" |
|
||||
| MF_TIMESCALE_SSL_KEY | Timescale SSL key | "" |
|
||||
| MF_TIMESCALE_SSL_ROOT_CERT | Timescale SSL root certificate path | "" |
|
||||
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_JAEGER_URL | Jaeger server URL | http://jaeger:14268/api/traces |
|
||||
| MF_SEND_TELEMETRY | Send telemetry to mainflux call home server | true |
|
||||
| MF_TIMESCALE_WRITER_INSTANCE_ID | Timescale writer instance ID | "" |
|
||||
@ -64,7 +64,7 @@ MF_TIMESCALE_SSL_MODE=[Timescale SSL mode] \
|
||||
MF_TIMESCALE_SSL_CERT=[Timescale SSL cert] \
|
||||
MF_TIMESCALE_SSL_KEY=[Timescale SSL key] \
|
||||
MF_TIMESCALE_SSL_ROOT_CERT=[Timescale SSL Root cert] \
|
||||
MF_BROKER_URL=[Message broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message broker instance URL] \
|
||||
MF_JAEGER_URL=[Jaeger server URL] \
|
||||
MF_SEND_TELEMETRY=[Send telemetry to mainflux call home server] \
|
||||
MF_TIMESCALE_WRITER_INSTANCE_ID=[Timescale writer instance ID] \
|
||||
|
38
docker/.env
38
docker/.env
@ -10,7 +10,11 @@ MF_NGINX_MQTTS_PORT=8883
|
||||
MF_NATS_PORT=4222
|
||||
MF_NATS_HTTP_PORT=8222
|
||||
MF_NATS_JETSTREAM_KEY=u7wFoAPgXpDueXOFldBnXDh4xjnSOyEJ2Cb8Z5SZvGLzIZ3U4exWhhoIBZHzuNvh
|
||||
MF_NATS_URL=nats://broker:${MF_NATS_PORT}
|
||||
MF_NATS_URL=nats://nats:${MF_NATS_PORT}
|
||||
# Configs for nats as MQTT broker
|
||||
MF_NATS_HEALTH_CHECK=http://nats:${MF_NATS_HTTP_PORT}/healthz
|
||||
MF_NATS_WS_TARGET_PATH=
|
||||
MF_NATS_MQTT_QOS=1
|
||||
|
||||
## RabbitMQ
|
||||
MF_RABBITMQ_PORT=5672
|
||||
@ -22,8 +26,26 @@ MF_RABBITMQ_VHOST=/
|
||||
MF_RABBITMQ_URL=amqp://${MF_RABBITMQ_USER}:${MF_RABBITMQ_PASS}@broker:${MF_RABBITMQ_PORT}${MF_RABBITMQ_VHOST}
|
||||
|
||||
## Message Broker
|
||||
MF_BROKER_TYPE=nats
|
||||
MF_BROKER_URL=${MF_NATS_URL}
|
||||
MF_MESSAGE_BROKER_TYPE=nats
|
||||
MF_MESSAGE_BROKER_URL=${MF_NATS_URL}
|
||||
|
||||
## VERNEMQ
|
||||
MF_DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on
|
||||
MF_DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL=error
|
||||
MF_VERNEMQ_HEALTH_CHECK=http://vernemq:8888/health
|
||||
MF_VERNEMQ_WS_TARGET_PATH=/mqtt
|
||||
MF_VERNEMQ_MQTT_QOS=2
|
||||
|
||||
## MQTT Broker
|
||||
MF_MQTT_BROKER_TYPE=nats
|
||||
MF_MQTT_BROKER_HEALTH_CHECK=${MF_NATS_HEALTH_CHECK}
|
||||
MF_MQTT_ADAPTER_MQTT_QOS=${MF_NATS_MQTT_QOS}
|
||||
MF_MQTT_ADAPTER_MQTT_TARGET_HOST=${MF_MQTT_BROKER_TYPE}
|
||||
MF_MQTT_ADAPTER_MQTT_TARGET_PORT=1883
|
||||
MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${MF_MQTT_BROKER_HEALTH_CHECK}
|
||||
MF_MQTT_ADAPTER_WS_TARGET_HOST=${MF_MQTT_BROKER_TYPE}
|
||||
MF_MQTT_ADAPTER_WS_TARGET_PORT=8080
|
||||
MF_MQTT_ADAPTER_WS_TARGET_PATH=${MF_NATS_WS_TARGET_PATH}
|
||||
|
||||
## Redis
|
||||
MF_REDIS_TCP_PORT=6379
|
||||
@ -168,24 +190,14 @@ MF_HTTP_ADAPTER_INSTANCE_ID=
|
||||
### MQTT
|
||||
MF_MQTT_ADAPTER_LOG_LEVEL=debug
|
||||
MF_MQTT_ADAPTER_MQTT_PORT=1883
|
||||
MF_MQTT_ADAPTER_MQTT_TARGET_HOST=vernemq
|
||||
MF_MQTT_ADAPTER_MQTT_TARGET_PORT=1883
|
||||
MF_MQTT_ADAPTER_FORWARDER_TIMEOUT=30s
|
||||
MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=http://vernemq:8888/health
|
||||
MF_MQTT_ADAPTER_WS_PORT=8080
|
||||
MF_MQTT_ADAPTER_WS_TARGET_HOST=vernemq
|
||||
MF_MQTT_ADAPTER_WS_TARGET_PORT=8080
|
||||
MF_MQTT_ADAPTER_WS_TARGET_PATH=/mqtt
|
||||
MF_MQTT_ADAPTER_INSTANCE=
|
||||
MF_MQTT_ADAPTER_ES_URL=es-redis:${MF_REDIS_TCP_PORT}
|
||||
MF_MQTT_ADAPTER_ES_PASS=
|
||||
MF_MQTT_ADAPTER_INSTANCE_ID=
|
||||
MF_MQTT_ADAPTER_ES_DB=0
|
||||
|
||||
### VERNEMQ
|
||||
MF_DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on
|
||||
MF_DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL=error
|
||||
|
||||
### CoAP
|
||||
MF_COAP_ADAPTER_LOG_LEVEL=debug
|
||||
MF_COAP_ADAPTER_HOST=coap-adapter
|
||||
|
@ -55,7 +55,7 @@ services:
|
||||
MF_CASSANDRA_KEYSPACE: ${MF_CASSANDRA_KEYSPACE}
|
||||
MF_CASSANDRA_USER: ${MF_CASSANDRA_USER}
|
||||
MF_CASSANDRA_PASS: ${MF_CASSANDRA_PASS}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_CASSANDRA_WRITER_INSANCE_ID: ${MF_CASSANDRA_WRITER_INSANCE_ID}
|
||||
|
@ -61,7 +61,7 @@ services:
|
||||
MF_INFLUXDB_USER_AGENT: ${MF_INFLUXDB_USER_AGENT}
|
||||
MF_INFLUXDB_TIMEOUT: ${MF_INFLUXDB_TIMEOUT}
|
||||
MF_INFLUXDB_INSECURE_SKIP_VERIFY: ${MF_INFLUXDB_INSECURE_SKIP_VERIFY}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_INFLUX_WRITER_INSTANCE_ID: ${MF_INFLUX_WRITER_INSTANCE_ID}
|
||||
|
@ -39,7 +39,7 @@ services:
|
||||
MF_LORA_ADAPTER_ROUTE_MAP_PASS: ${MF_LORA_ADAPTER_ROUTE_MAP_PASS}
|
||||
MF_LORA_ADAPTER_ROUTE_MAP_DB: ${MF_LORA_ADAPTER_ROUTE_MAP_DB}
|
||||
MF_LORA_ADAPTER_ES_URL: ${MF_ES_URL}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_LORA_ADAPTER_INSTANCE_ID: ${MF_LORA_ADAPTER_INSTANCE_ID}
|
||||
|
@ -48,7 +48,7 @@ services:
|
||||
MF_MONGO_HOST: ${MF_MONGO_HOST}
|
||||
MF_MONGO_PORT: ${MF_MONGO_PORT}
|
||||
MF_MONGO_NAME: ${MF_MONGO_NAME}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_MONGO_WRITER_INSTANCE_ID: ${MF_MONGO_WRITER_INSTANCE_ID}
|
||||
|
@ -40,7 +40,7 @@ services:
|
||||
MF_OPCUA_ADAPTER_ROUTE_MAP_URL: ${MF_OPCUA_ADAPTER_ROUTE_MAP_URL}
|
||||
MF_OPCUA_ADAPTER_ROUTE_MAP_PASS: ${MF_OPCUA_ADAPTER_ROUTE_MAP_PASS}
|
||||
MF_OPCUA_ADAPTER_ROUTE_MAP_DB: ${MF_OPCUA_ADAPTER_ROUTE_MAP_DB}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_OPCUA_ADAPTER_INSTANCE_ID: ${MF_OPCUA_ADAPTER_INSTANCE_ID}
|
||||
|
@ -52,7 +52,7 @@ services:
|
||||
MF_POSTGRES_SSL_CERT: ${MF_POSTGRES_SSL_CERT}
|
||||
MF_POSTGRES_SSL_KEY: ${MF_POSTGRES_SSL_KEY}
|
||||
MF_POSTGRES_SSL_ROOT_CERT: ${MF_POSTGRES_SSL_ROOT_CERT}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_POSTGRES_WRITER_INSTANCE_ID: ${MF_POSTGRES_WRITER_INSTANCE_ID}
|
||||
|
@ -64,7 +64,7 @@ services:
|
||||
MF_AUTH_GRPC_CLIENT_CERT: ${MF_AUTH_GRPC_CLIENT_CERT:+/users-grpc-client.crt}
|
||||
MF_AUTH_GRPC_CLIENT_KEY: ${MF_AUTH_GRPC_CLIENT_KEY:+/users-grpc-client.key}
|
||||
MF_AUTH_GRPC_SERVER_CA_CERTS: ${MF_AUTH_GRPC_SERVER_CA_CERTS:+/users-grpc-server-ca.crt}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_SMPP_NOTIFIER_INSTANCE_ID: ${MF_SMPP_NOTIFIER_INSTANCE_ID}
|
||||
|
@ -63,7 +63,7 @@ services:
|
||||
MF_EMAIL_FROM_ADDRESS: ${MF_EMAIL_FROM_ADDRESS}
|
||||
MF_EMAIL_FROM_NAME: ${MF_EMAIL_FROM_NAME}
|
||||
MF_EMAIL_TEMPLATE: ${MF_SMTP_NOTIFIER_EMAIL_TEMPLATE}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_SMTP_NOTIFIER_INSTANCE_ID: ${MF_SMTP_NOTIFIER_INSTANCE_ID}
|
||||
|
@ -52,7 +52,7 @@ services:
|
||||
MF_TIMESCALE_SSL_CERT: ${MF_TIMESCALE_SSL_CERT}
|
||||
MF_TIMESCALE_SSL_KEY: ${MF_TIMESCALE_SSL_KEY}
|
||||
MF_TIMESCALE_SSL_ROOT_CERT: ${MF_TIMESCALE_SSL_ROOT_CERT}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_TIMESCALE_WRITER_INSTANCE_ID: ${MF_TIMESCALE_WRITER_INSTANCE_ID}
|
||||
|
@ -65,7 +65,7 @@ services:
|
||||
MF_AUTH_GRPC_CLIENT_CERT: ${MF_AUTH_GRPC_CLIENT_CERT:+/users-grpc-client.crt}
|
||||
MF_AUTH_GRPC_CLIENT_KEY: ${MF_AUTH_GRPC_CLIENT_KEY:+/users-grpc-client.key}
|
||||
MF_AUTH_GRPC_SERVER_CA_CERTS: ${MF_AUTH_GRPC_SERVER_CA_CERTS:+/users-grpc-server-ca.crt}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_TWINS_INSTANCE_ID: ${MF_TWINS_INSTANCE_ID}
|
||||
ports:
|
||||
|
34
docker/brokers/README.md
Normal file
34
docker/brokers/README.md
Normal file
@ -0,0 +1,34 @@
|
||||
# Brokers Docker Compose
|
||||
|
||||
Mainflux supports configurable MQTT broker and Message broker.
|
||||
|
||||
## MQTT Broker
|
||||
|
||||
Mainflux supports VerneMQ and Nats as an MQTT broker.
|
||||
|
||||
## Message Broker
|
||||
|
||||
Mainflux supports NATS and RabbitMQ as a message broker.
|
||||
|
||||
## Profiles
|
||||
|
||||
This directory contains 4 docker-compose profiles for running Mainflux with different combinations of MQTT and message brokers.
|
||||
|
||||
The profiles are:
|
||||
|
||||
- `vernemq_nats` - VerneMQ as an MQTT broker and Nats as a message broker
|
||||
- `vernemq_rabbitmq` - VerneMQ as an MQTT broker and RabbitMQ as a message broker
|
||||
- `nats_nats` - Nats as an MQTT broker and Nats as a message broker
|
||||
- `nats_rabbitmq` - Nats as an MQTT broker and RabbitMQ as a message broker
|
||||
|
||||
The following command will run VerneMQ as an MQTT broker and Nats as a message broker:
|
||||
|
||||
```bash
|
||||
MF_MQTT_BROKER_TYPE=vernemq MF_MESSAGE_BROKER_TYPE=nats make run
|
||||
```
|
||||
|
||||
The following command will run VerneMQ as an MQTT broker and RabbitMQ as a message broker:
|
||||
|
||||
```bash
|
||||
MF_MQTT_BROKER_TYPE=vernemq MF_MESSAGE_BROKER_TYPE=rabbitmq make run
|
||||
```
|
48
docker/brokers/docker-compose.yml
Normal file
48
docker/brokers/docker-compose.yml
Normal file
@ -0,0 +1,48 @@
|
||||
# This file configures Mainflux brokers. Mainflux uses two types of brokers:
|
||||
# 1. MQTT_BROKER: Handles MQTT communication between MQTT adapters and message broker.
|
||||
# 2. MESSAGE_BROKER: Manages communication between adapters and Mainflux writer services.
|
||||
#
|
||||
# MQTT_BROKER can be either 'vernemq' or 'nats'.
|
||||
# MESSAGE_BROKER can be either 'nats' or 'rabbitmq'.
|
||||
#
|
||||
# Each broker has a unique profile for configuration. The available profiles are:
|
||||
# - vernemq_nats: Uses 'vernemq' as MQTT_BROKER and 'nats' as MESSAGE_BROKER.
|
||||
# - vernemq_rabbitmq: Uses 'vernemq' as MQTT_BROKER and 'rabbitmq' as MESSAGE_BROKER.
|
||||
# - nats_nats: Uses 'nats' as both MQTT_BROKER and MESSAGE_BROKER.
|
||||
# - nats_rabbitmq: Uses 'nats' as MQTT_BROKER and 'rabbitmq' as MESSAGE_BROKER.
|
||||
#
|
||||
|
||||
include:
|
||||
- path: profiles/nats.yml
|
||||
env_file: docker/.env
|
||||
|
||||
services:
|
||||
vernemq:
|
||||
image: mainflux/vernemq:${MF_RELEASE_TAG}
|
||||
container_name: mainflux-vernemq
|
||||
restart: on-failure
|
||||
environment:
|
||||
DOCKER_VERNEMQ_ALLOW_ANONYMOUS: ${MF_DOCKER_VERNEMQ_ALLOW_ANONYMOUS}
|
||||
DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL: ${MF_DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL}
|
||||
networks:
|
||||
- mainflux-base-net
|
||||
volumes:
|
||||
- mainflux-mqtt-broker-volume:/var/lib/vernemq
|
||||
profiles:
|
||||
- vernemq_nats
|
||||
- vernemq_rabbitmq
|
||||
|
||||
broker:
|
||||
extends:
|
||||
file: nats.yml
|
||||
service: broker
|
||||
container_name: mainflux-broker
|
||||
restart: on-failure
|
||||
networks:
|
||||
- mainflux-base-net
|
||||
volumes:
|
||||
- mainflux-broker-volume:/data
|
||||
profiles:
|
||||
- vernemq_nats
|
||||
- vernemq_rabbitmq
|
||||
- nats_rabbitmq
|
24
docker/brokers/profiles/README.md
Normal file
24
docker/brokers/profiles/README.md
Normal file
@ -0,0 +1,24 @@
|
||||
# Nats Docker Profiles
|
||||
|
||||
This directory contains 2 docker-compose profiles for running Nats as an MQTT broker. It is separated from the main profile at `../docker-compose.yml` because of name conflicts with the Nats message broker.
|
||||
|
||||
The configuration is the same as for the main profile, except that the MQTT broker is set to `nats` instead of `vernemq`.
|
||||
|
||||
The profiles are:
|
||||
|
||||
- `nats_nats.yml` - Nats as an MQTT broker and Nats as a message broker
|
||||
- `nats_rabbit.yml` - Nats as an MQTT broker and RabbitMQ as a message broker
|
||||
|
||||
They are automatically included in the main profile, so you can run them depending on the profile you want to use:
|
||||
|
||||
The following command will run Nats as an MQTT broker and Nats as a message broker:
|
||||
|
||||
```bash
|
||||
MF_MQTT_BROKER_TYPE=nats MF_MESSAGE_BROKER_TYPE=nats make run
|
||||
```
|
||||
|
||||
The following command will run Nats as an MQTT broker and RabbitMQ as a message broker:
|
||||
|
||||
```bash
|
||||
MF_MQTT_BROKER_TYPE=nats MF_MESSAGE_BROKER_TYPE=rabbit make run
|
||||
```
|
16
docker/brokers/profiles/nats.yml
Normal file
16
docker/brokers/profiles/nats.yml
Normal file
@ -0,0 +1,16 @@
|
||||
# This file is used to configure NATS broker.
|
||||
# It used when running nats both as an MQTT and Message broker.
|
||||
services:
|
||||
nats:
|
||||
extends:
|
||||
file: ../nats.yml
|
||||
service: broker
|
||||
container_name: mainflux-nats
|
||||
restart: on-failure
|
||||
networks:
|
||||
- mainflux-base-net
|
||||
volumes:
|
||||
- mainflux-broker-volume:/data
|
||||
profiles:
|
||||
- nats_nats
|
||||
- nats_rabbitmq
|
15
docker/brokers/profiles/nats_rabbitmq.yml
Normal file
15
docker/brokers/profiles/nats_rabbitmq.yml
Normal file
@ -0,0 +1,15 @@
|
||||
# This file is used to configure NATS broker.
|
||||
# It used when running nats as an MQTT broker and RabbitMQ as a Message broker.
|
||||
services:
|
||||
nats:
|
||||
extends:
|
||||
file: ../nats.yml
|
||||
service: broker
|
||||
container_name: mainflux-nats
|
||||
restart: on-failure
|
||||
networks:
|
||||
- mainflux-base-net
|
||||
volumes:
|
||||
- mainflux-mqtt-broker-volume:/data
|
||||
profiles:
|
||||
- nats_rabbitmq
|
@ -18,6 +18,10 @@ volumes:
|
||||
mainflux-spicedb-db-volume:
|
||||
mainflux-auth-db-volume:
|
||||
|
||||
include:
|
||||
- path: brokers/docker-compose.yml
|
||||
env_file: docker/.env
|
||||
|
||||
services:
|
||||
spicedb:
|
||||
image: "authzed/spicedb"
|
||||
@ -139,17 +143,6 @@ services:
|
||||
- http-adapter
|
||||
- ws-adapter
|
||||
|
||||
broker:
|
||||
extends:
|
||||
file: brokers/nats.yml
|
||||
service: broker
|
||||
container_name: mainflux-broker
|
||||
restart: on-failure
|
||||
networks:
|
||||
- mainflux-base-net
|
||||
volumes:
|
||||
- mainflux-broker-volume:/data
|
||||
|
||||
things-db:
|
||||
image: postgres:13.3-alpine
|
||||
container_name: mainflux-things-db
|
||||
@ -362,25 +355,12 @@ services:
|
||||
networks:
|
||||
- mainflux-base-net
|
||||
|
||||
vernemq:
|
||||
image: mainflux/vernemq:${MF_RELEASE_TAG}
|
||||
container_name: mainflux-vernemq
|
||||
restart: on-failure
|
||||
environment:
|
||||
DOCKER_VERNEMQ_ALLOW_ANONYMOUS: ${MF_DOCKER_VERNEMQ_ALLOW_ANONYMOUS}
|
||||
DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL: ${MF_DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL}
|
||||
networks:
|
||||
- mainflux-base-net
|
||||
volumes:
|
||||
- mainflux-mqtt-broker-volume:/var/lib/vernemq
|
||||
|
||||
mqtt-adapter:
|
||||
image: mainflux/mqtt:${MF_RELEASE_TAG}
|
||||
container_name: mainflux-mqtt
|
||||
depends_on:
|
||||
- vernemq
|
||||
- things
|
||||
- broker
|
||||
- nats
|
||||
restart: on-failure
|
||||
environment:
|
||||
MF_MQTT_ADAPTER_LOG_LEVEL: ${MF_MQTT_ADAPTER_LOG_LEVEL}
|
||||
@ -402,7 +382,7 @@ services:
|
||||
MF_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/client.key}
|
||||
MF_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/server_ca.crt}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
networks:
|
||||
- mainflux-base-net
|
||||
@ -429,7 +409,7 @@ services:
|
||||
container_name: mainflux-http
|
||||
depends_on:
|
||||
- things
|
||||
- broker
|
||||
- nats
|
||||
restart: on-failure
|
||||
environment:
|
||||
MF_HTTP_ADAPTER_LOG_LEVEL: ${MF_HTTP_ADAPTER_LOG_LEVEL}
|
||||
@ -442,7 +422,7 @@ services:
|
||||
MF_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/client.crt}
|
||||
MF_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/client.key}
|
||||
MF_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/server_ca.crt}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_HTTP_ADAPTER_INSTANCE_ID: ${MF_HTTP_ADAPTER_INSTANCE_ID}
|
||||
@ -482,7 +462,7 @@ services:
|
||||
container_name: mainflux-coap
|
||||
depends_on:
|
||||
- things
|
||||
- broker
|
||||
- nats
|
||||
restart: on-failure
|
||||
environment:
|
||||
MF_COAP_ADAPTER_LOG_LEVEL: ${MF_COAP_ADAPTER_LOG_LEVEL}
|
||||
@ -499,7 +479,7 @@ services:
|
||||
MF_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/client.crt}
|
||||
MF_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/client.key}
|
||||
MF_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/server_ca.crt}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_COAP_ADAPTER_INSTANCE_ID: ${MF_COAP_ADAPTER_INSTANCE_ID}
|
||||
@ -531,7 +511,7 @@ services:
|
||||
container_name: mainflux-ws
|
||||
depends_on:
|
||||
- things
|
||||
- broker
|
||||
- nats
|
||||
restart: on-failure
|
||||
environment:
|
||||
MF_WS_ADAPTER_LOG_LEVEL: ${MF_WS_ADAPTER_LOG_LEVEL}
|
||||
@ -544,7 +524,7 @@ services:
|
||||
MF_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/client.crt}
|
||||
MF_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/client.key}
|
||||
MF_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/server_ca.crt}
|
||||
MF_BROKER_URL: ${MF_BROKER_URL}
|
||||
MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_WS_ADAPTER_INSTANCE_ID: ${MF_WS_ADAPTER_INSTANCE_ID}
|
||||
|
@ -1,3 +1,4 @@
|
||||
server_name: "nats_internal_broker"
|
||||
max_payload: 1MB
|
||||
max_connections: 1M
|
||||
port: $MF_NATS_PORT
|
||||
@ -10,3 +11,14 @@ jetstream {
|
||||
key: $MF_NATS_JETSTREAM_KEY
|
||||
max_mem: 1G
|
||||
}
|
||||
|
||||
mqtt {
|
||||
port: 1883
|
||||
max_ack_pending: 1
|
||||
}
|
||||
|
||||
websocket {
|
||||
port: 8080
|
||||
|
||||
no_tls: true
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ RUN chown -R 10000:10000 /vernemq && \
|
||||
# 9100 9101 9102 9103 9104 9105 9106 9107 9108 9109 Specific Distributed Erlang Port Range
|
||||
|
||||
EXPOSE 1883 8883 8080 44053 4369 8888 \
|
||||
9100 9101 9102 9103 9104 9105 9106 9107 9108 9109
|
||||
9100 9101 9102 9103 9104 9105 9106 9107 9108 9109
|
||||
|
||||
|
||||
VOLUME ["/vernemq/log", "/vernemq/data", "/vernemq/etc"]
|
||||
|
0
docker/vernemq/bin/vernemq.sh
Executable file → Normal file
0
docker/vernemq/bin/vernemq.sh
Executable file → Normal file
@ -19,7 +19,7 @@ default values.
|
||||
| MF_THINGS_AUTH_GRPC_TIMEOUT | Things service Auth gRPC request timeout in seconds | 1s |
|
||||
| MF_THINGS_AUTH_GRPC_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
|
||||
| MF_THINGS_AUTH_GRPC_CA_CERTS | Path to trusted CAs in PEM format | |
|
||||
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_JAEGER_URL | Jaeger server URL | http://jaeger:14268/api/traces |
|
||||
| MF_SEND_TELEMETRY | Send telemetry to mainflux call home server | true |
|
||||
| MF_HTTP_ADAPTER_INSTANCE_ID | HTTP Adapter instance ID | |
|
||||
@ -53,7 +53,7 @@ MF_THINGS_AUTH_GRPC_URL=[Things service Auth gRPC URL] \
|
||||
MF_THINGS_AUTH_GRPC_TIMEOUT=[Things service Auth gRPC request timeout in seconds] \
|
||||
MF_THINGS_AUTH_GRPC_CLIENT_TLS=[Flag that indicates if TLS should be turned on] \
|
||||
MF_THINGS_AUTH_GRPC_CA_CERTS=[Path to trusted CAs in PEM format] \
|
||||
MF_BROKER_URL=[Message broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message broker instance URL] \
|
||||
MF_JAEGER_URL=[Jaeger server URL] \
|
||||
MF_SEND_TELEMETRY=[Send telemetry to mainflux call home server] \
|
||||
MF_HTTP_ADAPTER_INSTANCE_ID=[HTTP Adapter instance ID] \
|
||||
|
@ -36,7 +36,6 @@ func New(publisher messaging.Publisher, auth mainflux.AuthzServiceClient) Servic
|
||||
|
||||
func (as *adapterService) Publish(ctx context.Context, token string, msg *messaging.Message) error {
|
||||
ar := &mainflux.AuthorizeReq{
|
||||
Namespace: "",
|
||||
SubjectType: "thing",
|
||||
Permission: "publish",
|
||||
Subject: token,
|
||||
|
@ -15,7 +15,7 @@ default values.
|
||||
|----------------------------------|----------------------------------------------|---------------------------------|
|
||||
| MF_LORA_ADAPTER_HTTP_PORT | Service HTTP port | 9017 |
|
||||
| MF_LORA_ADAPTER_LOG_LEVEL | Service Log level | info |
|
||||
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_LORA_ADAPTER_MESSAGES_URL | LoRa adapter MQTT broker URL | tcp://localhost:1883 |
|
||||
| MF_LORA_ADAPTER_MESSAGES_TOPIC | LoRa adapter MQTT subscriber Topic | application/+/device/+/event/up |
|
||||
| MF_LORA_ADAPTER_MESSAGES_USER | LoRa adapter MQTT subscriber Username | |
|
||||
@ -52,7 +52,7 @@ make install
|
||||
|
||||
# set the environment variables and run the service
|
||||
MF_LORA_ADAPTER_LOG_LEVEL=[Lora Adapter Log Level] \
|
||||
MF_BROKER_URL=[Message broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message broker instance URL] \
|
||||
MF_LORA_ADAPTER_MESSAGES_URL=[LoRa adapter MQTT broker URL] \
|
||||
MF_LORA_ADAPTER_MESSAGES_TOPIC=[LoRa adapter MQTT subscriber Topic] \
|
||||
MF_LORA_ADAPTER_MESSAGES_USER=[LoRa adapter MQTT subscriber Username] \
|
||||
|
@ -16,6 +16,7 @@ default values.
|
||||
| MF_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 |
|
||||
| MF_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | 0.0.0.0 |
|
||||
| MF_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 |
|
||||
| MF_MQTT_ADAPTER_MQTT_QOS | MQTT broker QoS | 1 |
|
||||
| MF_MQTT_ADAPTER_FORWARDER_TIMEOUT | MQTT forwarder for multiprotocol communication timeout | 30s |
|
||||
| MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" |
|
||||
| MF_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 |
|
||||
@ -30,7 +31,7 @@ default values.
|
||||
| MF_MQTT_ADAPTER_ES_URL | Event sourcing URL | localhost:6379 |
|
||||
| MF_MQTT_ADAPTER_ES_PASS | Event sourcing password | "" |
|
||||
| MF_MQTT_ADAPTER_ES_DB | Event sourcing database | "0" |
|
||||
| MF_BROKER_URL | Message broker broker URL | nats://127.0.0.1:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker broker URL | nats://127.0.0.1:4222 |
|
||||
| MF_JAEGER_URL | URL of Jaeger tracing service | "http://jaeger:14268/api/traces" |
|
||||
| MF_SEND_TELEMETRY | Send telemetry to mainflux call home server | true |
|
||||
| MF_MQTT_ADAPTER_INSTANCE_ID | Instance ID for telemetry | "" |
|
||||
@ -61,6 +62,7 @@ MF_MQTT_ADAPTER_MQTT_TARGET_HOST=[MQTT broker host] \
|
||||
MF_MQTT_ADAPTER_MQTT_TARGET_PORT=[MQTT broker MQTT port]] \
|
||||
MF_MQTT_ADAPTER_FORWARDER_TIMEOUT=[MQTT forwarder for multiprotocol support timeout] \
|
||||
MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=[MQTT health check URL] \
|
||||
MF_MQTT_ADAPTER_MQTT_QOS=[MQTT broker QoS] \
|
||||
MF_MQTT_ADAPTER_WS_PORT=[MQTT adapter WS port] \
|
||||
MF_MQTT_ADAPTER_WS_TARGET_HOST=[MQTT broker for MQTT over WS host] \
|
||||
MF_MQTT_ADAPTER_WS_TARGET_PORT=[MQTT broker for MQTT over WS port]] \
|
||||
@ -75,7 +77,7 @@ MF_MQTT_ADAPTER_CA_CERTS=[CA certs for gRPC client] \
|
||||
MF_MQTT_ADAPTER_ES_URL=[Event sourcing URL] \
|
||||
MF_MQTT_ADAPTER_ES_PASS=[Event sourcing pass] \
|
||||
MF_MQTT_ADAPTER_ES_DB=[Event sourcing database] \
|
||||
MF_BROKER_URL=[Message broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message broker instance URL] \
|
||||
MF_JAEGER_URL=[Jaeger service URL] \
|
||||
MF_SEND_TELEMETRY=[Send telemetry to mainflux call home server] \
|
||||
MF_MQTT_ADAPTER_INSTANCE_ID=[Instance ID] \
|
||||
|
@ -55,19 +55,19 @@ var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]
|
||||
|
||||
// Event implements events.Event interface.
|
||||
type handler struct {
|
||||
publishers []messaging.Publisher
|
||||
auth mainflux.AuthzServiceClient
|
||||
logger logger.Logger
|
||||
es events.EventStore
|
||||
publisher messaging.Publisher
|
||||
auth mainflux.AuthzServiceClient
|
||||
logger logger.Logger
|
||||
es events.EventStore
|
||||
}
|
||||
|
||||
// NewHandler creates new Handler entity.
|
||||
func NewHandler(publishers []messaging.Publisher, es events.EventStore, logger logger.Logger, auth mainflux.AuthzServiceClient) session.Handler {
|
||||
func NewHandler(publisher messaging.Publisher, es events.EventStore, logger logger.Logger, auth mainflux.AuthzServiceClient) session.Handler {
|
||||
return &handler{
|
||||
es: es,
|
||||
logger: logger,
|
||||
publishers: publishers,
|
||||
auth: auth,
|
||||
es: es,
|
||||
logger: logger,
|
||||
publisher: publisher,
|
||||
auth: auth,
|
||||
}
|
||||
}
|
||||
|
||||
@ -168,11 +168,10 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
Created: time.Now().UnixNano(),
|
||||
}
|
||||
|
||||
for _, pub := range h.publishers {
|
||||
if err := pub.Publish(ctx, msg.Channel, &msg); err != nil {
|
||||
return errors.Wrap(ErrFailedPublishToMsgBroker, err)
|
||||
}
|
||||
if err := h.publisher.Publish(ctx, msg.Channel, &msg); err != nil {
|
||||
return errors.Wrap(ErrFailedPublishToMsgBroker, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
"github.com/mainflux/mainflux/mqtt"
|
||||
"github.com/mainflux/mainflux/mqtt/mocks"
|
||||
"github.com/mainflux/mainflux/pkg/errors"
|
||||
"github.com/mainflux/mainflux/pkg/messaging"
|
||||
"github.com/mainflux/mproxy/pkg/session"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@ -455,5 +454,5 @@ func newHandler() session.Handler {
|
||||
}
|
||||
auth := new(authmocks.Service)
|
||||
eventStore := mocks.NewEventStore()
|
||||
return mqtt.NewHandler([]messaging.Publisher{mocks.NewPublisher()}, eventStore, logger, auth)
|
||||
return mqtt.NewHandler(mocks.NewPublisher(), eventStore, logger, auth)
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ default values.
|
||||
|----------------------------------|--------------------------------------------------|----------------------------|
|
||||
| MF_OPCUA_ADAPTER_HTTP_PORT | Service HTTP port | 8180 |
|
||||
| MF_OPCUA_ADAPTER_LOG_LEVEL | Service Log level | info |
|
||||
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_OPCUA_ADAPTER_INTERVAL_MS | OPC-UA Server Interval in milliseconds | 1000 |
|
||||
| MF_OPCUA_ADAPTER_POLICY | OPC-UA Server Policy | |
|
||||
| MF_OPCUA_ADAPTER_MODE | OPC-UA Server Mode | |
|
||||
@ -53,7 +53,7 @@ make install
|
||||
# set the environment variables and run the service
|
||||
MF_OPCUA_ADAPTER_HTTP_PORT=[Service HTTP port] \
|
||||
MF_OPCUA_ADAPTER_LOG_LEVEL=[OPC-UA Adapter Log Level] \
|
||||
MF_BROKER_URL=[Message broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message broker instance URL] \
|
||||
MF_OPCUA_ADAPTER_INTERVAL_MS: [OPC-UA Server Interval (milliseconds)] \
|
||||
MF_OPCUA_ADAPTER_POLICY=[OPC-UA Server Policy] \
|
||||
MF_OPCUA_ADAPTER_MODE=[OPC-UA Server Mode] \
|
||||
|
@ -19,10 +19,11 @@ var _ messaging.Publisher = (*publisher)(nil)
|
||||
type publisher struct {
|
||||
client mqtt.Client
|
||||
timeout time.Duration
|
||||
qos uint8
|
||||
}
|
||||
|
||||
// NewPublisher returns a new MQTT message publisher.
|
||||
func NewPublisher(address string, timeout time.Duration) (messaging.Publisher, error) {
|
||||
func NewPublisher(address string, qos uint8, timeout time.Duration) (messaging.Publisher, error) {
|
||||
client, err := newClient(address, "mqtt-publisher", timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -31,6 +32,7 @@ func NewPublisher(address string, timeout time.Duration) (messaging.Publisher, e
|
||||
ret := publisher{
|
||||
client: client,
|
||||
timeout: timeout,
|
||||
qos: qos,
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
@ -41,7 +43,7 @@ func (pub publisher) Publish(ctx context.Context, topic string, msg *messaging.M
|
||||
}
|
||||
|
||||
// Publish only the payload and not the whole message.
|
||||
token := pub.client.Publish(topic, qos, false, msg.GetPayload())
|
||||
token := pub.client.Publish(topic, byte(pub.qos), false, msg.GetPayload())
|
||||
if token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
|
@ -16,10 +16,7 @@ import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
const (
|
||||
username = "mainflux-mqtt"
|
||||
qos = 2
|
||||
)
|
||||
const username = "mainflux-mqtt"
|
||||
|
||||
var (
|
||||
// ErrConnect indicates that connection to MQTT broker failed.
|
||||
@ -62,7 +59,7 @@ type pubsub struct {
|
||||
}
|
||||
|
||||
// NewPubSub returns MQTT message publisher/subscriber.
|
||||
func NewPubSub(url string, timeout time.Duration, logger mflog.Logger) (messaging.PubSub, error) {
|
||||
func NewPubSub(url string, qos uint8, timeout time.Duration, logger mflog.Logger) (messaging.PubSub, error) {
|
||||
client, err := newClient(url, "mqtt-publisher", timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -71,6 +68,7 @@ func NewPubSub(url string, timeout time.Duration, logger mflog.Logger) (messagin
|
||||
publisher: publisher{
|
||||
client: client,
|
||||
timeout: timeout,
|
||||
qos: qos,
|
||||
},
|
||||
address: url,
|
||||
timeout: timeout,
|
||||
@ -113,14 +111,15 @@ func (ps *pubsub) Subscribe(ctx context.Context, id, topic string, handler messa
|
||||
s.topics = append(s.topics, topic)
|
||||
ps.subscriptions[id] = s
|
||||
|
||||
token := s.client.Subscribe(topic, qos, ps.mqttHandler(handler))
|
||||
token := s.client.Subscribe(topic, byte(ps.qos), ps.mqttHandler(handler))
|
||||
if token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
if ok := token.WaitTimeout(ps.timeout); !ok {
|
||||
return ErrSubscribeTimeout
|
||||
}
|
||||
return token.Error()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *pubsub) Unsubscribe(ctx context.Context, id, topic string) error {
|
||||
@ -181,15 +180,10 @@ func newClient(address, id string, timeout time.Duration) (mqtt.Client, error) {
|
||||
return nil, token.Error()
|
||||
}
|
||||
|
||||
ok := token.WaitTimeout(timeout)
|
||||
if !ok {
|
||||
if ok := token.WaitTimeout(timeout); !ok {
|
||||
return nil, ErrConnect
|
||||
}
|
||||
|
||||
if token.Error() != nil {
|
||||
return nil, token.Error()
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
@ -200,6 +194,7 @@ func (ps *pubsub) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler {
|
||||
ps.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.Handle(&msg); err != nil {
|
||||
ps.logger.Warn(fmt.Sprintf("Failed to handle Mainflux message: %s", err))
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ func TestMain(m *testing.M) {
|
||||
}
|
||||
|
||||
if err := pool.Retry(func() error {
|
||||
pubsub, err = mqttpubsub.NewPubSub(address, brokerTimeout, logger)
|
||||
pubsub, err = mqttpubsub.NewPubSub(address, 2, brokerTimeout, logger)
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Fatalf("Could not connect to docker: %s", err)
|
||||
|
@ -100,14 +100,18 @@ func (ps *pubsub) Subscribe(ctx context.Context, id, topic string, handler messa
|
||||
ps.subscriptions[topic] = s
|
||||
}
|
||||
|
||||
if _, err := ps.ch.QueueDeclare(topic, true, false, false, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ps.ch.QueueBind(topic, topic, exchangeName, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
clientID := fmt.Sprintf("%s-%s", topic, id)
|
||||
msgs, err := ps.ch.Consume(topic, clientID, true, false, false, false, nil)
|
||||
|
||||
queue, err := ps.ch.QueueDeclare(clientID, true, false, false, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ps.ch.QueueBind(queue.Name, topic, exchangeName, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgs, err := ps.ch.Consume(queue.Name, clientID, true, false, false, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -404,8 +404,6 @@ func TestPubSub(t *testing.T) {
|
||||
|
||||
switch tc.err {
|
||||
case nil:
|
||||
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", tc.desc, err))
|
||||
|
||||
// If no error, publish message, and receive after subscribing.
|
||||
expectedMsg := messaging.Message{
|
||||
Channel: channel,
|
||||
@ -417,10 +415,6 @@ func TestPubSub(t *testing.T) {
|
||||
|
||||
receivedMsg := <-msgChan
|
||||
assert.Equal(t, expectedMsg.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
|
||||
assert.Equal(t, expectedMsg.Created, receivedMsg.Created, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
|
||||
assert.Equal(t, expectedMsg.Protocol, receivedMsg.Protocol, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
|
||||
assert.Equal(t, expectedMsg.Publisher, receivedMsg.Publisher, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
|
||||
assert.Equal(t, expectedMsg.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
|
||||
assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
|
||||
|
||||
err = pubsub.Unsubscribe(context.TODO(), tc.clientID, fmt.Sprintf("%s.%s", chansPrefix, tc.topic))
|
||||
|
@ -66,7 +66,7 @@ setup_mf() {
|
||||
fi
|
||||
done
|
||||
echo "Compile check for rabbitmq..."
|
||||
MF_BROKER_TYPE=rabbitmq make http
|
||||
MF_MESSAGE_BROKER_TYPE=rabbitmq make http
|
||||
make -j$NPROC
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ default values.
|
||||
| MF_TWINS_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
|
||||
| MF_TWINS_CA_CERTS | Path to trusted CAs in PEM format | |
|
||||
| MF_TWINS_CHANNEL_ID | Message broker notifications channel ID | |
|
||||
| MF_BROKER_URL | Mainflux Message broker URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Mainflux Message broker URL | nats://localhost:4222 |
|
||||
| MF_AUTH_GRPC_URL | Users service gRPC URL | localhost:7001 |
|
||||
| MF_AUTH_GRPC_TIMEOUT | Users service gRPC request timeout in seconds | 1s |
|
||||
| MF_TWINS_CACHE_URL | Cache database URL | localhost:6379 |
|
||||
@ -69,7 +69,7 @@ MF_THINGS_STANDALONE_TOKEN=[User token for standalone mode that should be passed
|
||||
MF_TWINS_CLIENT_TLS=[Flag that indicates if TLS should be turned on] \
|
||||
MF_TWINS_CA_CERTS=[Path to trusted CAs in PEM format] \
|
||||
MF_TWINS_CHANNEL_ID=[Message broker notifications channel ID] \
|
||||
MF_BROKER_URL=[Mainflux Message broker URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Mainflux Message broker URL] \
|
||||
MF_AUTH_GRPC_URL=[Users service gRPC URL] \
|
||||
MF_AUTH_GRPC_TIMEOUT=[Users service gRPC request timeout in seconds] \
|
||||
$GOBIN/mainflux-twins
|
||||
|
@ -19,7 +19,7 @@ default values.
|
||||
| MF_THINGS_AUTH_GRPC_TIMEOUT | Things service Auth gRPC request timeout in seconds | 1s |
|
||||
| MF_THINGS_AUTH_GRPC_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
|
||||
| MF_THINGS_AUTH_GRPC_CA_CERTS | Path to trusted CAs in PEM format | |
|
||||
| MF_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_MESSAGE_BROKER_URL | Message broker instance URL | nats://localhost:4222 |
|
||||
| MF_JAEGER_URL | Jaeger server URL | http://jaeger:14268/api/traces |
|
||||
| MF_SEND_TELEMETRY | Send telemetry to mainflux call home server | true |
|
||||
| MF_WS_ADAPTER_INSTANCE_ID | Service instance ID | "" |
|
||||
@ -53,7 +53,7 @@ MF_THINGS_AUTH_GRPC_URL=[Things service Auth gRPC URL] \
|
||||
MF_THINGS_AUTH_GRPC_TIMEOUT=[Things service Auth gRPC request timeout in seconds] \
|
||||
MF_THINGS_AUTH_GRPC_CLIENT_TLS=[Flag that indicates if TLS should be turned on] \
|
||||
MF_THINGS_AUTH_GRPC_CA_CERTS=[Path to trusted CAs in PEM format] \
|
||||
MF_BROKER_URL=[Message broker instance URL] \
|
||||
MF_MESSAGE_BROKER_URL=[Message broker instance URL] \
|
||||
MF_JAEGER_URL=[Jaeger server URL] \
|
||||
MF_SEND_TELEMETRY=[Send telemetry to mainflux call home server] \
|
||||
MF_WS_ADAPTER_INSTANCE_ID=[Service instance ID] \
|
||||
|
Loading…
x
Reference in New Issue
Block a user