diff --git a/amqp/connection.go b/amqp/connection.go index b0cc085..d5b22c2 100644 --- a/amqp/connection.go +++ b/amqp/connection.go @@ -26,14 +26,14 @@ type connection struct { // ConnectionConfig to be used when creating a new connection. type ConnectionConfig struct { - reconnectInterval time.Duration + ReconnectInterval time.Duration } // NewConnection returns an AMQP Connection. // Uses a default ConnectionConfig with 2 second of reconnect interval. func NewConnection(url string) (*connection, error) { return NewConnectionConfig(url, ConnectionConfig{ - reconnectInterval: 2 * time.Second, + ReconnectInterval: 2 * time.Second, }) } @@ -153,7 +153,7 @@ func (c *connection) handleConnectionClose() { "attempt": i, }).Error("Error reestablishing connection. Retrying...") - time.Sleep(c.config.reconnectInterval) + time.Sleep(c.config.ReconnectInterval) } } } diff --git a/amqp/consumer.go b/amqp/consumer.go index 9bfa2b4..a915552 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -57,6 +57,7 @@ var consumerTagSeq uint64 // ConsumerConfig to be used when creating a new producer. type ConsumerConfig struct { ConsumeRetryInterval time.Duration + MaxRetryDelay time.Duration PrefetchCount int PrefixName string } @@ -66,6 +67,7 @@ type ConsumerConfig struct { func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (*consumer, error) { return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{ ConsumeRetryInterval: 2 * time.Second, + MaxRetryDelay: 5 * time.Minute, PrefetchCount: 0, }) } @@ -120,6 +122,10 @@ func (c *consumer) dispatch(msg amqplib.Delivery) { delay, isRetry := getXRetryDelayHeader(msg) if isRetry { + if delay > c.config.MaxRetryDelay { + delay = c.config.MaxRetryDelay + } + logger.WithFields(logrus.Fields{ "delay": delay.String(), "message_id": msg.MessageId,