1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-29 13:49:26 +08:00

Change parameter type.

This commit is contained in:
Alexandre Vicenzi 2017-10-05 15:19:54 -03:00
parent 3e5e24f212
commit 07f407b2dd
5 changed files with 66 additions and 53 deletions

View File

@ -1,6 +1,7 @@
package amqp package amqp
import ( import (
"context"
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
@ -12,11 +13,6 @@ import (
amqplib "github.com/streadway/amqp" amqplib "github.com/streadway/amqp"
) )
const (
MaxInt32 = 1<<31 - 1
MaxRetries = MaxInt32
)
var ( var (
logger = log.WithFields(log.Fields{ logger = log.WithFields(log.Fields{
"type": "goevents", "type": "goevents",
@ -25,12 +21,12 @@ var (
) )
type handler struct { type handler struct {
action string action string
fn messaging.EventHandler fn messaging.EventHandler
re *regexp.Regexp re *regexp.Regexp
maxRetries int32 maxRetries int32
retryDelay time.Duration retryDelay time.Duration
delayProgression bool delayedRetry bool
} }
type Consumer struct { type Consumer struct {
@ -229,7 +225,11 @@ func (c *Consumer) dispatch(msg amqplib.Delivery) {
} }
func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
err := h.fn(msg.Body) err := h.fn(messaging.Event{
Action: h.action,
Body: msg.Body,
Context: context.Background(),
})
if err != nil { if err != nil {
if h.maxRetries > 0 { if h.maxRetries > 0 {
@ -264,7 +264,7 @@ func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32
func (c *Consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { func (c *Consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
delayNs := delay.Nanoseconds() delayNs := delay.Nanoseconds()
if h.delayProgression { if h.delayedRetry {
delayNs *= 2 delayNs *= 2
} }
@ -370,12 +370,12 @@ func (c *Consumer) SubscribeWithOptions(options messaging.SubscribeOptions) erro
} }
c.handlers = append(c.handlers, handler{ c.handlers = append(c.handlers, handler{
action: options.Action, action: options.Action,
fn: options.Handler, fn: options.Handler,
re: re, re: re,
maxRetries: options.MaxRetries, maxRetries: options.MaxRetries,
retryDelay: options.RetryDelay, retryDelay: options.RetryDelay,
delayProgression: options.DelayedRetry, delayedRetry: options.DelayedRetry,
}) })
return nil return nil

View File

@ -29,12 +29,12 @@ func TestSubscribeActions(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action_1", func(body []byte) error { c.Subscribe("my_action_1", func(e messaging.Event) error {
func1 <- true func1 <- true
return nil return nil
}) })
c.Subscribe("my_action_2", func(body []byte) error { c.Subscribe("my_action_2", func(e messaging.Event) error {
func2 <- true func2 <- true
return nil return nil
}) })
@ -76,12 +76,12 @@ func TestSubscribeWildcardActions(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) error { c.Subscribe("webinar.*", func(e messaging.Event) error {
func1 <- true func1 <- true
return nil return nil
}) })
c.Subscribe("foobar.*", func(body []byte) error { c.Subscribe("foobar.*", func(e messaging.Event) error {
func2 <- true func2 <- true
return nil return nil
}) })
@ -123,12 +123,12 @@ func TestSubscribeWildcardActionOrder1(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) error { c.Subscribe("webinar.*", func(e messaging.Event) error {
func1 <- true func1 <- true
return nil return nil
}) })
c.Subscribe("webinar.state_changed", func(body []byte) error { c.Subscribe("webinar.state_changed", func(e messaging.Event) error {
func2 <- true func2 <- true
return nil return nil
}) })
@ -170,12 +170,12 @@ func TestSubscribeWildcardActionOrder2(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.state_changed", func(body []byte) error { c.Subscribe("webinar.state_changed", func(e messaging.Event) error {
func1 <- true func1 <- true
return nil return nil
}) })
c.Subscribe("webinar.*", func(body []byte) error { c.Subscribe("webinar.*", func(e messaging.Event) error {
func2 <- true func2 <- true
return nil return nil
}) })
@ -216,7 +216,7 @@ func TestDontRetryMessageIfFailsToProcess(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action", func(body []byte) error { c.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
if timesCalled == 0 { if timesCalled == 0 {
@ -261,7 +261,7 @@ func TestRetryMessageIfFailsToProcess(t *testing.T) {
c.SubscribeWithOptions(messaging.SubscribeOptions{ c.SubscribeWithOptions(messaging.SubscribeOptions{
Action: "my_action", Action: "my_action",
Handler: func(body []byte) error { Handler: func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
if timesCalled == 0 { if timesCalled == 0 {
@ -310,7 +310,7 @@ func TestRetryMessageIfPanicsToProcess(t *testing.T) {
c.SubscribeWithOptions(messaging.SubscribeOptions{ c.SubscribeWithOptions(messaging.SubscribeOptions{
Action: "my_action", Action: "my_action",
Handler: func(body []byte) error { Handler: func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
if timesCalled == 0 { if timesCalled == 0 {
@ -364,14 +364,14 @@ func TestRetryMessageToTheSameQueue(t *testing.T) {
consumer2 := c2.(*Consumer) consumer2 := c2.(*Consumer)
consumer2.channel.QueuePurge(consumer2.queueName, false) consumer2.channel.QueuePurge(consumer2.queueName, false)
c1.Subscribe("my_action", func(body []byte) error { c1.Subscribe("my_action", func(e messaging.Event) error {
timesCalled2++ timesCalled2++
return nil return nil
}) })
c2.SubscribeWithOptions(messaging.SubscribeOptions{ c2.SubscribeWithOptions(messaging.SubscribeOptions{
Action: "my_action", Action: "my_action",
Handler: func(body []byte) error { Handler: func(e messaging.Event) error {
defer func() { timesCalled1++ }() defer func() { timesCalled1++ }()
if timesCalled1 == 0 { if timesCalled1 == 0 {
@ -422,7 +422,7 @@ func TestActionExitsMaxRetries(t *testing.T) {
// It runs once and get an error, it will try five times more until it stops. // It runs once and get an error, it will try five times more until it stops.
c.SubscribeWithOptions(messaging.SubscribeOptions{ c.SubscribeWithOptions(messaging.SubscribeOptions{
Action: "my_action", Action: "my_action",
Handler: func(body []byte) error { Handler: func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
return fmt.Errorf("Error.") return fmt.Errorf("Error.")
}, },
@ -466,7 +466,7 @@ func TestActionExitsMaxRetriesWhenDelayed(t *testing.T) {
// It runs once and get an error, it will try three times more until it stops. // It runs once and get an error, it will try three times more until it stops.
c.SubscribeWithOptions(messaging.SubscribeOptions{ c.SubscribeWithOptions(messaging.SubscribeOptions{
Action: "my_action", Action: "my_action",
Handler: func(body []byte) error { Handler: func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
return fmt.Errorf("Error.") return fmt.Errorf("Error.")
}, },
@ -510,7 +510,7 @@ func TestActionExitsMaxRetriesWhenDelayedWindow(t *testing.T) {
// It runs once and get an error, it will try three times more until it stops. // It runs once and get an error, it will try three times more until it stops.
c.SubscribeWithOptions(messaging.SubscribeOptions{ c.SubscribeWithOptions(messaging.SubscribeOptions{
Action: "my_action", Action: "my_action",
Handler: func(body []byte) error { Handler: func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
return fmt.Errorf("Error.") return fmt.Errorf("Error.")
}, },
@ -559,7 +559,7 @@ func TestActionRetryTimeout(t *testing.T) {
c.SubscribeWithOptions(messaging.SubscribeOptions{ c.SubscribeWithOptions(messaging.SubscribeOptions{
Action: "test1", Action: "test1",
Handler: func(body []byte) error { Handler: func(e messaging.Event) error {
defer func() { defer func() {
myActionTimesCalled++ myActionTimesCalled++
}() }()
@ -570,7 +570,7 @@ func TestActionRetryTimeout(t *testing.T) {
MaxRetries: 4, MaxRetries: 4,
}) })
c.Subscribe("test2", func(body []byte) error { c.Subscribe("test2", func(e messaging.Event) error {
defer func() { defer func() {
myAction2TimesCalled++ myAction2TimesCalled++
}() }()
@ -618,7 +618,7 @@ func TestConsumePrefetch(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action", func(body []byte) error { c.Subscribe("my_action", func(e messaging.Event) error {
timesCalled++ timesCalled++
<-wait <-wait
return nil return nil

View File

@ -4,6 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/eventials/goevents/messaging"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -26,7 +27,7 @@ func TestPublish(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("action.name", func(body []byte) error { c.Subscribe("action.name", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
return nil return nil
}) })
@ -64,7 +65,7 @@ func TestPublishMultipleTimes(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("action.name", func(body []byte) error { c.Subscribe("action.name", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
return nil return nil
}) })

View File

@ -29,20 +29,20 @@ func main() {
panic(err) panic(err)
} }
consumerA.Subscribe("object.eventA", func(body []byte) error { consumerA.Subscribe("object.eventA", func(e messaging.Event) error {
fmt.Println("object.eventA:", string(body)) fmt.Println("object.eventA:", string(e.Body))
return nil return nil
}) })
consumerA.Subscribe("object.eventB", func(body []byte) error { consumerA.Subscribe("object.eventB", func(e messaging.Event) error {
fmt.Println("object.eventB:", string(body)) fmt.Println("object.eventB:", string(e.Body))
return nil return nil
}) })
consumerA.SubscribeWithOptions(messaging.SubscribeOptions{ consumerA.SubscribeWithOptions(messaging.SubscribeOptions{
Action: "object.eventToRetryDelay", Action: "object.eventToRetryDelay",
Handler: func(body []byte) error { Handler: func(e messaging.Event) error {
fmt.Println("object.eventToRetryDelay:", string(body)) fmt.Println("object.eventToRetryDelay:", string(e.Body))
return fmt.Errorf("Try again.") return fmt.Errorf("Try again.")
}, },
RetryDelay: 10 * time.Second, RetryDelay: 10 * time.Second,
@ -52,8 +52,8 @@ func main() {
consumerA.SubscribeWithOptions(messaging.SubscribeOptions{ consumerA.SubscribeWithOptions(messaging.SubscribeOptions{
Action: "object.eventToRetry", Action: "object.eventToRetry",
Handler: func(body []byte) error { Handler: func(e messaging.Event) error {
fmt.Println("object.eventToRetry:", string(body)) fmt.Println("object.eventToRetry:", string(e.Body))
return fmt.Errorf("Try again.") return fmt.Errorf("Try again.")
}, },
RetryDelay: 1 * time.Second, RetryDelay: 1 * time.Second,
@ -67,13 +67,13 @@ func main() {
panic(err) panic(err)
} }
consumerB.Subscribe("object.eventC", func(body []byte) error { consumerB.Subscribe("object.eventC", func(e messaging.Event) error {
fmt.Println("object.eventC:", string(body)) fmt.Println("object.eventC:", string(e.Body))
return nil return nil
}) })
consumerB.Subscribe("object.eventD", func(body []byte) error { consumerB.Subscribe("object.eventD", func(e messaging.Event) error {
fmt.Println("object.eventD:", string(body)) fmt.Println("object.eventD:", string(e.Body))
return nil return nil
}) })

View File

@ -1,10 +1,14 @@
package messaging package messaging
import ( import (
"context"
"time" "time"
) )
type EventHandler func(body []byte) error const (
MaxInt32 = 1<<31 - 1
MaxRetries = MaxInt32
)
type SubscribeOptions struct { type SubscribeOptions struct {
// The action name. // The action name.
@ -21,6 +25,14 @@ type SubscribeOptions struct {
MaxRetries int32 MaxRetries int32
} }
type Event struct {
Action string
Body []byte
Context context.Context
}
type EventHandler func(Event) error
type Consumer interface { type Consumer interface {
Subscribe(action string, handler EventHandler) error Subscribe(action string, handler EventHandler) error
SubscribeWithOptions(options SubscribeOptions) error SubscribeWithOptions(options SubscribeOptions) error