mirror of
https://github.com/eventials/goevents.git
synced 2025-04-26 13:48:59 +08:00
fix delay
This commit is contained in:
parent
bfd7ed11f2
commit
030c26dba8
@ -173,7 +173,12 @@ func (c *consumer) dispatch(msg amqplib.Delivery) {
|
|||||||
delay, isRetry := getXRetryDelayHeader(msg)
|
delay, isRetry := getXRetryDelayHeader(msg)
|
||||||
|
|
||||||
if isRetry {
|
if isRetry {
|
||||||
<-time.After(delay)
|
logger.WithFields(log.Fields{
|
||||||
|
"delay": delay.String(),
|
||||||
|
"message_id": msg.MessageId,
|
||||||
|
}).Info("Delaying message.")
|
||||||
|
|
||||||
|
time.Sleep(delay)
|
||||||
}
|
}
|
||||||
|
|
||||||
retryCount, _ := getXRetryCountHeader(msg)
|
retryCount, _ := getXRetryCountHeader(msg)
|
||||||
@ -278,10 +283,8 @@ func (c *consumer) publishMessage(msg amqplib.Publishing, queue string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
|
func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
|
||||||
delayNs := delay.Nanoseconds()
|
|
||||||
|
|
||||||
if h.delayedRetry {
|
if h.delayedRetry {
|
||||||
delayNs *= 2
|
delay *= 2
|
||||||
}
|
}
|
||||||
|
|
||||||
retryMsg := amqplib.Publishing{
|
retryMsg := amqplib.Publishing{
|
||||||
@ -289,7 +292,7 @@ func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int
|
|||||||
"x-retry-death": time.Now().UTC(),
|
"x-retry-death": time.Now().UTC(),
|
||||||
"x-retry-count": retryCount + 1,
|
"x-retry-count": retryCount + 1,
|
||||||
"x-retry-max": h.maxRetries,
|
"x-retry-max": h.maxRetries,
|
||||||
"x-retry-delay": delayNs,
|
"x-retry-delay": delay.String(),
|
||||||
"x-action-key": getAction(msg),
|
"x-action-key": getAction(msg),
|
||||||
},
|
},
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
@ -479,10 +482,9 @@ func getXRetryCountHeader(msg amqplib.Delivery) (int32, bool) {
|
|||||||
|
|
||||||
func getXRetryDelayHeader(msg amqplib.Delivery) (time.Duration, bool) {
|
func getXRetryDelayHeader(msg amqplib.Delivery) (time.Duration, bool) {
|
||||||
if d, ok := msg.Headers["x-retry-delay"]; ok {
|
if d, ok := msg.Headers["x-retry-delay"]; ok {
|
||||||
f, ok := d.(int64)
|
t, err := time.ParseDuration(d.(string))
|
||||||
|
if err != nil {
|
||||||
if ok {
|
return t, true
|
||||||
return time.Duration(f), true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user