From 07f407b2dda8256a3fb856b734d772552821f417 Mon Sep 17 00:00:00 2001 From: Alexandre Vicenzi Date: Thu, 5 Oct 2017 15:19:54 -0300 Subject: [PATCH] Change parameter type. --- amqp/consumer.go | 38 +++++++++++++++++------------------ amqp/consumer_test.go | 38 +++++++++++++++++------------------ amqp/producer_test.go | 5 +++-- examples/consumer/consumer.go | 24 +++++++++++----------- messaging/consumer.go | 14 ++++++++++++- 5 files changed, 66 insertions(+), 53 deletions(-) diff --git a/amqp/consumer.go b/amqp/consumer.go index 6d357c1..1d2726e 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -1,6 +1,7 @@ package amqp import ( + "context" "regexp" "strings" "sync" @@ -12,11 +13,6 @@ import ( amqplib "github.com/streadway/amqp" ) -const ( - MaxInt32 = 1<<31 - 1 - MaxRetries = MaxInt32 -) - var ( logger = log.WithFields(log.Fields{ "type": "goevents", @@ -25,12 +21,12 @@ var ( ) type handler struct { - action string - fn messaging.EventHandler - re *regexp.Regexp - maxRetries int32 - retryDelay time.Duration - delayProgression bool + action string + fn messaging.EventHandler + re *regexp.Regexp + maxRetries int32 + retryDelay time.Duration + delayedRetry bool } type Consumer struct { @@ -229,7 +225,11 @@ func (c *Consumer) dispatch(msg amqplib.Delivery) { } func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { - err := h.fn(msg.Body) + err := h.fn(messaging.Event{ + Action: h.action, + Body: msg.Body, + Context: context.Background(), + }) if err != nil { if h.maxRetries > 0 { @@ -264,7 +264,7 @@ func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32 func (c *Consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { delayNs := delay.Nanoseconds() - if h.delayProgression { + if h.delayedRetry { delayNs *= 2 } @@ -370,12 +370,12 @@ func (c *Consumer) SubscribeWithOptions(options messaging.SubscribeOptions) erro } c.handlers = append(c.handlers, handler{ - action: options.Action, - fn: options.Handler, - re: re, - maxRetries: options.MaxRetries, - retryDelay: options.RetryDelay, - delayProgression: options.DelayedRetry, + action: options.Action, + fn: options.Handler, + re: re, + maxRetries: options.MaxRetries, + retryDelay: options.RetryDelay, + delayedRetry: options.DelayedRetry, }) return nil diff --git a/amqp/consumer_test.go b/amqp/consumer_test.go index 46b9bf7..e49b4e1 100644 --- a/amqp/consumer_test.go +++ b/amqp/consumer_test.go @@ -29,12 +29,12 @@ func TestSubscribeActions(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("my_action_1", func(body []byte) error { + c.Subscribe("my_action_1", func(e messaging.Event) error { func1 <- true return nil }) - c.Subscribe("my_action_2", func(body []byte) error { + c.Subscribe("my_action_2", func(e messaging.Event) error { func2 <- true return nil }) @@ -76,12 +76,12 @@ func TestSubscribeWildcardActions(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("webinar.*", func(body []byte) error { + c.Subscribe("webinar.*", func(e messaging.Event) error { func1 <- true return nil }) - c.Subscribe("foobar.*", func(body []byte) error { + c.Subscribe("foobar.*", func(e messaging.Event) error { func2 <- true return nil }) @@ -123,12 +123,12 @@ func TestSubscribeWildcardActionOrder1(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("webinar.*", func(body []byte) error { + c.Subscribe("webinar.*", func(e messaging.Event) error { func1 <- true return nil }) - c.Subscribe("webinar.state_changed", func(body []byte) error { + c.Subscribe("webinar.state_changed", func(e messaging.Event) error { func2 <- true return nil }) @@ -170,12 +170,12 @@ func TestSubscribeWildcardActionOrder2(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("webinar.state_changed", func(body []byte) error { + c.Subscribe("webinar.state_changed", func(e messaging.Event) error { func1 <- true return nil }) - c.Subscribe("webinar.*", func(body []byte) error { + c.Subscribe("webinar.*", func(e messaging.Event) error { func2 <- true return nil }) @@ -216,7 +216,7 @@ func TestDontRetryMessageIfFailsToProcess(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("my_action", func(body []byte) error { + c.Subscribe("my_action", func(e messaging.Event) error { defer func() { timesCalled++ }() if timesCalled == 0 { @@ -261,7 +261,7 @@ func TestRetryMessageIfFailsToProcess(t *testing.T) { c.SubscribeWithOptions(messaging.SubscribeOptions{ Action: "my_action", - Handler: func(body []byte) error { + Handler: func(e messaging.Event) error { defer func() { timesCalled++ }() if timesCalled == 0 { @@ -310,7 +310,7 @@ func TestRetryMessageIfPanicsToProcess(t *testing.T) { c.SubscribeWithOptions(messaging.SubscribeOptions{ Action: "my_action", - Handler: func(body []byte) error { + Handler: func(e messaging.Event) error { defer func() { timesCalled++ }() if timesCalled == 0 { @@ -364,14 +364,14 @@ func TestRetryMessageToTheSameQueue(t *testing.T) { consumer2 := c2.(*Consumer) consumer2.channel.QueuePurge(consumer2.queueName, false) - c1.Subscribe("my_action", func(body []byte) error { + c1.Subscribe("my_action", func(e messaging.Event) error { timesCalled2++ return nil }) c2.SubscribeWithOptions(messaging.SubscribeOptions{ Action: "my_action", - Handler: func(body []byte) error { + Handler: func(e messaging.Event) error { defer func() { timesCalled1++ }() if timesCalled1 == 0 { @@ -422,7 +422,7 @@ func TestActionExitsMaxRetries(t *testing.T) { // It runs once and get an error, it will try five times more until it stops. c.SubscribeWithOptions(messaging.SubscribeOptions{ Action: "my_action", - Handler: func(body []byte) error { + Handler: func(e messaging.Event) error { defer func() { timesCalled++ }() return fmt.Errorf("Error.") }, @@ -466,7 +466,7 @@ func TestActionExitsMaxRetriesWhenDelayed(t *testing.T) { // It runs once and get an error, it will try three times more until it stops. c.SubscribeWithOptions(messaging.SubscribeOptions{ Action: "my_action", - Handler: func(body []byte) error { + Handler: func(e messaging.Event) error { defer func() { timesCalled++ }() return fmt.Errorf("Error.") }, @@ -510,7 +510,7 @@ func TestActionExitsMaxRetriesWhenDelayedWindow(t *testing.T) { // It runs once and get an error, it will try three times more until it stops. c.SubscribeWithOptions(messaging.SubscribeOptions{ Action: "my_action", - Handler: func(body []byte) error { + Handler: func(e messaging.Event) error { defer func() { timesCalled++ }() return fmt.Errorf("Error.") }, @@ -559,7 +559,7 @@ func TestActionRetryTimeout(t *testing.T) { c.SubscribeWithOptions(messaging.SubscribeOptions{ Action: "test1", - Handler: func(body []byte) error { + Handler: func(e messaging.Event) error { defer func() { myActionTimesCalled++ }() @@ -570,7 +570,7 @@ func TestActionRetryTimeout(t *testing.T) { MaxRetries: 4, }) - c.Subscribe("test2", func(body []byte) error { + c.Subscribe("test2", func(e messaging.Event) error { defer func() { myAction2TimesCalled++ }() @@ -618,7 +618,7 @@ func TestConsumePrefetch(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("my_action", func(body []byte) error { + c.Subscribe("my_action", func(e messaging.Event) error { timesCalled++ <-wait return nil diff --git a/amqp/producer_test.go b/amqp/producer_test.go index 2a89988..34c3134 100644 --- a/amqp/producer_test.go +++ b/amqp/producer_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/eventials/goevents/messaging" "github.com/stretchr/testify/assert" ) @@ -26,7 +27,7 @@ func TestPublish(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("action.name", func(body []byte) error { + c.Subscribe("action.name", func(e messaging.Event) error { defer func() { timesCalled++ }() return nil }) @@ -64,7 +65,7 @@ func TestPublishMultipleTimes(t *testing.T) { consumer := c.(*Consumer) consumer.channel.QueuePurge(consumer.queueName, false) - c.Subscribe("action.name", func(body []byte) error { + c.Subscribe("action.name", func(e messaging.Event) error { defer func() { timesCalled++ }() return nil }) diff --git a/examples/consumer/consumer.go b/examples/consumer/consumer.go index 913371f..05a10b7 100644 --- a/examples/consumer/consumer.go +++ b/examples/consumer/consumer.go @@ -29,20 +29,20 @@ func main() { panic(err) } - consumerA.Subscribe("object.eventA", func(body []byte) error { - fmt.Println("object.eventA:", string(body)) + consumerA.Subscribe("object.eventA", func(e messaging.Event) error { + fmt.Println("object.eventA:", string(e.Body)) return nil }) - consumerA.Subscribe("object.eventB", func(body []byte) error { - fmt.Println("object.eventB:", string(body)) + consumerA.Subscribe("object.eventB", func(e messaging.Event) error { + fmt.Println("object.eventB:", string(e.Body)) return nil }) consumerA.SubscribeWithOptions(messaging.SubscribeOptions{ Action: "object.eventToRetryDelay", - Handler: func(body []byte) error { - fmt.Println("object.eventToRetryDelay:", string(body)) + Handler: func(e messaging.Event) error { + fmt.Println("object.eventToRetryDelay:", string(e.Body)) return fmt.Errorf("Try again.") }, RetryDelay: 10 * time.Second, @@ -52,8 +52,8 @@ func main() { consumerA.SubscribeWithOptions(messaging.SubscribeOptions{ Action: "object.eventToRetry", - Handler: func(body []byte) error { - fmt.Println("object.eventToRetry:", string(body)) + Handler: func(e messaging.Event) error { + fmt.Println("object.eventToRetry:", string(e.Body)) return fmt.Errorf("Try again.") }, RetryDelay: 1 * time.Second, @@ -67,13 +67,13 @@ func main() { panic(err) } - consumerB.Subscribe("object.eventC", func(body []byte) error { - fmt.Println("object.eventC:", string(body)) + consumerB.Subscribe("object.eventC", func(e messaging.Event) error { + fmt.Println("object.eventC:", string(e.Body)) return nil }) - consumerB.Subscribe("object.eventD", func(body []byte) error { - fmt.Println("object.eventD:", string(body)) + consumerB.Subscribe("object.eventD", func(e messaging.Event) error { + fmt.Println("object.eventD:", string(e.Body)) return nil }) diff --git a/messaging/consumer.go b/messaging/consumer.go index 41d670e..35e1a66 100644 --- a/messaging/consumer.go +++ b/messaging/consumer.go @@ -1,10 +1,14 @@ package messaging import ( + "context" "time" ) -type EventHandler func(body []byte) error +const ( + MaxInt32 = 1<<31 - 1 + MaxRetries = MaxInt32 +) type SubscribeOptions struct { // The action name. @@ -21,6 +25,14 @@ type SubscribeOptions struct { MaxRetries int32 } +type Event struct { + Action string + Body []byte + Context context.Context +} + +type EventHandler func(Event) error + type Consumer interface { Subscribe(action string, handler EventHandler) error SubscribeWithOptions(options SubscribeOptions) error