1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-26 13:48:59 +08:00

Retry feature.

This commit is contained in:
Alexandre Vicenzi 2017-05-24 18:23:07 -03:00
parent 7deff6a9de
commit c801a363b3
7 changed files with 279 additions and 106 deletions

View File

@ -126,9 +126,9 @@ func (c *Connection) handleConnectionClose() {
if err == nil { if err == nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"type": "goevents", "type": "goevents",
"subType": "connection", "sub_type": "connection",
"attempt": i, "attempt": i,
}).Info("Connection reestablished") }).Info("Connection reestablished.")
for _, c := range c.reestablishs { for _, c := range c.reestablishs {
c <- true c <- true
@ -138,7 +138,7 @@ func (c *Connection) handleConnectionClose() {
} else { } else {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"type": "goevents", "type": "goevents",
"subType": "connection", "sub_type": "connection",
"error": err, "error": err,
"attempt": i, "attempt": i,
}).Error("Error reestablishing connection. Retrying...") }).Error("Error reestablishing connection. Retrying...")

View File

@ -12,10 +12,25 @@ import (
amqplib "github.com/streadway/amqp" amqplib "github.com/streadway/amqp"
) )
const (
MaxInt32 = 1<<31 - 1
MaxRetries = MaxInt32
)
var (
logger = log.WithFields(log.Fields{
"type": "goevents",
"sub_type": "consumer",
})
)
type handler struct { type handler struct {
action string action string
handler messaging.EventHandler fn messaging.EventHandler
re *regexp.Regexp re *regexp.Regexp
maxRetries int32
retryDelay time.Duration
delayProgression bool
} }
type Consumer struct { type Consumer struct {
@ -28,6 +43,7 @@ type Consumer struct {
channel *amqplib.Channel channel *amqplib.Channel
queue *amqplib.Queue queue *amqplib.Queue
retryQueue *amqplib.Queue
exchangeName string exchangeName string
queueName string queueName string
@ -106,12 +122,12 @@ func (c *Consumer) setupTopology() error {
nil, // arguments nil, // arguments
) )
c.queue = &q
if err != nil { if err != nil {
return err return err
} }
c.queue = &q
return nil return nil
} }
@ -122,53 +138,144 @@ func (c *Consumer) handleReestablishedConnnection() {
err := c.setupTopology() err := c.setupTopology()
if err != nil { if err != nil {
log.WithFields(log.Fields{ logger.WithFields(log.Fields{
"type": "goevents",
"subType": "consumer",
"error": err, "error": err,
}).Error("Error setting up topology after reconnection") }).Error("Error setting up topology after reconnection.")
} }
} }
} }
func (c *Consumer) dispatch(msg amqplib.Delivery) { func (c *Consumer) dispatch(msg amqplib.Delivery) {
if fn, ok := c.getHandler(msg.RoutingKey); ok { if h, ok := c.getHandler(msg.RoutingKey); ok {
delay, ok := getXRetryDelayHeader(msg)
if !ok {
delay = h.retryDelay
}
retryCount, _ := getXRetryCountHeader(msg)
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
if h.maxRetries > 0 {
c.requeueMessage(msg, h, retryCount, delay)
} else {
logger.WithFields(log.Fields{
"error": err,
"message_id": msg.MessageId,
}).Error("Failed to process event.")
if !c.autoAck { if !c.autoAck {
msg.Nack(false, true) msg.Ack(false)
}
} }
} }
}() }()
ok := fn(msg.Body) death, ok := getXRetryDeathHeader(msg)
if ok {
since := time.Since(death)
if since < delay {
time.Sleep(delay - since)
}
}
err := h.fn(msg.Body)
if err != nil {
if h.maxRetries > 0 {
if retryCount >= h.maxRetries {
logger.WithFields(log.Fields{
"max_retries": h.maxRetries,
"message_id": msg.MessageId,
}).Error("Maximum retries reached. Giving up.")
if !c.autoAck { if !c.autoAck {
if ok {
msg.Ack(false) msg.Ack(false)
} else {
msg.Nack(false, true)
} }
} else {
logger.WithFields(log.Fields{
"error": err,
"message_id": msg.MessageId,
}).Error("Failed to process event. Retrying...")
c.requeueMessage(msg, h, retryCount, delay)
}
} else {
logger.WithFields(log.Fields{
"error": err,
"message_id": msg.MessageId,
}).Error("Failed to process event.")
}
} else if !c.autoAck {
msg.Ack(false)
} }
} else { } else {
// got a message from wrong exchange? // got a message from wrong exchange?
// ignore and don't requeue. // ignore and don't requeue.
if !c.autoAck {
msg.Nack(false, false) msg.Nack(false, false)
} }
} }
}
func (c *Consumer) getHandler(action string) (messaging.EventHandler, bool) { func (c *Consumer) requeueMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
delayNs := delay.Nanoseconds()
if h.delayProgression {
delayNs *= 2
}
retryMsg := amqplib.Publishing{
Headers: amqplib.Table{
"x-retry-death": time.Now().UTC(),
"x-retry-count": retryCount + 1,
"x-retry-max": h.maxRetries,
"x-retry-delay": delayNs,
},
DeliveryMode: amqplib.Persistent,
Timestamp: time.Now(),
Body: msg.Body,
MessageId: msg.MessageId,
}
err := c.channel.Publish(msg.Exchange, msg.RoutingKey, false, false, retryMsg)
if err != nil {
logger.WithFields(log.Fields{
"error": err,
}).Error("Failed to retry.")
if !c.autoAck {
msg.Nack(false, true)
}
} else if !c.autoAck {
msg.Ack(false)
}
}
func (c *Consumer) getHandler(action string) (*handler, bool) {
for _, h := range c.handlers { for _, h := range c.handlers {
if h.re.MatchString(action) { if h.re.MatchString(action) {
return h.handler, true return &h, true
} }
} }
return nil, false return nil, false
} }
// Subscribe allow to subscribe an action handler. // Subscribe allows to subscribe an action handler.
// By default it won't retry any failed event.
func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) error { func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) error {
return c.SubscribeWithOptions(action, handlerFn, time.Duration(0), false, 0)
}
// SubscribeWithOptions allows to subscribe an action handler with retry options.
func (c *Consumer) SubscribeWithOptions(action string, handlerFn messaging.EventHandler,
retryDelay time.Duration, delayProgression bool, maxRetries int32) error {
// TODO: Replace # pattern too. // TODO: Replace # pattern too.
pattern := strings.Replace(action, "*", "(.*)", 0) pattern := strings.Replace(action, "*", "(.*)", 0)
re, err := regexp.Compile(pattern) re, err := regexp.Compile(pattern)
@ -190,9 +297,12 @@ func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) er
} }
c.handlers = append(c.handlers, handler{ c.handlers = append(c.handlers, handler{
action, action: action,
handlerFn, fn: handlerFn,
re, re: re,
maxRetries: maxRetries,
retryDelay: retryDelay,
delayProgression: delayProgression,
}) })
return nil return nil
@ -230,9 +340,7 @@ func (c *Consumer) Unsubscribe(action string) error {
// Listen start to listen for new messages. // Listen start to listen for new messages.
func (c *Consumer) Consume() { func (c *Consumer) Consume() {
for !c.closed { for !c.closed {
log.WithFields(log.Fields{ logger.WithFields(log.Fields{
"type": "goevents",
"subType": "consumer",
"queue": c.queueName, "queue": c.queueName,
}).Debug("Setting up consumer channel...") }).Debug("Setting up consumer channel...")
@ -247,9 +355,7 @@ func (c *Consumer) Consume() {
) )
if err != nil { if err != nil {
log.WithFields(log.Fields{ logger.WithFields(log.Fields{
"type": "goevents",
"subType": "consumer",
"queue": c.queueName, "queue": c.queueName,
"error": err, "error": err,
}).Error("Error setting up consumer...") }).Error("Error setting up consumer...")
@ -259,21 +365,45 @@ func (c *Consumer) Consume() {
continue continue
} }
log.WithFields(log.Fields{ logger.WithFields(log.Fields{
"type": "goevents",
"subType": "consumer",
"queue": c.queueName, "queue": c.queueName,
}).Info("Consuming messages...") }).Info("Consuming messages...")
for m := range msgs { for m := range msgs {
c.dispatch(m) go c.dispatch(m)
} }
log.WithFields(log.Fields{ logger.WithFields(log.Fields{
"type": "goevents",
"subType": "consumer",
"queue": c.queueName, "queue": c.queueName,
"closed": c.closed, "closed": c.closed,
}).Info("Consumption finished") }).Info("Consumption finished.")
} }
} }
func getXRetryDeathHeader(msg amqplib.Delivery) (time.Time, bool) {
if d, ok := msg.Headers["x-retry-death"]; ok {
return d.(time.Time), true
}
return time.Time{}, false
}
func getXRetryCountHeader(msg amqplib.Delivery) (int32, bool) {
if c, ok := msg.Headers["x-retry-count"]; ok {
return c.(int32), true
}
return 0, false
}
func getXRetryDelayHeader(msg amqplib.Delivery) (time.Duration, bool) {
if d, ok := msg.Headers["x-retry-delay"]; ok {
f, ok := d.(int64)
if ok {
return time.Duration(f), true
}
}
return time.Duration(0), false
}

View File

@ -1,6 +1,7 @@
package amqp package amqp
import ( import (
"fmt"
"testing" "testing"
"time" "time"
@ -27,14 +28,14 @@ func TestPublishConsume(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action_1", func(body []byte) bool { c.Subscribe("my_action_1", func(body []byte) error {
func1 <- true func1 <- true
return true return nil
}) })
c.Subscribe("my_action_2", func(body []byte) bool { c.Subscribe("my_action_2", func(body []byte) error {
func2 <- true func2 <- true
return true return nil
}) })
go c.Consume() go c.Consume()
@ -74,14 +75,14 @@ func TestPublishConsumeWildcardAction(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) bool { c.Subscribe("webinar.*", func(body []byte) error {
func1 <- true func1 <- true
return true return nil
}) })
c.Subscribe("foobar.*", func(body []byte) bool { c.Subscribe("foobar.*", func(body []byte) error {
func2 <- true func2 <- true
return true return nil
}) })
go c.Consume() go c.Consume()
@ -121,14 +122,14 @@ func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) bool { c.Subscribe("webinar.*", func(body []byte) error {
func1 <- true func1 <- true
return true return nil
}) })
c.Subscribe("webinar.state_changed", func(body []byte) bool { c.Subscribe("webinar.state_changed", func(body []byte) error {
func2 <- true func2 <- true
return true return nil
}) })
go c.Consume() go c.Consume()
@ -168,14 +169,14 @@ func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.state_changed", func(body []byte) bool { c.Subscribe("webinar.state_changed", func(body []byte) error {
func1 <- true func1 <- true
return true return nil
}) })
c.Subscribe("webinar.*", func(body []byte) bool { c.Subscribe("webinar.*", func(body []byte) error {
func2 <- true func2 <- true
return true return nil
}) })
go c.Consume() go c.Consume()
@ -215,15 +216,15 @@ func TestPublishConsumeRequeueIfFail(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action", func(body []byte) bool { c.SubscribeWithOptions("my_action", func(body []byte) error {
if calledOnce { if calledOnce {
called <- true called <- true
return true return nil
} else { } else {
calledOnce = true calledOnce = true
return false return fmt.Errorf("Error.")
} }
}) }, 1*time.Second, false, 5)
go c.Consume() go c.Consume()
@ -260,15 +261,15 @@ func TestPublishConsumeRequeueIfPanic(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action", func(body []byte) bool { c.SubscribeWithOptions("my_action", func(body []byte) error {
if calledOnce { if calledOnce {
called <- true called <- true
return true return nil
} else { } else {
calledOnce = true calledOnce = true
panic("this is a panic!") panic("this is a panic!")
} }
}) }, 1*time.Second, false, 5)
go c.Consume() go c.Consume()

