1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

Merge pull request #44 from jaswdr/bypass-acknowledger

Bypass acknowledger
This commit is contained in:
Guilherme Emilio Raduenz 2018-11-05 14:42:41 -02:00 committed by GitHub
commit 573d7b9997
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 6 deletions

View File

@ -1,12 +1,12 @@
FROM golang:1.10
ARG PLATFORM=linux
ENV PLATFORM $PLATFORM_VERSION
ENV PLATFORM $PLATFORM
ARG ARCH=amd64
ENV ARCH $ARCH_VERSION
ENV ARCH $ARCH
ARG DOCKERIZE_VERSION=v0.2.0
ARG DOCKERIZE_VERSION=v0.6.1
ENV DOCKERIZE_VERSION $DOCKERIZE_VERSION
ADD https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-$PLATFORM-$ARCH-$DOCKERIZE_VERSION.tar.gz /usr/local/bin
@ -26,4 +26,4 @@ RUN go get \
ENTRYPOINT ["dockerize"]
CMD ["-wait", "tcp://broker:5672", "-timeout", "60s", "go", "run", "examples/consumer/consumer.go"]
CMD ["-wait", "tcp://broker:5672", "-timeout", "60s", "go", "run", "examples/consumer/amqp/consumer.go"]

View File

@ -151,12 +151,17 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err
}
}()
err = h.fn(messaging.Event{
event := messaging.Event{
Id: msg.MessageId,
Action: h.action,
Body: msg.Body,
Timestamp: msg.Timestamp,
})
Ack: msg.Ack,
Nack: msg.Nack,
Reject: msg.Reject,
}
err = h.fn(event)
return
}

View File

@ -767,3 +767,83 @@ func TestBlankQueueWithPrefix(t *testing.T) {
}
}
}
func TestCallEventAckMethod(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
c, err := NewConsumer(conn, false, "multi", "TestSubscribeActions")
if assert.Nil(t, err) {
defer c.Close()
clearQueue(conn, c.queueName)
c.Subscribe("multi", func(e messaging.Event) error {
e.Ack(false)
func1 <- true
return nil
}, nil)
c.Subscribe("multi", func(e messaging.Event) error {
e.Ack(false)
func2 <- true
return nil
}, nil)
go c.Consume()
// take a time to setup topology
time.Sleep(SleepSetupTopology)
p, err := NewProducer(conn, "multi")
assert.Nil(t, err)
p.Publish("multi", []byte(""))
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(3 * time.Second):
assert.Fail(t, "timed out")
}
}
}
func TestCallEventNackMethod(t *testing.T) {
c, err := NewConsumer(conn, false, "onlynack", "TestSubscribeActions")
if assert.Nil(t, err) {
count := 0
defer c.Close()
clearQueue(conn, c.queueName)
c.Subscribe("multi", func(e messaging.Event) error {
count += 1
if count == 3 {
e.Ack(false)
} else {
e.Nack(false, true)
}
return nil
}, nil)
go c.Consume()
// take a time to setup topology
time.Sleep(SleepSetupTopology)
p, err := NewProducer(conn, "multi")
assert.Nil(t, err)
p.Publish("multi", []byte(""))
// Wait for requeue
time.Sleep(10 * time.Second)
assert.Equal(t, 3, count)
}
}

View File

@ -10,6 +10,9 @@ type Event struct {
Action string
Timestamp time.Time
Body []byte
Ack func(multiple bool) error
Nack func(multiple, requeue bool) error
Reject func(requeue bool) error
ctx context.Context
}