mirror of
https://github.com/eventials/goevents.git
synced 2025-04-24 13:48:53 +08:00
Add acknowledger events to Event struct
This commit is contained in:
parent
05a5b865ef
commit
dbd0a872ca
@ -151,12 +151,17 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err
|
||||
}
|
||||
}()
|
||||
|
||||
err = h.fn(messaging.Event{
|
||||
event := messaging.Event{
|
||||
Id: msg.MessageId,
|
||||
Action: h.action,
|
||||
Body: msg.Body,
|
||||
Timestamp: msg.Timestamp,
|
||||
})
|
||||
Ack: msg.Ack,
|
||||
Nack: msg.Nack,
|
||||
Reject: msg.Reject,
|
||||
}
|
||||
|
||||
err = h.fn(event)
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -767,3 +767,83 @@ func TestBlankQueueWithPrefix(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCallEventAckMethod(t *testing.T) {
|
||||
func1 := make(chan bool)
|
||||
func2 := make(chan bool)
|
||||
|
||||
c, err := NewConsumer(conn, false, "multi", "TestSubscribeActions")
|
||||
|
||||
if assert.Nil(t, err) {
|
||||
defer c.Close()
|
||||
|
||||
clearQueue(conn, c.queueName)
|
||||
|
||||
c.Subscribe("multi", func(e messaging.Event) error {
|
||||
e.Ack(false)
|
||||
func1 <- true
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
c.Subscribe("multi", func(e messaging.Event) error {
|
||||
e.Ack(false)
|
||||
func2 <- true
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
go c.Consume()
|
||||
|
||||
// take a time to setup topology
|
||||
time.Sleep(SleepSetupTopology)
|
||||
|
||||
p, err := NewProducer(conn, "multi")
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
p.Publish("multi", []byte(""))
|
||||
|
||||
select {
|
||||
case <-func1:
|
||||
case <-func2:
|
||||
assert.Fail(t, "called wrong action")
|
||||
case <-time.After(3 * time.Second):
|
||||
assert.Fail(t, "timed out")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCallEventNackMethod(t *testing.T) {
|
||||
c, err := NewConsumer(conn, false, "onlynack", "TestSubscribeActions")
|
||||
|
||||
if assert.Nil(t, err) {
|
||||
count := 0
|
||||
defer c.Close()
|
||||
clearQueue(conn, c.queueName)
|
||||
c.Subscribe("multi", func(e messaging.Event) error {
|
||||
count += 1
|
||||
if count == 3 {
|
||||
e.Ack(false)
|
||||
} else {
|
||||
e.Nack(false, true)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
go c.Consume()
|
||||
|
||||
// take a time to setup topology
|
||||
time.Sleep(SleepSetupTopology)
|
||||
|
||||
p, err := NewProducer(conn, "multi")
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
p.Publish("multi", []byte(""))
|
||||
|
||||
// Wait for requeue
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
assert.Equal(t, 3, count)
|
||||
}
|
||||
}
|
||||
|
@ -10,6 +10,9 @@ type Event struct {
|
||||
Action string
|
||||
Timestamp time.Time
|
||||
Body []byte
|
||||
Ack func(multiple bool) error
|
||||
Nack func(multiple, requeue bool) error
|
||||
Reject func(requeue bool) error
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user