diff --git a/docker/brokers/nats.yml b/docker/brokers/nats.yml index efbbf2a7..f2a53d97 100644 --- a/docker/brokers/nats.yml +++ b/docker/brokers/nats.yml @@ -1,7 +1,7 @@ services: broker: image: nats:2.2.4-alpine - command: "-c /etc/nats/nats.conf" + command: "-c /etc/nats/nats.conf -DV" volumes: - ./../nats/:/etc/nats ports: diff --git a/pkg/messaging/nats/publisher.go b/pkg/messaging/nats/publisher.go index 330aed2f..d9de2c19 100644 --- a/pkg/messaging/nats/publisher.go +++ b/pkg/messaging/nats/publisher.go @@ -11,6 +11,11 @@ import ( broker "github.com/nats-io/nats.go" ) +// A maximum number of reconnect attempts before NATS connection closes permanently. +// Value -1 represents an unlimited number of reconnect retries, i.e. the client +// will never give up on retrying to re-establish connection to NATS server. +const maxReconnects = -1 + var _ messaging.Publisher = (*publisher)(nil) type publisher struct { @@ -22,7 +27,7 @@ type publisher struct { // NewPublisher returns NATS message Publisher. func NewPublisher(url string) (messaging.Publisher, error) { - conn, err := broker.Connect(url) + conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects)) if err != nil { return nil, err } diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index 397ce880..07a020b0 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -17,12 +17,11 @@ import ( const chansPrefix = "channels" +// Publisher and Subscriber errors. var ( - ErrAlreadySubscribed = errors.New("already subscribed to topic") - ErrNotSubscribed = errors.New("not subscribed") - ErrEmptyTopic = errors.New("empty topic") - ErrEmptyID = errors.New("empty id") - ErrFailed = errors.New("failed") + ErrNotSubscribed = errors.New("not subscribed") + ErrEmptyTopic = errors.New("empty topic") + ErrEmptyID = errors.New("empty id") ) var _ messaging.PubSub = (*pubsub)(nil) @@ -48,7 +47,7 @@ type pubsub struct { // here: https://docs.nats.io/developing-with-nats/receiving/queues. // If the queue is empty, Subscribe will be used. func NewPubSub(url, queue string, logger log.Logger) (messaging.PubSub, error) { - conn, err := broker.Connect(url) + conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects)) if err != nil { return nil, err } diff --git a/pkg/messaging/nats/pubsub_test.go b/pkg/messaging/nats/pubsub_test.go index 428bb197..3bebdd16 100644 --- a/pkg/messaging/nats/pubsub_test.go +++ b/pkg/messaging/nats/pubsub_test.go @@ -4,6 +4,7 @@ package nats_test import ( + "errors" "fmt" "testing" @@ -22,8 +23,9 @@ const ( ) var ( - msgChan = make(chan messaging.Message) - data = []byte("payload") + msgChan = make(chan messaging.Message) + data = []byte("payload") + errFailed = errors.New("failed") ) func TestPublisher(t *testing.T) { @@ -230,7 +232,7 @@ func TestPubsub(t *testing.T) { desc: "Subscribe to another already subscribed topic with an ID with Unsubscribe failing", topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"), clientID: "clientid3", - errorMessage: nats.ErrFailed, + errorMessage: errFailed, pubsub: true, handler: handler{true}, }, @@ -246,7 +248,7 @@ func TestPubsub(t *testing.T) { desc: "Unsubscribe from a topic with an ID with failing handler", topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"), clientID: "clientid4", - errorMessage: nats.ErrFailed, + errorMessage: errFailed, pubsub: false, handler: handler{true}, }, @@ -282,7 +284,7 @@ func (h handler) Handle(msg messaging.Message) error { func (h handler) Cancel() error { if h.fail { - return nats.ErrFailed + return errFailed } return nil }