2016-12-01 10:52:22 -02:00
|
|
|
package amqp
|
|
|
|
|
|
|
|
import (
|
2017-10-10 17:49:53 -03:00
|
|
|
"errors"
|
2017-10-10 18:06:42 -03:00
|
|
|
"fmt"
|
2018-01-02 11:11:48 -02:00
|
|
|
"os"
|
2016-12-01 10:52:22 -02:00
|
|
|
"regexp"
|
2019-03-12 13:32:19 -03:00
|
|
|
"runtime/debug"
|
2016-12-01 10:52:22 -02:00
|
|
|
"strings"
|
2017-03-09 17:32:05 -03:00
|
|
|
"sync"
|
2018-01-02 11:11:48 -02:00
|
|
|
"sync/atomic"
|
2017-03-09 17:32:05 -03:00
|
|
|
"time"
|
2018-02-10 13:23:49 -02:00
|
|
|
|
|
|
|
"github.com/eventials/goevents/messaging"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
amqplib "github.com/streadway/amqp"
|
2016-12-01 10:52:22 -02:00
|
|
|
)
|
|
|
|
|
2017-05-24 18:23:07 -03:00
|
|
|
var (
|
|
|
|
logger = log.WithFields(log.Fields{
|
|
|
|
"type": "goevents",
|
|
|
|
"sub_type": "consumer",
|
|
|
|
})
|
|
|
|
)
|
|
|
|
|
2016-12-01 10:52:22 -02:00
|
|
|
type handler struct {
|
2017-10-05 15:19:54 -03:00
|
|
|
action string
|
|
|
|
fn messaging.EventHandler
|
|
|
|
re *regexp.Regexp
|
|
|
|
maxRetries int32
|
|
|
|
retryDelay time.Duration
|
|
|
|
delayedRetry bool
|
2016-12-01 10:52:22 -02:00
|
|
|
}
|
|
|
|
|
2017-10-10 18:13:28 -03:00
|
|
|
type consumer struct {
|
2017-03-09 17:32:05 -03:00
|
|
|
config ConsumerConfig
|
2017-06-15 12:07:50 -03:00
|
|
|
|
2018-05-03 14:38:57 -03:00
|
|
|
m sync.RWMutex
|
2017-10-10 09:54:28 -03:00
|
|
|
wg sync.WaitGroup
|
2017-03-09 17:32:05 -03:00
|
|
|
|
2018-02-10 14:38:29 -02:00
|
|
|
conn *connection
|
2016-12-01 10:52:22 -02:00
|
|
|
autoAck bool
|
|
|
|
handlers []handler
|
2016-12-28 09:20:29 -02:00
|
|
|
|
2018-02-10 13:23:49 -02:00
|
|
|
exchangeName string
|
|
|
|
queueName string
|
|
|
|
closed bool
|
2017-03-09 17:32:05 -03:00
|
|
|
}
|
|
|
|
|
2018-01-02 11:11:48 -02:00
|
|
|
var consumerTagSeq uint64
|
|
|
|
|
2017-03-09 17:32:05 -03:00
|
|
|
// ConsumerConfig to be used when creating a new producer.
|
|
|
|
type ConsumerConfig struct {
|
2017-10-10 19:47:07 -03:00
|
|
|
ConsumeRetryInterval time.Duration
|
|
|
|
PrefetchCount int
|
2017-12-19 12:02:09 -02:00
|
|
|
PrefixName string
|
2016-12-01 10:52:22 -02:00
|
|
|
}
|
|
|
|
|
2016-12-01 16:17:55 -02:00
|
|
|
// NewConsumer returns a new AMQP Consumer.
|
2017-03-09 17:32:05 -03:00
|
|
|
// Uses a default ConsumerConfig with 2 second of consume retry interval.
|
2017-10-10 18:13:28 -03:00
|
|
|
func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (*consumer, error) {
|
2017-03-09 22:24:53 -03:00
|
|
|
return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{
|
2017-10-10 19:47:07 -03:00
|
|
|
ConsumeRetryInterval: 2 * time.Second,
|
|
|
|
PrefetchCount: 0,
|
2017-03-09 17:32:05 -03:00
|
|
|
})
|
|
|
|
}
|
2016-12-01 10:52:22 -02:00
|
|
|
|
2018-02-10 13:23:49 -02:00
|
|
|
func createUniqueConsumerTagName() string {
|
2018-01-02 12:05:51 -02:00
|
|
|
hostname, err := os.Hostname()
|
|
|
|
if err != nil {
|
|
|
|
hostname = "unknown"
|
|
|
|
}
|
|
|
|
|
2018-02-10 13:23:49 -02:00
|
|
|
return fmt.Sprintf("ctag-%s-%s-%d", hostname, os.Args[0], atomic.AddUint64(&consumerTagSeq, 1))
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewConsumerConfig returns a new AMQP Consumer.
|
|
|
|
func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue string, config ConsumerConfig) (*consumer, error) {
|
2017-10-10 18:13:28 -03:00
|
|
|
consumer := &consumer{
|
2018-02-10 13:23:49 -02:00
|
|
|
config: config,
|
2018-02-10 14:38:29 -02:00
|
|
|
conn: c.(*connection),
|
2018-02-10 13:23:49 -02:00
|
|
|
autoAck: autoAck,
|
|
|
|
handlers: make([]handler, 0),
|
|
|
|
exchangeName: exchange,
|
|
|
|
queueName: queue,
|
2017-03-09 17:32:05 -03:00
|
|
|
}
|
|
|
|
|
2018-02-11 14:56:48 -02:00
|
|
|
return consumer, nil
|
2017-10-10 18:06:42 -03:00
|
|
|
|
2017-03-09 17:32:05 -03:00
|
|
|
}
|
|
|
|
|
2019-03-12 16:52:17 -03:00
|
|
|
func (c *consumer) closeAndClearHandlers() {
|
|
|
|
c.m.Lock()
|
|
|
|
defer c.m.Unlock()
|
2017-10-10 09:54:28 -03:00
|
|
|
|
2019-03-12 16:52:17 -03:00
|
|
|
// Unsubscribe all handlers
|
|
|
|
c.handlers = make([]handler, 0)
|
2017-10-09 19:33:27 -03:00
|
|
|
|
2019-03-12 16:52:17 -03:00
|
|
|
c.closed = true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *consumer) Close() {
|
|
|
|
c.closeAndClearHandlers()
|
2017-10-09 19:33:27 -03:00
|
|
|
|
2017-10-10 09:54:28 -03:00
|
|
|
// Wait all go routine finish.
|
|
|
|
c.wg.Wait()
|
2017-03-09 17:32:05 -03:00
|
|
|
}
|
|
|
|
|
2017-12-19 12:02:09 -02:00
|
|
|
func (c *consumer) uniqueNameWithPrefix() string {
|
2017-12-19 12:15:41 -02:00
|
|
|
return fmt.Sprintf("%s%d", c.config.PrefixName, time.Now().UnixNano())
|
2017-12-19 12:02:09 -02:00
|
|
|
}
|
|
|
|
|
2017-10-10 18:13:28 -03:00
|
|
|
func (c *consumer) dispatch(msg amqplib.Delivery) {
|
2017-06-14 18:33:16 -03:00
|
|
|
if h, ok := c.getHandler(msg); ok {
|
2017-06-22 16:33:59 -03:00
|
|
|
delay, isRetry := getXRetryDelayHeader(msg)
|
2017-05-24 18:23:07 -03:00
|
|
|
|
2017-10-10 17:49:53 -03:00
|
|
|
if isRetry {
|
2017-10-10 18:55:26 -03:00
|
|
|
logger.WithFields(log.Fields{
|
|
|
|
"delay": delay.String(),
|
|
|
|
"message_id": msg.MessageId,
|
2018-05-04 09:55:55 -03:00
|
|
|
}).Debug("Delaying message.")
|
2017-10-10 18:55:26 -03:00
|
|
|
|
|
|
|
time.Sleep(delay)
|
2017-05-24 18:23:07 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
retryCount, _ := getXRetryCountHeader(msg)
|
|
|
|
|
2017-06-22 16:33:59 -03:00
|
|
|
c.doDispatch(msg, h, retryCount, delay)
|
2016-12-01 10:52:22 -02:00
|
|
|
} else {
|
2017-05-24 18:23:07 -03:00
|
|
|
if !c.autoAck {
|
2017-10-09 19:33:27 -03:00
|
|
|
err := msg.Nack(false, true)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
logger.WithFields(log.Fields{
|
|
|
|
"error": err,
|
|
|
|
"message_id": msg.MessageId,
|
|
|
|
}).Error("Failed to nack message.")
|
|
|
|
}
|
2017-05-24 18:23:07 -03:00
|
|
|
}
|
2016-12-01 10:52:22 -02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-10-10 18:13:28 -03:00
|
|
|
func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err error) {
|
2017-10-10 17:49:53 -03:00
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
2019-03-12 13:32:19 -03:00
|
|
|
debug.PrintStack()
|
|
|
|
|
2017-10-10 17:49:53 -03:00
|
|
|
switch x := r.(type) {
|
|
|
|
case string:
|
|
|
|
err = errors.New(x)
|
|
|
|
case error:
|
|
|
|
err = x
|
|
|
|
default:
|
|
|
|
err = errors.New("Unknown panic")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2018-11-05 14:34:25 -02:00
|
|
|
event := messaging.Event{
|
2018-04-24 15:41:26 -03:00
|
|
|
Id: msg.MessageId,
|
|
|
|
Action: h.action,
|
|
|
|
Body: msg.Body,
|
|
|
|
Timestamp: msg.Timestamp,
|
2018-11-05 14:34:25 -02:00
|
|
|
Ack: msg.Ack,
|
|
|
|
Nack: msg.Nack,
|
|
|
|
Reject: msg.Reject,
|
|
|
|
}
|
|
|
|
|
|
|
|
err = h.fn(event)
|
2017-06-22 16:33:59 -03:00
|
|
|
|
2017-10-10 17:49:53 -03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-10-10 18:13:28 -03:00
|
|
|
func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
|
2017-10-10 17:49:53 -03:00
|
|
|
err := c.callAndHandlePanic(msg, h)
|
|
|
|
|
2017-10-30 16:51:47 -02:00
|
|
|
if err == nil {
|
2017-12-19 16:05:18 -02:00
|
|
|
logger.WithFields(log.Fields{
|
|
|
|
"action": h.action,
|
|
|
|
"body": string(msg.Body),
|
|
|
|
"message_id": msg.MessageId,
|
2018-05-04 09:55:55 -03:00
|
|
|
}).Debug("Message handled successfully.")
|
2017-12-19 16:05:18 -02:00
|
|
|
|
2017-10-30 16:51:47 -02:00
|
|
|
if !c.autoAck {
|
|
|
|
msg.Ack(false)
|
|
|
|
}
|
|
|
|
} else {
|
2017-06-22 16:33:59 -03:00
|
|
|
if h.maxRetries > 0 {
|
|
|
|
if retryCount >= h.maxRetries {
|
|
|
|
logger.WithFields(log.Fields{
|
2017-10-30 16:51:47 -02:00
|
|
|
"action": h.action,
|
2017-12-19 16:01:48 -02:00
|
|
|
"body": string(msg.Body),
|
2017-10-30 16:51:47 -02:00
|
|
|
"error": err,
|
|
|
|
"message_id": msg.MessageId,
|
2017-06-22 16:33:59 -03:00
|
|
|
}).Error("Maximum retries reached. Giving up.")
|
|
|
|
|
|
|
|
if !c.autoAck {
|
|
|
|
msg.Ack(false)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
logger.WithFields(log.Fields{
|
2017-10-30 16:51:47 -02:00
|
|
|
"action": h.action,
|
2017-12-19 16:01:48 -02:00
|
|
|
"body": string(msg.Body),
|
2017-06-22 16:33:59 -03:00
|
|
|
"error": err,
|
|
|
|
"message_id": msg.MessageId,
|
|
|
|
}).Error("Failed to process event. Retrying...")
|
|
|
|
|
|
|
|
c.retryMessage(msg, h, retryCount, delay)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
logger.WithFields(log.Fields{
|
2017-10-30 16:51:47 -02:00
|
|
|
"action": h.action,
|
2017-12-19 16:01:48 -02:00
|
|
|
"body": string(msg.Body),
|
2017-06-22 16:33:59 -03:00
|
|
|
"error": err,
|
|
|
|
"message_id": msg.MessageId,
|
|
|
|
}).Error("Failed to process event.")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-10-10 18:13:28 -03:00
|
|
|
func (c *consumer) publishMessage(msg amqplib.Publishing, queue string) error {
|
2018-02-10 14:38:29 -02:00
|
|
|
channel, err := c.conn.openChannel()
|
2017-10-10 18:06:42 -03:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
defer channel.Close()
|
|
|
|
|
|
|
|
if err := channel.Confirm(false); err != nil {
|
|
|
|
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
confirms := channel.NotifyPublish(make(chan amqplib.Confirmation, 1))
|
|
|
|
|
|
|
|
err = channel.Publish("", queue, false, false, msg)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
} else {
|
|
|
|
if confirmed := <-confirms; !confirmed.Ack {
|
|
|
|
return ErrNotAcked
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-10-10 18:13:28 -03:00
|
|
|
func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
|
2017-10-10 19:47:07 -03:00
|
|
|
if delay > 0 {
|
|
|
|
if h.delayedRetry {
|
|
|
|
delay *= 2
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
delay = h.retryDelay
|
2017-05-24 18:23:07 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
retryMsg := amqplib.Publishing{
|
|
|
|
Headers: amqplib.Table{
|
|
|
|
"x-retry-count": retryCount + 1,
|
|
|
|
"x-retry-max": h.maxRetries,
|
2017-10-10 18:55:26 -03:00
|
|
|
"x-retry-delay": delay.String(),
|
2017-06-14 18:33:16 -03:00
|
|
|
"x-action-key": getAction(msg),
|
2017-05-24 18:23:07 -03:00
|
|
|
},
|
2018-04-24 15:41:26 -03:00
|
|
|
Timestamp: msg.Timestamp,
|
2017-06-14 18:33:16 -03:00
|
|
|
DeliveryMode: msg.DeliveryMode,
|
2017-05-24 18:23:07 -03:00
|
|
|
Body: msg.Body,
|
|
|
|
MessageId: msg.MessageId,
|
|
|
|
}
|
|
|
|
|
2017-10-10 18:06:42 -03:00
|
|
|
err := c.publishMessage(retryMsg, c.queueName)
|
2017-06-22 16:33:59 -03:00
|
|
|
|
2017-10-10 18:06:42 -03:00
|
|
|
if err != nil {
|
|
|
|
logger.WithFields(log.Fields{
|
|
|
|
"error": err,
|
|
|
|
}).Error("Failed to retry.")
|
|
|
|
|
|
|
|
if !c.autoAck {
|
|
|
|
msg.Nack(false, true)
|
|
|
|
}
|
|
|
|
} else if !c.autoAck {
|
2017-06-22 16:33:59 -03:00
|
|
|
msg.Ack(false)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-10-10 18:13:28 -03:00
|
|
|
func (c *consumer) getHandler(msg amqplib.Delivery) (*handler, bool) {
|
2018-05-03 14:38:57 -03:00
|
|
|
c.m.RLock()
|
|
|
|
defer c.m.RUnlock()
|
2017-10-09 19:33:27 -03:00
|
|
|
|
2017-06-14 18:33:16 -03:00
|
|
|
action := getAction(msg)
|
|
|
|
|
2016-12-01 10:52:22 -02:00
|
|
|
for _, h := range c.handlers {
|
|
|
|
if h.re.MatchString(action) {
|
2017-05-24 18:23:07 -03:00
|
|
|
return &h, true
|
2016-12-01 10:52:22 -02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
|
2017-05-24 18:23:07 -03:00
|
|
|
// Subscribe allows to subscribe an action handler.
|
2017-10-10 18:13:28 -03:00
|
|
|
func (c *consumer) Subscribe(action string, handlerFn messaging.EventHandler, options *messaging.SubscribeOptions) error {
|
2016-12-01 10:52:22 -02:00
|
|
|
// TODO: Replace # pattern too.
|
2017-10-05 16:22:28 -03:00
|
|
|
pattern := strings.Replace(action, "*", "(.*)", 0)
|
2016-12-01 10:52:22 -02:00
|
|
|
re, err := regexp.Compile(pattern)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-10-05 16:22:28 -03:00
|
|
|
if options == nil {
|
|
|
|
options = &messaging.SubscribeOptions{
|
|
|
|
RetryDelay: time.Duration(0),
|
|
|
|
DelayedRetry: false,
|
|
|
|
MaxRetries: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-03 14:38:57 -03:00
|
|
|
c.m.Lock()
|
|
|
|
defer c.m.Unlock()
|
|
|
|
|
2016-12-01 10:52:22 -02:00
|
|
|
c.handlers = append(c.handlers, handler{
|
2017-10-05 16:22:28 -03:00
|
|
|
action: action,
|
|
|
|
fn: handlerFn,
|
2017-10-05 15:19:54 -03:00
|
|
|
re: re,
|
|
|
|
maxRetries: options.MaxRetries,
|
|
|
|
retryDelay: options.RetryDelay,
|
|
|
|
delayedRetry: options.DelayedRetry,
|
2016-12-01 10:52:22 -02:00
|
|
|
})
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-12-01 16:17:55 -02:00
|
|
|
// Unsubscribe allows to unsubscribe an action handler.
|
2017-10-10 18:13:28 -03:00
|
|
|
func (c *consumer) Unsubscribe(action string) error {
|
2018-05-03 14:38:57 -03:00
|
|
|
idx := -1
|
|
|
|
|
|
|
|
c.m.Lock()
|
|
|
|
defer c.m.Unlock()
|
|
|
|
|
|
|
|
for i, h := range c.handlers {
|
|
|
|
if h.action == action {
|
|
|
|
idx = i
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if idx != -1 {
|
|
|
|
c.handlers = append(c.handlers[:idx], c.handlers[idx+1:]...)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *consumer) BindActions(actions ...string) error {
|
2018-02-10 14:38:29 -02:00
|
|
|
channel, err := c.conn.openChannel()
|
2018-02-10 13:23:49 -02:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
defer channel.Close()
|
|
|
|
|
2018-05-03 14:38:57 -03:00
|
|
|
for _, action := range actions {
|
|
|
|
err := c.bindActionToQueue(channel, c.queueName, action)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *consumer) UnbindActions(actions ...string) error {
|
|
|
|
channel, err := c.conn.openChannel()
|
2016-12-01 10:52:22 -02:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-05-03 14:38:57 -03:00
|
|
|
defer channel.Close()
|
2016-12-01 10:52:22 -02:00
|
|
|
|
2018-05-03 14:38:57 -03:00
|
|
|
for _, action := range actions {
|
|
|
|
err := channel.QueueUnbind(
|
|
|
|
c.queueName, // queue name
|
|
|
|
action, // routing key
|
|
|
|
c.exchangeName, // exchange
|
|
|
|
nil, // arguments
|
|
|
|
)
|
2016-12-01 10:52:22 -02:00
|
|
|
|
2018-05-03 14:38:57 -03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-12-01 10:52:22 -02:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-02-12 10:08:18 -02:00
|
|
|
func (c *consumer) bindActionToQueue(channel *amqplib.Channel, queueName string, action string) error {
|
|
|
|
return channel.QueueBind(
|
|
|
|
queueName, // queue name
|
|
|
|
action, // routing key
|
|
|
|
c.exchangeName, // exchange
|
|
|
|
false, // no-wait
|
|
|
|
nil, // arguments
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *consumer) bindAllActionsQueue(channel *amqplib.Channel, queueName string) error {
|
2019-03-12 16:52:17 -03:00
|
|
|
c.m.RLock()
|
|
|
|
defer c.m.RUnlock()
|
|
|
|
|
2018-02-12 10:08:18 -02:00
|
|
|
for _, h := range c.handlers {
|
|
|
|
err := c.bindActionToQueue(channel, queueName, h.action)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-02-12 00:54:10 -02:00
|
|
|
func (c *consumer) setupTopology(channel *amqplib.Channel) (err error) {
|
|
|
|
err = channel.Qos(c.config.PrefetchCount, 0, true)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = channel.ExchangeDeclare(
|
|
|
|
c.exchangeName, // name
|
|
|
|
"topic", // type
|
|
|
|
true, // durable
|
|
|
|
false, // auto-delete
|
|
|
|
false, // internal
|
|
|
|
false, // no-wait
|
|
|
|
nil, // arguments
|
|
|
|
)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
durable := true
|
|
|
|
exclusive := false
|
|
|
|
|
|
|
|
if c.config.PrefixName != "" {
|
|
|
|
c.queueName = c.uniqueNameWithPrefix()
|
|
|
|
durable = false
|
|
|
|
exclusive = true
|
|
|
|
}
|
|
|
|
|
|
|
|
_, err = channel.QueueDeclare(
|
|
|
|
c.queueName, // name
|
|
|
|
durable, // durable
|
|
|
|
false, // auto-delete
|
|
|
|
exclusive, // exclusive
|
|
|
|
false, // no-wait
|
|
|
|
nil, // arguments
|
|
|
|
)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-02-12 10:08:18 -02:00
|
|
|
return c.bindAllActionsQueue(channel, c.queueName)
|
2018-02-12 00:54:10 -02:00
|
|
|
}
|
|
|
|
|
2018-02-10 13:23:49 -02:00
|
|
|
func (c *consumer) doConsume() error {
|
|
|
|
logger.WithFields(log.Fields{
|
|
|
|
"queue": c.queueName,
|
|
|
|
}).Debug("Setting up consumer channel...")
|
|
|
|
|
2018-02-10 14:38:29 -02:00
|
|
|
channel, err := c.conn.openChannel()
|
2018-02-10 13:23:49 -02:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
defer channel.Close()
|
|
|
|
|
2018-02-11 14:56:48 -02:00
|
|
|
err = c.setupTopology(channel)
|
2018-02-10 14:38:29 -02:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2018-02-10 13:23:49 -02:00
|
|
|
msgs, err := channel.Consume(
|
|
|
|
c.queueName, // queue
|
|
|
|
createUniqueConsumerTagName(), // consumer
|
|
|
|
c.autoAck, // auto ack
|
|
|
|
false, // exclusive
|
|
|
|
false, // no local
|
|
|
|
false, // no wait
|
|
|
|
nil, // args
|
|
|
|
)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
logger.WithFields(log.Fields{
|
|
|
|
"queue": c.queueName,
|
|
|
|
}).Info("Consuming messages...")
|
|
|
|
|
|
|
|
for m := range msgs {
|
|
|
|
c.wg.Add(1)
|
|
|
|
|
|
|
|
go func(msg amqplib.Delivery) {
|
|
|
|
c.dispatch(msg)
|
|
|
|
c.wg.Done()
|
|
|
|
}(m)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-03-12 16:52:17 -03:00
|
|
|
func (c *consumer) isClosed() bool {
|
|
|
|
c.m.Lock()
|
|
|
|
defer c.m.Unlock()
|
|
|
|
|
|
|
|
return c.closed
|
|
|
|
}
|
|
|
|
|
2016-12-01 16:17:55 -02:00
|
|
|
// Listen start to listen for new messages.
|
2017-10-10 18:13:28 -03:00
|
|
|
func (c *consumer) Consume() {
|
2017-10-03 16:50:41 -03:00
|
|
|
logger.Info("Registered handlers:")
|
|
|
|
|
|
|
|
for _, handler := range c.handlers {
|
|
|
|
logger.Infof(" %s", handler.action)
|
|
|
|
}
|
|
|
|
|
2018-06-12 14:48:18 -03:00
|
|
|
rs := c.conn.NotifyReestablish()
|
|
|
|
|
2019-03-12 16:52:17 -03:00
|
|
|
for !c.isClosed() {
|
2017-12-19 15:12:30 -02:00
|
|
|
if !c.conn.IsConnected() {
|
2018-02-11 14:56:48 -02:00
|
|
|
logger.Info("Connection not established. Waiting connection to be reestablished.")
|
2017-12-19 15:12:30 -02:00
|
|
|
|
2018-06-12 14:48:18 -03:00
|
|
|
<-rs
|
2017-12-19 15:12:30 -02:00
|
|
|
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2018-02-10 13:23:49 -02:00
|
|
|
err := c.doConsume()
|
2017-03-09 17:32:05 -03:00
|
|
|
|
2018-02-10 13:23:49 -02:00
|
|
|
if err == nil {
|
|
|
|
logger.WithFields(log.Fields{
|
|
|
|
"queue": c.queueName,
|
|
|
|
"closed": c.closed,
|
|
|
|
}).Info("Consumption finished.")
|
|
|
|
} else {
|
2017-05-24 18:23:07 -03:00
|
|
|
logger.WithFields(log.Fields{
|
|
|
|
"queue": c.queueName,
|
|
|
|
"error": err,
|
2018-02-10 13:23:49 -02:00
|
|
|
}).Error("Error consuming events.")
|
2017-03-09 17:32:05 -03:00
|
|
|
}
|
2017-05-24 18:23:07 -03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-06-14 18:33:16 -03:00
|
|
|
func getAction(msg amqplib.Delivery) string {
|
|
|
|
if ac, ok := msg.Headers["x-action-key"]; ok {
|
|
|
|
return ac.(string)
|
|
|
|
} else {
|
|
|
|
return msg.RoutingKey
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-24 18:23:07 -03:00
|
|
|
func getXRetryCountHeader(msg amqplib.Delivery) (int32, bool) {
|
|
|
|
if c, ok := msg.Headers["x-retry-count"]; ok {
|
|
|
|
return c.(int32), true
|
|
|
|
}
|
|
|
|
|
|
|
|
return 0, false
|
|
|
|
}
|
|
|
|
|
|
|
|
func getXRetryDelayHeader(msg amqplib.Delivery) (time.Duration, bool) {
|
|
|
|
if d, ok := msg.Headers["x-retry-delay"]; ok {
|
2017-10-10 18:55:26 -03:00
|
|
|
t, err := time.ParseDuration(d.(string))
|
2017-10-10 19:22:58 -03:00
|
|
|
if err == nil {
|
2017-10-10 18:55:26 -03:00
|
|
|
return t, true
|
2017-05-24 18:23:07 -03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return time.Duration(0), false
|
2016-12-01 10:52:22 -02:00
|
|
|
}
|