From 06a8443d6f1ba638f594a03ce4d20163f30ea505 Mon Sep 17 00:00:00 2001 From: skrater Date: Tue, 19 Dec 2017 12:02:09 -0200 Subject: [PATCH] more options to declare queue --- amqp/consumer.go | 25 ++++++++++++++++++------ amqp/consumer_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/amqp/consumer.go b/amqp/consumer.go index 9a07c34..2cb3d8a 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -52,6 +52,9 @@ type consumer struct { type ConsumerConfig struct { ConsumeRetryInterval time.Duration PrefetchCount int + DurableQueue bool + AutoDelete bool + PrefixName string } // NewConsumer returns a new AMQP Consumer. @@ -60,6 +63,8 @@ func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) ( return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{ ConsumeRetryInterval: 2 * time.Second, PrefetchCount: 0, + DurableQueue: true, + AutoDelete: false, }) } @@ -102,6 +107,10 @@ func (c *consumer) Close() { c.wg.Wait() } +func (c *consumer) uniqueNameWithPrefix() string { + return fmt.Sprint("%s%d", c.config.PrefixName, time.Now().UnixNano()) +} + func (c *consumer) setupTopology() error { c.m.Lock() defer c.m.Unlock() @@ -138,13 +147,17 @@ func (c *consumer) setupTopology() error { return err } + if c.queueName == "" && c.config.PrefixName != "" { + c.queueName = c.uniqueNameWithPrefix() + } + q, err := c.channel.QueueDeclare( - c.queueName, // name - true, // durable - false, // auto-delete - false, // exclusive - false, // no-wait - nil, // arguments + c.queueName, // name + c.config.DurableQueue, // durable + c.config.AutoDelete, // auto-delete + false, // exclusive + false, // no-wait + nil, // arguments ) if err != nil { diff --git a/amqp/consumer_test.go b/amqp/consumer_test.go index 8f6cc5c..0c98433 100644 --- a/amqp/consumer_test.go +++ b/amqp/consumer_test.go @@ -626,3 +626,48 @@ func TestConsumePrefetch(t *testing.T) { wait <- true } } + +func TestBlankQueueWithPrefix(t *testing.T) { + myActionTimesCalled := 0 + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumerConfig(conn, false, "webhooks", "", ConsumerConfig{ + ConsumeRetryInterval: 2 * time.Second, + PrefetchCount: 1, + PrefixName: "test@", + }) + + if assert.Nil(t, err) { + defer c.Close() + + // Clean all messages if any... + c.channel.QueuePurge(c.queueName, false) + + c.Subscribe("test1", func(e messaging.Event) error { + defer func() { + myActionTimesCalled++ + }() + return nil + }, nil) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("test1", []byte("")) + + time.Sleep(200 * time.Millisecond) + + select { + case <-time.After(1 * time.Second): + assert.Equal(t, 1, myActionTimesCalled, "Consumer got wrong quantity of messages.") + } + } +}