1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00
eventials.goevents/amqp/consumer.go

609 lines
12 KiB
Go
Raw Permalink Normal View History

2016-12-01 10:52:22 -02:00
package amqp
import (
2017-10-10 17:49:53 -03:00
"errors"
2017-10-10 18:06:42 -03:00
"fmt"
"os"
2016-12-01 10:52:22 -02:00
"regexp"
"runtime/debug"
2016-12-01 10:52:22 -02:00
"strings"
2017-03-09 17:32:05 -03:00
"sync"
"sync/atomic"
2017-03-09 17:32:05 -03:00
"time"
"github.com/eventials/goevents/messaging"
2019-05-20 16:45:16 -03:00
"github.com/sirupsen/logrus"
amqplib "github.com/streadway/amqp"
2016-12-01 10:52:22 -02:00
)
2017-05-24 18:23:07 -03:00
var (
2019-05-20 16:45:16 -03:00
logger = logrus.WithFields(logrus.Fields{
2017-05-24 18:23:07 -03:00
"type": "goevents",
"sub_type": "consumer",
})
)
2016-12-01 10:52:22 -02:00
type handler struct {
2017-10-05 15:19:54 -03:00
action string
fn messaging.EventHandler
re *regexp.Regexp
maxRetries int32
retryDelay time.Duration
delayedRetry bool
2019-06-24 18:56:34 -03:00
manualMode bool
}
func (h *handler) manual() {
h.manualMode = true
2016-12-01 10:52:22 -02:00
}
2017-10-10 18:13:28 -03:00
type consumer struct {
2017-03-09 17:32:05 -03:00
config ConsumerConfig
2017-06-15 12:07:50 -03:00
m sync.RWMutex
2017-10-10 09:54:28 -03:00
wg sync.WaitGroup
2017-03-09 17:32:05 -03:00
2018-02-10 14:38:29 -02:00
conn *connection
2016-12-01 10:52:22 -02:00
autoAck bool
handlers []handler
2016-12-28 09:20:29 -02:00
exchangeName string
queueName string
closed bool
2017-03-09 17:32:05 -03:00
}
var consumerTagSeq uint64
2017-03-09 17:32:05 -03:00
// ConsumerConfig to be used when creating a new producer.
type ConsumerConfig struct {
2017-10-10 19:47:07 -03:00
ConsumeRetryInterval time.Duration
2020-02-29 15:09:53 -03:00
MaxRetryDelay time.Duration
2017-10-10 19:47:07 -03:00
PrefetchCount int
2017-12-19 12:02:09 -02:00
PrefixName string
2016-12-01 10:52:22 -02:00
}
2016-12-01 16:17:55 -02:00
// NewConsumer returns a new AMQP Consumer.
2017-03-09 17:32:05 -03:00
// Uses a default ConsumerConfig with 2 second of consume retry interval.
2017-10-10 18:13:28 -03:00
func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (*consumer, error) {
return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{
2017-10-10 19:47:07 -03:00
ConsumeRetryInterval: 2 * time.Second,
2020-02-29 15:09:53 -03:00
MaxRetryDelay: 5 * time.Minute,
2017-10-10 19:47:07 -03:00
PrefetchCount: 0,
2017-03-09 17:32:05 -03:00
})
}
2016-12-01 10:52:22 -02:00
func createUniqueConsumerTagName() string {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
return fmt.Sprintf("ctag-%s-%s-%d", hostname, os.Args[0], atomic.AddUint64(&consumerTagSeq, 1))
}
// NewConsumerConfig returns a new AMQP Consumer.
func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue string, config ConsumerConfig) (*consumer, error) {
2017-10-10 18:13:28 -03:00
consumer := &consumer{
config: config,
2018-02-10 14:38:29 -02:00
conn: c.(*connection),
autoAck: autoAck,
handlers: make([]handler, 0),
exchangeName: exchange,
queueName: queue,
2017-03-09 17:32:05 -03:00
}
return consumer, nil
2017-10-10 18:06:42 -03:00
2017-03-09 17:32:05 -03:00
}
func (c *consumer) closeAndClearHandlers() {
c.m.Lock()
defer c.m.Unlock()
2017-10-10 09:54:28 -03:00
// Unsubscribe all handlers
c.handlers = make([]handler, 0)
2017-10-09 19:33:27 -03:00
c.closed = true
}
func (c *consumer) Close() {
c.closeAndClearHandlers()
2017-10-09 19:33:27 -03:00
2017-10-10 09:54:28 -03:00
// Wait all go routine finish.
c.wg.Wait()
2017-03-09 17:32:05 -03:00
}
2017-12-19 12:02:09 -02:00
func (c *consumer) uniqueNameWithPrefix() string {
2017-12-19 12:15:41 -02:00
return fmt.Sprintf("%s%d", c.config.PrefixName, time.Now().UnixNano())
2017-12-19 12:02:09 -02:00
}
2017-10-10 18:13:28 -03:00
func (c *consumer) dispatch(msg amqplib.Delivery) {
if h, ok := c.getHandler(msg); ok {
2017-06-22 16:33:59 -03:00
delay, isRetry := getXRetryDelayHeader(msg)
2017-05-24 18:23:07 -03:00
2017-10-10 17:49:53 -03:00
if isRetry {
2020-02-29 15:09:53 -03:00
if delay > c.config.MaxRetryDelay {
delay = c.config.MaxRetryDelay
}
2019-05-20 16:45:16 -03:00
logger.WithFields(logrus.Fields{
2017-10-10 18:55:26 -03:00
"delay": delay.String(),
"message_id": msg.MessageId,
2018-05-04 09:55:55 -03:00
}).Debug("Delaying message.")
2017-10-10 18:55:26 -03:00
time.Sleep(delay)
2017-05-24 18:23:07 -03:00
}
retryCount, _ := getXRetryCountHeader(msg)
2017-06-22 16:33:59 -03:00
c.doDispatch(msg, h, retryCount, delay)
2019-06-24 18:56:34 -03:00
return
}
if !c.autoAck {
err := msg.Nack(false, true)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err,
"message_id": msg.MessageId,
}).Error("Failed to nack message.")
2017-05-24 18:23:07 -03:00
}
2016-12-01 10:52:22 -02:00
}
}
2017-10-10 18:13:28 -03:00
func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err error) {
2017-10-10 17:49:53 -03:00
defer func() {
if r := recover(); r != nil {
debug.PrintStack()
2017-10-10 17:49:53 -03:00
switch x := r.(type) {
case string:
err = errors.New(x)
case error:
err = x
default:
err = errors.New("Unknown panic")
}
}
}()
2019-06-24 18:56:34 -03:00
h.manualMode = false
event := messaging.Event{
2018-04-24 15:41:26 -03:00
Id: msg.MessageId,
Action: h.action,
Body: msg.Body,
2020-10-21 19:41:11 -03:00
Timestamp: getXEpochMilli(msg),
Ack: msg.Ack,
Nack: msg.Nack,
Reject: msg.Reject,
2019-06-24 18:56:34 -03:00
Manual: h.manual,
}
err = h.fn(event)
2017-06-22 16:33:59 -03:00
2017-10-10 17:49:53 -03:00
return
}
2017-10-10 18:13:28 -03:00
func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
2017-10-10 17:49:53 -03:00
err := c.callAndHandlePanic(msg, h)
2019-06-24 18:56:34 -03:00
if c.autoAck || h.manualMode {
return
}
2019-05-20 16:45:16 -03:00
log := logger.WithFields(logrus.Fields{
2020-10-21 21:38:41 -03:00
"action": h.action,
"body": string(msg.Body),
"message_id": msg.MessageId,
"published_at": msg.Timestamp.String(),
2019-05-20 16:45:16 -03:00
})
2017-10-30 16:51:47 -02:00
if err == nil {
2019-05-20 16:45:16 -03:00
log.Debug("Message handled successfully.")
2017-12-19 16:05:18 -02:00
2019-06-24 18:56:34 -03:00
msg.Ack(false)
2017-06-22 16:33:59 -03:00
2019-05-20 16:45:16 -03:00
return
}
log = log.WithError(err)
if h.maxRetries == 0 {
log.Warn("Failed to process event.")
return
}
if retryCount >= h.maxRetries {
log.Error("Maximum retries reached. Giving up.")
2019-06-24 18:56:34 -03:00
msg.Ack(false)
2019-05-20 16:45:16 -03:00
return
2017-06-22 16:33:59 -03:00
}
2019-05-20 16:45:16 -03:00
log.Debug("Failed to process event. Retrying...")
c.retryMessage(msg, h, retryCount, delay)
2017-06-22 16:33:59 -03:00
}
2017-10-10 18:13:28 -03:00
func (c *consumer) publishMessage(msg amqplib.Publishing, queue string) error {
2018-02-10 14:38:29 -02:00
channel, err := c.conn.openChannel()
2017-10-10 18:06:42 -03:00
if err != nil {
return err
}
defer channel.Close()
if err := channel.Confirm(false); err != nil {
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}
confirms := channel.NotifyPublish(make(chan amqplib.Confirmation, 1))
err = channel.Publish("", queue, false, false, msg)
if err != nil {
return err
} else {
if confirmed := <-confirms; !confirmed.Ack {
return ErrNotAcked
}
}
return nil
}
2017-10-10 18:13:28 -03:00
func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
2017-10-10 19:47:07 -03:00
if delay > 0 {
if h.delayedRetry {
delay *= 2
}
} else {
delay = h.retryDelay
2017-05-24 18:23:07 -03:00
}
2020-10-21 19:41:11 -03:00
headers := msg.Headers
headers["x-retry-count"] = retryCount + 1
headers["x-retry-max"] = h.maxRetries
headers["x-retry-delay"] = delay.String()
headers["x-action-key"] = getAction(msg)
2017-05-24 18:23:07 -03:00
retryMsg := amqplib.Publishing{
2020-10-21 19:41:11 -03:00
Headers: headers,
2018-04-24 15:41:26 -03:00
Timestamp: msg.Timestamp,
DeliveryMode: msg.DeliveryMode,
2017-05-24 18:23:07 -03:00
Body: msg.Body,
MessageId: msg.MessageId,
}
2017-10-10 18:06:42 -03:00
err := c.publishMessage(retryMsg, c.queueName)
2017-06-22 16:33:59 -03:00
2017-10-10 18:06:42 -03:00
if err != nil {
2019-05-20 16:45:16 -03:00
logger.WithError(err).Error("Failed to retry.")
2017-10-10 18:06:42 -03:00
if !c.autoAck {
msg.Nack(false, true)
}
} else if !c.autoAck {
2017-06-22 16:33:59 -03:00
msg.Ack(false)
}
}
2017-10-10 18:13:28 -03:00
func (c *consumer) getHandler(msg amqplib.Delivery) (*handler, bool) {
c.m.RLock()
defer c.m.RUnlock()
2017-10-09 19:33:27 -03:00
action := getAction(msg)
2016-12-01 10:52:22 -02:00
for _, h := range c.handlers {
if h.re.MatchString(action) {
2017-05-24 18:23:07 -03:00
return &h, true
2016-12-01 10:52:22 -02:00
}
}
return nil, false
}
2017-05-24 18:23:07 -03:00
// Subscribe allows to subscribe an action handler.
2017-10-10 18:13:28 -03:00
func (c *consumer) Subscribe(action string, handlerFn messaging.EventHandler, options *messaging.SubscribeOptions) error {
2016-12-01 10:52:22 -02:00
// TODO: Replace # pattern too.
2017-10-05 16:22:28 -03:00
pattern := strings.Replace(action, "*", "(.*)", 0)
2016-12-01 10:52:22 -02:00
re, err := regexp.Compile(pattern)
if err != nil {
return err
}
2017-10-05 16:22:28 -03:00
if options == nil {
options = &messaging.SubscribeOptions{
RetryDelay: time.Duration(0),
DelayedRetry: false,
MaxRetries: 0,
}
}
c.m.Lock()
defer c.m.Unlock()
2016-12-01 10:52:22 -02:00
c.handlers = append(c.handlers, handler{
2017-10-05 16:22:28 -03:00
action: action,
fn: handlerFn,
2017-10-05 15:19:54 -03:00
re: re,
maxRetries: options.MaxRetries,
retryDelay: options.RetryDelay,
delayedRetry: options.DelayedRetry,
2016-12-01 10:52:22 -02:00
})
return nil
}
2016-12-01 16:17:55 -02:00
// Unsubscribe allows to unsubscribe an action handler.
2017-10-10 18:13:28 -03:00
func (c *consumer) Unsubscribe(action string) error {
idx := -1
c.m.Lock()
defer c.m.Unlock()
for i, h := range c.handlers {
if h.action == action {
idx = i
break
}
}
if idx != -1 {
c.handlers = append(c.handlers[:idx], c.handlers[idx+1:]...)
}
return nil
}
func (c *consumer) BindActions(actions ...string) error {
2018-02-10 14:38:29 -02:00
channel, err := c.conn.openChannel()
if err != nil {
return err
}
defer channel.Close()
for _, action := range actions {
err := c.bindActionToQueue(channel, c.queueName, action)
if err != nil {
return err
}
}
return nil
}
func (c *consumer) UnbindActions(actions ...string) error {
channel, err := c.conn.openChannel()
2016-12-01 10:52:22 -02:00
if err != nil {
return err
}
defer channel.Close()
2016-12-01 10:52:22 -02:00
for _, action := range actions {
err := channel.QueueUnbind(
c.queueName, // queue name
action, // routing key
c.exchangeName, // exchange
nil, // arguments
)
2016-12-01 10:52:22 -02:00
if err != nil {
return err
}
2016-12-01 10:52:22 -02:00
}
return nil
}
2018-02-12 10:08:18 -02:00
func (c *consumer) bindActionToQueue(channel *amqplib.Channel, queueName string, action string) error {
return channel.QueueBind(
queueName, // queue name
action, // routing key
c.exchangeName, // exchange
false, // no-wait
nil, // arguments
)
}
func (c *consumer) bindAllActionsQueue(channel *amqplib.Channel, queueName string) error {
c.m.RLock()
defer c.m.RUnlock()
2018-02-12 10:08:18 -02:00
for _, h := range c.handlers {
err := c.bindActionToQueue(channel, queueName, h.action)
if err != nil {
return err
}
}
return nil
}
2018-02-12 00:54:10 -02:00
func (c *consumer) setupTopology(channel *amqplib.Channel) (err error) {
err = channel.Qos(c.config.PrefetchCount, 0, true)
if err != nil {
return err
}
err = channel.ExchangeDeclare(
c.exchangeName, // name
"topic", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
durable := true
exclusive := false
if c.config.PrefixName != "" {
c.queueName = c.uniqueNameWithPrefix()
durable = false
exclusive = true
}
_, err = channel.QueueDeclare(
c.queueName, // name
durable, // durable
false, // auto-delete
exclusive, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
2018-02-12 10:08:18 -02:00
return c.bindAllActionsQueue(channel, c.queueName)
2018-02-12 00:54:10 -02:00
}
func (c *consumer) doConsume() error {
2019-05-20 16:45:16 -03:00
log := logger.WithFields(logrus.Fields{
"queue": c.queueName,
2019-05-20 16:45:16 -03:00
})
log.Debug("Setting up consumer channel...")
2018-02-10 14:38:29 -02:00
channel, err := c.conn.openChannel()
if err != nil {
return err
}
defer channel.Close()
err = c.setupTopology(channel)
2018-02-10 14:38:29 -02:00
if err != nil {
return err
}
msgs, err := channel.Consume(
c.queueName, // queue
createUniqueConsumerTagName(), // consumer
c.autoAck, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
return err
}
2019-05-20 16:45:16 -03:00
log.Info("Consuming messages...")
for m := range msgs {
c.wg.Add(1)
go func(msg amqplib.Delivery) {
c.dispatch(msg)
c.wg.Done()
}(m)
}
return nil
}
func (c *consumer) isClosed() bool {
c.m.Lock()
defer c.m.Unlock()
return c.closed
}
2016-12-01 16:17:55 -02:00
// Listen start to listen for new messages.
2017-10-10 18:13:28 -03:00
func (c *consumer) Consume() {
2017-10-03 16:50:41 -03:00
logger.Info("Registered handlers:")
for _, handler := range c.handlers {
logger.Infof(" %s", handler.action)
}
rs := c.conn.NotifyReestablish()
for !c.isClosed() {
2017-12-19 15:12:30 -02:00
if !c.conn.IsConnected() {
logger.Info("Connection not established. Waiting connection to be reestablished.")
2017-12-19 15:12:30 -02:00
<-rs
2017-12-19 15:12:30 -02:00
continue
}
err := c.doConsume()
2017-03-09 17:32:05 -03:00
if err == nil {
2019-05-20 16:45:16 -03:00
logger.WithFields(logrus.Fields{
"queue": c.queueName,
"closed": c.closed,
}).Info("Consumption finished.")
} else {
2019-05-20 16:45:16 -03:00
logger.WithFields(logrus.Fields{
2017-05-24 18:23:07 -03:00
"queue": c.queueName,
"error": err,
}).Error("Error consuming events.")
2017-03-09 17:32:05 -03:00
}
2017-05-24 18:23:07 -03:00
}
}
func getAction(msg amqplib.Delivery) string {
if ac, ok := msg.Headers["x-action-key"]; ok {
return ac.(string)
} else {
return msg.RoutingKey
}
}
2017-05-24 18:23:07 -03:00
func getXRetryCountHeader(msg amqplib.Delivery) (int32, bool) {
if c, ok := msg.Headers["x-retry-count"]; ok {
return c.(int32), true
}
return 0, false
}
func getXRetryDelayHeader(msg amqplib.Delivery) (time.Duration, bool) {
if d, ok := msg.Headers["x-retry-delay"]; ok {
2017-10-10 18:55:26 -03:00
t, err := time.ParseDuration(d.(string))
2017-10-10 19:22:58 -03:00
if err == nil {
2017-10-10 18:55:26 -03:00
return t, true
2017-05-24 18:23:07 -03:00
}
}
return time.Duration(0), false
2016-12-01 10:52:22 -02:00
}
2020-10-21 19:41:11 -03:00
func getXEpochMilli(msg amqplib.Delivery) time.Time {
if epoch, ok := msg.Headers["x-epoch-milli"]; ok {
switch v := epoch.(type) {
case int64:
return time.Unix(0, v*int64(time.Millisecond))
case int:
return time.Unix(0, int64(v)*int64(time.Millisecond))
}
}
return msg.Timestamp
}