2016-12-01 10:52:22 -02:00
|
|
|
package amqp
|
|
|
|
|
|
|
|
import (
|
2016-12-28 10:20:09 -02:00
|
|
|
"errors"
|
2017-03-09 17:32:05 -03:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
log "github.com/Sirupsen/logrus"
|
2016-12-01 10:52:22 -02:00
|
|
|
amqplib "github.com/streadway/amqp"
|
2017-03-09 17:32:05 -03:00
|
|
|
|
|
|
|
"github.com/eventials/goevents/messaging"
|
2016-12-01 10:52:22 -02:00
|
|
|
)
|
|
|
|
|
2017-03-09 17:32:05 -03:00
|
|
|
// Connection with an AMQP peer.
|
2016-12-01 10:52:22 -02:00
|
|
|
type Connection struct {
|
2017-03-09 17:32:05 -03:00
|
|
|
config ConnectionConfig
|
|
|
|
m sync.Mutex
|
|
|
|
url string
|
2016-12-01 10:52:22 -02:00
|
|
|
connection *amqplib.Connection
|
2017-03-09 17:32:05 -03:00
|
|
|
closed bool
|
|
|
|
|
|
|
|
reestablishs []chan bool
|
|
|
|
}
|
|
|
|
|
|
|
|
// ConnectionConfig to be used when creating a new connection.
|
|
|
|
type ConnectionConfig struct {
|
|
|
|
reconnectInterval time.Duration
|
|
|
|
reconnectRetries int
|
2016-12-01 10:52:22 -02:00
|
|
|
}
|
|
|
|
|
2016-12-01 16:17:55 -02:00
|
|
|
// NewConnection returns an AMQP Connection.
|
2017-03-09 17:32:05 -03:00
|
|
|
// Uses a default ConnectionConfig with 2 second of reconnect interval and 10 retries.
|
2016-12-28 09:20:29 -02:00
|
|
|
func NewConnection(url string) (messaging.Connection, error) {
|
2017-03-09 17:32:05 -03:00
|
|
|
return NewConnectionWithConfig(url, ConnectionConfig{
|
|
|
|
reconnectInterval: 2 * time.Second,
|
|
|
|
reconnectRetries: 10,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewConnectionWithConfig returns an AMQP Connection.
|
|
|
|
func NewConnectionWithConfig(url string, config ConnectionConfig) (messaging.Connection, error) {
|
2016-12-01 10:52:22 -02:00
|
|
|
conn, err := amqplib.Dial(url)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-03-09 17:32:05 -03:00
|
|
|
connection := &Connection{
|
|
|
|
url: url,
|
|
|
|
connection: conn,
|
|
|
|
config: config,
|
|
|
|
}
|
|
|
|
|
|
|
|
go connection.handleConnectionClose()
|
|
|
|
|
|
|
|
return connection, nil
|
2016-12-01 10:52:22 -02:00
|
|
|
}
|
|
|
|
|
2017-03-09 17:32:05 -03:00
|
|
|
// NotifyConnectionClose returns a channel to notify when the connection closes.
|
2016-12-28 10:20:09 -02:00
|
|
|
func (c *Connection) NotifyConnectionClose() <-chan error {
|
|
|
|
ch := make(chan error)
|
|
|
|
|
|
|
|
go func() {
|
2017-03-09 17:32:05 -03:00
|
|
|
se := <-c.connection.NotifyClose(make(chan *amqplib.Error))
|
|
|
|
|
|
|
|
ch <- errors.New(se.Error())
|
2016-12-28 10:20:09 -02:00
|
|
|
}()
|
|
|
|
|
|
|
|
return ch
|
2016-12-28 09:20:29 -02:00
|
|
|
}
|
|
|
|
|
2017-03-09 17:32:05 -03:00
|
|
|
// NotifyReestablish returns a channel to notify when the connection is restablished.
|
|
|
|
func (c *Connection) NotifyReestablish() <-chan bool {
|
|
|
|
receiver := make(chan bool)
|
|
|
|
c.reestablishs = append(c.reestablishs, receiver)
|
|
|
|
|
|
|
|
return receiver
|
|
|
|
}
|
|
|
|
|
2016-12-01 16:17:55 -02:00
|
|
|
// Consumer returns an AMQP Consumer.
|
2016-12-28 09:20:29 -02:00
|
|
|
func (c *Connection) Consumer(autoAck bool, exchange, queue string) (messaging.Consumer, error) {
|
|
|
|
return NewConsumer(c, autoAck, exchange, queue)
|
2016-12-01 10:52:22 -02:00
|
|
|
}
|
|
|
|
|
2017-03-09 17:32:05 -03:00
|
|
|
// OpenChannel returns an AMQP Consumer.
|
|
|
|
func (c *Connection) OpenChannel() (*amqplib.Channel, error) {
|
|
|
|
c.m.Lock()
|
|
|
|
defer c.m.Unlock()
|
|
|
|
|
|
|
|
return c.connection.Channel()
|
|
|
|
}
|
|
|
|
|
2016-12-01 16:17:55 -02:00
|
|
|
// Producer returns an AMQP Producer.
|
2017-03-09 17:32:05 -03:00
|
|
|
func (c *Connection) Producer(exchange string) (messaging.Producer, error) {
|
|
|
|
return NewProducer(c, exchange)
|
2016-12-01 10:52:22 -02:00
|
|
|
}
|
|
|
|
|
2016-12-01 16:17:55 -02:00
|
|
|
// Close closes the AMQP connection.
|
2016-12-01 10:52:22 -02:00
|
|
|
func (c *Connection) Close() {
|
2017-03-09 17:32:05 -03:00
|
|
|
c.m.Lock()
|
|
|
|
defer c.m.Unlock()
|
|
|
|
|
|
|
|
c.closed = true
|
2016-12-01 10:52:22 -02:00
|
|
|
c.connection.Close()
|
|
|
|
}
|
2016-12-28 09:20:29 -02:00
|
|
|
|
2017-03-09 17:32:05 -03:00
|
|
|
// WaitUntilConnectionCloses holds the execution until the connection closes.
|
2016-12-28 09:33:59 -02:00
|
|
|
func (c *Connection) WaitUntilConnectionCloses() {
|
|
|
|
<-c.NotifyConnectionClose()
|
2016-12-28 09:20:29 -02:00
|
|
|
}
|
2017-03-09 17:32:05 -03:00
|
|
|
|
|
|
|
func (c *Connection) reestablish() error {
|
|
|
|
conn, err := amqplib.Dial(c.url)
|
|
|
|
|
|
|
|
c.m.Lock()
|
|
|
|
defer c.m.Unlock()
|
|
|
|
|
|
|
|
c.connection = conn
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Connection) handleConnectionClose() {
|
|
|
|
for !c.closed {
|
|
|
|
c.WaitUntilConnectionCloses()
|
|
|
|
|
|
|
|
for i := 1; i <= c.config.reconnectRetries; i++ {
|
|
|
|
err := c.reestablish()
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
log.WithFields(log.Fields{
|
|
|
|
"type": "amqp",
|
|
|
|
"attempt": i,
|
|
|
|
"max_retries": c.config.reconnectRetries,
|
|
|
|
}).Info("Connection reestablished")
|
|
|
|
|
|
|
|
for _, c := range c.reestablishs {
|
|
|
|
c <- true
|
|
|
|
}
|
|
|
|
|
|
|
|
break
|
|
|
|
} else {
|
|
|
|
if i < c.config.reconnectRetries {
|
|
|
|
log.WithFields(log.Fields{
|
|
|
|
"type": "amqp",
|
|
|
|
"error": err,
|
|
|
|
"attempt": i,
|
|
|
|
"max_retries": c.config.reconnectRetries,
|
|
|
|
}).Error("Error reestablishing connection. Retrying...")
|
|
|
|
|
|
|
|
time.Sleep(c.config.reconnectInterval)
|
|
|
|
} else {
|
|
|
|
log.WithFields(log.Fields{
|
|
|
|
"type": "amqp",
|
|
|
|
"error": err,
|
|
|
|
"attempt": i,
|
|
|
|
"max_retries": c.config.reconnectRetries,
|
|
|
|
}).Panic("Error reestablishing connection. Max retries reached, giving up...")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|