mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-26 13:48:53 +08:00
MF-1681 - Failed to reconnect to NATS (#1686)
* Make NATS reconnect never give up Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Extract maxReconnects to a constant Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
This commit is contained in:
parent
e6e9d22133
commit
f438d24c41
@ -1,7 +1,7 @@
|
|||||||
services:
|
services:
|
||||||
broker:
|
broker:
|
||||||
image: nats:2.2.4-alpine
|
image: nats:2.2.4-alpine
|
||||||
command: "-c /etc/nats/nats.conf"
|
command: "-c /etc/nats/nats.conf -DV"
|
||||||
volumes:
|
volumes:
|
||||||
- ./../nats/:/etc/nats
|
- ./../nats/:/etc/nats
|
||||||
ports:
|
ports:
|
||||||
|
@ -11,6 +11,11 @@ import (
|
|||||||
broker "github.com/nats-io/nats.go"
|
broker "github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// A maximum number of reconnect attempts before NATS connection closes permanently.
|
||||||
|
// Value -1 represents an unlimited number of reconnect retries, i.e. the client
|
||||||
|
// will never give up on retrying to re-establish connection to NATS server.
|
||||||
|
const maxReconnects = -1
|
||||||
|
|
||||||
var _ messaging.Publisher = (*publisher)(nil)
|
var _ messaging.Publisher = (*publisher)(nil)
|
||||||
|
|
||||||
type publisher struct {
|
type publisher struct {
|
||||||
@ -22,7 +27,7 @@ type publisher struct {
|
|||||||
|
|
||||||
// NewPublisher returns NATS message Publisher.
|
// NewPublisher returns NATS message Publisher.
|
||||||
func NewPublisher(url string) (messaging.Publisher, error) {
|
func NewPublisher(url string) (messaging.Publisher, error) {
|
||||||
conn, err := broker.Connect(url)
|
conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -17,12 +17,11 @@ import (
|
|||||||
|
|
||||||
const chansPrefix = "channels"
|
const chansPrefix = "channels"
|
||||||
|
|
||||||
|
// Publisher and Subscriber errors.
|
||||||
var (
|
var (
|
||||||
ErrAlreadySubscribed = errors.New("already subscribed to topic")
|
ErrNotSubscribed = errors.New("not subscribed")
|
||||||
ErrNotSubscribed = errors.New("not subscribed")
|
ErrEmptyTopic = errors.New("empty topic")
|
||||||
ErrEmptyTopic = errors.New("empty topic")
|
ErrEmptyID = errors.New("empty id")
|
||||||
ErrEmptyID = errors.New("empty id")
|
|
||||||
ErrFailed = errors.New("failed")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ messaging.PubSub = (*pubsub)(nil)
|
var _ messaging.PubSub = (*pubsub)(nil)
|
||||||
@ -48,7 +47,7 @@ type pubsub struct {
|
|||||||
// here: https://docs.nats.io/developing-with-nats/receiving/queues.
|
// here: https://docs.nats.io/developing-with-nats/receiving/queues.
|
||||||
// If the queue is empty, Subscribe will be used.
|
// If the queue is empty, Subscribe will be used.
|
||||||
func NewPubSub(url, queue string, logger log.Logger) (messaging.PubSub, error) {
|
func NewPubSub(url, queue string, logger log.Logger) (messaging.PubSub, error) {
|
||||||
conn, err := broker.Connect(url)
|
conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
package nats_test
|
package nats_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -22,8 +23,9 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
msgChan = make(chan messaging.Message)
|
msgChan = make(chan messaging.Message)
|
||||||
data = []byte("payload")
|
data = []byte("payload")
|
||||||
|
errFailed = errors.New("failed")
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPublisher(t *testing.T) {
|
func TestPublisher(t *testing.T) {
|
||||||
@ -230,7 +232,7 @@ func TestPubsub(t *testing.T) {
|
|||||||
desc: "Subscribe to another already subscribed topic with an ID with Unsubscribe failing",
|
desc: "Subscribe to another already subscribed topic with an ID with Unsubscribe failing",
|
||||||
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
|
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
|
||||||
clientID: "clientid3",
|
clientID: "clientid3",
|
||||||
errorMessage: nats.ErrFailed,
|
errorMessage: errFailed,
|
||||||
pubsub: true,
|
pubsub: true,
|
||||||
handler: handler{true},
|
handler: handler{true},
|
||||||
},
|
},
|
||||||
@ -246,7 +248,7 @@ func TestPubsub(t *testing.T) {
|
|||||||
desc: "Unsubscribe from a topic with an ID with failing handler",
|
desc: "Unsubscribe from a topic with an ID with failing handler",
|
||||||
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
|
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
|
||||||
clientID: "clientid4",
|
clientID: "clientid4",
|
||||||
errorMessage: nats.ErrFailed,
|
errorMessage: errFailed,
|
||||||
pubsub: false,
|
pubsub: false,
|
||||||
handler: handler{true},
|
handler: handler{true},
|
||||||
},
|
},
|
||||||
@ -282,7 +284,7 @@ func (h handler) Handle(msg messaging.Message) error {
|
|||||||
|
|
||||||
func (h handler) Cancel() error {
|
func (h handler) Cancel() error {
|
||||||
if h.fail {
|
if h.fail {
|
||||||
return nats.ErrFailed
|
return errFailed
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user