diff --git a/amqp/consumer.go b/amqp/consumer.go index 4b57afc..e4349c8 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -173,7 +173,12 @@ func (c *consumer) dispatch(msg amqplib.Delivery) { delay, isRetry := getXRetryDelayHeader(msg) if isRetry { - <-time.After(delay) + logger.WithFields(log.Fields{ + "delay": delay.String(), + "message_id": msg.MessageId, + }).Info("Delaying message.") + + time.Sleep(delay) } retryCount, _ := getXRetryCountHeader(msg) @@ -278,10 +283,8 @@ func (c *consumer) publishMessage(msg amqplib.Publishing, queue string) error { } func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { - delayNs := delay.Nanoseconds() - if h.delayedRetry { - delayNs *= 2 + delay *= 2 } retryMsg := amqplib.Publishing{ @@ -289,7 +292,7 @@ func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int "x-retry-death": time.Now().UTC(), "x-retry-count": retryCount + 1, "x-retry-max": h.maxRetries, - "x-retry-delay": delayNs, + "x-retry-delay": delay.String(), "x-action-key": getAction(msg), }, Timestamp: time.Now(), @@ -479,10 +482,9 @@ func getXRetryCountHeader(msg amqplib.Delivery) (int32, bool) { func getXRetryDelayHeader(msg amqplib.Delivery) (time.Duration, bool) { if d, ok := msg.Headers["x-retry-delay"]; ok { - f, ok := d.(int64) - - if ok { - return time.Duration(f), true + t, err := time.ParseDuration(d.(string)) + if err != nil { + return t, true } }