From 43a263d7da08c8cc75c86b6d781669360a7aeee9 Mon Sep 17 00:00:00 2001 From: b1ackd0t <28790446+rodneyosodo@users.noreply.github.com> Date: Tue, 24 Oct 2023 18:22:53 +0300 Subject: [PATCH] NOISSUE - Introduce NATS Jetstream as Default ES (#1907) * Refactor message broker implementation This commit refactors the Nats message broker implementation to include pubsub options. These changes include: - Adding `Option` func that takes in the URL and prefix - Implement `WithStream` option which can create a different stream for nats stream - Implement `WithExchange` option which can create a different exchaange for rabbitmq channel - Implement `WithPrefix` option which allows to you change the publisher prefix These changes improve the organization and readability of the codebase. Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Use redis URL to configure username, password and db Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Make event store configurable on dev deployment Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Fix adds options to messaging `PubSub` interface Adding options to PubSub interface allows the use of messaging package to do es. The changes in this commit ensure that the code handles errors properly and provides more informative error messages when encountering unexpected types. Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Add NATS event publisher implementation This commit adds the implementation of the NATS event publisher. The NATS event publisher is responsible for publishing events to a NATS messaging system. It uses the `messaging` package to interact with the messaging system. The implementation includes the following features: - Publishing events to NATS using the `Publish` method. - Marshaling events to JSON before publishing. - Setting the message subject and headers based on the event. - Handling errors during publishing. This implementation is built with the `!rabbitmq` build tag, which means it will only be compiled if the `rabbitmq` build tag is not present. The NATS event publisher is part of the Mainflux events package and provides support for the Mainflux NATS events source service functionality. Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Add RabbitMQ event publisher implementation This commit adds the implementation of the RabbitMQ event publisher. The RabbitMQ event publisher is responsible for publishing events to a RabbitMQ messaging system. It uses the `messaging` package to interact with the messaging system. The implementation includes the following features: - Publishing events to RabbitMQ using the `Publish` method. - Marshaling events to JSON before publishing. - Setting the message subject and headers based on the event. - Handling errors during publishing. This implementation is built with the `rabbitmq` build tag, which means it will only be compiled if the `rabbitmq` build tag is present. The RabbitMQ event publisher is part of the Mainflux events package and provides support for the Mainflux RabbitMQ events source service functionality. Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Add configurable implementation for events store This commit adds a new file `brokers_*.go` which contains the implementation for the different event store. The file includes functions for creating a new publisher and subscriber using different es store. This commit also includes an `init` function that logs a message indicating that the binary was built using the respective package as the events store. The purpose of this commit is to add support for alternative events store. Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Fix build flags Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Refactor Makefile and Semaphore configuration The Makefile has been refactored to include the `MF_ES_STORE_TYPE` tag in the `go build` command. Additionally, the Semaphore configuration has been updated to include a new task for compiling with Redis as the broker type. This commit addresses the need to compile the codebase with Redis as the event store type and includes the necessary changes in the Makefile and Semaphore configuration. Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Reduced due to memory on testing Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Fix tests for es Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Fix grammar Co-authored-by: Sammy Kerata Oina <44265300+SammyOina@users.noreply.github.com> Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Fix linting Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * feat(docker): update environment variables for message broker The commit updates the environment variable `MF_ES_STORE_TYPE` in the `docker/.env` file. The variable is changed from `${MF_MQTT_BROKER_TYPE}` to `${MF_MESSAGE_BROKER_TYPE}` to accurately reflect the type of message broker being used. This change ensures that the correct message broker is configured for the Event Store. Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * feat: Update docker environment variables - Removed the unused MF_ES_URL variable in the .env file - Updated the MF_ES_STORE_TYPE and MF_ES_STORE_URL variables in the .env file to match the MF_MESSAGE_BROKER_TYPE and MF_NATS_URL variables respectively Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo * Fix after rebase Signed-off-by: rodneyosodo * Add godocs for option parameters for brokers Signed-off-by: rodneyosodo * pass by value exchange and prefix names Signed-off-by: rodneyosodo * Rename option functions Signed-off-by: rodneyosodo * move variables to constants Signed-off-by: rodneyosodo * fix: option example comment Signed-off-by: rodneyosodo --------- Signed-off-by: Rodney Osodo Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Signed-off-by: rodneyosodo Co-authored-by: Sammy Kerata Oina <44265300+SammyOina@users.noreply.github.com> --- .semaphore/semaphore.yml | 86 ++--- Makefile | 15 +- bootstrap/events/producer/streams.go | 4 +- cmd/bootstrap/main.go | 4 +- cmd/lora/main.go | 4 +- cmd/opcua/main.go | 4 +- docker/.env | 3 +- docker/addons/bootstrap/docker-compose.yml | 2 +- docker/addons/lora-adapter/docker-compose.yml | 2 +- .../addons/opcua-adapter/docker-compose.yml | 2 +- docker/addons/twins/docker-compose.yml | 2 +- docker/brokers/profiles/nats_rabbitmq.yml | 15 - docker/docker-compose.yml | 19 +- docker/es/docker-compose.yml | 14 + internal/groups/events/streams.go | 4 +- mqtt/events/streams.go | 4 +- pkg/events/events.go | 4 +- pkg/events/nats/doc.go | 8 + pkg/events/nats/publisher.go | 90 ++++++ pkg/events/nats/publisher_test.go | 302 ++++++++++++++++++ pkg/events/nats/setup_test.go | 97 ++++++ pkg/events/nats/subscriber.go | 145 +++++++++ pkg/events/rabbitmq/doc.go | 8 + pkg/events/rabbitmq/publisher.go | 111 +++++++ pkg/events/rabbitmq/publisher_test.go | 302 ++++++++++++++++++ pkg/events/rabbitmq/setup_test.go | 93 ++++++ pkg/events/rabbitmq/subscriber.go | 127 ++++++++ pkg/events/redis/publisher.go | 18 +- pkg/events/redis/publisher_test.go | 109 ++++++- pkg/events/redis/setup_test.go | 86 +++-- pkg/events/redis/subscriber.go | 3 + pkg/events/store/brokers_nats.go | 38 +++ pkg/events/store/brokers_rabbitmq.go | 38 +++ pkg/events/store/brokers_redis.go | 38 +++ pkg/messaging/brokers/brokers_nats.go | 8 +- pkg/messaging/brokers/brokers_rabbitmq.go | 14 +- pkg/messaging/nats/options.go | 56 ++++ pkg/messaging/nats/publisher.go | 41 ++- pkg/messaging/nats/pubsub.go | 13 +- pkg/messaging/nats/setup_test.go | 2 +- pkg/messaging/pubsub.go | 19 ++ pkg/messaging/rabbitmq/options.go | 60 ++++ pkg/messaging/rabbitmq/publisher.go | 33 +- pkg/messaging/rabbitmq/pubsub.go | 31 +- pkg/messaging/rabbitmq/pubsub_test.go | 2 +- pkg/messaging/rabbitmq/setup_test.go | 1 + scripts/ci.sh | 2 + things/events/streams.go | 4 +- twins/events/setup_test.go | 17 +- twins/events/streams.go | 4 +- users/events/streams.go | 4 +- 51 files changed, 1923 insertions(+), 189 deletions(-) delete mode 100644 docker/brokers/profiles/nats_rabbitmq.yml create mode 100644 docker/es/docker-compose.yml create mode 100644 pkg/events/nats/doc.go create mode 100644 pkg/events/nats/publisher.go create mode 100644 pkg/events/nats/publisher_test.go create mode 100644 pkg/events/nats/setup_test.go create mode 100644 pkg/events/nats/subscriber.go create mode 100644 pkg/events/rabbitmq/doc.go create mode 100644 pkg/events/rabbitmq/publisher.go create mode 100644 pkg/events/rabbitmq/publisher_test.go create mode 100644 pkg/events/rabbitmq/setup_test.go create mode 100644 pkg/events/rabbitmq/subscriber.go create mode 100644 pkg/events/store/brokers_nats.go create mode 100644 pkg/events/store/brokers_rabbitmq.go create mode 100644 pkg/events/store/brokers_redis.go create mode 100644 pkg/messaging/nats/options.go create mode 100644 pkg/messaging/rabbitmq/options.go diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 22779b55..a270f5f9 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -19,13 +19,13 @@ blocks: jobs: - name: Setup Codecov commands: - - 'curl -Os https://uploader.codecov.io/latest/linux/codecov' + - "curl -Os https://uploader.codecov.io/latest/linux/codecov" - chmod +x codecov - ./codecov - cache store codecov ./codecov - name: Setup Golangci-lint commands: - - 'curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.53.3' + - "curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.53.3" - cache store linter ./bin/golangci-lint secrets: - name: codecov @@ -50,10 +50,10 @@ blocks: - go install google.golang.org/protobuf/cmd/protoc-gen-go@$PROTOC_GEN_VERSION - go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@$PROTOC_GRPC_VERSION - + - export PATH=$PATH:/usr/local/bin/protoc - export PATH=$PATH:$HOME/go/bin - + - | echo "Setting up Mainflux..." for p in $(ls ./*.pb.go); do @@ -70,7 +70,7 @@ blocks: exit 1 fi done - - | + - | for p in $(ls pkg/messaging/*.pb.go); do if ! cmp -s $p $p.tmp; then echo "Proto file and generated Go file $p are out of sync!" @@ -89,7 +89,7 @@ blocks: commands: - cd users - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Users commands: - cd users @@ -110,7 +110,7 @@ blocks: commands: - make docker_users secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Things run: when: "change_in(['things', 'cmd/things', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -123,7 +123,7 @@ blocks: commands: - cd things - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Things commands: - cd things @@ -144,7 +144,7 @@ blocks: commands: - make docker_things secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test CoAP run: when: "change_in(['coap', 'cmd/coap', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -157,7 +157,7 @@ blocks: commands: - cd coap - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test CoAP commands: - cd coap @@ -178,7 +178,7 @@ blocks: commands: - make docker_coap secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test HTTP run: when: "change_in(['http', 'cmd/http', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -191,7 +191,7 @@ blocks: commands: - cd http - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test HTTP commands: - cd http @@ -213,7 +213,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/http:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test MQTT run: when: "change_in(['mqtt', 'cmd/mqtt', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -226,7 +226,7 @@ blocks: commands: - cd mqtt - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test MQTT commands: - cd mqtt @@ -248,7 +248,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/mqtt:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test WS run: when: "change_in(['ws', 'cmd/ws', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -261,7 +261,7 @@ blocks: commands: - cd ws - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test WS commands: - cd ws @@ -283,7 +283,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/ws:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Bootstrap run: when: "change_in(['bootstrap', 'cmd/bootstrap','things/policies/postgres/policies.go', 'things/policies/api/grpc/client.go'])" @@ -296,7 +296,7 @@ blocks: commands: - cd bootstrap - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Bootstrap commands: - cd bootstrap @@ -318,7 +318,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/bootstrap:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Certs run: when: "change_in(['certs', 'cmd/certs', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -331,7 +331,7 @@ blocks: commands: - cd certs - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Certs commands: - cd certs @@ -353,7 +353,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/certs:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Provision dependencies: - Setup @@ -366,7 +366,7 @@ blocks: commands: - cd provision - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Provision commands: - cd provision @@ -388,7 +388,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/http:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Twins run: when: "change_in(['twins', 'cmd/twins', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -401,7 +401,7 @@ blocks: commands: - cd twins - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Twins commands: - cd twins @@ -423,7 +423,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/twins:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Readers run: when: "change_in(['/readers', 'cmd/cassandra-reader', 'cmd/influxdb-reader', 'cmd/mongodb-reader', 'cmd/postgres-reader', 'cmd/timescale-reader', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -436,7 +436,7 @@ blocks: commands: - cd readers - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Twins commands: - cd readers @@ -462,7 +462,7 @@ blocks: - docker push mainflux/postgres-reader:latest - docker push mainflux/timescale-reader:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Consumers run: when: "change_in(['consumers', 'cmd/cassandra-writer', 'cmd/influxdb-writer', 'cmd/mongodb-writer', 'cmd/postgres-writer', 'cmd/timescale-writer', 'cmd/smpp-notifier', 'cmd/smtp-notifier', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -475,7 +475,7 @@ blocks: commands: - cd consumers - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Consumers commands: - cd consumers @@ -502,7 +502,7 @@ blocks: - docker push mainflux/timescale-writer:latest - docker push mainflux/smtp-notifier:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test CLI run: when: "change_in(['cli', 'cmd/cli'])" @@ -515,7 +515,7 @@ blocks: commands: - cd cli - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test CLI commands: - cd cli @@ -537,7 +537,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/cli:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test LoRa run: when: "change_in(['lora', 'cmd/lora'])" @@ -550,7 +550,7 @@ blocks: commands: - cd lora - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test LoRa commands: - cd lora @@ -572,7 +572,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/lora:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test OPC-UA run: when: "change_in(['/opcua', 'cmd/opcua', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -585,7 +585,7 @@ blocks: commands: - cd opcua - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test OPC-UA commands: - cd opcua @@ -607,7 +607,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/opcua:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Internal run: when: "change_in('/internal')" @@ -620,7 +620,7 @@ blocks: commands: - cd internal - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Internal commands: - cd internal @@ -640,7 +640,7 @@ blocks: commands: - cd logger - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Logger commands: - cd logger @@ -660,7 +660,7 @@ blocks: commands: - cd pkg - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test PKG commands: - cd pkg @@ -680,7 +680,7 @@ blocks: commands: - cd tools - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Tools commands: - cd tools @@ -697,3 +697,11 @@ blocks: commands: - MF_MESSAGE_BROKER_TYPE=rabbitmq make mqtt + - name: Compile Check For Redis + dependencies: + - Setup + task: + jobs: + - name: Compile For Redis + commands: + - MF_ES_STORE_TYPE=redis make mqtt diff --git a/Makefile b/Makefile index 398e08e2..eaf648fc 100644 --- a/Makefile +++ b/Makefile @@ -35,10 +35,15 @@ else MF_MQTT_BROKER_TYPE=nats endif +ifneq ($(MF_ES_STORE_TYPE),) + MF_ES_STORE_TYPE := $(MF_ES_STORE_TYPE) +else + MF_ES_STORE_TYPE=nats +endif define compile_service CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \ - go build -mod=vendor -tags $(MF_MESSAGE_BROKER_TYPE) -ldflags "-s -w \ + go build -mod=vendor -tags $(MF_MESSAGE_BROKER_TYPE) --tags $(MF_ES_STORE_TYPE) -ldflags "-s -w \ -X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \ -X 'github.com/mainflux/mainflux.Version=$(VERSION)' \ -X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \ @@ -226,7 +231,15 @@ else endif run: check_certs change_config +ifeq ($(MF_ES_STORE_TYPE), redis) + sed -i "s/MF_ES_STORE_TYPE=.*/MF_ES_STORE_TYPE=redis/" docker/.env + sed -i "s/MF_ES_STORE_URL=.*/MF_ES_STORE_URL=$$\{MF_REDIS_URL}/" docker/.env + docker-compose -f docker/docker-compose.yml --profile $(DOCKER_PROFILE) --profile redis -p $(DOCKER_PROJECT) $(DOCKER_COMPOSE_COMMAND) $(args) +else + sed -i "s,MF_ES_STORE_TYPE=.*,MF_ES_STORE_TYPE=$$\{MF_MESSAGE_BROKER_TYPE}," docker/.env + sed -i "s,MF_ES_STORE_URL=.*,MF_ES_STORE_URL=$$\{MF_$(shell echo ${MF_MESSAGE_BROKER_TYPE} | tr 'a-z' 'A-Z')_URL\}," docker/.env docker-compose -f docker/docker-compose.yml --profile $(DOCKER_PROFILE) -p $(DOCKER_PROJECT) $(DOCKER_COMPOSE_COMMAND) $(args) +endif run_addons: check_certs $(call change_config) diff --git a/bootstrap/events/producer/streams.go b/bootstrap/events/producer/streams.go index 461ebd3e..54e88cb7 100644 --- a/bootstrap/events/producer/streams.go +++ b/bootstrap/events/producer/streams.go @@ -8,7 +8,7 @@ import ( "github.com/mainflux/mainflux/bootstrap" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" ) const streamID = "mainflux.bootstrap" @@ -23,7 +23,7 @@ type eventStore struct { // NewEventStoreMiddleware returns wrapper around bootstrap service that sends // events to event store. func NewEventStoreMiddleware(ctx context.Context, svc bootstrap.Service, url string) (bootstrap.Service, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err } diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index 56053b4c..b22e8c5e 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -28,7 +28,7 @@ import ( "github.com/mainflux/mainflux/internal/server" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" mfsdk "github.com/mainflux/mainflux/pkg/sdk/go" "github.com/mainflux/mainflux/pkg/uuid" "go.opentelemetry.io/otel/trace" @@ -184,7 +184,7 @@ func newService(ctx context.Context, auth mainflux.AuthServiceClient, db *sqlx.D } func subscribeToThingsES(ctx context.Context, svc bootstrap.Service, cfg config, logger mflog.Logger) error { - subscriber, err := redis.NewSubscriber(cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) if err != nil { return err } diff --git a/cmd/lora/main.go b/cmd/lora/main.go index 20b64809..33cf00f2 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -26,7 +26,7 @@ import ( "github.com/mainflux/mainflux/lora/api" "github.com/mainflux/mainflux/lora/events" "github.com/mainflux/mainflux/lora/mqtt" - mfredis "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging/brokers" brokerstracing "github.com/mainflux/mainflux/pkg/messaging/brokers/tracing" @@ -195,7 +195,7 @@ func subscribeToLoRaBroker(svc lora.Service, mc mqttpaho.Client, timeout time.Du } func subscribeToThingsES(ctx context.Context, svc lora.Service, cfg config, logger mflog.Logger) error { - subscriber, err := mfredis.NewSubscriber(cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) if err != nil { return err } diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index 61489a50..30a8f87c 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -25,7 +25,7 @@ import ( "github.com/mainflux/mainflux/opcua/db" "github.com/mainflux/mainflux/opcua/events" "github.com/mainflux/mainflux/opcua/gopcua" - mfredis "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/pkg/messaging/brokers" brokerstracing "github.com/mainflux/mainflux/pkg/messaging/brokers/tracing" "github.com/mainflux/mainflux/pkg/uuid" @@ -178,7 +178,7 @@ func subscribeToStoredSubs(ctx context.Context, sub opcua.Subscriber, cfg opcua. } func subscribeToThingsES(ctx context.Context, svc opcua.Service, cfg config, logger mflog.Logger) error { - subscriber, err := mfredis.NewSubscriber(cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) if err != nil { return err } diff --git a/docker/.env b/docker/.env index c9e907b8..c1100ec6 100644 --- a/docker/.env +++ b/docker/.env @@ -52,7 +52,8 @@ MF_REDIS_TCP_PORT=6379 MF_REDIS_URL=redis://es-redis:${MF_REDIS_TCP_PORT}/0 ## Event Store -MF_ES_URL=${MF_REDIS_URL} +MF_ES_STORE_TYPE=${MF_MESSAGE_BROKER_TYPE} +MF_ES_STORE_URL=${MF_NATS_URL} ## Jaeger MF_JAEGER_PORT=6831 diff --git a/docker/addons/bootstrap/docker-compose.yml b/docker/addons/bootstrap/docker-compose.yml index 0e2c100f..1383aa91 100644 --- a/docker/addons/bootstrap/docker-compose.yml +++ b/docker/addons/bootstrap/docker-compose.yml @@ -41,7 +41,7 @@ services: MF_BOOTSTRAP_LOG_LEVEL: ${MF_BOOTSTRAP_LOG_LEVEL} MF_BOOTSTRAP_ENCRYPT_KEY: ${MF_BOOTSTRAP_ENCRYPT_KEY} MF_BOOTSTRAP_EVENT_CONSUMER: ${MF_BOOTSTRAP_EVENT_CONSUMER} - MF_BOOTSTRAP_ES_URL: ${MF_ES_URL} + MF_BOOTSTRAP_ES_URL: ${MF_ES_STORE_URL} MF_BOOTSTRAP_HTTP_HOST: ${MF_BOOTSTRAP_HTTP_HOST} MF_BOOTSTRAP_HTTP_PORT: ${MF_BOOTSTRAP_HTTP_PORT} MF_BOOTSTRAP_HTTP_SERVER_CERT: ${MF_BOOTSTRAP_HTTP_SERVER_CERT} diff --git a/docker/addons/lora-adapter/docker-compose.yml b/docker/addons/lora-adapter/docker-compose.yml index 5c18705f..99ab3cc7 100644 --- a/docker/addons/lora-adapter/docker-compose.yml +++ b/docker/addons/lora-adapter/docker-compose.yml @@ -36,7 +36,7 @@ services: MF_LORA_ADAPTER_HTTP_SERVER_CERT: ${MF_LORA_ADAPTER_HTTP_SERVER_CERT} MF_LORA_ADAPTER_HTTP_SERVER_KEY: ${MF_LORA_ADAPTER_HTTP_SERVER_KEY} MF_LORA_ADAPTER_ROUTE_MAP_URL: ${MF_LORA_ADAPTER_ROUTE_MAP_URL} - MF_LORA_ADAPTER_ES_URL: ${MF_ES_URL} + MF_LORA_ADAPTER_ES_URL: ${MF_ES_STORE_URL} MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} diff --git a/docker/addons/opcua-adapter/docker-compose.yml b/docker/addons/opcua-adapter/docker-compose.yml index c0bf5a0c..7bdfe44b 100644 --- a/docker/addons/opcua-adapter/docker-compose.yml +++ b/docker/addons/opcua-adapter/docker-compose.yml @@ -36,7 +36,7 @@ services: MF_OPCUA_ADAPTER_HTTP_PORT: ${MF_OPCUA_ADAPTER_HTTP_PORT} MF_OPCUA_ADAPTER_HTTP_SERVER_CERT: ${MF_OPCUA_ADAPTER_HTTP_SERVER_CERT} MF_OPCUA_ADAPTER_HTTP_SERVER_KEY: ${MF_OPCUA_ADAPTER_HTTP_SERVER_KEY} - MF_OPCUA_ADAPTER_ES_URL: ${MF_ES_URL} + MF_OPCUA_ADAPTER_ES_URL: ${MF_ES_STORE_URL} MF_OPCUA_ADAPTER_ROUTE_MAP_URL: ${MF_OPCUA_ADAPTER_ROUTE_MAP_URL} MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} diff --git a/docker/addons/twins/docker-compose.yml b/docker/addons/twins/docker-compose.yml index 336dde19..eb93683d 100644 --- a/docker/addons/twins/docker-compose.yml +++ b/docker/addons/twins/docker-compose.yml @@ -52,7 +52,7 @@ services: MF_TWINS_HTTP_SERVER_CERT: ${MF_TWINS_HTTP_SERVER_CERT} MF_TWINS_HTTP_SERVER_KEY: ${MF_TWINS_HTTP_SERVER_KEY} MF_TWINS_CACHE_URL: ${MF_TWINS_CACHE_URL} - MF_TWINS_ES_URL: ${MF_ES_URL} + MF_TWINS_ES_URL: ${MF_ES_STORE_URL} MF_THINGS_STANDALONE_ID: ${MF_THINGS_STANDALONE_ID} MF_THINGS_STANDALONE_TOKEN: ${MF_THINGS_STANDALONE_TOKEN} MF_TWINS_DB_HOST: ${MF_TWINS_DB_HOST} diff --git a/docker/brokers/profiles/nats_rabbitmq.yml b/docker/brokers/profiles/nats_rabbitmq.yml deleted file mode 100644 index f928ad3d..00000000 --- a/docker/brokers/profiles/nats_rabbitmq.yml +++ /dev/null @@ -1,15 +0,0 @@ -# This file is used to configure NATS broker. -# It used when running nats as an MQTT broker and RabbitMQ as a Message broker. -services: - nats: - extends: - file: ../nats.yml - service: broker - container_name: mainflux-nats - restart: on-failure - networks: - - mainflux-base-net - volumes: - - mainflux-mqtt-broker-volume:/data - profiles: - - nats_rabbitmq diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index b1aae17a..325331c6 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -12,15 +12,17 @@ volumes: mainflux-users-db-volume: mainflux-things-db-volume: mainflux-things-redis-volume: - mainflux-es-redis-volume: mainflux-mqtt-broker-volume: mainflux-broker-volume: + mainflux-es-volume: mainflux-spicedb-db-volume: mainflux-auth-db-volume: include: - path: brokers/docker-compose.yml env_file: docker/.env + - path: es/docker-compose.yml + env_file: docker/.env services: spicedb: @@ -190,7 +192,7 @@ services: MF_THINGS_AUTH_GRPC_SERVER_KEY: ${MF_THINGS_AUTH_GRPC_SERVER_KEY:+/things-grpc-server.key} MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/things-grpc-server-ca.crt} MF_THINGS_AUTH_GRPC_CLIENT_CA_CERTS: ${MF_THINGS_AUTH_GRPC_CLIENT_CA_CERTS:+/things-grpc-client-ca.crt} - MF_THINGS_ES_URL: ${MF_ES_URL} + MF_THINGS_ES_URL: ${MF_ES_STORE_URL} MF_THINGS_CACHE_URL: ${MF_THINGS_CACHE_URL} MF_THINGS_DB_HOST: ${MF_THINGS_DB_HOST} MF_THINGS_DB_PORT: ${MF_THINGS_DB_PORT} @@ -308,7 +310,7 @@ services: MF_EMAIL_FROM_ADDRESS: ${MF_EMAIL_FROM_ADDRESS} MF_EMAIL_FROM_NAME: ${MF_EMAIL_FROM_NAME} MF_EMAIL_TEMPLATE: ${MF_EMAIL_TEMPLATE} - MF_USERS_ES_URL: ${MF_ES_URL} + MF_USERS_ES_URL: ${MF_ES_STORE_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} MF_AUTH_GRPC_URL: ${MF_AUTH_GRPC_URL} @@ -373,7 +375,7 @@ services: MF_MQTT_ADAPTER_WS_TARGET_PORT: ${MF_MQTT_ADAPTER_WS_TARGET_PORT} MF_MQTT_ADAPTER_WS_TARGET_PATH: ${MF_MQTT_ADAPTER_WS_TARGET_PATH} MF_MQTT_ADAPTER_INSTANCE: ${MF_MQTT_ADAPTER_INSTANCE} - MF_MQTT_ADAPTER_ES_URL: ${MF_ES_URL} + MF_MQTT_ADAPTER_ES_URL: ${MF_ES_STORE_URL} MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} MF_THINGS_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/things-grpc-client.crt} @@ -446,15 +448,6 @@ services: bind: create_host_path: true - es-redis: - image: redis:7.2.0-alpine - container_name: mainflux-es-redis - restart: on-failure - networks: - - mainflux-base-net - volumes: - - mainflux-es-redis-volume:/data - coap-adapter: image: mainflux/coap:${MF_RELEASE_TAG} container_name: mainflux-coap diff --git a/docker/es/docker-compose.yml b/docker/es/docker-compose.yml new file mode 100644 index 00000000..93065d3e --- /dev/null +++ b/docker/es/docker-compose.yml @@ -0,0 +1,14 @@ +volumes: + mainflux-es-redis-volume: + +services: + es-redis: + image: redis:7.2.0-alpine + container_name: mainflux-es-redis + restart: on-failure + networks: + - mainflux-base-net + volumes: + - mainflux-es-volume:/data + profiles: + - redis diff --git a/internal/groups/events/streams.go b/internal/groups/events/streams.go index 485b9008..0881131b 100644 --- a/internal/groups/events/streams.go +++ b/internal/groups/events/streams.go @@ -7,7 +7,7 @@ import ( "context" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/pkg/groups" ) @@ -23,7 +23,7 @@ type eventStore struct { // NewEventStoreMiddleware returns wrapper around things service that sends // events to event store. func NewEventStoreMiddleware(ctx context.Context, svc groups.Service, url string) (groups.Service, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err } diff --git a/mqtt/events/streams.go b/mqtt/events/streams.go index 0f966b63..002137e0 100644 --- a/mqtt/events/streams.go +++ b/mqtt/events/streams.go @@ -7,7 +7,7 @@ import ( "context" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" ) const streamID = "mainflux.mqtt" @@ -26,7 +26,7 @@ type eventStore struct { // NewEventStore returns wrapper around mProxy service that sends // events to event store. func NewEventStore(ctx context.Context, url, instance string) (EventStore, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err } diff --git a/pkg/events/events.go b/pkg/events/events.go index 174e0796..2da70830 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -11,8 +11,8 @@ import ( const ( UnpublishedEventsCheckInterval = 1 * time.Minute ConnCheckInterval = 100 * time.Millisecond - MaxUnpublishedEvents uint64 = 1e6 - MaxEventStreamLen int64 = 1e9 + MaxUnpublishedEvents uint64 = 1e4 + MaxEventStreamLen int64 = 1e6 ) // Event represents an event. diff --git a/pkg/events/nats/doc.go b/pkg/events/nats/doc.go new file mode 100644 index 00000000..c0e7bf94 --- /dev/null +++ b/pkg/events/nats/doc.go @@ -0,0 +1,8 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package redis contains the domain concept definitions needed to support +// Mainflux redis events source service functionality. +// +// It provides the abstraction of the redis stream and its operations. +package nats diff --git a/pkg/events/nats/publisher.go b/pkg/events/nats/publisher.go new file mode 100644 index 00000000..785ab785 --- /dev/null +++ b/pkg/events/nats/publisher.go @@ -0,0 +1,90 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats + +import ( + "context" + "encoding/json" + "time" + + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/messaging" + broker "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// Max message payload size is 1MB. +var reconnectBufSize = 1024 * 1024 * int(events.MaxUnpublishedEvents) + +type pubEventStore struct { + url string + conn *nats.Conn + publisher messaging.Publisher + stream string +} + +func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { + conn, err := nats.Connect(url, nats.MaxReconnects(maxReconnects), nats.ReconnectBufSize(reconnectBufSize)) + if err != nil { + return nil, err + } + js, err := jetstream.New(conn) + if err != nil { + return nil, err + } + if _, err := js.CreateStream(ctx, jsStreamConfig); err != nil { + return nil, err + } + + publisher, err := broker.NewPublisher(ctx, url, broker.Prefix(eventsPrefix), broker.JSStream(js)) + if err != nil { + return nil, err + } + + es := &pubEventStore{ + url: url, + conn: conn, + publisher: publisher, + stream: stream, + } + + go es.StartPublishingRoutine(ctx) + + return es, nil +} + +func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error { + values, err := event.Encode() + if err != nil { + return err + } + values["occurred_at"] = time.Now().UnixNano() + + data, err := json.Marshal(values) + if err != nil { + return err + } + + record := &messaging.Message{ + Payload: data, + } + + return es.publisher.Publish(ctx, es.stream, record) +} + +func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) { + // Nats doesn't need to check for unpublished events + // since the events are published to a buffer. + // The buffer is flushed when the connection is reestablished. + // https://docs.nats.io/using-nats/developer/connecting/reconnect/buffer + + <-ctx.Done() +} + +func (es *pubEventStore) Close() error { + es.conn.Close() + + return es.publisher.Close() +} diff --git a/pkg/events/nats/publisher_test.go b/pkg/events/nats/publisher_test.go new file mode 100644 index 00000000..7c93163a --- /dev/null +++ b/pkg/events/nats/publisher_test.go @@ -0,0 +1,302 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats_test + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/events/nats" + "github.com/stretchr/testify/assert" +) + +var ( + streamTopic = "test-topic" + eventsChan = make(chan map[string]interface{}) + logger = mflog.NewMock() + errFailed = errors.New("failed") +) + +type testEvent struct { + Data map[string]interface{} +} + +func (te testEvent) Encode() (map[string]interface{}, error) { + data := make(map[string]interface{}) + for k, v := range te.Data { + switch v.(type) { + case string: + data[k] = v + case float64: + data[k] = v + default: + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + data[k] = string(b) + } + } + + return data, nil +} + +func TestPublish(t *testing.T) { + publisher, err := nats.NewPublisher(ctx, natsURL, stream) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + _, err = nats.NewSubscriber(ctx, "http://invaliurl.com", stream, consumer, logger) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + subcriber, err := nats.NewSubscriber(ctx, natsURL, stream, consumer, logger) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = subcriber.Subscribe(ctx, handler{}) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) + + cases := []struct { + desc string + event map[string]interface{} + err error + }{ + { + desc: "publish event successfully", + err: nil, + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": "Earth", + "status": "normal", + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "publish with nil event", + err: nil, + event: nil, + }, + { + desc: "publish event with invalid event location", + err: fmt.Errorf("json: unsupported type: chan int"), + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": make(chan int), + "status": "normal", + "timestamp": "invalid", + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "publish event with nested sting value", + err: nil, + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": map[string]string{ + "lat": fmt.Sprintf("%f", rand.Float64()), + "lng": fmt.Sprintf("%f", rand.Float64()), + }, + "status": "normal", + "timestamp": "invalid", + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + } + + for _, tc := range cases { + event := testEvent{Data: tc.event} + + err := publisher.Publish(ctx, event) + switch tc.err { + case nil: + assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) + + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(tc.event, "occurred_at") + } + + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"])) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"])) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"])) + assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"])) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"])) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"])) + + default: + assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) + } + } +} + +func TestUnavailablePublish(t *testing.T) { + client, err := startContainer() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on starting container: %s", err)) + + _, err = nats.NewPublisher(ctx, "http://invaliurl.com", stream) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + publisher, err := nats.NewPublisher(ctx, client.url, stream) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = client.pool.Client.PauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) + + spawnGoroutines(publisher, t) + + err = client.pool.Client.UnpauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) + + // Wait for the events to be published. + time.Sleep(events.UnpublishedEventsCheckInterval) + + err = publisher.Close() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) + + err = client.pool.Purge(client.container) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on purging container: %s", err)) +} + +func generateRandomEvent() testEvent { + return testEvent{ + Data: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), + "location": fmt.Sprintf("%f", rand.Float64()), + "status": fmt.Sprintf("%d", rand.Intn(1000)), + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + }, + } +} + +func spawnGoroutines(publisher events.Publisher, t *testing.T) { + for i := 0; i < 1e4; i++ { + go func() { + for i := 0; i < 10; i++ { + event := generateRandomEvent() + err := publisher.Publish(ctx, event) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + } + }() + } +} + +func TestPubsub(t *testing.T) { + subcases := []struct { + desc string + stream string + consumer string + errorMessage error + handler events.EventHandler + }{ + { + desc: "Subscribe to a stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + errorMessage: nats.ErrEmptyStream, + handler: handler{false}, + }, + { + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + errorMessage: nats.ErrEmptyStream, + handler: handler{false}, + }, + { + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: "", + errorMessage: nats.ErrEmptyConsumer, + handler: handler{false}, + }, + { + desc: "Subscribe to another stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{true}, + }, + } + + for _, pc := range subcases { + subcriber, err := nats.NewSubscriber(ctx, natsURL, pc.stream, pc.consumer, logger) + if err != nil { + assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + + continue + } + + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + + switch err := subcriber.Subscribe(context.TODO(), pc.handler); { + case err == nil: + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + default: + assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + } + + err = subcriber.Close() + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + } +} + +type handler struct { + fail bool +} + +func (h handler) Handle(_ context.Context, event events.Event) error { + if h.fail { + return errFailed + } + data, err := event.Encode() + if err != nil { + return err + } + + eventsChan <- data + + return nil +} diff --git a/pkg/events/nats/setup_test.go b/pkg/events/nats/setup_test.go new file mode 100644 index 00000000..6304b7b6 --- /dev/null +++ b/pkg/events/nats/setup_test.go @@ -0,0 +1,97 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats_test + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "testing" + + "github.com/mainflux/mainflux/pkg/events/nats" + "github.com/ory/dockertest/v3" +) + +type client struct { + url string + pool *dockertest.Pool + container *dockertest.Resource +} + +var ( + natsURL string + stream = "tests.events" + consumer = "tests-consumer" + ctx = context.Background() +) + +func TestMain(m *testing.M) { + client, err := startContainer() + if err != nil { + log.Fatalf(err.Error()) + } + natsURL = client.url + + code := m.Run() + + if err := client.pool.Purge(client.container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + + os.Exit(code) +} + +func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + go func() { + <-c + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + os.Exit(0) + }() +} + +func startContainer() (client, error) { + var cli client + var err error + cli.pool, err = dockertest.NewPool("") + if err != nil { + return client{}, fmt.Errorf("Could not connect to docker: %s", err) + } + + cli.container, err = cli.pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "nats", + Tag: "2.9.21-alpine", + Cmd: []string{"-DVV", "-js"}, + }) + if err != nil { + return client{}, fmt.Errorf("Could not start container: %s", err) + } + + handleInterrupt(cli.pool, cli.container) + + cli.url = fmt.Sprintf("nats://%s:%s", "localhost", cli.container.GetPort("4222/tcp")) + + if err := cli.pool.Retry(func() error { + _, err = nats.NewPublisher(ctx, cli.url, stream) + return err + }); err != nil { + return client{}, fmt.Errorf("Could not connect to docker: %s", err) + } + + if err := cli.pool.Retry(func() error { + _, err = nats.NewSubscriber(ctx, cli.url, stream, consumer, logger) + return err + }); err != nil { + return client{}, fmt.Errorf("Could not connect to docker: %s", err) + } + + return cli, nil +} diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go new file mode 100644 index 00000000..7dfa5d42 --- /dev/null +++ b/pkg/events/nats/subscriber.go @@ -0,0 +1,145 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/messaging" + broker "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +const ( + maxReconnects = -1 +) + +var _ events.Subscriber = (*subEventStore)(nil) + +var ( + eventsPrefix = "events" + + jsStreamConfig = jetstream.StreamConfig{ + Name: "events", + Description: "Mainflux stream for sending and receiving messages in between Mainflux events", + Subjects: []string{"events.>"}, + Retention: jetstream.LimitsPolicy, + MaxMsgsPerSubject: 1e9, + MaxAge: time.Hour * 24, + MaxMsgSize: 1024 * 1024, + Discard: jetstream.DiscardOld, + Storage: jetstream.FileStorage, + } + + // ErrEmptyStream is returned when stream name is empty. + ErrEmptyStream = errors.New("stream name cannot be empty") + + // ErrEmptyConsumer is returned when consumer name is empty. + ErrEmptyConsumer = errors.New("consumer name cannot be empty") +) + +type subEventStore struct { + conn *nats.Conn + pubsub messaging.PubSub + stream string + consumer string + logger mflog.Logger +} + +func NewSubscriber(ctx context.Context, url, stream, consumer string, logger mflog.Logger) (events.Subscriber, error) { + if stream == "" { + return nil, ErrEmptyStream + } + + if consumer == "" { + return nil, ErrEmptyConsumer + } + + conn, err := nats.Connect(url, nats.MaxReconnects(maxReconnects)) + if err != nil { + return nil, err + } + js, err := jetstream.New(conn) + if err != nil { + return nil, err + } + jsStream, err := js.CreateStream(ctx, jsStreamConfig) + if err != nil { + return nil, err + } + + pubsub, err := broker.NewPubSub(ctx, url, logger, broker.Stream(jsStream)) + if err != nil { + return nil, err + } + + return &subEventStore{ + conn: conn, + pubsub: pubsub, + stream: stream, + consumer: consumer, + logger: logger, + }, nil +} + +func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error { + subCfg := messaging.SubscriberConfig{ + ID: es.consumer, + Topic: eventsPrefix + "." + es.stream, + Handler: &eventHandler{ + handler: handler, + ctx: ctx, + logger: es.logger, + }, + DeliveryPolicy: messaging.DeliverNewPolicy, + } + + return es.pubsub.Subscribe(ctx, subCfg) +} + +func (es *subEventStore) Close() error { + es.conn.Close() + return es.pubsub.Close() +} + +type event struct { + Data map[string]interface{} +} + +func (re event) Encode() (map[string]interface{}, error) { + return re.Data, nil +} + +type eventHandler struct { + handler events.EventHandler + ctx context.Context + logger mflog.Logger +} + +func (eh *eventHandler) Handle(msg *messaging.Message) error { + event := event{ + Data: make(map[string]interface{}), + } + + if err := json.Unmarshal(msg.GetPayload(), &event.Data); err != nil { + return err + } + + if err := eh.handler.Handle(eh.ctx, event); err != nil { + eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + } + + return nil +} + +func (eh *eventHandler) Cancel() error { + return nil +} diff --git a/pkg/events/rabbitmq/doc.go b/pkg/events/rabbitmq/doc.go new file mode 100644 index 00000000..b5eef9fb --- /dev/null +++ b/pkg/events/rabbitmq/doc.go @@ -0,0 +1,8 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package redis contains the domain concept definitions needed to support +// Mainflux redis events source service functionality. +// +// It provides the abstraction of the redis stream and its operations. +package rabbitmq diff --git a/pkg/events/rabbitmq/publisher.go b/pkg/events/rabbitmq/publisher.go new file mode 100644 index 00000000..a67c177f --- /dev/null +++ b/pkg/events/rabbitmq/publisher.go @@ -0,0 +1,111 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/messaging" + broker "github.com/mainflux/mainflux/pkg/messaging/rabbitmq" + amqp "github.com/rabbitmq/amqp091-go" +) + +type pubEventStore struct { + conn *amqp.Connection + publisher messaging.Publisher + unpublishedEvents chan amqp.Return + stream string + mu sync.Mutex +} + +func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { + conn, err := amqp.Dial(url) + if err != nil { + return nil, err + } + ch, err := conn.Channel() + if err != nil { + return nil, err + } + if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil); err != nil { + return nil, err + } + + publisher, err := broker.NewPublisher(url, broker.Prefix(eventsPrefix), broker.Exchange(exchangeName), broker.Channel(ch)) + if err != nil { + return nil, err + } + + es := &pubEventStore{ + conn: conn, + publisher: publisher, + unpublishedEvents: make(chan amqp.Return, events.MaxUnpublishedEvents), + stream: stream, + } + + ch.NotifyReturn(es.unpublishedEvents) + + go es.StartPublishingRoutine(ctx) + + return es, nil +} + +func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error { + values, err := event.Encode() + if err != nil { + return err + } + values["occurred_at"] = time.Now().UnixNano() + + data, err := json.Marshal(values) + if err != nil { + return err + } + + record := &messaging.Message{ + Payload: data, + } + + return es.publisher.Publish(ctx, es.stream, record) +} + +func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) { + defer close(es.unpublishedEvents) + + ticker := time.NewTicker(events.UnpublishedEventsCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if ok := es.conn.IsClosed(); !ok { + es.mu.Lock() + for i := len(es.unpublishedEvents) - 1; i >= 0; i-- { + record := <-es.unpublishedEvents + msg := &messaging.Message{ + Payload: record.Body, + } + if err := es.publisher.Publish(ctx, es.stream, msg); err != nil { + es.unpublishedEvents <- record + + break + } + } + es.mu.Unlock() + } + case <-ctx.Done(): + return + } + } +} + +func (es *pubEventStore) Close() error { + es.conn.Close() + + return es.publisher.Close() +} diff --git a/pkg/events/rabbitmq/publisher_test.go b/pkg/events/rabbitmq/publisher_test.go new file mode 100644 index 00000000..1ed51c5d --- /dev/null +++ b/pkg/events/rabbitmq/publisher_test.go @@ -0,0 +1,302 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq_test + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/events/rabbitmq" + "github.com/stretchr/testify/assert" +) + +var ( + streamTopic = "test-topic" + eventsChan = make(chan map[string]interface{}) + logger = mflog.NewMock() + errFailed = errors.New("failed") +) + +type testEvent struct { + Data map[string]interface{} +} + +func (te testEvent) Encode() (map[string]interface{}, error) { + data := make(map[string]interface{}) + for k, v := range te.Data { + switch v.(type) { + case string: + data[k] = v + case float64: + data[k] = v + default: + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + data[k] = string(b) + } + } + + return data, nil +} + +func TestPublish(t *testing.T) { + publisher, err := rabbitmq.NewPublisher(ctx, rabbitmqURL, stream) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + _, err = rabbitmq.NewSubscriber("http://invaliurl.com", stream, consumer, logger) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = subcriber.Subscribe(ctx, handler{}) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) + + cases := []struct { + desc string + event map[string]interface{} + err error + }{ + { + desc: "publish event successfully", + err: nil, + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": "Earth", + "status": "normal", + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "publish with nil event", + err: nil, + event: nil, + }, + { + desc: "publish event with invalid event location", + err: fmt.Errorf("json: unsupported type: chan int"), + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": make(chan int), + "status": "normal", + "timestamp": "invalid", + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "publish event with nested sting value", + err: nil, + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": map[string]string{ + "lat": fmt.Sprintf("%f", rand.Float64()), + "lng": fmt.Sprintf("%f", rand.Float64()), + }, + "status": "normal", + "timestamp": "invalid", + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + } + + for _, tc := range cases { + event := testEvent{Data: tc.event} + + err := publisher.Publish(ctx, event) + switch tc.err { + case nil: + assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) + + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(tc.event, "occurred_at") + } + + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"])) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"])) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"])) + assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"])) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"])) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"])) + + default: + assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) + } + } +} + +func TestUnavailablePublish(t *testing.T) { + client, err := startContainer() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on starting container: %s", err)) + + _, err = rabbitmq.NewPublisher(ctx, "http://invaliurl.com", stream) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + publisher, err := rabbitmq.NewPublisher(ctx, client.url, stream) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = client.pool.Client.PauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) + + spawnGoroutines(publisher, t) + + err = client.pool.Client.UnpauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) + + // Wait for the events to be published. + time.Sleep(2 * events.UnpublishedEventsCheckInterval) + + err = publisher.Close() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) + + err = client.pool.Purge(client.container) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on purging container: %s", err)) +} + +func generateRandomEvent() testEvent { + return testEvent{ + Data: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), + "location": fmt.Sprintf("%f", rand.Float64()), + "status": fmt.Sprintf("%d", rand.Intn(1000)), + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + }, + } +} + +func spawnGoroutines(publisher events.Publisher, t *testing.T) { + for i := 0; i < 1e4; i++ { + go func() { + for i := 0; i < 10; i++ { + event := generateRandomEvent() + err := publisher.Publish(ctx, event) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + } + }() + } +} + +func TestPubsub(t *testing.T) { + subcases := []struct { + desc string + stream string + consumer string + errorMessage error + handler events.EventHandler + }{ + { + desc: "Subscribe to a stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + errorMessage: rabbitmq.ErrEmptyStream, + handler: handler{false}, + }, + { + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + errorMessage: rabbitmq.ErrEmptyStream, + handler: handler{false}, + }, + { + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: "", + errorMessage: rabbitmq.ErrEmptyConsumer, + handler: handler{false}, + }, + { + desc: "Subscribe to another stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{true}, + }, + } + + for _, pc := range subcases { + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, pc.stream, pc.consumer, logger) + if err != nil { + assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + + continue + } + + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + + switch err := subcriber.Subscribe(ctx, pc.handler); { + case err == nil: + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + default: + assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + } + + err = subcriber.Close() + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + } +} + +type handler struct { + fail bool +} + +func (h handler) Handle(_ context.Context, event events.Event) error { + if h.fail { + return errFailed + } + data, err := event.Encode() + if err != nil { + return err + } + + eventsChan <- data + + return nil +} diff --git a/pkg/events/rabbitmq/setup_test.go b/pkg/events/rabbitmq/setup_test.go new file mode 100644 index 00000000..bd7babe4 --- /dev/null +++ b/pkg/events/rabbitmq/setup_test.go @@ -0,0 +1,93 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq_test + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "testing" + + "github.com/mainflux/mainflux/pkg/events/rabbitmq" + "github.com/ory/dockertest/v3" +) + +type client struct { + url string + pool *dockertest.Pool + container *dockertest.Resource +} + +var ( + rabbitmqURL string + stream = "tests.events" + consumer = "tests-consumer" + ctx = context.TODO() +) + +func TestMain(m *testing.M) { + client, err := startContainer() + if err != nil { + log.Fatalf(err.Error()) + } + rabbitmqURL = client.url + + code := m.Run() + + if err := client.pool.Purge(client.container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + + os.Exit(code) +} + +func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + go func() { + <-c + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + os.Exit(0) + }() +} + +func startContainer() (client, error) { + var cli client + var err error + cli.pool, err = dockertest.NewPool("") + if err != nil { + return client{}, fmt.Errorf("Could not connect to docker: %s", err) + } + + cli.container, err = cli.pool.Run("rabbitmq", "3.9.20", []string{}) + if err != nil { + log.Fatalf("Could not start container: %s", err) + } + + handleInterrupt(cli.pool, cli.container) + + cli.url = fmt.Sprintf("amqp://%s:%s", "localhost", cli.container.GetPort("5672/tcp")) + + if err := cli.pool.Retry(func() error { + _, err = rabbitmq.NewPublisher(ctx, cli.url, stream) + return err + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + if err := cli.pool.Retry(func() error { + _, err = rabbitmq.NewSubscriber(cli.url, stream, consumer, logger) + return err + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + return cli, nil +} diff --git a/pkg/events/rabbitmq/subscriber.go b/pkg/events/rabbitmq/subscriber.go new file mode 100644 index 00000000..4a44c258 --- /dev/null +++ b/pkg/events/rabbitmq/subscriber.go @@ -0,0 +1,127 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/messaging" + broker "github.com/mainflux/mainflux/pkg/messaging/rabbitmq" + amqp "github.com/rabbitmq/amqp091-go" +) + +var _ events.Subscriber = (*subEventStore)(nil) + +var ( + exchangeName = "events" + eventsPrefix = "events" + + // ErrEmptyStream is returned when stream name is empty. + ErrEmptyStream = errors.New("stream name cannot be empty") + + // ErrEmptyConsumer is returned when consumer name is empty. + ErrEmptyConsumer = errors.New("consumer name cannot be empty") +) + +type subEventStore struct { + conn *amqp.Connection + pubsub messaging.PubSub + stream string + consumer string + logger mflog.Logger +} + +func NewSubscriber(url, stream, consumer string, logger mflog.Logger) (events.Subscriber, error) { + if stream == "" { + return nil, ErrEmptyStream + } + + if consumer == "" { + return nil, ErrEmptyConsumer + } + + conn, err := amqp.Dial(url) + if err != nil { + return nil, err + } + ch, err := conn.Channel() + if err != nil { + return nil, err + } + if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil); err != nil { + return nil, err + } + + pubsub, err := broker.NewPubSub(url, logger, broker.Channel(ch), broker.Exchange(exchangeName)) + if err != nil { + return nil, err + } + + return &subEventStore{ + conn: conn, + pubsub: pubsub, + stream: stream, + consumer: consumer, + logger: logger, + }, nil +} + +func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error { + subCfg := messaging.SubscriberConfig{ + ID: es.consumer, + Topic: eventsPrefix + "." + es.stream, + Handler: &eventHandler{ + handler: handler, + ctx: ctx, + logger: es.logger, + }, + DeliveryPolicy: messaging.DeliverNewPolicy, + } + + return es.pubsub.Subscribe(ctx, subCfg) +} + +func (es *subEventStore) Close() error { + es.conn.Close() + return es.pubsub.Close() +} + +type event struct { + Data map[string]interface{} +} + +func (re event) Encode() (map[string]interface{}, error) { + return re.Data, nil +} + +type eventHandler struct { + handler events.EventHandler + ctx context.Context + logger mflog.Logger +} + +func (eh *eventHandler) Handle(msg *messaging.Message) error { + event := event{ + Data: make(map[string]interface{}), + } + + if err := json.Unmarshal(msg.GetPayload(), &event.Data); err != nil { + return err + } + + if err := eh.handler.Handle(eh.ctx, event); err != nil { + eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + } + + return nil +} + +func (eh *eventHandler) Cancel() error { + return nil +} diff --git a/pkg/events/redis/publisher.go b/pkg/events/redis/publisher.go index 570925dc..4dfb40f1 100644 --- a/pkg/events/redis/publisher.go +++ b/pkg/events/redis/publisher.go @@ -1,6 +1,9 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 +//go:build !nats && !rabbitmq +// +build !nats,!rabbitmq + package redis import ( @@ -50,21 +53,22 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error Values: values, } - if err := es.checkRedisConnection(ctx); err != nil { + switch err := es.checkRedisConnection(ctx); err { + case nil: + return es.client.XAdd(ctx, record).Err() + default: es.mu.Lock() defer es.mu.Unlock() - select { - case es.unpublishedEvents <- record: - default: - // If the channel is full (rarely happens), drop the events. + // If the channel is full (rarely happens), drop the events. + if len(es.unpublishedEvents) == int(events.MaxUnpublishedEvents) { return nil } + es.unpublishedEvents <- record + return nil } - - return es.client.XAdd(ctx, record).Err() } func (es *pubEventStore) startPublishingRoutine(ctx context.Context) { diff --git a/pkg/events/redis/publisher_test.go b/pkg/events/redis/publisher_test.go index 3a698d7a..c2cb897d 100644 --- a/pkg/events/redis/publisher_test.go +++ b/pkg/events/redis/publisher_test.go @@ -1,6 +1,9 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 +//go:build !nats && !rabbitmq +// +build !nats,!rabbitmq + package redis_test import ( @@ -26,6 +29,7 @@ var ( eventsChan = make(chan map[string]interface{}) logger = mflog.NewMock() errFailed = errors.New("failed") + ctx = context.TODO() ) type testEvent struct { @@ -53,16 +57,19 @@ func (te testEvent) Encode() (map[string]interface{}, error) { } func TestPublish(t *testing.T) { - err := redisClient.FlushAll(context.Background()).Err() + err := redisClient.FlushAll(ctx).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) - publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName) + publisher, err := redis.NewPublisher(ctx, redisURL, streamName) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - subcriber, err := redis.NewSubscriber(redisURL, streamName, consumer, logger) + subcriber, err := redis.NewSubscriber("http://invaliurl.com", streamName, consumer, logger) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + subcriber, err = redis.NewSubscriber(redisURL, streamName, consumer, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + err = subcriber.Subscribe(ctx, handler{}) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -125,7 +132,7 @@ func TestPublish(t *testing.T) { for _, tc := range cases { event := testEvent{Data: tc.event} - err := publisher.Publish(context.Background(), event) + err := publisher.Publish(ctx, event) switch tc.err { case nil: assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) @@ -152,44 +159,127 @@ func TestPublish(t *testing.T) { } } +func TestUnavailablePublish(t *testing.T) { + client, err := startContainer() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on starting container: %s", err)) + + err = client.Client.FlushAll(ctx).Err() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) + + publisher, err := redis.NewPublisher(ctx, "http://invaliurl.com", streamName) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + publisher, err = redis.NewPublisher(ctx, client.url, streamName) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = client.pool.Client.PauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) + + spawnGoroutines(publisher, t) + + err = client.pool.Client.UnpauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) + + // Wait for the events to be published. + time.Sleep(events.UnpublishedEventsCheckInterval) + + err = publisher.Close() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) + + err = client.pool.Purge(client.container) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on purging container: %s", err)) +} + +func generateRandomEvent() testEvent { + return testEvent{ + Data: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), + "location": fmt.Sprintf("%f", rand.Float64()), + "status": fmt.Sprintf("%d", rand.Intn(1000)), + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + }, + } +} + +func spawnGoroutines(publisher events.Publisher, t *testing.T) { + for i := 0; i < 1e4; i++ { + go func() { + for i := 0; i < 10; i++ { + event := generateRandomEvent() + err := publisher.Publish(ctx, event) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + } + }() + } +} + func TestPubsub(t *testing.T) { - err := redisClient.FlushAll(context.Background()).Err() + err := redisClient.FlushAll(ctx).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) subcases := []struct { desc string stream string + consumer string errorMessage error handler events.EventHandler }{ { desc: "Subscribe to a stream", stream: fmt.Sprintf("%s.%s", streamName, streamTopic), + consumer: consumer, errorMessage: nil, handler: handler{false}, }, { desc: "Subscribe to the same stream", stream: fmt.Sprintf("%s.%s", streamName, streamTopic), + consumer: consumer, errorMessage: nil, handler: handler{false}, }, { - desc: "Subscribe to an empty stream", + desc: "Subscribe to an empty stream with an empty consumer", stream: "", + consumer: "", errorMessage: redis.ErrEmptyStream, handler: handler{false}, }, + { + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + errorMessage: redis.ErrEmptyStream, + handler: handler{false}, + }, + { + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("%s.%s", streamName, streamTopic), + consumer: "", + errorMessage: redis.ErrEmptyConsumer, + handler: handler{false}, + }, { desc: "Subscribe to another stream", stream: fmt.Sprintf("%s.%s", streamName, streamTopic+"1"), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("%s.%s", streamName, streamTopic), + consumer: consumer, errorMessage: nil, handler: handler{true}, }, } for _, pc := range subcases { - subcriber, err := redis.NewSubscriber(redisURL, pc.stream, consumer, logger) + subcriber, err := redis.NewSubscriber(redisURL, pc.stream, pc.consumer, logger) if err != nil { assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) @@ -204,6 +294,9 @@ func TestPubsub(t *testing.T) { default: assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) } + + err = subcriber.Close() + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) } } diff --git a/pkg/events/redis/setup_test.go b/pkg/events/redis/setup_test.go index cdc77d4b..f733987f 100644 --- a/pkg/events/redis/setup_test.go +++ b/pkg/events/redis/setup_test.go @@ -1,54 +1,94 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 +//go:build !nats && !rabbitmq +// +build !nats,!rabbitmq + package redis_test import ( - "context" "fmt" "log" "os" + "os/signal" + "syscall" "testing" "github.com/go-redis/redis/v8" "github.com/ory/dockertest/v3" ) +type client struct { + *redis.Client + url string + pool *dockertest.Pool + container *dockertest.Resource +} + var ( redisClient *redis.Client redisURL string ) func TestMain(m *testing.M) { - pool, err := dockertest.NewPool("") + client, err := startContainer() if err != nil { - log.Fatalf("Could not connect to docker: %s", err) - } - - container, err := pool.Run("redis", "7.2.0-alpine", nil) - if err != nil { - log.Fatalf("Could not start container: %s", err) - } - - redisURL = fmt.Sprintf("redis://localhost:%s/0", container.GetPort("6379/tcp")) - opts, err := redis.ParseURL(redisURL) - if err != nil { - log.Fatalf("Could not parse redis URL: %s", err) - } - - if err := pool.Retry(func() error { - redisClient = redis.NewClient(opts) - - return redisClient.Ping(context.Background()).Err() - }); err != nil { - log.Fatalf("Could not connect to docker: %s", err) + log.Fatalf(err.Error()) } + redisClient = client.Client + redisURL = client.url code := m.Run() - if err := pool.Purge(container); err != nil { + if err := client.pool.Purge(client.container); err != nil { log.Fatalf("Could not purge container: %s", err) } os.Exit(code) } + +func startContainer() (client, error) { + var cli client + pool, err := dockertest.NewPool("") + if err != nil { + return client{}, fmt.Errorf("Could not connect to docker: %s", err) + } + cli.pool = pool + + container, err := cli.pool.Run("redis", "7.2.0-alpine", nil) + if err != nil { + return client{}, fmt.Errorf("Could not start container: %s", err) + } + cli.container = container + + handleInterrupt(cli.pool, cli.container) + + cli.url = fmt.Sprintf("redis://localhost:%s/0", cli.container.GetPort("6379/tcp")) + opts, err := redis.ParseURL(cli.url) + if err != nil { + return client{}, fmt.Errorf("Could not parse redis URL: %s", err) + } + + if err := pool.Retry(func() error { + cli.Client = redis.NewClient(opts) + + return cli.Client.Ping(ctx).Err() + }); err != nil { + return client{}, fmt.Errorf("Could not connect to docker: %s", err) + } + + return cli, nil +} + +func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + go func() { + <-c + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + os.Exit(0) + }() +} diff --git a/pkg/events/redis/subscriber.go b/pkg/events/redis/subscriber.go index e5c4d198..f4767611 100644 --- a/pkg/events/redis/subscriber.go +++ b/pkg/events/redis/subscriber.go @@ -1,6 +1,9 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 +//go:build !nats && !rabbitmq +// +build !nats,!rabbitmq + package redis import ( diff --git a/pkg/events/store/brokers_nats.go b/pkg/events/store/brokers_nats.go new file mode 100644 index 00000000..cde4a8bf --- /dev/null +++ b/pkg/events/store/brokers_nats.go @@ -0,0 +1,38 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +//go:build nats +// +build nats + +package store + +import ( + "context" + "log" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/events/nats" +) + +func init() { + log.Println("The binary was build using nats as the events store") +} + +func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { + pb, err := nats.NewPublisher(ctx, url, stream) + if err != nil { + return nil, err + } + + return pb, nil +} + +func NewSubscriber(ctx context.Context, url, stream, consumer string, logger mflog.Logger) (events.Subscriber, error) { + pb, err := nats.NewSubscriber(ctx, url, stream, consumer, logger) + if err != nil { + return nil, err + } + + return pb, nil +} diff --git a/pkg/events/store/brokers_rabbitmq.go b/pkg/events/store/brokers_rabbitmq.go new file mode 100644 index 00000000..c190543b --- /dev/null +++ b/pkg/events/store/brokers_rabbitmq.go @@ -0,0 +1,38 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +//go:build rabbitmq +// +build rabbitmq + +package store + +import ( + "context" + "log" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/events/rabbitmq" +) + +func init() { + log.Println("The binary was build using rabbitmq as the events store") +} + +func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { + pb, err := rabbitmq.NewPublisher(ctx, url, stream) + if err != nil { + return nil, err + } + + return pb, nil +} + +func NewSubscriber(_ context.Context, url, stream, consumer string, logger mflog.Logger) (events.Subscriber, error) { + pb, err := rabbitmq.NewSubscriber(url, stream, consumer, logger) + if err != nil { + return nil, err + } + + return pb, nil +} diff --git a/pkg/events/store/brokers_redis.go b/pkg/events/store/brokers_redis.go new file mode 100644 index 00000000..8a824384 --- /dev/null +++ b/pkg/events/store/brokers_redis.go @@ -0,0 +1,38 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +//go:build !nats && !rabbitmq +// +build !nats,!rabbitmq + +package store + +import ( + "context" + "log" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/events/redis" +) + +func init() { + log.Println("The binary was build using redis as the events store") +} + +func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { + pb, err := redis.NewPublisher(ctx, url, stream) + if err != nil { + return nil, err + } + + return pb, nil +} + +func NewSubscriber(_ context.Context, url, stream, consumer string, logger mflog.Logger) (events.Subscriber, error) { + pb, err := redis.NewSubscriber(url, stream, consumer, logger) + if err != nil { + return nil, err + } + + return pb, nil +} diff --git a/pkg/messaging/brokers/brokers_nats.go b/pkg/messaging/brokers/brokers_nats.go index b8a15696..8a787a1f 100644 --- a/pkg/messaging/brokers/brokers_nats.go +++ b/pkg/messaging/brokers/brokers_nats.go @@ -22,8 +22,8 @@ func init() { log.Println("The binary was build using Nats as the message broker") } -func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) { - pb, err := nats.NewPublisher(ctx, url) +func NewPublisher(ctx context.Context, url string, opts ...messaging.Option) (messaging.Publisher, error) { + pb, err := nats.NewPublisher(ctx, url, opts...) if err != nil { return nil, err } @@ -31,8 +31,8 @@ func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) return pb, nil } -func NewPubSub(ctx context.Context, url string, logger mflog.Logger) (messaging.PubSub, error) { - pb, err := nats.NewPubSub(ctx, url, logger) +func NewPubSub(ctx context.Context, url string, logger mflog.Logger, opts ...messaging.Option) (messaging.PubSub, error) { + pb, err := nats.NewPubSub(ctx, url, logger, opts...) if err != nil { return nil, err } diff --git a/pkg/messaging/brokers/brokers_rabbitmq.go b/pkg/messaging/brokers/brokers_rabbitmq.go index 536279e6..215ae374 100644 --- a/pkg/messaging/brokers/brokers_rabbitmq.go +++ b/pkg/messaging/brokers/brokers_rabbitmq.go @@ -1,9 +1,9 @@ -//go:build rabbitmq -// +build rabbitmq - // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 +//go:build rabbitmq +// +build rabbitmq + package brokers import ( @@ -22,8 +22,8 @@ func init() { log.Println("The binary was build using RabbitMQ as the message broker") } -func NewPublisher(_ context.Context, url string) (messaging.Publisher, error) { - pb, err := rabbitmq.NewPublisher(url) +func NewPublisher(_ context.Context, url string, opts ...messaging.Option) (messaging.Publisher, error) { + pb, err := rabbitmq.NewPublisher(url, opts...) if err != nil { return nil, err } @@ -31,8 +31,8 @@ func NewPublisher(_ context.Context, url string) (messaging.Publisher, error) { return pb, nil } -func NewPubSub(_ context.Context, url string, logger mflog.Logger) (messaging.PubSub, error) { - pb, err := rabbitmq.NewPubSub(url, logger) +func NewPubSub(_ context.Context, url string, logger mflog.Logger, opts ...messaging.Option) (messaging.PubSub, error) { + pb, err := rabbitmq.NewPubSub(url, logger, opts...) if err != nil { return nil, err } diff --git a/pkg/messaging/nats/options.go b/pkg/messaging/nats/options.go new file mode 100644 index 00000000..a24dbee7 --- /dev/null +++ b/pkg/messaging/nats/options.go @@ -0,0 +1,56 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats + +import ( + "errors" + + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/nats-io/nats.go/jetstream" +) + +// ErrInvalidType is returned when the provided value is not of the expected type. +var ErrInvalidType = errors.New("invalid type") + +// Prefix sets the prefix for the publisher. +func Prefix(prefix string) messaging.Option { + return func(val interface{}) error { + p, ok := val.(*publisher) + if !ok { + return ErrInvalidType + } + + p.prefix = prefix + + return nil + } +} + +// JSStream sets the JetStream for the publisher. +func JSStream(stream jetstream.JetStream) messaging.Option { + return func(val interface{}) error { + p, ok := val.(*publisher) + if !ok { + return ErrInvalidType + } + + p.js = stream + + return nil + } +} + +// Stream sets the Stream for the subscriber. +func Stream(stream jetstream.Stream) messaging.Option { + return func(val interface{}) error { + p, ok := val.(*pubsub) + if !ok { + return ErrInvalidType + } + + p.stream = stream + + return nil + } +} diff --git a/pkg/messaging/nats/publisher.go b/pkg/messaging/nats/publisher.go index fdaf62ee..ca5c522f 100644 --- a/pkg/messaging/nats/publisher.go +++ b/pkg/messaging/nats/publisher.go @@ -7,30 +7,35 @@ import ( "context" "fmt" + "github.com/mainflux/mainflux/pkg/events" "github.com/mainflux/mainflux/pkg/messaging" broker "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "google.golang.org/protobuf/proto" ) -// A maximum number of reconnect attempts before NATS connection closes permanently. -// Value -1 represents an unlimited number of reconnect retries, i.e. the client -// will never give up on retrying to re-establish connection to NATS server. -const maxReconnects = -1 +const ( + // A maximum number of reconnect attempts before NATS connection closes permanently. + // Value -1 represents an unlimited number of reconnect retries, i.e. the client + // will never give up on retrying to re-establish connection to NATS server. + maxReconnects = -1 + + // reconnectBufSize is obtained from the maximum number of unpublished events + // multiplied by the approximate maximum size of a single event. + reconnectBufSize = events.MaxUnpublishedEvents * (1024 * 1024) +) var _ messaging.Publisher = (*publisher)(nil) type publisher struct { - js jetstream.JetStream - conn *broker.Conn + js jetstream.JetStream + conn *broker.Conn + prefix string } -// Publisher wraps messaging Publisher exposing -// Close() method for NATS connection. - // NewPublisher returns NATS message Publisher. -func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) { - conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects)) +func NewPublisher(ctx context.Context, url string, opts ...messaging.Option) (messaging.Publisher, error) { + conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects), broker.ReconnectBufSize(int(reconnectBufSize))) if err != nil { return nil, err } @@ -41,9 +46,17 @@ func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) if _, err := js.CreateStream(ctx, jsStreamConfig); err != nil { return nil, err } + ret := &publisher{ - js: js, - conn: conn, + js: js, + conn: conn, + prefix: chansPrefix, + } + + for _, opt := range opts { + if err := opt(ret); err != nil { + return nil, err + } } return ret, nil @@ -59,7 +72,7 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging. return err } - subject := fmt.Sprintf("%s.%s", chansPrefix, topic) + subject := fmt.Sprintf("%s.%s", pub.prefix, topic) if msg.Subtopic != "" { subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic) } diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index f59e2f7c..a6055739 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -53,7 +53,7 @@ type pubsub struct { // from ordinary subscribe. For more information, please take a look // here: https://docs.nats.io/developing-with-nats/receiving/queues. // If the queue is empty, Subscribe will be used. -func NewPubSub(ctx context.Context, url string, logger mflog.Logger) (messaging.PubSub, error) { +func NewPubSub(ctx context.Context, url string, logger mflog.Logger, opts ...messaging.Option) (messaging.PubSub, error) { conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects)) if err != nil { return nil, err @@ -69,13 +69,20 @@ func NewPubSub(ctx context.Context, url string, logger mflog.Logger) (messaging. ret := &pubsub{ publisher: publisher{ - js: js, - conn: conn, + js: js, + conn: conn, + prefix: chansPrefix, }, stream: stream, logger: logger, } + for _, opt := range opts { + if err := opt(ret); err != nil { + return nil, err + } + } + return ret, nil } diff --git a/pkg/messaging/nats/setup_test.go b/pkg/messaging/nats/setup_test.go index 7c3e6d60..1b1be179 100644 --- a/pkg/messaging/nats/setup_test.go +++ b/pkg/messaging/nats/setup_test.go @@ -39,7 +39,7 @@ func TestMain(m *testing.M) { } handleInterrupt(pool, container) - address := fmt.Sprintf("%s:%s", "localhost", container.GetPort("4222/tcp")) + address := fmt.Sprintf("nats://%s:%s", "localhost", container.GetPort("4222/tcp")) if err := pool.Retry(func() error { publisher, err = nats.NewPublisher(context.Background(), address) return err diff --git a/pkg/messaging/pubsub.go b/pkg/messaging/pubsub.go index 7ed5c93e..ed845b54 100644 --- a/pkg/messaging/pubsub.go +++ b/pkg/messaging/pubsub.go @@ -59,3 +59,22 @@ type PubSub interface { Publisher Subscriber } + +// Option represents optional configuration for message broker. +// +// This is used to provide optional configuration parameters to the +// underlying publisher and pubsub implementation so that it can be +// configured to meet the specific needs. +// +// For example, it can be used to set the message prefix so that +// brokers can be used for event sourcing as well as internal message broker. +// Using value of type interface is not recommended but is the most suitable +// for this use case as options should be compiled with respect to the +// underlying broker which can either be RabbitMQ or NATS. +// +// The example below shows how to set the prefix and jetstream stream for NATS. +// +// Example: +// +// broker.NewPublisher(ctx, url, broker.Prefix(eventsPrefix), broker.JSStream(js)) +type Option func(vals interface{}) error diff --git a/pkg/messaging/rabbitmq/options.go b/pkg/messaging/rabbitmq/options.go new file mode 100644 index 00000000..3d206535 --- /dev/null +++ b/pkg/messaging/rabbitmq/options.go @@ -0,0 +1,60 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq + +import ( + "errors" + + "github.com/mainflux/mainflux/pkg/messaging" + amqp "github.com/rabbitmq/amqp091-go" +) + +// ErrInvalidType is returned when the provided value is not of the expected type. +var ErrInvalidType = errors.New("invalid type") + +// Prefix sets the prefix for the publisher. +func Prefix(prefix string) messaging.Option { + return func(val interface{}) error { + p, ok := val.(*publisher) + if !ok { + return ErrInvalidType + } + + p.prefix = prefix + + return nil + } +} + +// Channel sets the channel for the publisher or subscriber. +func Channel(channel *amqp.Channel) messaging.Option { + return func(val interface{}) error { + switch v := val.(type) { + case *publisher: + v.channel = channel + case *pubsub: + v.channel = channel + default: + return ErrInvalidType + } + + return nil + } +} + +// Exchange sets the exchange for the publisher or subscriber. +func Exchange(exchange string) messaging.Option { + return func(val interface{}) error { + switch v := val.(type) { + case *publisher: + v.exchange = exchange + case *pubsub: + v.exchange = exchange + default: + return ErrInvalidType + } + + return nil + } +} diff --git a/pkg/messaging/rabbitmq/publisher.go b/pkg/messaging/rabbitmq/publisher.go index db438491..bb9ca1ed 100644 --- a/pkg/messaging/rabbitmq/publisher.go +++ b/pkg/messaging/rabbitmq/publisher.go @@ -16,17 +16,18 @@ import ( var _ messaging.Publisher = (*publisher)(nil) type publisher struct { - conn *amqp.Connection - ch *amqp.Channel + conn *amqp.Connection + channel *amqp.Channel + prefix string + exchange string } // NewPublisher returns RabbitMQ message Publisher. -func NewPublisher(url string) (messaging.Publisher, error) { +func NewPublisher(url string, opts ...messaging.Option) (messaging.Publisher, error) { conn, err := amqp.Dial(url) if err != nil { return nil, err } - ch, err := conn.Channel() if err != nil { return nil, err @@ -34,10 +35,20 @@ func NewPublisher(url string) (messaging.Publisher, error) { if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil); err != nil { return nil, err } + ret := &publisher{ - conn: conn, - ch: ch, + conn: conn, + channel: ch, + prefix: chansPrefix, + exchange: exchangeName, } + + for _, opt := range opts { + if err := opt(ret); err != nil { + return nil, err + } + } + return ret, nil } @@ -49,15 +60,16 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging. if err != nil { return err } - subject := fmt.Sprintf("%s.%s", chansPrefix, topic) + + subject := fmt.Sprintf("%s.%s", pub.prefix, topic) if msg.Subtopic != "" { subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic) } subject = formatTopic(subject) - err = pub.ch.PublishWithContext( + err = pub.channel.PublishWithContext( ctx, - exchangeName, + pub.exchange, subject, false, false, @@ -76,9 +88,6 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging. } func (pub *publisher) Close() error { - if err := pub.ch.Close(); err != nil { - return err - } return pub.conn.Close() } diff --git a/pkg/messaging/rabbitmq/pubsub.go b/pkg/messaging/rabbitmq/pubsub.go index 2327344b..e50cb6e9 100644 --- a/pkg/messaging/rabbitmq/pubsub.go +++ b/pkg/messaging/rabbitmq/pubsub.go @@ -16,10 +16,11 @@ import ( ) const ( - chansPrefix = "channels" // SubjectAllChannels represents subject to subscribe for all the channels. SubjectAllChannels = "channels.#" - exchangeName = "mainflux" + + exchangeName = "messages" + chansPrefix = "channels" ) var ( @@ -45,7 +46,7 @@ type pubsub struct { } // NewPubSub returns RabbitMQ message publisher/subscriber. -func NewPubSub(url string, logger mflog.Logger) (messaging.PubSub, error) { +func NewPubSub(url string, logger mflog.Logger, opts ...messaging.Option) (messaging.PubSub, error) { conn, err := amqp.Dial(url) if err != nil { return nil, err @@ -57,14 +58,24 @@ func NewPubSub(url string, logger mflog.Logger) (messaging.PubSub, error) { if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil); err != nil { return nil, err } + ret := &pubsub{ publisher: publisher{ - conn: conn, - ch: ch, + conn: conn, + channel: ch, + exchange: exchangeName, + prefix: chansPrefix, }, logger: logger, subscriptions: make(map[string]map[string]subscription), } + + for _, opt := range opts { + if err := opt(ret); err != nil { + return nil, err + } + } + return ret, nil } @@ -102,23 +113,23 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) clientID := fmt.Sprintf("%s-%s", cfg.Topic, cfg.ID) - queue, err := ps.ch.QueueDeclare(clientID, true, false, false, false, nil) + queue, err := ps.channel.QueueDeclare(clientID, true, false, false, false, nil) if err != nil { return err } - if err := ps.ch.QueueBind(queue.Name, cfg.Topic, exchangeName, false, nil); err != nil { + if err := ps.channel.QueueBind(queue.Name, cfg.Topic, ps.exchange, false, nil); err != nil { return err } - msgs, err := ps.ch.Consume(queue.Name, clientID, true, false, false, false, nil) + msgs, err := ps.channel.Consume(queue.Name, clientID, true, false, false, false, nil) if err != nil { return err } go ps.handle(msgs, cfg.Handler) s[cfg.ID] = subscription{ cancel: func() error { - if err := ps.ch.Cancel(clientID, false); err != nil { + if err := ps.channel.Cancel(clientID, false); err != nil { return err } return cfg.Handler.Cancel() @@ -154,7 +165,7 @@ func (ps *pubsub) Unsubscribe(ctx context.Context, id, topic string) error { return err } } - if err := ps.ch.QueueUnbind(topic, topic, exchangeName, nil); err != nil { + if err := ps.channel.QueueUnbind(topic, topic, exchangeName, nil); err != nil { return err } diff --git a/pkg/messaging/rabbitmq/pubsub_test.go b/pkg/messaging/rabbitmq/pubsub_test.go index 202d0baa..ea960900 100644 --- a/pkg/messaging/rabbitmq/pubsub_test.go +++ b/pkg/messaging/rabbitmq/pubsub_test.go @@ -22,7 +22,7 @@ const ( channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b" subtopic = "engine" clientID = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b" - exchangeName = "mainflux" + exchangeName = "messages" ) var ( diff --git a/pkg/messaging/rabbitmq/setup_test.go b/pkg/messaging/rabbitmq/setup_test.go index 7f059996..d9277b01 100644 --- a/pkg/messaging/rabbitmq/setup_test.go +++ b/pkg/messaging/rabbitmq/setup_test.go @@ -65,6 +65,7 @@ func TestMain(m *testing.M) { } code := m.Run() + if err := pool.Purge(container); err != nil { log.Fatalf("Could not purge container: %s", err) } diff --git a/scripts/ci.sh b/scripts/ci.sh index f5dc9d1f..35c0a470 100755 --- a/scripts/ci.sh +++ b/scripts/ci.sh @@ -67,6 +67,8 @@ setup_mf() { done echo "Compile check for rabbitmq..." MF_MESSAGE_BROKER_TYPE=rabbitmq make http + echo "Compile check for redis..." + MF_ES_STORE_TYPE=redis make http make -j$NPROC } diff --git a/things/events/streams.go b/things/events/streams.go index 5ef9f842..5a1eed5f 100644 --- a/things/events/streams.go +++ b/things/events/streams.go @@ -9,7 +9,7 @@ import ( "github.com/mainflux/mainflux" mfclients "github.com/mainflux/mainflux/pkg/clients" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/things" ) @@ -25,7 +25,7 @@ type eventStore struct { // NewEventStoreMiddleware returns wrapper around things service that sends // events to event store. func NewEventStoreMiddleware(ctx context.Context, svc things.Service, url string) (things.Service, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err } diff --git a/twins/events/setup_test.go b/twins/events/setup_test.go index dd4e4bbf..b1a5765a 100644 --- a/twins/events/setup_test.go +++ b/twins/events/setup_test.go @@ -14,7 +14,10 @@ import ( "github.com/ory/dockertest/v3" ) -var redisClient *redis.Client +var ( + redisClient *redis.Client + redisURL string +) func TestMain(m *testing.M) { pool, err := dockertest.NewPool("") @@ -27,12 +30,14 @@ func TestMain(m *testing.M) { log.Fatalf("Could not start container: %s", err) } + redisURL = fmt.Sprintf("redis://localhost:%s/0", container.GetPort("6379/tcp")) + opts, err := redis.ParseURL(redisURL) + if err != nil { + log.Fatalf("Could not parse redis URL: %s", err) + } + if err := pool.Retry(func() error { - redisClient = redis.NewClient(&redis.Options{ - Addr: fmt.Sprintf("localhost:%s", container.GetPort("6379/tcp")), - Password: "", - DB: 0, - }) + redisClient = redis.NewClient(opts) return redisClient.Ping(context.Background()).Err() }); err != nil { diff --git a/twins/events/streams.go b/twins/events/streams.go index 72a6704c..3466c717 100644 --- a/twins/events/streams.go +++ b/twins/events/streams.go @@ -7,7 +7,7 @@ import ( "context" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/twins" ) @@ -24,7 +24,7 @@ type eventStore struct { // NewEventStoreMiddleware returns wrapper around things service that sends // events to event store. func NewEventStoreMiddleware(ctx context.Context, svc twins.Service, url string) (twins.Service, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err } diff --git a/users/events/streams.go b/users/events/streams.go index 5e2942bf..c146124e 100644 --- a/users/events/streams.go +++ b/users/events/streams.go @@ -9,7 +9,7 @@ import ( "github.com/mainflux/mainflux" mfclients "github.com/mainflux/mainflux/pkg/clients" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/users" ) @@ -25,7 +25,7 @@ type eventStore struct { // NewEventStoreMiddleware returns wrapper around users service that sends // events to event store. func NewEventStoreMiddleware(ctx context.Context, svc users.Service, url string) (users.Service, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err }