From 26439cfd493b0c7e4188037b9b45115963f09a72 Mon Sep 17 00:00:00 2001 From: skrater Date: Thu, 3 May 2018 14:38:57 -0300 Subject: [PATCH] methods to bind actions after consume is started --- .travis.yml | 2 +- amqp/consumer.go | 76 ++++++++++++++++++++++++++----------- amqp/consumer_test.go | 88 +++++++++++++++++++++++++++++++++++++++++++ messaging/consumer.go | 2 + mock/consumer.go | 10 +++++ 5 files changed, 155 insertions(+), 23 deletions(-) diff --git a/.travis.yml b/.travis.yml index 0d0a14b..fbc7ecb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: go go: - - 1.8 - 1.9 + - 1.10 - tip install: - go get github.com/streadway/amqp diff --git a/amqp/consumer.go b/amqp/consumer.go index e9f1dc1..fd0bd83 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -34,7 +34,7 @@ type handler struct { type consumer struct { config ConsumerConfig - m sync.Mutex + m sync.RWMutex wg sync.WaitGroup conn *connection @@ -282,8 +282,8 @@ func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int } func (c *consumer) getHandler(msg amqplib.Delivery) (*handler, bool) { - c.m.Lock() - defer c.m.Unlock() + c.m.RLock() + defer c.m.RUnlock() action := getAction(msg) @@ -314,6 +314,9 @@ func (c *consumer) Subscribe(action string, handlerFn messaging.EventHandler, op } } + c.m.Lock() + defer c.m.Unlock() + c.handlers = append(c.handlers, handler{ action: action, fn: handlerFn, @@ -328,27 +331,11 @@ func (c *consumer) Subscribe(action string, handlerFn messaging.EventHandler, op // Unsubscribe allows to unsubscribe an action handler. func (c *consumer) Unsubscribe(action string) error { - channel, err := c.conn.openChannel() - - if err != nil { - return err - } - - defer channel.Close() - - err = channel.QueueUnbind( - c.queueName, // queue name - action, // routing key - c.exchangeName, // exchange - nil, // arguments - ) - - if err != nil { - return err - } - idx := -1 + c.m.Lock() + defer c.m.Unlock() + for i, h := range c.handlers { if h.action == action { idx = i @@ -363,6 +350,51 @@ func (c *consumer) Unsubscribe(action string) error { return nil } +func (c *consumer) BindActions(actions ...string) error { + channel, err := c.conn.openChannel() + + if err != nil { + return err + } + + defer channel.Close() + + for _, action := range actions { + err := c.bindActionToQueue(channel, c.queueName, action) + + if err != nil { + return err + } + } + + return nil +} + +func (c *consumer) UnbindActions(actions ...string) error { + channel, err := c.conn.openChannel() + + if err != nil { + return err + } + + defer channel.Close() + + for _, action := range actions { + err := channel.QueueUnbind( + c.queueName, // queue name + action, // routing key + c.exchangeName, // exchange + nil, // arguments + ) + + if err != nil { + return err + } + } + + return nil +} + func (c *consumer) bindActionToQueue(channel *amqplib.Channel, queueName string, action string) error { return channel.QueueBind( queueName, // queue name diff --git a/amqp/consumer_test.go b/amqp/consumer_test.go index dac06b1..403f6a3 100644 --- a/amqp/consumer_test.go +++ b/amqp/consumer_test.go @@ -86,6 +86,94 @@ func TestSubscribeActions(t *testing.T) { } } +func TestSubscribeActionsByBindAfterConsume(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeActionsByBindAfterConsume") + + if assert.Nil(t, err) { + defer c.Close() + + clearQueue(conn, c.queueName) + + go c.Consume() + + // take a time to setup topology + time.Sleep(SleepSetupTopology) + + c.Subscribe("my_action_1", func(e messaging.Event) error { + func1 <- true + return nil + }, nil) + + c.Subscribe("my_action_2", func(e messaging.Event) error { + func2 <- true + return nil + }, nil) + + assert.NoError(t, c.BindActions("my_action_1", "my_action_2")) + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("my_action_1", []byte("")) + + select { + case <-func1: + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(3 * time.Second): + assert.Fail(t, "timed out") + } + } +} + +func TestSubscribeActionsUnbindAfterConsume(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeActionsUnbindAfterConsume") + + if assert.Nil(t, err) { + defer c.Close() + + clearQueue(conn, c.queueName) + + c.Subscribe("my_action_1", func(e messaging.Event) error { + func1 <- true + return nil + }, nil) + + c.Subscribe("my_action_2", func(e messaging.Event) error { + func2 <- true + return nil + }, nil) + + go c.Consume() + + // take a time to setup topology + time.Sleep(SleepSetupTopology) + + assert.NoError(t, c.UnbindActions("my_action_2")) + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("my_action_2", []byte("")) + + select { + case <-func1: + assert.Fail(t, "called wrong action") + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(3 * time.Second): + } + } +} + func TestSubscribeWildcardActions(t *testing.T) { func1 := make(chan bool) func2 := make(chan bool) diff --git a/messaging/consumer.go b/messaging/consumer.go index edd0cf4..7e9cc0e 100644 --- a/messaging/consumer.go +++ b/messaging/consumer.go @@ -25,6 +25,8 @@ type EventHandler func(Event) error type Consumer interface { Subscribe(action string, handler EventHandler, options *SubscribeOptions) error Unsubscribe(action string) error + BindActions(actions ...string) error + UnbindActions(actions ...string) error Consume() Close() } diff --git a/mock/consumer.go b/mock/consumer.go index f4e4c87..b665cdf 100644 --- a/mock/consumer.go +++ b/mock/consumer.go @@ -30,3 +30,13 @@ func (c *Consumer) Consume() { func (c *Consumer) Close() { c.Called() } + +func (c *Consumer) BindActions(actions ...string) error { + args := c.Called(actions) + return args.Error(0) +} + +func (c *Consumer) UnbindActions(actions ...string) error { + args := c.Called(actions) + return args.Error(0) +}