mirror of
https://github.com/eventials/goevents.git
synced 2025-05-01 13:48:56 +08:00
Don't log errors in consumer event
This commit is contained in:
parent
245a35bec3
commit
c0b8bd193a
@ -12,12 +12,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/eventials/goevents/messaging"
|
"github.com/eventials/goevents/messaging"
|
||||||
log "github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
amqplib "github.com/streadway/amqp"
|
amqplib "github.com/streadway/amqp"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
logger = log.WithFields(log.Fields{
|
logger = logrus.WithFields(logrus.Fields{
|
||||||
"type": "goevents",
|
"type": "goevents",
|
||||||
"sub_type": "consumer",
|
"sub_type": "consumer",
|
||||||
})
|
})
|
||||||
@ -115,7 +115,7 @@ func (c *consumer) dispatch(msg amqplib.Delivery) {
|
|||||||
delay, isRetry := getXRetryDelayHeader(msg)
|
delay, isRetry := getXRetryDelayHeader(msg)
|
||||||
|
|
||||||
if isRetry {
|
if isRetry {
|
||||||
logger.WithFields(log.Fields{
|
logger.WithFields(logrus.Fields{
|
||||||
"delay": delay.String(),
|
"delay": delay.String(),
|
||||||
"message_id": msg.MessageId,
|
"message_id": msg.MessageId,
|
||||||
}).Debug("Delaying message.")
|
}).Debug("Delaying message.")
|
||||||
@ -131,7 +131,7 @@ func (c *consumer) dispatch(msg amqplib.Delivery) {
|
|||||||
err := msg.Nack(false, true)
|
err := msg.Nack(false, true)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithFields(log.Fields{
|
logger.WithFields(logrus.Fields{
|
||||||
"error": err,
|
"error": err,
|
||||||
"message_id": msg.MessageId,
|
"message_id": msg.MessageId,
|
||||||
}).Error("Failed to nack message.")
|
}).Error("Failed to nack message.")
|
||||||
@ -174,48 +174,42 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err
|
|||||||
func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
|
func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
|
||||||
err := c.callAndHandlePanic(msg, h)
|
err := c.callAndHandlePanic(msg, h)
|
||||||
|
|
||||||
|
log := logger.WithFields(logrus.Fields{
|
||||||
|
"action": h.action,
|
||||||
|
"body": string(msg.Body),
|
||||||
|
"message_id": msg.MessageId,
|
||||||
|
})
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
logger.WithFields(log.Fields{
|
log.Debug("Message handled successfully.")
|
||||||
"action": h.action,
|
|
||||||
"body": string(msg.Body),
|
|
||||||
"message_id": msg.MessageId,
|
|
||||||
}).Debug("Message handled successfully.")
|
|
||||||
|
|
||||||
if !c.autoAck {
|
if !c.autoAck {
|
||||||
msg.Ack(false)
|
msg.Ack(false)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
if h.maxRetries > 0 {
|
|
||||||
if retryCount >= h.maxRetries {
|
|
||||||
logger.WithFields(log.Fields{
|
|
||||||
"action": h.action,
|
|
||||||
"body": string(msg.Body),
|
|
||||||
"error": err,
|
|
||||||
"message_id": msg.MessageId,
|
|
||||||
}).Error("Maximum retries reached. Giving up.")
|
|
||||||
|
|
||||||
if !c.autoAck {
|
return
|
||||||
msg.Ack(false)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.WithFields(log.Fields{
|
|
||||||
"action": h.action,
|
|
||||||
"body": string(msg.Body),
|
|
||||||
"error": err,
|
|
||||||
"message_id": msg.MessageId,
|
|
||||||
}).Error("Failed to process event. Retrying...")
|
|
||||||
|
|
||||||
c.retryMessage(msg, h, retryCount, delay)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.WithFields(log.Fields{
|
|
||||||
"action": h.action,
|
|
||||||
"body": string(msg.Body),
|
|
||||||
"error": err,
|
|
||||||
"message_id": msg.MessageId,
|
|
||||||
}).Error("Failed to process event.")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log = log.WithError(err)
|
||||||
|
|
||||||
|
if h.maxRetries == 0 {
|
||||||
|
log.Warn("Failed to process event.")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if retryCount >= h.maxRetries {
|
||||||
|
log.Error("Maximum retries reached. Giving up.")
|
||||||
|
|
||||||
|
if !c.autoAck {
|
||||||
|
msg.Ack(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("Failed to process event. Retrying...")
|
||||||
|
|
||||||
|
c.retryMessage(msg, h, retryCount, delay)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *consumer) publishMessage(msg amqplib.Publishing, queue string) error {
|
func (c *consumer) publishMessage(msg amqplib.Publishing, queue string) error {
|
||||||
@ -271,9 +265,7 @@ func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int
|
|||||||
err := c.publishMessage(retryMsg, c.queueName)
|
err := c.publishMessage(retryMsg, c.queueName)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithFields(log.Fields{
|
logger.WithError(err).Error("Failed to retry.")
|
||||||
"error": err,
|
|
||||||
}).Error("Failed to retry.")
|
|
||||||
|
|
||||||
if !c.autoAck {
|
if !c.autoAck {
|
||||||
msg.Nack(false, true)
|
msg.Nack(false, true)
|
||||||
@ -468,9 +460,11 @@ func (c *consumer) setupTopology(channel *amqplib.Channel) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *consumer) doConsume() error {
|
func (c *consumer) doConsume() error {
|
||||||
logger.WithFields(log.Fields{
|
log := logger.WithFields(logrus.Fields{
|
||||||
"queue": c.queueName,
|
"queue": c.queueName,
|
||||||
}).Debug("Setting up consumer channel...")
|
})
|
||||||
|
|
||||||
|
log.Debug("Setting up consumer channel...")
|
||||||
|
|
||||||
channel, err := c.conn.openChannel()
|
channel, err := c.conn.openChannel()
|
||||||
|
|
||||||
@ -500,9 +494,7 @@ func (c *consumer) doConsume() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.WithFields(log.Fields{
|
log.Info("Consuming messages...")
|
||||||
"queue": c.queueName,
|
|
||||||
}).Info("Consuming messages...")
|
|
||||||
|
|
||||||
for m := range msgs {
|
for m := range msgs {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
@ -545,12 +537,12 @@ func (c *consumer) Consume() {
|
|||||||
err := c.doConsume()
|
err := c.doConsume()
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
logger.WithFields(log.Fields{
|
logger.WithFields(logrus.Fields{
|
||||||
"queue": c.queueName,
|
"queue": c.queueName,
|
||||||
"closed": c.closed,
|
"closed": c.closed,
|
||||||
}).Info("Consumption finished.")
|
}).Info("Consumption finished.")
|
||||||
} else {
|
} else {
|
||||||
logger.WithFields(log.Fields{
|
logger.WithFields(logrus.Fields{
|
||||||
"queue": c.queueName,
|
"queue": c.queueName,
|
||||||
"error": err,
|
"error": err,
|
||||||
}).Error("Error consuming events.")
|
}).Error("Error consuming events.")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user