diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index c34d226d..6401dc7f 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -26,7 +26,8 @@ import ( ) const ( - sep = "," + queue = "cassandra-writer" + sep = "," defNatsURL = nats.DefaultURL defPort = "8180" @@ -58,7 +59,7 @@ func main() { defer session.Close() repo := newService(session, logger) - if err := writers.Start(nc, logger, repo); err != nil { + if err := writers.Start(nc, repo, queue, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err)) } diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index 81eda3f4..752c3a59 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -25,6 +25,8 @@ import ( ) const ( + queue = "influxdb-writer" + defNatsURL = nats.DefaultURL defPort = "8180" defPointName = "messages" @@ -80,7 +82,7 @@ func main() { counter, latency := makeMetrics() repo = writers.LoggingMiddleware(repo, logger) repo = writers.MetricsMiddleware(repo, counter, latency) - if err := writers.Start(nc, logger, repo); err != nil { + if err := writers.Start(nc, repo, queue, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start message writer: %s", err)) os.Exit(1) } diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index 782f4fe5..f2e26de8 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -26,6 +26,8 @@ import ( ) const ( + queue = "mongodb-writer" + defNatsURL = nats.DefaultURL defPort = "8180" defDBName = "mainflux" @@ -70,7 +72,7 @@ func main() { counter, latency := makeMetrics() repo = writers.LoggingMiddleware(repo, logger) repo = writers.MetricsMiddleware(repo, counter, latency) - if err := writers.Start(nc, logger, repo); err != nil { + if err := writers.Start(nc, repo, queue, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start message writer: %s", err)) os.Exit(1) } diff --git a/writers/writer.go b/writers/writer.go index 27df5446..e137ca10 100644 --- a/writers/writer.go +++ b/writers/writer.go @@ -23,14 +23,14 @@ type consumer struct { } // Start method starts to consume normalized messages received from NATS. -func Start(nc *nats.Conn, logger log.Logger, repo MessageRepository) error { +func Start(nc *nats.Conn, repo MessageRepository, queue string, logger log.Logger) error { c := consumer{ nc: nc, logger: logger, repo: repo, } - _, err := nc.Subscribe(mainflux.OutputSenML, c.consume) + _, err := nc.QueueSubscribe(mainflux.OutputSenML, queue, c.consume) return err }