View File

@ -87,7 +87,7 @@ func (p *Producer) Close() {
func (p *Producer) setupTopology() error { func (p *Producer) setupTopology() error {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"type": "goevents", "type": "goevents",
"subType": "producer", "sub_type": "producer",
}).Debug("Setting up topology...") }).Debug("Setting up topology...")
p.m.Lock() p.m.Lock()
@ -125,8 +125,8 @@ func (p *Producer) setupTopology() error {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"type": "goevents", "type": "goevents",
"subType": "producer", "sub_type": "producer",
}).Debug("Topology ready") }).Debug("Topology ready.")
return nil return nil
} }
@ -143,7 +143,7 @@ func (p *Producer) handleReestablishedConnnection() {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"type": "amqp", "type": "amqp",
"error": err, "error": err,
}).Error("Error setting up topology after reconnection") }).Error("Error setting up topology after reconnection.")
} }
} }
} }
@ -151,7 +151,10 @@ func (p *Producer) handleReestablishedConnnection() {
func (p *Producer) drainInternalQueue() { func (p *Producer) drainInternalQueue() {
for m := range p.internalQueue { for m := range p.internalQueue {
for i := 0; !p.closed; i++ { for i := 0; !p.closed; i++ {
messageId, _ := NewUUIDv4()
msg := amqplib.Publishing{ msg := amqplib.Publishing{
MessageId: messageId,
DeliveryMode: amqplib.Persistent, DeliveryMode: amqplib.Persistent,
Timestamp: time.Now(), Timestamp: time.Now(),
Body: m.data, Body: m.data,
@ -163,9 +166,9 @@ func (p *Producer) drainInternalQueue() {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"type": "goevents", "type": "goevents",
"subType": "producer", "sub_type": "producer",
"attempt": i, "attempt": i,
}).Debug("Publishing message to the exchange") }).Debug("Publishing message to the exchange.")
return p.channel.Publish(p.exchangeName, m.action, false, false, msg) return p.channel.Publish(p.exchangeName, m.action, false, false, msg)
}() }()
@ -173,7 +176,7 @@ func (p *Producer) drainInternalQueue() {
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"type": "goevents", "type": "goevents",
"subType": "producer", "sub_type": "producer",
"error": err, "error": err,
"attempt": i, "attempt": i,
}).Error("Error publishing message to the exchange. Retrying...") }).Error("Error publishing message to the exchange. Retrying...")
@ -188,7 +191,7 @@ func (p *Producer) drainInternalQueue() {
case <-p.nackChannel: case <-p.nackChannel:
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"type": "goevents", "type": "goevents",
"subType": "producer", "sub_type": "producer",
"attempt": i, "attempt": i,
}).Error("Error publishing message to the exchange. Retrying...") }).Error("Error publishing message to the exchange. Retrying...")

