diff --git a/amqp/connection.go b/amqp/connection.go index 8828d4b..a4e4833 100644 --- a/amqp/connection.go +++ b/amqp/connection.go @@ -24,15 +24,13 @@ type Connection struct { // ConnectionConfig to be used when creating a new connection. type ConnectionConfig struct { reconnectInterval time.Duration - reconnectRetries int } // NewConnection returns an AMQP Connection. -// Uses a default ConnectionConfig with 2 second of reconnect interval and 10 retries. +// Uses a default ConnectionConfig with 2 second of reconnect interval. func NewConnection(url string) (messaging.Connection, error) { return NewConnectionConfig(url, ConnectionConfig{ reconnectInterval: 2 * time.Second, - reconnectRetries: 10, }) } @@ -122,14 +120,13 @@ func (c *Connection) handleConnectionClose() { for !c.closed { c.WaitUntilConnectionCloses() - for i := 1; i <= c.config.reconnectRetries; i++ { + for i := 0; !c.closed; i++ { err := c.reestablish() if err == nil { log.WithFields(log.Fields{ - "type": "amqp", - "attempt": i, - "max_retries": c.config.reconnectRetries, + "type": "amqp", + "attempt": i, }).Info("Connection reestablished") for _, c := range c.reestablishs { @@ -138,23 +135,13 @@ func (c *Connection) handleConnectionClose() { break } else { - if i < c.config.reconnectRetries { - log.WithFields(log.Fields{ - "type": "amqp", - "error": err, - "attempt": i, - "max_retries": c.config.reconnectRetries, - }).Error("Error reestablishing connection. Retrying...") + log.WithFields(log.Fields{ + "type": "amqp", + "error": err, + "attempt": i, + }).Error("Error reestablishing connection. Retrying...") - time.Sleep(c.config.reconnectInterval) - } else { - log.WithFields(log.Fields{ - "type": "amqp", - "error": err, - "attempt": i, - "max_retries": c.config.reconnectRetries, - }).Panic("Error reestablishing connection. Max retries reached, giving up...") - } + time.Sleep(c.config.reconnectInterval) } } } diff --git a/amqp/consumer.go b/amqp/consumer.go index c39ab2f..3ede1eb 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -232,7 +232,7 @@ func (c *Consumer) Consume() { log.WithFields(log.Fields{ "type": "amqp", "queue": c.queueName, - }).Info("Setting up consumer channel...") + }).Debug("Setting up consumer channel...") msgs, err := c.channel.Consume( c.queueName, // queue @@ -249,7 +249,7 @@ func (c *Consumer) Consume() { "type": "amqp", "queue": c.queueName, "error": err, - }).Info("Error setting up consumer...") + }).Error("Error setting up consumer...") time.Sleep(c.config.consumeRetryInterval) @@ -269,6 +269,6 @@ func (c *Consumer) Consume() { "type": "amqp", "queue": c.queueName, "closed": c.closed, - }).Info("Consumption stopped") + }).Info("Consumption finished") } } diff --git a/amqp/producer.go b/amqp/producer.go index a8070ee..e660691 100644 --- a/amqp/producer.go +++ b/amqp/producer.go @@ -38,15 +38,13 @@ type Producer struct { // ProducerConfig to be used when creating a new producer. type ProducerConfig struct { publishInterval time.Duration - publishRetries int } // NewProducer returns a new AMQP Producer. -// Uses a default ProducerConfig with 2 second of publish interval and 10 publish retries. +// Uses a default ProducerConfig with 2 second of publish interval. func NewProducer(c messaging.Connection, exchange string) (messaging.Producer, error) { return NewProducerConfig(c, exchange, ProducerConfig{ publishInterval: 2 * time.Second, - publishRetries: 10, }) } @@ -87,6 +85,10 @@ func (p *Producer) Close() { } func (p *Producer) setupTopology() error { + log.WithFields(log.Fields{ + "type": "amqp", + }).Debug("Setting up topology...") + p.m.Lock() defer p.m.Unlock() @@ -120,12 +122,18 @@ func (p *Producer) setupTopology() error { return err } + log.WithFields(log.Fields{ + "type": "amqp", + }).Debug("Topology ready") + return nil } func (p *Producer) handleReestablishedConnnection() { + reestablishChannel := p.conn.NotifyReestablish() + for !p.closed { - <-p.conn.NotifyReestablish() + <-reestablishChannel err := p.setupTopology() @@ -140,8 +148,7 @@ func (p *Producer) handleReestablishedConnnection() { func (p *Producer) drainInternalQueue() { for m := range p.internalQueue { - // try to publish in N attempts. - for i := 1; i <= p.config.publishRetries; i++ { + for i := 0; !p.closed; i++ { msg := amqplib.Publishing{ DeliveryMode: amqplib.Persistent, Timestamp: time.Now(), @@ -153,54 +160,34 @@ func (p *Producer) drainInternalQueue() { defer p.m.Unlock() log.WithFields(log.Fields{ - "type": "amqp", - "attempt": i, - "max_retries": p.config.publishRetries, + "type": "amqp", + "attempt": i, }).Debug("Publishing message to the exchange") return p.channel.Publish(p.exchangeName, m.action, false, false, msg) }() if err != nil { - if i < p.config.publishRetries { - log.WithFields(log.Fields{ - "type": "amqp", - "error": err, - "attempt": i, - "max_retries": p.config.publishRetries, - }).Error("Error publishing message to the exchange. Retrying...") + log.WithFields(log.Fields{ + "type": "amqp", + "error": err, + "attempt": i, + }).Error("Error publishing message to the exchange. Retrying...") - time.Sleep(p.config.publishInterval) - continue - } else { - log.WithFields(log.Fields{ - "type": "amqp", - "error": err, - "attempt": i, - "max_retries": p.config.publishRetries, - }).Error("Error publishing message to the exchange. Max retries reached, giving up...") - } + time.Sleep(p.config.publishInterval) + continue } select { case <-p.ackChannel: goto outer // 😈 case <-p.nackChannel: - if i < p.config.publishRetries { - log.WithFields(log.Fields{ - "type": "amqp", - "attempt": i, - "max_retries": p.config.publishRetries, - }).Error("Error publishing message to the exchange. Retrying...") + log.WithFields(log.Fields{ + "type": "amqp", + "attempt": i, + }).Error("Error publishing message to the exchange. Retrying...") - time.Sleep(p.config.publishInterval) - } else { - log.WithFields(log.Fields{ - "type": "amqp", - "attempt": i, - "max_retries": p.config.publishRetries, - }).Error("Error publishing message to the exchange. Max retries reached, giving up...") - } + time.Sleep(p.config.publishInterval) } } outer: