1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00

Fix NATS subscription bug in writers (#391)

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>
This commit is contained in:
Aleksandar Novaković 2018-09-13 10:06:34 +02:00 committed by Dušan Borovčanin
parent 845b69a4ac
commit 6408b8a9d3
4 changed files with 11 additions and 6 deletions

View File

@ -26,7 +26,8 @@ import (
)
const (
sep = ","
queue = "cassandra-writer"
sep = ","
defNatsURL = nats.DefaultURL
defPort = "8180"
@ -58,7 +59,7 @@ func main() {
defer session.Close()
repo := newService(session, logger)
if err := writers.Start(nc, logger, repo); err != nil {
if err := writers.Start(nc, repo, queue, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err))
}

View File

@ -25,6 +25,8 @@ import (
)
const (
queue = "influxdb-writer"
defNatsURL = nats.DefaultURL
defPort = "8180"
defPointName = "messages"
@ -80,7 +82,7 @@ func main() {
counter, latency := makeMetrics()
repo = writers.LoggingMiddleware(repo, logger)
repo = writers.MetricsMiddleware(repo, counter, latency)
if err := writers.Start(nc, logger, repo); err != nil {
if err := writers.Start(nc, repo, queue, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start message writer: %s", err))
os.Exit(1)
}

View File

@ -26,6 +26,8 @@ import (
)
const (
queue = "mongodb-writer"
defNatsURL = nats.DefaultURL
defPort = "8180"
defDBName = "mainflux"
@ -70,7 +72,7 @@ func main() {
counter, latency := makeMetrics()
repo = writers.LoggingMiddleware(repo, logger)
repo = writers.MetricsMiddleware(repo, counter, latency)
if err := writers.Start(nc, logger, repo); err != nil {
if err := writers.Start(nc, repo, queue, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start message writer: %s", err))
os.Exit(1)
}

View File

@ -23,14 +23,14 @@ type consumer struct {
}
// Start method starts to consume normalized messages received from NATS.
func Start(nc *nats.Conn, logger log.Logger, repo MessageRepository) error {
func Start(nc *nats.Conn, repo MessageRepository, queue string, logger log.Logger) error {
c := consumer{
nc: nc,
logger: logger,
repo: repo,
}
_, err := nc.Subscribe(mainflux.OutputSenML, c.consume)
_, err := nc.QueueSubscribe(mainflux.OutputSenML, queue, c.consume)
return err
}