1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-28 13:48:49 +08:00

Manual ack nack or reject messages

This commit is contained in:
skrater 2019-06-24 18:56:34 -03:00
parent f20dc5ff95
commit 764344199d
3 changed files with 35 additions and 19 deletions

View File

@ -30,6 +30,11 @@ type handler struct {
maxRetries int32 maxRetries int32
retryDelay time.Duration retryDelay time.Duration
delayedRetry bool delayedRetry bool
manualMode bool
}
func (h *handler) manual() {
h.manualMode = true
} }
type consumer struct { type consumer struct {
@ -126,7 +131,9 @@ func (c *consumer) dispatch(msg amqplib.Delivery) {
retryCount, _ := getXRetryCountHeader(msg) retryCount, _ := getXRetryCountHeader(msg)
c.doDispatch(msg, h, retryCount, delay) c.doDispatch(msg, h, retryCount, delay)
} else { return
}
if !c.autoAck { if !c.autoAck {
err := msg.Nack(false, true) err := msg.Nack(false, true)
@ -138,7 +145,6 @@ func (c *consumer) dispatch(msg amqplib.Delivery) {
} }
} }
} }
}
func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err error) { func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err error) {
defer func() { defer func() {
@ -156,6 +162,8 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err
} }
}() }()
h.manualMode = false
event := messaging.Event{ event := messaging.Event{
Id: msg.MessageId, Id: msg.MessageId,
Action: h.action, Action: h.action,
@ -164,6 +172,7 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err
Ack: msg.Ack, Ack: msg.Ack,
Nack: msg.Nack, Nack: msg.Nack,
Reject: msg.Reject, Reject: msg.Reject,
Manual: h.manual,
} }
err = h.fn(event) err = h.fn(event)
@ -174,6 +183,10 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err
func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
err := c.callAndHandlePanic(msg, h) err := c.callAndHandlePanic(msg, h)
if c.autoAck || h.manualMode {
return
}
log := logger.WithFields(logrus.Fields{ log := logger.WithFields(logrus.Fields{
"action": h.action, "action": h.action,
"body": string(msg.Body), "body": string(msg.Body),
@ -183,9 +196,7 @@ func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32
if err == nil { if err == nil {
log.Debug("Message handled successfully.") log.Debug("Message handled successfully.")
if !c.autoAck {
msg.Ack(false) msg.Ack(false)
}
return return
} }
@ -200,9 +211,7 @@ func (c *consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32
if retryCount >= h.maxRetries { if retryCount >= h.maxRetries {
log.Error("Maximum retries reached. Giving up.") log.Error("Maximum retries reached. Giving up.")
if !c.autoAck {
msg.Ack(false) msg.Ack(false)
}
return return
} }

View File

@ -772,7 +772,7 @@ func TestCallEventAckMethod(t *testing.T) {
func1 := make(chan bool) func1 := make(chan bool)
func2 := make(chan bool) func2 := make(chan bool)
c, err := NewConsumer(conn, false, "multi", "TestSubscribeActions") c, err := NewConsumer(conn, false, "multi", "TestCallEventAckMethod")
if assert.Nil(t, err) { if assert.Nil(t, err) {
defer c.Close() defer c.Close()
@ -780,12 +780,16 @@ func TestCallEventAckMethod(t *testing.T) {
clearQueue(conn, c.queueName) clearQueue(conn, c.queueName)
c.Subscribe("multi", func(e messaging.Event) error { c.Subscribe("multi", func(e messaging.Event) error {
e.Manual()
e.Ack(false) e.Ack(false)
func1 <- true func1 <- true
return nil return nil
}, nil) }, nil)
c.Subscribe("multi", func(e messaging.Event) error { c.Subscribe("multi_2", func(e messaging.Event) error {
e.Manual()
e.Ack(false) e.Ack(false)
func2 <- true func2 <- true
return nil return nil
@ -813,13 +817,15 @@ func TestCallEventAckMethod(t *testing.T) {
} }
func TestCallEventNackMethod(t *testing.T) { func TestCallEventNackMethod(t *testing.T) {
c, err := NewConsumer(conn, false, "onlynack", "TestSubscribeActions") c, err := NewConsumer(conn, false, "onlynack", "TestCallEventNackMethod")
if assert.Nil(t, err) { if assert.Nil(t, err) {
count := 0 count := 0
defer c.Close() defer c.Close()
clearQueue(conn, c.queueName) clearQueue(conn, c.queueName)
c.Subscribe("multi", func(e messaging.Event) error { c.Subscribe("multi", func(e messaging.Event) error {
e.Manual()
count += 1 count += 1
if count == 3 { if count == 3 {
e.Ack(false) e.Ack(false)
@ -835,7 +841,7 @@ func TestCallEventNackMethod(t *testing.T) {
// take a time to setup topology // take a time to setup topology
time.Sleep(SleepSetupTopology) time.Sleep(SleepSetupTopology)
p, err := NewProducer(conn, "multi") p, err := NewProducer(conn, "onlynack")
assert.Nil(t, err) assert.Nil(t, err)

View File

@ -13,6 +13,7 @@ type Event struct {
Ack func(multiple bool) error Ack func(multiple bool) error
Nack func(multiple, requeue bool) error Nack func(multiple, requeue bool) error
Reject func(requeue bool) error Reject func(requeue bool) error
Manual func()
ctx context.Context ctx context.Context
} }