1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-26 13:48:59 +08:00

Merge pull request #8 from alexandrevicenzi/master

Fixes
This commit is contained in:
Alexandre Vicenzi 2017-06-15 14:14:41 -03:00 committed by GitHub
commit a05e56a148
6 changed files with 659 additions and 298 deletions

View File

@ -9,7 +9,7 @@ Go messaging library
An application produces events based on actions. An application produces events based on actions.
Another application consume these events and maybe create new events. Another application consume these events and maybe create new events.
*Scenario:* If an application produces an events "payment-received", another application may want to delivery the product to the buyer. *Scenario:* If an application produces an event "payment.received", another application may want to delivery the product to the buyer.
## Supported Transport ## Supported Transport

View File

@ -2,6 +2,7 @@ package amqp
import ( import (
"regexp" "regexp"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -35,7 +36,9 @@ type handler struct {
type Consumer struct { type Consumer struct {
config ConsumerConfig config ConsumerConfig
m sync.Mutex m sync.Mutex
s Semaphore
conn *Connection conn *Connection
autoAck bool autoAck bool
@ -53,14 +56,16 @@ type Consumer struct {
// ConsumerConfig to be used when creating a new producer. // ConsumerConfig to be used when creating a new producer.
type ConsumerConfig struct { type ConsumerConfig struct {
consumeRetryInterval time.Duration ConsumeRetryInterval time.Duration
MaxWorkers int
} }
// NewConsumer returns a new AMQP Consumer. // NewConsumer returns a new AMQP Consumer.
// Uses a default ConsumerConfig with 2 second of consume retry interval. // Uses a default ConsumerConfig with 2 second of consume retry interval.
func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (messaging.Consumer, error) { func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (messaging.Consumer, error) {
return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{ return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{
consumeRetryInterval: 2 * time.Second, ConsumeRetryInterval: 2 * time.Second,
MaxWorkers: runtime.NumCPU(),
}) })
} }
@ -68,6 +73,7 @@ func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (
func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue string, config ConsumerConfig) (messaging.Consumer, error) { func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue string, config ConsumerConfig) (messaging.Consumer, error) {
consumer := &Consumer{ consumer := &Consumer{
config: config, config: config,
s: NewSemaphore(config.MaxWorkers),
conn: c.(*Connection), conn: c.(*Connection),
autoAck: autoAck, autoAck: autoAck,
handlers: make([]handler, 0), handlers: make([]handler, 0),
@ -146,7 +152,7 @@ func (c *Consumer) handleReestablishedConnnection() {
} }
func (c *Consumer) dispatch(msg amqplib.Delivery) { func (c *Consumer) dispatch(msg amqplib.Delivery) {
if h, ok := c.getHandler(msg.RoutingKey); ok { if h, ok := c.getHandler(msg); ok {
delay, ok := getXRetryDelayHeader(msg) delay, ok := getXRetryDelayHeader(msg)
if !ok { if !ok {
@ -213,7 +219,7 @@ func (c *Consumer) dispatch(msg amqplib.Delivery) {
msg.Ack(false) msg.Ack(false)
} }
} else { } else {
// got a message from wrong exchange? // got wrong message?
// ignore and don't requeue. // ignore and don't requeue.
if !c.autoAck { if !c.autoAck {
msg.Nack(false, false) msg.Nack(false, false)
@ -234,14 +240,15 @@ func (c *Consumer) requeueMessage(msg amqplib.Delivery, h *handler, retryCount i
"x-retry-count": retryCount + 1, "x-retry-count": retryCount + 1,
"x-retry-max": h.maxRetries, "x-retry-max": h.maxRetries,
"x-retry-delay": delayNs, "x-retry-delay": delayNs,
"x-action-key": getAction(msg),
}, },
DeliveryMode: amqplib.Persistent,
Timestamp: time.Now(), Timestamp: time.Now(),
DeliveryMode: msg.DeliveryMode,
Body: msg.Body, Body: msg.Body,
MessageId: msg.MessageId, MessageId: msg.MessageId,
} }
err := c.channel.Publish(msg.Exchange, msg.RoutingKey, false, false, retryMsg) err := c.channel.Publish("", c.queueName, false, false, retryMsg)
if err != nil { if err != nil {
logger.WithFields(log.Fields{ logger.WithFields(log.Fields{
@ -256,7 +263,9 @@ func (c *Consumer) requeueMessage(msg amqplib.Delivery, h *handler, retryCount i
} }
} }
func (c *Consumer) getHandler(action string) (*handler, bool) { func (c *Consumer) getHandler(msg amqplib.Delivery) (*handler, bool) {
action := getAction(msg)
for _, h := range c.handlers { for _, h := range c.handlers {
if h.re.MatchString(action) { if h.re.MatchString(action) {
return &h, true return &h, true
@ -360,7 +369,7 @@ func (c *Consumer) Consume() {
"error": err, "error": err,
}).Error("Error setting up consumer...") }).Error("Error setting up consumer...")
time.Sleep(c.config.consumeRetryInterval) time.Sleep(c.config.ConsumeRetryInterval)
continue continue
} }
@ -370,7 +379,12 @@ func (c *Consumer) Consume() {
}).Info("Consuming messages...") }).Info("Consuming messages...")
for m := range msgs { for m := range msgs {
go c.dispatch(m) c.s.Acquire()
go func() {
c.dispatch(m)
c.s.Release()
}()
} }
logger.WithFields(log.Fields{ logger.WithFields(log.Fields{
@ -380,6 +394,14 @@ func (c *Consumer) Consume() {
} }
} }
func getAction(msg amqplib.Delivery) string {
if ac, ok := msg.Headers["x-action-key"]; ok {
return ac.(string)
} else {
return msg.RoutingKey
}
}
func getXRetryDeathHeader(msg amqplib.Delivery) (time.Time, bool) { func getXRetryDeathHeader(msg amqplib.Delivery) (time.Time, bool) {
if d, ok := msg.Headers["x-retry-death"]; ok { if d, ok := msg.Headers["x-retry-death"]; ok {
return d.(time.Time), true return d.(time.Time), true

521
amqp/consumer_test.go Normal file
View File

@ -0,0 +1,521 @@
package amqp
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestSubscribeActions(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeActions")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action_1", func(body []byte) error {
func1 <- true
return nil
})
c.Subscribe("my_action_2", func(body []byte) error {
func2 <- true
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("my_action_1", []byte(""))
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(1 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestSubscribeWildcardActions(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeWildcardActions")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) error {
func1 <- true
return nil
})
c.Subscribe("foobar.*", func(body []byte) error {
func2 <- true
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("webinar.state_changed", []byte(""))
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(1 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestSubscribeWildcardActionOrder1(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeWildcardActionOrder1")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) error {
func1 <- true
return nil
})
c.Subscribe("webinar.state_changed", func(body []byte) error {
func2 <- true
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("webinar.state_changed", []byte(""))
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(1 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestSubscribeWildcardActionOrder2(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeWildcardActionOrder2")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.state_changed", func(body []byte) error {
func1 <- true
return nil
})
c.Subscribe("webinar.*", func(body []byte) error {
func2 <- true
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("webinar.state_changed", []byte(""))
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(1 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestDontRequeueMessageIfFailsToProcess(t *testing.T) {
timesCalled := 0
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestDontRequeueMessageIfFailsToProcess")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action", func(body []byte) error {
defer func() { timesCalled++ }()
if timesCalled == 0 {
return fmt.Errorf("Error")
}
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("my_action", []byte(""))
select {
case <-time.After(1 * time.Second):
assert.Equal(t, 1, timesCalled, "Consumer got wrong quantity of messages.")
}
}
func TestRequeueMessageIfFailsToProcess(t *testing.T) {
timesCalled := 0
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestRequeueMessageIfFailsToProcess")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.SubscribeWithOptions("my_action", func(body []byte) error {
defer func() { timesCalled++ }()
if timesCalled == 0 {
return fmt.Errorf("Error")
}
return nil
}, 100*time.Millisecond, false, 5)
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("my_action", []byte(""))
select {
case <-time.After(1 * time.Second):
assert.Equal(t, 2, timesCalled, "Consumer got wrong quantity of messages.")
}
}
func TestRequeueMessageIfPanicsToProcess(t *testing.T) {
timesCalled := 0
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestRequeueMessageIfPanicsToProcess")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.SubscribeWithOptions("my_action", func(body []byte) error {
defer func() { timesCalled++ }()
if timesCalled == 0 {
panic("this is a panic!")
}
return nil
}, 100*time.Millisecond, false, 5)
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("my_action", []byte(""))
select {
case <-time.After(1 * time.Second):
assert.Equal(t, 2, timesCalled, "Consumer got wrong quantity of messages.")
}
}
func TestRequeueMessageToTheSameQueue(t *testing.T) {
timesCalled1 := 0
timesCalled2 := 0
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c1, err := NewConsumer(conn, false, "webhooks", "TestRequeueMessageToTheSameQueue_1")
assert.Nil(t, err)
c2, err := NewConsumer(conn, false, "webhooks", "TestRequeueMessageToTheSameQueue_2")
assert.Nil(t, err)
defer c1.Close()
defer c2.Close()
// Clean all messages if any...
consumer1 := c1.(*Consumer)
consumer1.channel.QueuePurge(consumer1.queueName, false)
consumer2 := c2.(*Consumer)
consumer2.channel.QueuePurge(consumer2.queueName, false)
c1.Subscribe("my_action", func(body []byte) error {
timesCalled2++
return nil
})
c2.SubscribeWithOptions("my_action", func(body []byte) error {
defer func() { timesCalled1++ }()
if timesCalled1 == 0 {
return fmt.Errorf("Error.")
} else {
return nil
}
}, 100*time.Millisecond, false, 5)
go c1.Consume()
go c2.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("my_action", []byte(""))
select {
case <-time.After(1 * time.Second):
assert.Equal(t, 2, timesCalled1, "Consumer 1 got wrong quantity of messages.")
assert.Equal(t, 1, timesCalled2, "Consumer 2 got wrong quantity of messages.")
}
}
func TestActionExitsMaxRetries(t *testing.T) {
timesCalled := 0
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestActionExitsMaxRetries")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
// It runs once and get an error, it will try five times more until it stops.
c.SubscribeWithOptions("my_action", func(body []byte) error {
defer func() { timesCalled++ }()
return fmt.Errorf("Error.")
}, 100*time.Millisecond, false, 5)
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("my_action", []byte(""))
select {
case <-time.After(1 * time.Second):
assert.Equal(t, 6, timesCalled, "Consumer got wrong quantity of messages.")
}
}
func TestActionExitsMaxRetriesWhenDelayed(t *testing.T) {
timesCalled := 0
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestActionExitsMaxRetriesWhenDelayed")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
// It runs once and get an error, it will try five times more until it stops.
c.SubscribeWithOptions("my_action", func(body []byte) error {
defer func() { timesCalled++ }()
return fmt.Errorf("Error.")
}, 100*time.Millisecond, true, 3)
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("my_action", []byte(""))
select {
case <-time.After(1 * time.Second):
assert.Equal(t, 4, timesCalled, "Consumer got wrong quantity of messages.")
}
}
func TestConsumeWorkers(t *testing.T) {
timesCalled := 0
wait := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumerConfig(conn, false, "webhooks", "TestConsumeWorkers", ConsumerConfig{
MaxWorkers: 5,
ConsumeRetryInterval: 15 * time.Second,
})
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action", func(body []byte) error {
timesCalled++
<-wait
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
for i := 0; i < 10; i++ {
p.Publish("my_action", []byte(""))
}
<-time.After(100 * time.Millisecond)
assert.Equal(t, 5, timesCalled, "Consumer got wrong quantity of messages.")
// release one
wait <- true
<-time.After(100 * time.Millisecond)
assert.Equal(t, 6, timesCalled, "Consumer got wrong quantity of messages.")
// release all
for i := 0; i < 5; i++ {
wait <- true
}
<-time.After(100 * time.Millisecond)
assert.Equal(t, 10, timesCalled, "Consumer got wrong quantity of messages.")
// release all
for i := 0; i < 4; i++ {
wait <- true
}
}

View File

@ -1,287 +0,0 @@
package amqp
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestPublishConsume(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "events-exchange", "events-queue")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action_1", func(body []byte) error {
func1 <- true
return nil
})
c.Subscribe("my_action_2", func(body []byte) error {
func2 <- true
return nil
})
go c.Consume()
p, err := NewProducer(conn, "events-exchange")
assert.Nil(t, err)
p.Publish("my_action_1", []byte(""))
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestPublishConsumeWildcardAction(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) error {
func1 <- true
return nil
})
c.Subscribe("foobar.*", func(body []byte) error {
func2 <- true
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("webinar.state_changed", []byte(""))
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) error {
func1 <- true
return nil
})
c.Subscribe("webinar.state_changed", func(body []byte) error {
func2 <- true
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("webinar.state_changed", []byte(""))
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.state_changed", func(body []byte) error {
func1 <- true
return nil
})
c.Subscribe("webinar.*", func(body []byte) error {
func2 <- true
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("webinar.state_changed", []byte(""))
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestPublishConsumeRequeueIfFail(t *testing.T) {
calledOnce := false
called := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.SubscribeWithOptions("my_action", func(body []byte) error {
if calledOnce {
called <- true
return nil
} else {
calledOnce = true
return fmt.Errorf("Error.")
}
}, 1*time.Second, false, 5)
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("my_action", []byte(""))
select {
case <-called:
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestPublishConsumeRequeueIfPanic(t *testing.T) {
calledOnce := false
called := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.SubscribeWithOptions("my_action", func(body []byte) error {
if calledOnce {
called <- true
return nil
} else {
calledOnce = true
panic("this is a panic!")
}
}, 1*time.Second, false, 5)
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("my_action", []byte(""))
select {
case <-called:
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}

86
amqp/producer_test.go Normal file
View File

@ -0,0 +1,86 @@
package amqp
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestPublish(t *testing.T) {
timesCalled := 0
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestPublish")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("action.name", func(body []byte) error {
defer func() { timesCalled++ }()
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
p.Publish("action.name", []byte(""))
select {
case <-time.After(1 * time.Second):
assert.Equal(t, 1, timesCalled, "Message wasn't published.")
}
}
func TestPublishMultipleTimes(t *testing.T) {
timesCalled := 0
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
c, err := NewConsumer(conn, false, "webhooks", "TestPublishMultipleTimes")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("action.name", func(body []byte) error {
defer func() { timesCalled++ }()
return nil
})
go c.Consume()
p, err := NewProducer(conn, "webhooks")
assert.Nil(t, err)
for i := 0; i < 5; i++ {
p.Publish("action.name", []byte(""))
}
select {
case <-time.After(1 * time.Second):
assert.Equal(t, 5, timesCalled, "One or more messages weren't published.")
}
}

19
amqp/semaphore.go Normal file
View File

@ -0,0 +1,19 @@
package amqp
type Semaphore struct {
c chan bool
}
func NewSemaphore(size int) Semaphore {
return Semaphore{
c: make(chan bool, size),
}
}
func (s *Semaphore) Acquire() {
s.c <- true
}
func (s *Semaphore) Release() {
<-s.c
}