1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-26 13:48:59 +08:00
eventials.goevents/amqp/consumer.go

567 lines
11 KiB
Go
Raw Normal View History

2016-12-01 10:52:22 -02:00
package amqp
import (
2017-10-10 17:49:53 -03:00
"errors"
2017-10-10 18:06:42 -03:00
"fmt"
"os"
2016-12-01 10:52:22 -02:00
"regexp"
"strings"
2017-03-09 17:32:05 -03:00
"sync"
"sync/atomic"
2017-03-09 17:32:05 -03:00
"time"
"github.com/eventials/goevents/messaging"
log "github.com/sirupsen/logrus"
amqplib "github.com/streadway/amqp"
2016-12-01 10:52:22 -02:00
)
2017-05-24 18:23:07 -03:00
var (
logger = log.WithFields(log.Fields{
"type": "goevents",
"sub_type": "consumer",
})
)
2016-12-01 10:52:22 -02:00
type handler struct {
2017-10-05 15:19:54 -03:00
action string
fn messaging.EventHandler
re *regexp.Regexp
maxRetries int32
retryDelay time.Duration
delayedRetry bool
2016-12-01 10:52:22 -02:00
}
2017-10-10 18:13:28 -03:00
type consumer struct {
2017-03-09 17:32:05 -03:00
config ConsumerConfig
2017-06-15 12:07:50 -03:00
m sync.RWMutex
2017-10-10 09:54:28 -03:00
wg sync.WaitGroup
2017-03-09 17:32:05 -03:00
2018-02-10 14:38:29 -02:00
conn *connection
2016-12-01 10:52:22 -02:00
autoAck bool
handlers []handler
2016-12-28 09:20:29 -02:00
exchangeName string
queueName string
closed bool
2017-03-09 17:32:05 -03:00
}
var consumerTagSeq uint64
2017-03-09 17:32:05 -03:00
// ConsumerConfig to be used when creating a new producer.
type ConsumerConfig struct {
2017-10-10 19:47:07 -03:00
ConsumeRetryInterval time.Duration
PrefetchCount int
2017-12-19 12:02:09 -02:00
PrefixName string
2016-12-01 10:52:22 -02:00
}
2016-12-01 16:17:55 -02:00
// NewConsumer returns a new AMQP Consumer.
2017-03-09 17:32:05 -03:00
// Uses a default ConsumerConfig with 2 second of consume retry interval.
2017-10-10 18:13:28 -03:00
func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (*consumer, error) {
return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{
2017-10-10 19:47:07 -03:00
ConsumeRetryInterval: 2 * time.Second,
PrefetchCount: 0,
2017-03-09 17:32:05 -03:00
})
}
2016-12-01 10:52:22 -02:00
func createUniqueConsumerTagName() string {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
return fmt.Sprintf("ctag-%s-%s-%d", hostname, os.Args[0], atomic.AddUint64(&consumerTagSeq, 1))
}
// NewConsumerConfig returns a new AMQP Consumer.
func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue string, config ConsumerConfig) (*consumer, error) {
2017-10-10 18:13:28 -03:00
consumer := &consumer{
config: config,
2018-02-10 14:38:29 -02:00
conn: c.(*connection),
autoAck: autoAck,
handlers: make([]handler, 0),
exchangeName: exchange,
queueName: queue,
2017-03-09 17:32:05 -03:00
}
return consumer, nil
2017-10-10 18:06:42 -03:00
2017-03-09 17:32:05 -03:00
}
2017-10-10 18:13:28 -03:00
func (c *consumer) Close() {
2017-10-10 09:54:28 -03:00
func() {
c.m.Lock()
defer c.m.Unlock()
// Unsubscribe all handlers
c.handlers = make([]handler, 0)
2017-10-09 19:33:27 -03:00
2017-10-10 09:54:28 -03:00
c.closed = true
}()
2017-10-09 19:33:27 -03:00
2017-10-10 09:54:28 -03:00
// Wait all go routine finish.
c.wg.Wait()
2017-03-09 17:32:05 -03:00
}
2017-12-19 12:02:09 -02:00
func (c *consumer) uniqueNameWithPrefix() string {
2017-12-19 12:15:41 -02:00
return fmt.Sprintf("%s%d", c.config.PrefixName, time.Now().UnixNano())
2017-12-19 12:02:09 -02:00
}
2017-10-10 18:13:28 -03:00
func (c *consumer) dispatch(msg amqplib.Delivery) {
if h, ok := c.getHandler(msg); ok {
2017-06-22 16:33:59 -03:00
delay, isRetry := getXRetryDelayHeader(msg)
2017-05-24 18:23:07 -03:00
2017-10-10 17:49:53 -03:00
if isRetry {
2017-10-10 18:55:26 -03:00
logger.WithFields(log.Fields{
"delay": delay.String(),
"message_id": msg.MessageId,
2018-05-04 09:55:55 -03:00
}).Debug("Delaying message.")
2017-10-10 18:55:26 -03:00
time.Sleep(delay)
2017-05-24 18:23:07 -03:00
}
retryCount, _ := getXRetryCountHeader(msg)
2017-06-22 16:33:59 -03:00
c.doDispatch(msg, h, retryCount, delay)
2016-12-01 10:52:22 -02:00
} else {
2017-05-24 18:23:07 -03:00
if !c.autoAck {
2017-10-09 19:33:27 -03:00
err := msg.Nack(false, true)
if err != nil {
logger.WithFields(log.Fields{
"error": err,
"message_id": msg.MessageId,
}).Error("Failed to nack message.")
}
2017-05-24 18:23:07 -03:00
}
2016-12-01 10:52:22 -02:00
}
}
2017-10-10 18:13:28 -03:00
func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err error) {
2017-10-10 17:49:53 -03:00
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case string:
err = errors.New(x)
case error:
err = x
default:
err = errors.New("Unknown panic")
}
}
}()
err = h.fn(messaging.Event{
2018-04-24 15:41:26 -03:00
Id: msg.MessageId,
Action: h.action,
Body: msg.Body,
Timestamp: msg.Timestamp,
2017-10-05 15:19:54 -03:00
})
2017-06-22 16:33:59 -03:00
2017-10-10 17:49:53 -03:00
return
}
2017-10-10 18:13:28 -03:00
func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
2017-10-10 17:49:53 -03:00
err := c.callAndHandlePanic(msg, h)
2017-10-30 16:51:47 -02:00
if err == nil {
2017-12-19 16:05:18 -02:00
logger.WithFields(log.Fields{
"action": h.action,
"body": string(msg.Body),
"message_id": msg.MessageId,
2018-05-04 09:55:55 -03:00
}).Debug("Message handled successfully.")
2017-12-19 16:05:18 -02:00
2017-10-30 16:51:47 -02:00
if !c.autoAck {
msg.Ack(false)
}
} else {
2017-06-22 16:33:59 -03:00
if h.maxRetries > 0 {
if retryCount >= h.maxRetries {
logger.WithFields(log.Fields{
2017-10-30 16:51:47 -02:00
"action": h.action,
2017-12-19 16:01:48 -02:00
"body": string(msg.Body),
2017-10-30 16:51:47 -02:00
"error": err,
"message_id": msg.MessageId,
2017-06-22 16:33:59 -03:00
}).Error("Maximum retries reached. Giving up.")
if !c.autoAck {
msg.Ack(false)
}
} else {
logger.WithFields(log.Fields{
2017-10-30 16:51:47 -02:00
"action": h.action,
2017-12-19 16:01:48 -02:00
"body": string(msg.Body),
2017-06-22 16:33:59 -03:00
"error": err,
"message_id": msg.MessageId,
}).Error("Failed to process event. Retrying...")
c.retryMessage(msg, h, retryCount, delay)
}
} else {
logger.WithFields(log.Fields{
2017-10-30 16:51:47 -02:00
"action": h.action,
2017-12-19 16:01:48 -02:00
"body": string(msg.Body),
2017-06-22 16:33:59 -03:00
"error": err,
"message_id": msg.MessageId,
}).Error("Failed to process event.")
}
}
}
2017-10-10 18:13:28 -03:00
func (c *consumer) publishMessage(msg amqplib.Publishing, queue string) error {
2018-02-10 14:38:29 -02:00
channel, err := c.conn.openChannel()
2017-10-10 18:06:42 -03:00
if err != nil {
return err
}
defer channel.Close()
if err := channel.Confirm(false); err != nil {
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}
confirms := channel.NotifyPublish(make(chan amqplib.Confirmation, 1))
err = channel.Publish("", queue, false, false, msg)
if err != nil {
return err
} else {
if confirmed := <-confirms; !confirmed.Ack {
return ErrNotAcked
}
}
return nil
}
2017-10-10 18:13:28 -03:00
func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
2017-10-10 19:47:07 -03:00
if delay > 0 {
if h.delayedRetry {
delay *= 2
}
} else {
delay = h.retryDelay
2017-05-24 18:23:07 -03:00
}
retryMsg := amqplib.Publishing{
Headers: amqplib.Table{
"x-retry-count": retryCount + 1,
"x-retry-max": h.maxRetries,
2017-10-10 18:55:26 -03:00
"x-retry-delay": delay.String(),
"x-action-key": getAction(msg),
2017-05-24 18:23:07 -03:00
},
2018-04-24 15:41:26 -03:00
Timestamp: msg.Timestamp,
DeliveryMode: msg.DeliveryMode,
2017-05-24 18:23:07 -03:00
Body: msg.Body,
MessageId: msg.MessageId,
}
2017-10-10 18:06:42 -03:00
err := c.publishMessage(retryMsg, c.queueName)
2017-06-22 16:33:59 -03:00
2017-10-10 18:06:42 -03:00
if err != nil {
logger.WithFields(log.Fields{
"error": err,
}).Error("Failed to retry.")
if !c.autoAck {
msg.Nack(false, true)
}
} else if !c.autoAck {
2017-06-22 16:33:59 -03:00
msg.Ack(false)
}
}
2017-10-10 18:13:28 -03:00
func (c *consumer) getHandler(msg amqplib.Delivery) (*handler, bool) {
c.m.RLock()
defer c.m.RUnlock()
2017-10-09 19:33:27 -03:00
action := getAction(msg)
2016-12-01 10:52:22 -02:00
for _, h := range c.handlers {
if h.re.MatchString(action) {
2017-05-24 18:23:07 -03:00
return &h, true
2016-12-01 10:52:22 -02:00
}
}
return nil, false
}
2017-05-24 18:23:07 -03:00
// Subscribe allows to subscribe an action handler.
2017-10-10 18:13:28 -03:00
func (c *consumer) Subscribe(action string, handlerFn messaging.EventHandler, options *messaging.SubscribeOptions) error {
2016-12-01 10:52:22 -02:00
// TODO: Replace # pattern too.
2017-10-05 16:22:28 -03:00
pattern := strings.Replace(action, "*", "(.*)", 0)
2016-12-01 10:52:22 -02:00
re, err := regexp.Compile(pattern)
if err != nil {
return err
}
2017-10-05 16:22:28 -03:00
if options == nil {
options = &messaging.SubscribeOptions{
RetryDelay: time.Duration(0),
DelayedRetry: false,
MaxRetries: 0,
}
}
c.m.Lock()
defer c.m.Unlock()
2016-12-01 10:52:22 -02:00
c.handlers = append(c.handlers, handler{
2017-10-05 16:22:28 -03:00
action: action,
fn: handlerFn,
2017-10-05 15:19:54 -03:00
re: re,
maxRetries: options.MaxRetries,
retryDelay: options.RetryDelay,
delayedRetry: options.DelayedRetry,
2016-12-01 10:52:22 -02:00
})
return nil
}
2016-12-01 16:17:55 -02:00
// Unsubscribe allows to unsubscribe an action handler.
2017-10-10 18:13:28 -03:00
func (c *consumer) Unsubscribe(action string) error {
idx := -1
c.m.Lock()
defer c.m.Unlock()
for i, h := range c.handlers {
if h.action == action {
idx = i
break
}
}
if idx != -1 {
c.handlers = append(c.handlers[:idx], c.handlers[idx+1:]...)
}
return nil
}
func (c *consumer) BindActions(actions ...string) error {
2018-02-10 14:38:29 -02:00
channel, err := c.conn.openChannel()
if err != nil {
return err
}
defer channel.Close()
for _, action := range actions {
err := c.bindActionToQueue(channel, c.queueName, action)
if err != nil {
return err
}
}
return nil
}
func (c *consumer) UnbindActions(actions ...string) error {
channel, err := c.conn.openChannel()
2016-12-01 10:52:22 -02:00
if err != nil {
return err
}
defer channel.Close()
2016-12-01 10:52:22 -02:00
for _, action := range actions {
err := channel.QueueUnbind(
c.queueName, // queue name
action, // routing key
c.exchangeName, // exchange
nil, // arguments
)
2016-12-01 10:52:22 -02:00
if err != nil {
return err
}
2016-12-01 10:52:22 -02:00
}
return nil
}
2018-02-12 10:08:18 -02:00
func (c *consumer) bindActionToQueue(channel *amqplib.Channel, queueName string, action string) error {
return channel.QueueBind(
queueName, // queue name
action, // routing key
c.exchangeName, // exchange
false, // no-wait
nil, // arguments
)
}
func (c *consumer) bindAllActionsQueue(channel *amqplib.Channel, queueName string) error {
for _, h := range c.handlers {
err := c.bindActionToQueue(channel, queueName, h.action)
if err != nil {
return err
}
}
return nil
}
2018-02-12 00:54:10 -02:00
func (c *consumer) setupTopology(channel *amqplib.Channel) (err error) {
err = channel.Qos(c.config.PrefetchCount, 0, true)
if err != nil {
return err
}
err = channel.ExchangeDeclare(
c.exchangeName, // name
"topic", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
durable := true
exclusive := false
if c.config.PrefixName != "" {
c.queueName = c.uniqueNameWithPrefix()
durable = false
exclusive = true
}
_, err = channel.QueueDeclare(
c.queueName, // name
durable, // durable
false, // auto-delete
exclusive, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
2018-02-12 10:08:18 -02:00
return c.bindAllActionsQueue(channel, c.queueName)
2018-02-12 00:54:10 -02:00
}
func (c *consumer) doConsume() error {
logger.WithFields(log.Fields{
"queue": c.queueName,
}).Debug("Setting up consumer channel...")
2018-02-10 14:38:29 -02:00
channel, err := c.conn.openChannel()
if err != nil {
return err
}
defer channel.Close()
err = c.setupTopology(channel)
2018-02-10 14:38:29 -02:00
if err != nil {
return err
}
msgs, err := channel.Consume(
c.queueName, // queue
createUniqueConsumerTagName(), // consumer
c.autoAck, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
return err
}
logger.WithFields(log.Fields{
"queue": c.queueName,
}).Info("Consuming messages...")
for m := range msgs {
c.wg.Add(1)
go func(msg amqplib.Delivery) {
c.dispatch(msg)
c.wg.Done()
}(m)
}
return nil
}
2016-12-01 16:17:55 -02:00
// Listen start to listen for new messages.
2017-10-10 18:13:28 -03:00
func (c *consumer) Consume() {
2017-10-03 16:50:41 -03:00
logger.Info("Registered handlers:")
for _, handler := range c.handlers {
logger.Infof(" %s", handler.action)
}
rs := c.conn.NotifyReestablish()
2017-03-09 17:32:05 -03:00
for !c.closed {
2017-12-19 15:12:30 -02:00
if !c.conn.IsConnected() {
logger.Info("Connection not established. Waiting connection to be reestablished.")
2017-12-19 15:12:30 -02:00
<-rs
2017-12-19 15:12:30 -02:00
continue
}
err := c.doConsume()
2017-03-09 17:32:05 -03:00
if err == nil {
logger.WithFields(log.Fields{
"queue": c.queueName,
"closed": c.closed,
}).Info("Consumption finished.")
} else {
2017-05-24 18:23:07 -03:00
logger.WithFields(log.Fields{
"queue": c.queueName,
"error": err,
}).Error("Error consuming events.")
2017-03-09 17:32:05 -03:00
}
2017-05-24 18:23:07 -03:00
}
}
func getAction(msg amqplib.Delivery) string {
if ac, ok := msg.Headers["x-action-key"]; ok {
return ac.(string)
} else {
return msg.RoutingKey
}
}
2017-05-24 18:23:07 -03:00
func getXRetryCountHeader(msg amqplib.Delivery) (int32, bool) {
if c, ok := msg.Headers["x-retry-count"]; ok {
return c.(int32), true
}
return 0, false
}
func getXRetryDelayHeader(msg amqplib.Delivery) (time.Duration, bool) {
if d, ok := msg.Headers["x-retry-delay"]; ok {
2017-10-10 18:55:26 -03:00
t, err := time.ParseDuration(d.(string))
2017-10-10 19:22:58 -03:00
if err == nil {
2017-10-10 18:55:26 -03:00
return t, true
2017-05-24 18:23:07 -03:00
}
}
return time.Duration(0), false
2016-12-01 10:52:22 -02:00
}