1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-26 13:48:59 +08:00
eventials.goevents/amqp/consumer.go

486 lines
10 KiB
Go
Raw Normal View History

2016-12-01 10:52:22 -02:00
package amqp
import (
"regexp"
"strings"
2017-03-09 17:32:05 -03:00
"sync"
"time"
2016-12-01 10:52:22 -02:00
2016-12-01 11:40:58 -02:00
"github.com/eventials/goevents/messaging"
2017-03-09 17:32:05 -03:00
log "github.com/Sirupsen/logrus"
2016-12-01 10:52:22 -02:00
amqplib "github.com/streadway/amqp"
)
2017-05-24 18:23:07 -03:00
const (
MaxInt32 = 1<<31 - 1
MaxRetries = MaxInt32
)
var (
logger = log.WithFields(log.Fields{
"type": "goevents",
"sub_type": "consumer",
})
)
2016-12-01 10:52:22 -02:00
type handler struct {
2017-05-24 18:23:07 -03:00
action string
fn messaging.EventHandler
re *regexp.Regexp
maxRetries int32
retryDelay time.Duration
delayProgression bool
2016-12-01 10:52:22 -02:00
}
type Consumer struct {
2017-03-09 17:32:05 -03:00
config ConsumerConfig
2017-06-15 12:07:50 -03:00
m sync.Mutex
2017-03-09 17:32:05 -03:00
2016-12-01 10:52:22 -02:00
conn *Connection
autoAck bool
handlers []handler
2016-12-28 09:20:29 -02:00
2017-05-24 18:23:07 -03:00
channel *amqplib.Channel
queue *amqplib.Queue
retryQueue *amqplib.Queue
2016-12-28 09:20:29 -02:00
exchangeName string
queueName string
2017-03-09 17:32:05 -03:00
closed bool
}
// ConsumerConfig to be used when creating a new producer.
type ConsumerConfig struct {
2017-06-22 16:33:59 -03:00
ConsumeRetryInterval time.Duration
PrefetchCount int
RetryTimeoutBeforeRequeue time.Duration
2016-12-01 10:52:22 -02:00
}
2016-12-01 16:17:55 -02:00
// NewConsumer returns a new AMQP Consumer.
2017-03-09 17:32:05 -03:00
// Uses a default ConsumerConfig with 2 second of consume retry interval.
2016-12-28 09:20:29 -02:00
func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (messaging.Consumer, error) {
return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{
2017-06-22 16:33:59 -03:00
ConsumeRetryInterval: 2 * time.Second,
PrefetchCount: 0,
RetryTimeoutBeforeRequeue: 30 * time.Second,
2017-03-09 17:32:05 -03:00
})
}
2016-12-01 10:52:22 -02:00
// NewConsumerConfig returns a new AMQP Consumer.
func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue string, config ConsumerConfig) (messaging.Consumer, error) {
2017-03-09 17:32:05 -03:00
consumer := &Consumer{
config: config,
conn: c.(*Connection),
autoAck: autoAck,
handlers: make([]handler, 0),
exchangeName: exchange,
queueName: queue,
}
err := consumer.setupTopology()
go consumer.handleReestablishedConnnection()
return consumer, err
}
func (c *Consumer) Close() {
c.closed = true
c.channel.Close()
}
func (c *Consumer) setupTopology() error {
c.m.Lock()
defer c.m.Unlock()
var err error
c.channel, err = c.conn.OpenChannel()
2016-12-28 09:20:29 -02:00
if err != nil {
2017-03-09 17:32:05 -03:00
return err
2016-12-28 09:20:29 -02:00
}
2017-06-15 16:28:08 -03:00
err = c.channel.Qos(c.config.PrefetchCount, 0, true)
if err != nil {
return err
}
2017-03-09 17:32:05 -03:00
err = c.channel.ExchangeDeclare(
c.exchangeName, // name
"topic", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
2016-12-28 09:20:29 -02:00
)
if err != nil {
2017-03-09 17:32:05 -03:00
return err
2016-12-28 09:20:29 -02:00
}
2017-03-09 17:32:05 -03:00
q, err := c.channel.QueueDeclare(
c.queueName, // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // arguments
2016-12-28 09:20:29 -02:00
)
if err != nil {
2017-03-09 17:32:05 -03:00
return err
2016-12-28 09:20:29 -02:00
}
2017-05-24 18:23:07 -03:00
c.queue = &q
2017-03-09 17:32:05 -03:00
return nil
2016-12-01 10:52:22 -02:00
}
2017-03-09 17:32:05 -03:00
func (c *Consumer) handleReestablishedConnnection() {
for !c.closed {
<-c.conn.NotifyReestablish()
err := c.setupTopology()
if err != nil {
2017-05-24 18:23:07 -03:00
logger.WithFields(log.Fields{
"error": err,
}).Error("Error setting up topology after reconnection.")
2017-03-09 17:32:05 -03:00
}
}
2016-12-28 09:20:29 -02:00
}
2016-12-01 10:52:22 -02:00
func (c *Consumer) dispatch(msg amqplib.Delivery) {
if h, ok := c.getHandler(msg); ok {
2017-06-22 16:33:59 -03:00
delay, isRetry := getXRetryDelayHeader(msg)
2017-05-24 18:23:07 -03:00
2017-06-22 16:33:59 -03:00
if !isRetry {
2017-05-24 18:23:07 -03:00
delay = h.retryDelay
}
retryCount, _ := getXRetryCountHeader(msg)
2016-12-01 10:52:22 -02:00
defer func() {
if err := recover(); err != nil {
2017-05-24 18:23:07 -03:00
if h.maxRetries > 0 {
2017-06-22 16:33:59 -03:00
c.retryMessage(msg, h, retryCount, delay)
2017-05-24 18:23:07 -03:00
} else {
logger.WithFields(log.Fields{
"error": err,
"message_id": msg.MessageId,
}).Error("Failed to process event.")
if !c.autoAck {
msg.Ack(false)
}
2016-12-01 10:52:22 -02:00
}
}
}()
2017-06-22 16:33:59 -03:00
death, isRetry := getXRetryDeathHeader(msg)
2016-12-01 10:52:22 -02:00
2017-06-22 16:33:59 -03:00
if isRetry {
2017-05-24 18:23:07 -03:00
since := time.Since(death)
2017-06-22 16:33:59 -03:00
tts := delay - since
2017-05-24 18:23:07 -03:00
2017-06-22 16:33:59 -03:00
logger.WithFields(log.Fields{
"max_retries": h.maxRetries,
"message_id": msg.MessageId,
"retry_in_millis": tts,
"retry_timeout": c.config.RetryTimeoutBeforeRequeue,
}).Info("Retrying message")
2017-05-24 18:23:07 -03:00
2017-06-22 16:33:59 -03:00
if since < delay {
select {
case <-time.After(c.config.RetryTimeoutBeforeRequeue):
2017-05-24 18:23:07 -03:00
logger.WithFields(log.Fields{
"max_retries": h.maxRetries,
"message_id": msg.MessageId,
2017-06-22 16:33:59 -03:00
}).Info("Requeue message")
2017-05-24 18:23:07 -03:00
2017-06-22 16:33:59 -03:00
c.requeueMessage(msg)
case <-time.After(tts):
2017-05-24 18:23:07 -03:00
logger.WithFields(log.Fields{
2017-06-22 16:33:59 -03:00
"max_retries": h.maxRetries,
"message_id": msg.MessageId,
}).Info("dispathing retry message")
2017-05-24 18:23:07 -03:00
2017-06-22 16:33:59 -03:00
c.doDispatch(msg, h, retryCount, delay)
2017-05-24 18:23:07 -03:00
}
2017-06-22 16:33:59 -03:00
return
2016-12-01 10:52:22 -02:00
}
}
2017-06-22 16:33:59 -03:00
c.doDispatch(msg, h, retryCount, delay)
2016-12-01 10:52:22 -02:00
} else {
// got wrong message?
2016-12-01 10:52:22 -02:00
// ignore and don't requeue.
2017-05-24 18:23:07 -03:00
if !c.autoAck {
msg.Nack(false, false)
}
2016-12-01 10:52:22 -02:00
}
}
2017-06-22 16:33:59 -03:00
func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
err := h.fn(msg.Body)
if err != nil {
if h.maxRetries > 0 {
if retryCount >= h.maxRetries {
logger.WithFields(log.Fields{
"max_retries": h.maxRetries,
"message_id": msg.MessageId,
}).Error("Maximum retries reached. Giving up.")
if !c.autoAck {
msg.Ack(false)
}
} else {
logger.WithFields(log.Fields{
"error": err,
"message_id": msg.MessageId,
}).Error("Failed to process event. Retrying...")
c.retryMessage(msg, h, retryCount, delay)
}
} else {
logger.WithFields(log.Fields{
"error": err,
"message_id": msg.MessageId,
}).Error("Failed to process event.")
}
} else if !c.autoAck {
msg.Ack(false)
}
}
func (c *Consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
2017-05-24 18:23:07 -03:00
delayNs := delay.Nanoseconds()
if h.delayProgression {
delayNs *= 2
}
retryMsg := amqplib.Publishing{
Headers: amqplib.Table{
"x-retry-death": time.Now().UTC(),
"x-retry-count": retryCount + 1,
"x-retry-max": h.maxRetries,
"x-retry-delay": delayNs,
"x-action-key": getAction(msg),
2017-05-24 18:23:07 -03:00
},
Timestamp: time.Now(),
DeliveryMode: msg.DeliveryMode,
2017-05-24 18:23:07 -03:00
Body: msg.Body,
MessageId: msg.MessageId,
}
err := c.channel.Publish("", c.queueName, false, false, retryMsg)
2017-05-24 18:23:07 -03:00
if err != nil {
logger.WithFields(log.Fields{
"error": err,
}).Error("Failed to retry.")
if !c.autoAck {
msg.Nack(false, true)
}
} else if !c.autoAck {
msg.Ack(false)
}
}
2017-06-22 16:33:59 -03:00
func (c *Consumer) requeueMessage(msg amqplib.Delivery) {
retryMsg := amqplib.Publishing{
Headers: msg.Headers,
Timestamp: msg.Timestamp,
DeliveryMode: msg.DeliveryMode,
Body: msg.Body,
MessageId: msg.MessageId,
}
err := c.channel.Publish("", c.queueName, false, false, retryMsg)
if err != nil {
logger.WithFields(log.Fields{
"messageId": msg.MessageId,
"error": err,
}).Error("Failed to requeue.")
if !c.autoAck {
msg.Nack(false, true)
}
} else if !c.autoAck {
msg.Ack(false)
}
}
func (c *Consumer) getHandler(msg amqplib.Delivery) (*handler, bool) {
action := getAction(msg)
2016-12-01 10:52:22 -02:00
for _, h := range c.handlers {
if h.re.MatchString(action) {
2017-05-24 18:23:07 -03:00
return &h, true
2016-12-01 10:52:22 -02:00
}
}
return nil, false
}
2017-05-24 18:23:07 -03:00
// Subscribe allows to subscribe an action handler.
// By default it won't retry any failed event.
2016-12-01 11:40:58 -02:00
func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) error {
2017-05-24 18:23:07 -03:00
return c.SubscribeWithOptions(action, handlerFn, time.Duration(0), false, 0)
}
// SubscribeWithOptions allows to subscribe an action handler with retry options.
func (c *Consumer) SubscribeWithOptions(action string, handlerFn messaging.EventHandler,
retryDelay time.Duration, delayProgression bool, maxRetries int32) error {
2016-12-01 10:52:22 -02:00
// TODO: Replace # pattern too.
pattern := strings.Replace(action, "*", "(.*)", 0)
re, err := regexp.Compile(pattern)
if err != nil {
return err
}
2016-12-28 09:20:29 -02:00
err = c.channel.QueueBind(
c.queueName, // queue name
action, // routing key
c.exchangeName, // exchange
false, // no-wait
nil, // arguments
2016-12-01 10:52:22 -02:00
)
if err != nil {
return err
}
c.handlers = append(c.handlers, handler{
2017-05-24 18:23:07 -03:00
action: action,
fn: handlerFn,
re: re,
maxRetries: maxRetries,
retryDelay: retryDelay,
delayProgression: delayProgression,
2016-12-01 10:52:22 -02:00
})
return nil
}
2016-12-01 16:17:55 -02:00
// Unsubscribe allows to unsubscribe an action handler.
2016-12-01 10:52:22 -02:00
func (c *Consumer) Unsubscribe(action string) error {
2016-12-28 09:20:29 -02:00
err := c.channel.QueueUnbind(
c.queueName, // queue name
action, // routing key
c.exchangeName, // exchange
nil, // arguments
2016-12-01 10:52:22 -02:00
)
if err != nil {
return err
}
idx := -1
for i, h := range c.handlers {
if h.action == action {
idx = i
break
}
}
if idx != -1 {
c.handlers = append(c.handlers[:idx], c.handlers[idx+1:]...)
}
return nil
}
2016-12-01 16:17:55 -02:00
// Listen start to listen for new messages.
2017-03-09 17:32:05 -03:00
func (c *Consumer) Consume() {
for !c.closed {
2017-05-24 18:23:07 -03:00
logger.WithFields(log.Fields{
"queue": c.queueName,
}).Debug("Setting up consumer channel...")
2017-03-09 17:32:05 -03:00
msgs, err := c.channel.Consume(
c.queueName, // queue
"", // consumer
c.autoAck, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
2017-05-24 18:23:07 -03:00
logger.WithFields(log.Fields{
"queue": c.queueName,
"error": err,
}).Error("Error setting up consumer...")
2017-03-09 17:32:05 -03:00
2017-06-15 12:07:50 -03:00
time.Sleep(c.config.ConsumeRetryInterval)
2017-03-09 17:32:05 -03:00
continue
}
2016-12-01 10:52:22 -02:00
2017-05-24 18:23:07 -03:00
logger.WithFields(log.Fields{
"queue": c.queueName,
2017-03-09 17:32:05 -03:00
}).Info("Consuming messages...")
2016-12-01 10:52:22 -02:00
2017-03-09 17:32:05 -03:00
for m := range msgs {
2017-06-15 16:28:08 -03:00
go c.dispatch(m)
2017-03-09 17:32:05 -03:00
}
2016-12-01 10:52:22 -02:00
2017-05-24 18:23:07 -03:00
logger.WithFields(log.Fields{
"queue": c.queueName,
"closed": c.closed,
}).Info("Consumption finished.")
}
}
func getAction(msg amqplib.Delivery) string {
if ac, ok := msg.Headers["x-action-key"]; ok {
return ac.(string)
} else {
return msg.RoutingKey
}
}
2017-05-24 18:23:07 -03:00
func getXRetryDeathHeader(msg amqplib.Delivery) (time.Time, bool) {
if d, ok := msg.Headers["x-retry-death"]; ok {
return d.(time.Time), true
2017-03-09 17:32:05 -03:00
}
2017-05-24 18:23:07 -03:00
return time.Time{}, false
}
func getXRetryCountHeader(msg amqplib.Delivery) (int32, bool) {
if c, ok := msg.Headers["x-retry-count"]; ok {
return c.(int32), true
}
return 0, false
}
func getXRetryDelayHeader(msg amqplib.Delivery) (time.Duration, bool) {
if d, ok := msg.Headers["x-retry-delay"]; ok {
f, ok := d.(int64)
if ok {
return time.Duration(f), true
}
}
return time.Duration(0), false
2016-12-01 10:52:22 -02:00
}