22
amqp/uuid.go Normal file
View File

@ -0,0 +1,22 @@
package amqp
import (
"crypto/rand"
"fmt"
"io"
)
func NewUUIDv4() (string, error) {
var uuid [16]byte
_, err := io.ReadFull(rand.Reader, uuid[:])
if err != nil {
return "", err
}
uuid[6] = (uuid[6] & 0x0f) | 0x40 // Version 4
uuid[8] = (uuid[8] & 0x3f) | 0x80 // Variant is 10
return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil
}

View File

@ -1,12 +1,13 @@
package main package main
import ( import (
"fmt"
"os" "os"
"os/signal" "os/signal"
"sync" "sync"
"syscall" "syscall"
"time"
"fmt"
"github.com/eventials/goevents/amqp" "github.com/eventials/goevents/amqp"
) )
@ -23,30 +24,40 @@ func main() {
panic(err) panic(err)
} }
consumerA.Subscribe("object.eventA", func(body []byte) bool { consumerA.Subscribe("object.eventA", func(body []byte) error {
fmt.Println("object.eventA:", string(body)) fmt.Println("object.eventA:", string(body))
return true return nil
}) })
consumerA.Subscribe("object.eventB", func(body []byte) bool { consumerA.Subscribe("object.eventB", func(body []byte) error {
fmt.Println("object.eventB:", string(body)) fmt.Println("object.eventB:", string(body))
return true return nil
}) })
consumerA.SubscribeWithOptions("object.eventToRetryDelay", func(body []byte) error {
fmt.Println("object.eventToRetryDelay:", string(body))
return fmt.Errorf("Try again.")
}, 1*time.Second, true, 5)
consumerA.SubscribeWithOptions("object.eventToRetry", func(body []byte) error {
fmt.Println("object.eventToRetry:", string(body))
return fmt.Errorf("Try again.")
}, 1*time.Second, false, 10)
consumerB, err := conn.Consumer(false, "events-exchange", "events-queue-b") consumerB, err := conn.Consumer(false, "events-exchange", "events-queue-b")
if err != nil { if err != nil {
panic(err) panic(err)
} }
consumerB.Subscribe("object.eventC", func(body []byte) bool { consumerB.Subscribe("object.eventC", func(body []byte) error {
fmt.Println("object.eventC:", string(body)) fmt.Println("object.eventC:", string(body))
return true return nil
}) })
consumerB.Subscribe("object.eventD", func(body []byte) bool { consumerB.Subscribe("object.eventD", func(body []byte) error {
fmt.Println("object.eventD:", string(body)) fmt.Println("object.eventD:", string(body))
return true return nil
}) })
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -1,9 +1,15 @@
package messaging package messaging
type EventHandler func(body []byte) bool import (
"time"
)
type EventHandler func(body []byte) error
type Consumer interface { type Consumer interface {
Subscribe(action string, handler EventHandler) error Subscribe(action string, handler EventHandler) error
SubscribeWithOptions(action string, handlerFn EventHandler,
retryDelay time.Duration, delayPow2 bool, maxRetries int32) error
Unsubscribe(action string) error Unsubscribe(action string) error
Consume() Consume()
Close() Close()