diff --git a/amqp/connection.go b/amqp/connection.go index de9f193..53e580b 100644 --- a/amqp/connection.go +++ b/amqp/connection.go @@ -113,7 +113,6 @@ func (c *Connection) reestablish() error { c.connection = conn return err - } func (c *Connection) handleConnectionClose() { diff --git a/amqp/consumer.go b/amqp/consumer.go index 0432642..ba10071 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -1,6 +1,7 @@ package amqp import ( + "context" "regexp" "strings" "sync" @@ -12,11 +13,6 @@ import ( amqplib "github.com/streadway/amqp" ) -const ( - MaxInt32 = 1<<31 - 1 - MaxRetries = MaxInt32 -) - var ( logger = log.WithFields(log.Fields{ "type": "goevents", @@ -25,12 +21,12 @@ var ( ) type handler struct { - action string - fn messaging.EventHandler - re *regexp.Regexp - maxRetries int32 - retryDelay time.Duration - delayProgression bool + action string + fn messaging.EventHandler + re *regexp.Regexp + maxRetries int32 + retryDelay time.Duration + delayedRetry bool } type Consumer struct { @@ -229,7 +225,11 @@ func (c *Consumer) dispatch(msg amqplib.Delivery) { } func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { - err := h.fn(msg.Body) + err := h.fn(messaging.Event{ + Action: h.action, + Body: msg.Body, + Context: context.Background(), + }) if err != nil { if h.maxRetries > 0 { @@ -264,7 +264,7 @@ func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32 func (c *Consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { delayNs := delay.Nanoseconds() - if h.delayProgression { + if h.delayedRetry { delayNs *= 2 } @@ -335,15 +335,7 @@ func (c *Consumer) getHandler(msg amqplib.Delivery) (*handler, bool) { } // Subscribe allows to subscribe an action handler. -// By default it won't retry any failed event. -func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) error { - return c.SubscribeWithOptions(action, handlerFn, time.Duration(0), false, 0) -} - -// SubscribeWithOptions allows to subscribe an action handler with retry options. -func (c *Consumer) SubscribeWithOptions(action string, handlerFn messaging.EventHandler, - retryDelay time.Duration, delayProgression bool, maxRetries int32) error { - +func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler, options *messaging.SubscribeOptions) error { // TODO: Replace # pattern too. pattern := strings.Replace(action, "*", "(.*)", 0) re, err := regexp.Compile(pattern) @@ -364,13 +356,21 @@ func (c *Consumer) SubscribeWithOptions(action string, handlerFn messaging.Event return err } + if options == nil { + options = &messaging.SubscribeOptions{ + RetryDelay: time.Duration(0), + DelayedRetry: false, + MaxRetries: 0, + } + } + c.handlers = append(c.handlers, handler{ - action: action, - fn: handlerFn, - re: re, - maxRetries: maxRetries, - retryDelay: retryDelay, - delayProgression: delayProgression, + action: action, + fn: handlerFn, + re: re, + maxRetries: options.MaxRetries, + retryDelay: options.RetryDelay, + delayedRetry: options.DelayedRetry, }) return nil diff --git a/amqp/consumer_test.go b/amqp/consumer_test.go index b6443c5..3db8529 100644 --- a/amqp/consumer_test.go +++ b/amqp/consumer_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/eventials/goevents/messaging" "github.com/stretchr/testify/assert" ) @@ -28,15 +29,15 @@ func TestSubscribeActions(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("my_action_1", func(body []byte) error { + c.Subscribe("my_action_1", func(e messaging.Event) error { func1 <- true return nil - }) + }, nil) - c.Subscribe("my_action_2", func(body []byte) error { + c.Subscribe("my_action_2", func(e messaging.Event) error { func2 <- true return nil - }) + }, nil) go c.Consume() @@ -75,15 +76,15 @@ func TestSubscribeWildcardActions(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("webinar.*", func(body []byte) error { + c.Subscribe("webinar.*", func(e messaging.Event) error { func1 <- true return nil - }) + }, nil) - c.Subscribe("foobar.*", func(body []byte) error { + c.Subscribe("foobar.*", func(e messaging.Event) error { func2 <- true return nil - }) + }, nil) go c.Consume() @@ -122,15 +123,15 @@ func TestSubscribeWildcardActionOrder1(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("webinar.*", func(body []byte) error { + c.Subscribe("webinar.*", func(e messaging.Event) error { func1 <- true return nil - }) + }, nil) - c.Subscribe("webinar.state_changed", func(body []byte) error { + c.Subscribe("webinar.state_changed", func(e messaging.Event) error { func2 <- true return nil - }) + }, nil) go c.Consume() @@ -169,15 +170,15 @@ func TestSubscribeWildcardActionOrder2(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("webinar.state_changed", func(body []byte) error { + c.Subscribe("webinar.state_changed", func(e messaging.Event) error { func1 <- true return nil - }) + }, nil) - c.Subscribe("webinar.*", func(body []byte) error { + c.Subscribe("webinar.*", func(e messaging.Event) error { func2 <- true return nil - }) + }, nil) go c.Consume() @@ -215,7 +216,7 @@ func TestDontRetryMessageIfFailsToProcess(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("my_action", func(body []byte) error { + c.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled++ }() if timesCalled == 0 { @@ -223,7 +224,7 @@ func TestDontRetryMessageIfFailsToProcess(t *testing.T) { } return nil - }) + }, nil) go c.Consume() @@ -258,7 +259,7 @@ func TestRetryMessageIfFailsToProcess(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.SubscribeWithOptions("my_action", func(body []byte) error { + c.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled++ }() if timesCalled == 0 { @@ -266,7 +267,11 @@ func TestRetryMessageIfFailsToProcess(t *testing.T) { } return nil - }, 100*time.Millisecond, false, 5) + }, &messaging.SubscribeOptions{ + RetryDelay: 100 * time.Millisecond, + DelayedRetry: false, + MaxRetries: 5, + }) go c.Consume() @@ -301,7 +306,7 @@ func TestRetryMessageIfPanicsToProcess(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.SubscribeWithOptions("my_action", func(body []byte) error { + c.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled++ }() if timesCalled == 0 { @@ -309,7 +314,11 @@ func TestRetryMessageIfPanicsToProcess(t *testing.T) { } return nil - }, 100*time.Millisecond, false, 5) + }, &messaging.SubscribeOptions{ + RetryDelay: 100 * time.Millisecond, + DelayedRetry: false, + MaxRetries: 5, + }) go c.Consume() @@ -351,12 +360,12 @@ func TestRetryMessageToTheSameQueue(t *testing.T) { consumer2 := c2.(*Consumer) consumer2.channel.QueuePurge(consumer2.queueName, false) - c1.Subscribe("my_action", func(body []byte) error { + c1.Subscribe("my_action", func(e messaging.Event) error { timesCalled2++ return nil - }) + }, nil) - c2.SubscribeWithOptions("my_action", func(body []byte) error { + c2.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled1++ }() if timesCalled1 == 0 { @@ -364,7 +373,11 @@ func TestRetryMessageToTheSameQueue(t *testing.T) { } else { return nil } - }, 100*time.Millisecond, false, 5) + }, &messaging.SubscribeOptions{ + RetryDelay: 100 * time.Millisecond, + DelayedRetry: false, + MaxRetries: 5, + }) go c1.Consume() go c2.Consume() @@ -401,10 +414,14 @@ func TestActionExitsMaxRetries(t *testing.T) { consumer.channel.QueuePurge(consumer.queueName, false) // It runs once and get an error, it will try five times more until it stops. - c.SubscribeWithOptions("my_action", func(body []byte) error { + c.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled++ }() return fmt.Errorf("Error.") - }, 100*time.Millisecond, false, 5) + }, &messaging.SubscribeOptions{ + RetryDelay: 100 * time.Millisecond, + DelayedRetry: false, + MaxRetries: 5, + }) go c.Consume() @@ -439,10 +456,14 @@ func TestActionExitsMaxRetriesWhenDelayed(t *testing.T) { consumer.channel.QueuePurge(consumer.queueName, false) // It runs once and get an error, it will try three times more until it stops. - c.SubscribeWithOptions("my_action", func(body []byte) error { + c.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled++ }() return fmt.Errorf("Error.") - }, 100*time.Millisecond, true, 3) + }, &messaging.SubscribeOptions{ + RetryDelay: 100 * time.Millisecond, + DelayedRetry: true, + MaxRetries: 3, + }) go c.Consume() @@ -477,10 +498,14 @@ func TestActionExitsMaxRetriesWhenDelayedWindow(t *testing.T) { consumer.channel.QueuePurge(consumer.queueName, false) // It runs once and get an error, it will try three times more until it stops. - c.SubscribeWithOptions("my_action", func(body []byte) error { + c.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled++ }() return fmt.Errorf("Error.") - }, 100*time.Millisecond, true, 5) + }, &messaging.SubscribeOptions{ + RetryDelay: 100 * time.Millisecond, + DelayedRetry: true, + MaxRetries: 5, + }) go c.Consume() @@ -520,19 +545,23 @@ func TestActionRetryTimeout(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.SubscribeWithOptions("test1", func(body []byte) error { + c.Subscribe("test1", func(e messaging.Event) error { defer func() { myActionTimesCalled++ }() return fmt.Errorf("Error.") - }, 300*time.Millisecond, true, 4) + }, &messaging.SubscribeOptions{ + RetryDelay: 300 * time.Millisecond, + DelayedRetry: true, + MaxRetries: 4, + }) - c.Subscribe("test2", func(body []byte) error { + c.Subscribe("test2", func(e messaging.Event) error { defer func() { myAction2TimesCalled++ }() return nil - }) + }, nil) go c.Consume() @@ -575,11 +604,11 @@ func TestConsumePrefetch(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("my_action", func(body []byte) error { + c.Subscribe("my_action", func(e messaging.Event) error { timesCalled++ <-wait return nil - }) + }, nil) go c.Consume() diff --git a/amqp/producer_test.go b/amqp/producer_test.go index 2a89988..252e350 100644 --- a/amqp/producer_test.go +++ b/amqp/producer_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/eventials/goevents/messaging" "github.com/stretchr/testify/assert" ) @@ -26,10 +27,10 @@ func TestPublish(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("action.name", func(body []byte) error { + c.Subscribe("action.name", func(e messaging.Event) error { defer func() { timesCalled++ }() return nil - }) + }, nil) go c.Consume() @@ -64,10 +65,10 @@ func TestPublishMultipleTimes(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("action.name", func(body []byte) error { + c.Subscribe("action.name", func(e messaging.Event) error { defer func() { timesCalled++ }() return nil - }) + }, nil) go c.Consume() diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go index c5b5bcd..b62a1ea 100644 --- a/examples/consumer/consumer.go +++ b/examples/consumer/consumer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/eventials/goevents/amqp" + "github.com/eventials/goevents/messaging" ) func main() { @@ -28,41 +29,49 @@ func main() { panic(err) } - consumerA.Subscribe("object.eventA", func(body []byte) error { - fmt.Println("object.eventA:", string(body)) + consumerA.Subscribe("object.eventA", func(e messaging.Event) error { + fmt.Println("object.eventA:", string(e.Body)) return nil + }, nil) + + consumerA.Subscribe("object.eventB", func(e messaging.Event) error { + fmt.Println("object.eventB:", string(e.Body)) + return nil + }, nil) + + consumerA.Subscribe("object.eventToRetryDelay", func(e messaging.Event) error { + fmt.Println("object.eventToRetryDelay:", string(e.Body)) + return fmt.Errorf("Try again.") + }, &messaging.SubscribeOptions{ + RetryDelay: 10 * time.Second, + DelayedRetry: true, + MaxRetries: 30, }) - consumerA.Subscribe("object.eventB", func(body []byte) error { - fmt.Println("object.eventB:", string(body)) - return nil + consumerA.Subscribe("object.eventToRetry", func(e messaging.Event) error { + fmt.Println("object.eventToRetry:", string(e.Body)) + return fmt.Errorf("Try again.") + }, &messaging.SubscribeOptions{ + RetryDelay: 1 * time.Second, + DelayedRetry: false, + MaxRetries: 10, }) - consumerA.SubscribeWithOptions("object.eventToRetryDelay", func(body []byte) error { - fmt.Println("object.eventToRetryDelay:", string(body)) - return fmt.Errorf("Try again.") - }, 10*time.Second, true, 30) - - consumerA.SubscribeWithOptions("object.eventToRetry", func(body []byte) error { - fmt.Println("object.eventToRetry:", string(body)) - return fmt.Errorf("Try again.") - }, 1*time.Second, false, 10) - consumerB, err := conn.Consumer(false, "events-exchange", "events-queue-b") if err != nil { panic(err) } - consumerB.Subscribe("object.eventC", func(body []byte) error { - fmt.Println("object.eventC:", string(body)) + consumerB.Subscribe("object.eventC", func(e messaging.Event) error { + fmt.Println("object.eventC:", string(e.Body)) return nil - }) + }, nil) - consumerB.Subscribe("object.eventD", func(body []byte) error { - fmt.Println("object.eventD:", string(body)) + consumerB.Subscribe("object.eventD", func(e messaging.Event) error { + fmt.Println("object.eventD:", string(e.Body)) return nil - }) + }, nil) var wg sync.WaitGroup diff --git a/messaging/consumer.go b/messaging/consumer.go index bfe8b69..26b3a99 100644 --- a/messaging/consumer.go +++ b/messaging/consumer.go @@ -1,15 +1,36 @@ package messaging import ( + "context" "time" ) -type EventHandler func(body []byte) error +const ( + MaxInt32 = 1<<31 - 1 + MaxRetries = MaxInt32 +) + +type SubscribeOptions struct { + // The time to retry after it fails. + RetryDelay time.Duration + // If enable the retry time it will be incresed in power of two. + // This means if your retry delay is 1s, the first retry will be after 1s, + // the sencond 2s, the third 4s and so on. + DelayedRetry bool + // Max attempts to retry. + MaxRetries int32 +} + +type Event struct { + Action string + Body []byte + Context context.Context +} + +type EventHandler func(Event) error type Consumer interface { - Subscribe(action string, handler EventHandler) error - SubscribeWithOptions(action string, handlerFn EventHandler, - retryDelay time.Duration, delayPow2 bool, maxRetries int32) error + Subscribe(action string, handler EventHandler, options *SubscribeOptions) error Unsubscribe(action string) error Consume() Close() diff --git a/mock/connection.go b/mock/connection.go index 7179216..58ef5c0 100644 --- a/mock/connection.go +++ b/mock/connection.go @@ -9,16 +9,34 @@ type Connection struct { mock.Mock } -func (c *Connection) Consumer(autoAck bool) (messaging.Consumer, error) { - args := c.Called(autoAck) +func NewMockConnection() messaging.Connection { + return &Connection{} +} + +func (c *Connection) Consumer(autoAck bool, exchange, queue string) (messaging.Consumer, error) { + args := c.Called(autoAck, exchange, queue) return args.Get(0).(messaging.Consumer), args.Error(1) } -func (c *Connection) Producer() (messaging.Producer, error) { - args := c.Called() - return args.Get(0).(*Producer), args.Error(1) +func (c *Connection) Producer(exchange string) (messaging.Producer, error) { + args := c.Called(exchange) + return args.Get(0).(messaging.Producer), args.Error(1) } func (c *Connection) Close() { c.Called() } + +func (c *Connection) NotifyConnectionClose() <-chan error { + args := c.Called() + return args.Get(0).(chan error) +} + +func (c *Connection) NotifyReestablish() <-chan bool { + args := c.Called() + return args.Get(0).(chan bool) +} + +func (c *Connection) WaitUntilConnectionCloses() { + c.Called() +} diff --git a/mock/consumer.go b/mock/consumer.go index 4023ac1..f4e4c87 100644 --- a/mock/consumer.go +++ b/mock/consumer.go @@ -9,22 +9,24 @@ type Consumer struct { mock.Mock } -func (c *Consumer) Subscribe(action string, handler messaging.EventHandler) error { - args := c.Called(action, handler) - return args.Error(1) +func NewMockConsumer() messaging.Consumer { + return &Consumer{} +} + +func (c *Consumer) Subscribe(action string, handler messaging.EventHandler, options *messaging.SubscribeOptions) error { + args := c.Called(action, handler, options) + return args.Error(0) } func (c *Consumer) Unsubscribe(action string) error { args := c.Called(action) - return args.Error(1) + return args.Error(0) } -func (c *Consumer) Listen() error { - args := c.Called() - return args.Error(1) +func (c *Consumer) Consume() { + c.Called() } -func (c *Consumer) ListenForever() error { - args := c.Called() - return args.Error(1) +func (c *Consumer) Close() { + c.Called() } diff --git a/mock/producer.go b/mock/producer.go index 64603f2..4d74cf0 100644 --- a/mock/producer.go +++ b/mock/producer.go @@ -1,6 +1,7 @@ package mock import ( + "github.com/eventials/goevents/messaging" "github.com/stretchr/testify/mock" ) @@ -8,6 +9,10 @@ type Producer struct { mock.Mock } +func NewMockProducer() messaging.Producer { + return &Producer{} +} + func (p *Producer) Publish(action string, data []byte) { p.Called(action, data) } @@ -17,6 +22,6 @@ func (p *Producer) Close() { } func (p *Producer) NotifyClose() <-chan bool { - p.Called() - return make(chan bool) + args := p.Called() + return args.Get(0).(chan bool) }