diff --git a/README.md b/README.md index 8768208..ee35339 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Go messaging library An application produces events based on actions. Another application consume these events and maybe create new events. -*Scenario:* If an application produces an events "payment-received", another application may want to delivery the product to the buyer. +*Scenario:* If an application produces an event "payment.received", another application may want to delivery the product to the buyer. ## Supported Transport diff --git a/amqp/consumer.go b/amqp/consumer.go index 8a73844..41f506b 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -2,6 +2,7 @@ package amqp import ( "regexp" + "runtime" "strings" "sync" "time" @@ -35,7 +36,9 @@ type handler struct { type Consumer struct { config ConsumerConfig - m sync.Mutex + + m sync.Mutex + s Semaphore conn *Connection autoAck bool @@ -53,14 +56,16 @@ type Consumer struct { // ConsumerConfig to be used when creating a new producer. type ConsumerConfig struct { - consumeRetryInterval time.Duration + ConsumeRetryInterval time.Duration + MaxWorkers int } // NewConsumer returns a new AMQP Consumer. // Uses a default ConsumerConfig with 2 second of consume retry interval. func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (messaging.Consumer, error) { return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{ - consumeRetryInterval: 2 * time.Second, + ConsumeRetryInterval: 2 * time.Second, + MaxWorkers: runtime.NumCPU(), }) } @@ -68,6 +73,7 @@ func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) ( func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue string, config ConsumerConfig) (messaging.Consumer, error) { consumer := &Consumer{ config: config, + s: NewSemaphore(config.MaxWorkers), conn: c.(*Connection), autoAck: autoAck, handlers: make([]handler, 0), @@ -146,7 +152,7 @@ func (c *Consumer) handleReestablishedConnnection() { } func (c *Consumer) dispatch(msg amqplib.Delivery) { - if h, ok := c.getHandler(msg.RoutingKey); ok { + if h, ok := c.getHandler(msg); ok { delay, ok := getXRetryDelayHeader(msg) if !ok { @@ -213,7 +219,7 @@ func (c *Consumer) dispatch(msg amqplib.Delivery) { msg.Ack(false) } } else { - // got a message from wrong exchange? + // got wrong message? // ignore and don't requeue. if !c.autoAck { msg.Nack(false, false) @@ -234,14 +240,15 @@ func (c *Consumer) requeueMessage(msg amqplib.Delivery, h *handler, retryCount i "x-retry-count": retryCount + 1, "x-retry-max": h.maxRetries, "x-retry-delay": delayNs, + "x-action-key": getAction(msg), }, - DeliveryMode: amqplib.Persistent, Timestamp: time.Now(), + DeliveryMode: msg.DeliveryMode, Body: msg.Body, MessageId: msg.MessageId, } - err := c.channel.Publish(msg.Exchange, msg.RoutingKey, false, false, retryMsg) + err := c.channel.Publish("", c.queueName, false, false, retryMsg) if err != nil { logger.WithFields(log.Fields{ @@ -256,7 +263,9 @@ func (c *Consumer) requeueMessage(msg amqplib.Delivery, h *handler, retryCount i } } -func (c *Consumer) getHandler(action string) (*handler, bool) { +func (c *Consumer) getHandler(msg amqplib.Delivery) (*handler, bool) { + action := getAction(msg) + for _, h := range c.handlers { if h.re.MatchString(action) { return &h, true @@ -360,7 +369,7 @@ func (c *Consumer) Consume() { "error": err, }).Error("Error setting up consumer...") - time.Sleep(c.config.consumeRetryInterval) + time.Sleep(c.config.ConsumeRetryInterval) continue } @@ -370,7 +379,12 @@ func (c *Consumer) Consume() { }).Info("Consuming messages...") for m := range msgs { - go c.dispatch(m) + c.s.Acquire() + + go func() { + c.dispatch(m) + c.s.Release() + }() } logger.WithFields(log.Fields{ @@ -380,6 +394,14 @@ func (c *Consumer) Consume() { } } +func getAction(msg amqplib.Delivery) string { + if ac, ok := msg.Headers["x-action-key"]; ok { + return ac.(string) + } else { + return msg.RoutingKey + } +} + func getXRetryDeathHeader(msg amqplib.Delivery) (time.Time, bool) { if d, ok := msg.Headers["x-retry-death"]; ok { return d.(time.Time), true diff --git a/amqp/consumer_test.go b/amqp/consumer_test.go new file mode 100644 index 0000000..0fd69cc --- /dev/null +++ b/amqp/consumer_test.go @@ -0,0 +1,521 @@ +package amqp + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSubscribeActions(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeActions") + + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + c.Subscribe("my_action_1", func(body []byte) error { + func1 <- true + return nil + }) + + c.Subscribe("my_action_2", func(body []byte) error { + func2 <- true + return nil + }) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("my_action_1", []byte("")) + + select { + case <-func1: + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(1 * time.Second): + assert.Fail(t, "timed out") + } +} + +func TestSubscribeWildcardActions(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeWildcardActions") + + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + c.Subscribe("webinar.*", func(body []byte) error { + func1 <- true + return nil + }) + + c.Subscribe("foobar.*", func(body []byte) error { + func2 <- true + return nil + }) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("webinar.state_changed", []byte("")) + + select { + case <-func1: + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(1 * time.Second): + assert.Fail(t, "timed out") + } +} + +func TestSubscribeWildcardActionOrder1(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeWildcardActionOrder1") + + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + c.Subscribe("webinar.*", func(body []byte) error { + func1 <- true + return nil + }) + + c.Subscribe("webinar.state_changed", func(body []byte) error { + func2 <- true + return nil + }) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("webinar.state_changed", []byte("")) + + select { + case <-func1: + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(1 * time.Second): + assert.Fail(t, "timed out") + } +} + +func TestSubscribeWildcardActionOrder2(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeWildcardActionOrder2") + + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + c.Subscribe("webinar.state_changed", func(body []byte) error { + func1 <- true + return nil + }) + + c.Subscribe("webinar.*", func(body []byte) error { + func2 <- true + return nil + }) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("webinar.state_changed", []byte("")) + + select { + case <-func1: + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(1 * time.Second): + assert.Fail(t, "timed out") + } +} + +func TestDontRequeueMessageIfFailsToProcess(t *testing.T) { + timesCalled := 0 + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestDontRequeueMessageIfFailsToProcess") + + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + c.Subscribe("my_action", func(body []byte) error { + defer func() { timesCalled++ }() + + if timesCalled == 0 { + return fmt.Errorf("Error") + } + + return nil + }) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("my_action", []byte("")) + + select { + case <-time.After(1 * time.Second): + assert.Equal(t, 1, timesCalled, "Consumer got wrong quantity of messages.") + } +} + +func TestRequeueMessageIfFailsToProcess(t *testing.T) { + timesCalled := 0 + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestRequeueMessageIfFailsToProcess") + + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + c.SubscribeWithOptions("my_action", func(body []byte) error { + defer func() { timesCalled++ }() + + if timesCalled == 0 { + return fmt.Errorf("Error") + } + + return nil + }, 100*time.Millisecond, false, 5) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("my_action", []byte("")) + + select { + case <-time.After(1 * time.Second): + assert.Equal(t, 2, timesCalled, "Consumer got wrong quantity of messages.") + } +} + +func TestRequeueMessageIfPanicsToProcess(t *testing.T) { + timesCalled := 0 + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestRequeueMessageIfPanicsToProcess") + + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + c.SubscribeWithOptions("my_action", func(body []byte) error { + defer func() { timesCalled++ }() + + if timesCalled == 0 { + panic("this is a panic!") + } + + return nil + }, 100*time.Millisecond, false, 5) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("my_action", []byte("")) + + select { + case <-time.After(1 * time.Second): + assert.Equal(t, 2, timesCalled, "Consumer got wrong quantity of messages.") + } +} + +func TestRequeueMessageToTheSameQueue(t *testing.T) { + timesCalled1 := 0 + timesCalled2 := 0 + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c1, err := NewConsumer(conn, false, "webhooks", "TestRequeueMessageToTheSameQueue_1") + assert.Nil(t, err) + + c2, err := NewConsumer(conn, false, "webhooks", "TestRequeueMessageToTheSameQueue_2") + assert.Nil(t, err) + + defer c1.Close() + defer c2.Close() + + // Clean all messages if any... + consumer1 := c1.(*Consumer) + consumer1.channel.QueuePurge(consumer1.queueName, false) + + consumer2 := c2.(*Consumer) + consumer2.channel.QueuePurge(consumer2.queueName, false) + + c1.Subscribe("my_action", func(body []byte) error { + timesCalled2++ + return nil + }) + + c2.SubscribeWithOptions("my_action", func(body []byte) error { + defer func() { timesCalled1++ }() + + if timesCalled1 == 0 { + return fmt.Errorf("Error.") + } else { + return nil + } + }, 100*time.Millisecond, false, 5) + + go c1.Consume() + go c2.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("my_action", []byte("")) + + select { + case <-time.After(1 * time.Second): + assert.Equal(t, 2, timesCalled1, "Consumer 1 got wrong quantity of messages.") + assert.Equal(t, 1, timesCalled2, "Consumer 2 got wrong quantity of messages.") + } +} + +func TestActionExitsMaxRetries(t *testing.T) { + timesCalled := 0 + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestActionExitsMaxRetries") + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + // It runs once and get an error, it will try five times more until it stops. + c.SubscribeWithOptions("my_action", func(body []byte) error { + defer func() { timesCalled++ }() + return fmt.Errorf("Error.") + }, 100*time.Millisecond, false, 5) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("my_action", []byte("")) + + select { + case <-time.After(1 * time.Second): + assert.Equal(t, 6, timesCalled, "Consumer got wrong quantity of messages.") + } +} + +func TestActionExitsMaxRetriesWhenDelayed(t *testing.T) { + timesCalled := 0 + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestActionExitsMaxRetriesWhenDelayed") + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + // It runs once and get an error, it will try five times more until it stops. + c.SubscribeWithOptions("my_action", func(body []byte) error { + defer func() { timesCalled++ }() + return fmt.Errorf("Error.") + }, 100*time.Millisecond, true, 3) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("my_action", []byte("")) + + select { + case <-time.After(1 * time.Second): + assert.Equal(t, 4, timesCalled, "Consumer got wrong quantity of messages.") + } +} + +func TestConsumeWorkers(t *testing.T) { + timesCalled := 0 + wait := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumerConfig(conn, false, "webhooks", "TestConsumeWorkers", ConsumerConfig{ + MaxWorkers: 5, + ConsumeRetryInterval: 15 * time.Second, + }) + + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + c.Subscribe("my_action", func(body []byte) error { + timesCalled++ + <-wait + return nil + }) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + for i := 0; i < 10; i++ { + p.Publish("my_action", []byte("")) + } + + <-time.After(100 * time.Millisecond) + assert.Equal(t, 5, timesCalled, "Consumer got wrong quantity of messages.") + + // release one + wait <- true + + <-time.After(100 * time.Millisecond) + assert.Equal(t, 6, timesCalled, "Consumer got wrong quantity of messages.") + + // release all + for i := 0; i < 5; i++ { + wait <- true + } + + <-time.After(100 * time.Millisecond) + assert.Equal(t, 10, timesCalled, "Consumer got wrong quantity of messages.") + + // release all + for i := 0; i < 4; i++ { + wait <- true + } +} diff --git a/amqp/integration_test.go b/amqp/integration_test.go deleted file mode 100644 index 467b12f..0000000 --- a/amqp/integration_test.go +++ /dev/null @@ -1,287 +0,0 @@ -package amqp - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestPublishConsume(t *testing.T) { - func1 := make(chan bool) - func2 := make(chan bool) - - conn, err := NewConnection("amqp://guest:guest@broker:5672/") - - assert.Nil(t, err) - - defer conn.Close() - - c, err := NewConsumer(conn, false, "events-exchange", "events-queue") - - assert.Nil(t, err) - - defer c.Close() - - // Clean all messages if any... - consumer := c.(*Consumer) - consumer.channel.QueuePurge(consumer.queueName, false) - - c.Subscribe("my_action_1", func(body []byte) error { - func1 <- true - return nil - }) - - c.Subscribe("my_action_2", func(body []byte) error { - func2 <- true - return nil - }) - - go c.Consume() - - p, err := NewProducer(conn, "events-exchange") - - assert.Nil(t, err) - - p.Publish("my_action_1", []byte("")) - - select { - case <-func1: - case <-func2: - assert.Fail(t, "called wrong action") - case <-time.After(5 * time.Second): - assert.Fail(t, "timed out") - } -} - -func TestPublishConsumeWildcardAction(t *testing.T) { - func1 := make(chan bool) - func2 := make(chan bool) - - conn, err := NewConnection("amqp://guest:guest@broker:5672/") - - assert.Nil(t, err) - - defer conn.Close() - - c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer") - - assert.Nil(t, err) - - defer c.Close() - - // Clean all messages if any... - consumer := c.(*Consumer) - consumer.channel.QueuePurge(consumer.queueName, false) - - c.Subscribe("webinar.*", func(body []byte) error { - func1 <- true - return nil - }) - - c.Subscribe("foobar.*", func(body []byte) error { - func2 <- true - return nil - }) - - go c.Consume() - - p, err := NewProducer(conn, "webhooks") - - assert.Nil(t, err) - - p.Publish("webinar.state_changed", []byte("")) - - select { - case <-func1: - case <-func2: - assert.Fail(t, "called wrong action") - case <-time.After(5 * time.Second): - assert.Fail(t, "timed out") - } -} - -func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) { - func1 := make(chan bool) - func2 := make(chan bool) - - conn, err := NewConnection("amqp://guest:guest@broker:5672/") - - assert.Nil(t, err) - - defer conn.Close() - - c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer") - - assert.Nil(t, err) - - defer c.Close() - - // Clean all messages if any... - consumer := c.(*Consumer) - consumer.channel.QueuePurge(consumer.queueName, false) - - c.Subscribe("webinar.*", func(body []byte) error { - func1 <- true - return nil - }) - - c.Subscribe("webinar.state_changed", func(body []byte) error { - func2 <- true - return nil - }) - - go c.Consume() - - p, err := NewProducer(conn, "webhooks") - - assert.Nil(t, err) - - p.Publish("webinar.state_changed", []byte("")) - - select { - case <-func1: - case <-func2: - assert.Fail(t, "called wrong action") - case <-time.After(5 * time.Second): - assert.Fail(t, "timed out") - } -} - -func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) { - func1 := make(chan bool) - func2 := make(chan bool) - - conn, err := NewConnection("amqp://guest:guest@broker:5672/") - - assert.Nil(t, err) - - defer conn.Close() - - c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer") - - assert.Nil(t, err) - - defer c.Close() - - // Clean all messages if any... - consumer := c.(*Consumer) - consumer.channel.QueuePurge(consumer.queueName, false) - - c.Subscribe("webinar.state_changed", func(body []byte) error { - func1 <- true - return nil - }) - - c.Subscribe("webinar.*", func(body []byte) error { - func2 <- true - return nil - }) - - go c.Consume() - - p, err := NewProducer(conn, "webhooks") - - assert.Nil(t, err) - - p.Publish("webinar.state_changed", []byte("")) - - select { - case <-func1: - case <-func2: - assert.Fail(t, "called wrong action") - case <-time.After(5 * time.Second): - assert.Fail(t, "timed out") - } -} - -func TestPublishConsumeRequeueIfFail(t *testing.T) { - calledOnce := false - called := make(chan bool) - - conn, err := NewConnection("amqp://guest:guest@broker:5672/") - - assert.Nil(t, err) - - defer conn.Close() - - c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer") - - assert.Nil(t, err) - - defer c.Close() - - // Clean all messages if any... - consumer := c.(*Consumer) - consumer.channel.QueuePurge(consumer.queueName, false) - - c.SubscribeWithOptions("my_action", func(body []byte) error { - if calledOnce { - called <- true - return nil - } else { - calledOnce = true - return fmt.Errorf("Error.") - } - }, 1*time.Second, false, 5) - - go c.Consume() - - p, err := NewProducer(conn, "webhooks") - - assert.Nil(t, err) - - p.Publish("my_action", []byte("")) - - select { - case <-called: - case <-time.After(5 * time.Second): - assert.Fail(t, "timed out") - } -} - -func TestPublishConsumeRequeueIfPanic(t *testing.T) { - calledOnce := false - called := make(chan bool) - - conn, err := NewConnection("amqp://guest:guest@broker:5672/") - - assert.Nil(t, err) - - defer conn.Close() - - c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer") - - assert.Nil(t, err) - - defer c.Close() - - // Clean all messages if any... - consumer := c.(*Consumer) - consumer.channel.QueuePurge(consumer.queueName, false) - - c.SubscribeWithOptions("my_action", func(body []byte) error { - if calledOnce { - called <- true - return nil - } else { - calledOnce = true - panic("this is a panic!") - } - }, 1*time.Second, false, 5) - - go c.Consume() - - p, err := NewProducer(conn, "webhooks") - - assert.Nil(t, err) - - p.Publish("my_action", []byte("")) - - select { - case <-called: - case <-time.After(5 * time.Second): - assert.Fail(t, "timed out") - } -} diff --git a/amqp/producer_test.go b/amqp/producer_test.go new file mode 100644 index 0000000..2a89988 --- /dev/null +++ b/amqp/producer_test.go @@ -0,0 +1,86 @@ +package amqp + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPublish(t *testing.T) { + timesCalled := 0 + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestPublish") + + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + c.Subscribe("action.name", func(body []byte) error { + defer func() { timesCalled++ }() + return nil + }) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + p.Publish("action.name", []byte("")) + + select { + case <-time.After(1 * time.Second): + assert.Equal(t, 1, timesCalled, "Message wasn't published.") + } +} + +func TestPublishMultipleTimes(t *testing.T) { + timesCalled := 0 + + conn, err := NewConnection("amqp://guest:guest@broker:5672/") + + assert.Nil(t, err) + + defer conn.Close() + + c, err := NewConsumer(conn, false, "webhooks", "TestPublishMultipleTimes") + + assert.Nil(t, err) + + defer c.Close() + + // Clean all messages if any... + consumer := c.(*Consumer) + consumer.channel.QueuePurge(consumer.queueName, false) + + c.Subscribe("action.name", func(body []byte) error { + defer func() { timesCalled++ }() + return nil + }) + + go c.Consume() + + p, err := NewProducer(conn, "webhooks") + + assert.Nil(t, err) + + for i := 0; i < 5; i++ { + p.Publish("action.name", []byte("")) + } + + select { + case <-time.After(1 * time.Second): + assert.Equal(t, 5, timesCalled, "One or more messages weren't published.") + } +} diff --git a/amqp/semaphore.go b/amqp/semaphore.go new file mode 100644 index 0000000..4509efe --- /dev/null +++ b/amqp/semaphore.go @@ -0,0 +1,19 @@ +package amqp + +type Semaphore struct { + c chan bool +} + +func NewSemaphore(size int) Semaphore { + return Semaphore{ + c: make(chan bool, size), + } +} + +func (s *Semaphore) Acquire() { + s.c <- true +} + +func (s *Semaphore) Release() { + <-s.c +}