diff --git a/amqp/consumer.go b/amqp/consumer.go index e7f24cd..9bfa2b4 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -30,6 +30,11 @@ type handler struct { maxRetries int32 retryDelay time.Duration delayedRetry bool + manualMode bool +} + +func (h *handler) manual() { + h.manualMode = true } type consumer struct { @@ -126,16 +131,17 @@ func (c *consumer) dispatch(msg amqplib.Delivery) { retryCount, _ := getXRetryCountHeader(msg) c.doDispatch(msg, h, retryCount, delay) - } else { - if !c.autoAck { - err := msg.Nack(false, true) + return + } - if err != nil { - logger.WithFields(logrus.Fields{ - "error": err, - "message_id": msg.MessageId, - }).Error("Failed to nack message.") - } + if !c.autoAck { + err := msg.Nack(false, true) + + if err != nil { + logger.WithFields(logrus.Fields{ + "error": err, + "message_id": msg.MessageId, + }).Error("Failed to nack message.") } } } @@ -156,6 +162,8 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err } }() + h.manualMode = false + event := messaging.Event{ Id: msg.MessageId, Action: h.action, @@ -164,6 +172,7 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err Ack: msg.Ack, Nack: msg.Nack, Reject: msg.Reject, + Manual: h.manual, } err = h.fn(event) @@ -174,6 +183,10 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { err := c.callAndHandlePanic(msg, h) + if c.autoAck || h.manualMode { + return + } + log := logger.WithFields(logrus.Fields{ "action": h.action, "body": string(msg.Body), @@ -183,9 +196,7 @@ func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32 if err == nil { log.Debug("Message handled successfully.") - if !c.autoAck { - msg.Ack(false) - } + msg.Ack(false) return } @@ -200,9 +211,7 @@ func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32 if retryCount >= h.maxRetries { log.Error("Maximum retries reached. Giving up.") - if !c.autoAck { - msg.Ack(false) - } + msg.Ack(false) return } diff --git a/amqp/consumer_test.go b/amqp/consumer_test.go index 0906cf0..17444a4 100644 --- a/amqp/consumer_test.go +++ b/amqp/consumer_test.go @@ -772,7 +772,7 @@ func TestCallEventAckMethod(t *testing.T) { func1 := make(chan bool) func2 := make(chan bool) - c, err := NewConsumer(conn, false, "multi", "TestSubscribeActions") + c, err := NewConsumer(conn, false, "multi", "TestCallEventAckMethod") if assert.Nil(t, err) { defer c.Close() @@ -780,12 +780,16 @@ func TestCallEventAckMethod(t *testing.T) { clearQueue(conn, c.queueName) c.Subscribe("multi", func(e messaging.Event) error { + e.Manual() + e.Ack(false) func1 <- true return nil }, nil) - c.Subscribe("multi", func(e messaging.Event) error { + c.Subscribe("multi_2", func(e messaging.Event) error { + e.Manual() + e.Ack(false) func2 <- true return nil @@ -813,13 +817,15 @@ func TestCallEventAckMethod(t *testing.T) { } func TestCallEventNackMethod(t *testing.T) { - c, err := NewConsumer(conn, false, "onlynack", "TestSubscribeActions") + c, err := NewConsumer(conn, false, "onlynack", "TestCallEventNackMethod") if assert.Nil(t, err) { count := 0 defer c.Close() clearQueue(conn, c.queueName) c.Subscribe("multi", func(e messaging.Event) error { + e.Manual() + count += 1 if count == 3 { e.Ack(false) @@ -835,7 +841,7 @@ func TestCallEventNackMethod(t *testing.T) { // take a time to setup topology time.Sleep(SleepSetupTopology) - p, err := NewProducer(conn, "multi") + p, err := NewProducer(conn, "onlynack") assert.Nil(t, err) diff --git a/messaging/event.go b/messaging/event.go index 25f4789..74afcdf 100644 --- a/messaging/event.go +++ b/messaging/event.go @@ -13,6 +13,7 @@ type Event struct { Ack func(multiple bool) error Nack func(multiple, requeue bool) error Reject func(requeue bool) error + Manual func() ctx context.Context }