From 05a5b865efdd9e2667b63e0c8644a033976532f6 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Schweder" Date: Mon, 5 Nov 2018 14:24:32 -0200 Subject: [PATCH 1/2] Corrected environment variable usage in Dockerfile --- Dockerfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6d84e58..83386f0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,12 @@ FROM golang:1.10 ARG PLATFORM=linux -ENV PLATFORM $PLATFORM_VERSION +ENV PLATFORM $PLATFORM ARG ARCH=amd64 -ENV ARCH $ARCH_VERSION +ENV ARCH $ARCH -ARG DOCKERIZE_VERSION=v0.2.0 +ARG DOCKERIZE_VERSION=v0.6.1 ENV DOCKERIZE_VERSION $DOCKERIZE_VERSION ADD https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-$PLATFORM-$ARCH-$DOCKERIZE_VERSION.tar.gz /usr/local/bin @@ -26,4 +26,4 @@ RUN go get \ ENTRYPOINT ["dockerize"] -CMD ["-wait", "tcp://broker:5672", "-timeout", "60s", "go", "run", "examples/consumer/consumer.go"] +CMD ["-wait", "tcp://broker:5672", "-timeout", "60s", "go", "run", "examples/consumer/amqp/consumer.go"] From dbd0a872caf87e7a5d7eefa7728875cb1bba24da Mon Sep 17 00:00:00 2001 From: "Jonathan A. Schweder" Date: Mon, 5 Nov 2018 14:34:25 -0200 Subject: [PATCH 2/2] Add acknowledger events to Event struct --- amqp/consumer.go | 9 +++-- amqp/consumer_test.go | 80 +++++++++++++++++++++++++++++++++++++++++++ messaging/event.go | 3 ++ 3 files changed, 90 insertions(+), 2 deletions(-) diff --git a/amqp/consumer.go b/amqp/consumer.go index fef888c..3556df6 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -151,12 +151,17 @@ func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err err } }() - err = h.fn(messaging.Event{ + event := messaging.Event{ Id: msg.MessageId, Action: h.action, Body: msg.Body, Timestamp: msg.Timestamp, - }) + Ack: msg.Ack, + Nack: msg.Nack, + Reject: msg.Reject, + } + + err = h.fn(event) return } diff --git a/amqp/consumer_test.go b/amqp/consumer_test.go index 403f6a3..0906cf0 100644 --- a/amqp/consumer_test.go +++ b/amqp/consumer_test.go @@ -767,3 +767,83 @@ func TestBlankQueueWithPrefix(t *testing.T) { } } } + +func TestCallEventAckMethod(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + c, err := NewConsumer(conn, false, "multi", "TestSubscribeActions") + + if assert.Nil(t, err) { + defer c.Close() + + clearQueue(conn, c.queueName) + + c.Subscribe("multi", func(e messaging.Event) error { + e.Ack(false) + func1 <- true + return nil + }, nil) + + c.Subscribe("multi", func(e messaging.Event) error { + e.Ack(false) + func2 <- true + return nil + }, nil) + + go c.Consume() + + // take a time to setup topology + time.Sleep(SleepSetupTopology) + + p, err := NewProducer(conn, "multi") + + assert.Nil(t, err) + + p.Publish("multi", []byte("")) + + select { + case <-func1: + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(3 * time.Second): + assert.Fail(t, "timed out") + } + } +} + +func TestCallEventNackMethod(t *testing.T) { + c, err := NewConsumer(conn, false, "onlynack", "TestSubscribeActions") + + if assert.Nil(t, err) { + count := 0 + defer c.Close() + clearQueue(conn, c.queueName) + c.Subscribe("multi", func(e messaging.Event) error { + count += 1 + if count == 3 { + e.Ack(false) + } else { + e.Nack(false, true) + } + + return nil + }, nil) + + go c.Consume() + + // take a time to setup topology + time.Sleep(SleepSetupTopology) + + p, err := NewProducer(conn, "multi") + + assert.Nil(t, err) + + p.Publish("multi", []byte("")) + + // Wait for requeue + time.Sleep(10 * time.Second) + + assert.Equal(t, 3, count) + } +} diff --git a/messaging/event.go b/messaging/event.go index 90f8f7a..25f4789 100644 --- a/messaging/event.go +++ b/messaging/event.go @@ -10,6 +10,9 @@ type Event struct { Action string Timestamp time.Time Body []byte + Ack func(multiple bool) error + Nack func(multiple, requeue bool) error + Reject func(requeue bool) error ctx context.Context }