1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-28 13:48:49 +08:00

Merge pull request #14 from alexandrevicenzi/master

Fixes
This commit is contained in:
Germano Fronza 2017-10-05 16:38:23 -03:00 committed by GitHub
commit 40f79589ca
9 changed files with 197 additions and 113 deletions

View File

@ -113,7 +113,6 @@ func (c *Connection) reestablish() error {
c.connection = conn c.connection = conn
return err return err
} }
func (c *Connection) handleConnectionClose() { func (c *Connection) handleConnectionClose() {

View File

@ -1,6 +1,7 @@
package amqp package amqp
import ( import (
"context"
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
@ -12,11 +13,6 @@ import (
amqplib "github.com/streadway/amqp" amqplib "github.com/streadway/amqp"
) )
const (
MaxInt32 = 1<<31 - 1
MaxRetries = MaxInt32
)
var ( var (
logger = log.WithFields(log.Fields{ logger = log.WithFields(log.Fields{
"type": "goevents", "type": "goevents",
@ -25,12 +21,12 @@ var (
) )
type handler struct { type handler struct {
action string action string
fn messaging.EventHandler fn messaging.EventHandler
re *regexp.Regexp re *regexp.Regexp
maxRetries int32 maxRetries int32
retryDelay time.Duration retryDelay time.Duration
delayProgression bool delayedRetry bool
} }
type Consumer struct { type Consumer struct {
@ -229,7 +225,11 @@ func (c *Consumer) dispatch(msg amqplib.Delivery) {
} }
func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
err := h.fn(msg.Body) err := h.fn(messaging.Event{
Action: h.action,
Body: msg.Body,
Context: context.Background(),
})
if err != nil { if err != nil {
if h.maxRetries > 0 { if h.maxRetries > 0 {
@ -264,7 +264,7 @@ func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32
func (c *Consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) { func (c *Consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
delayNs := delay.Nanoseconds() delayNs := delay.Nanoseconds()
if h.delayProgression { if h.delayedRetry {
delayNs *= 2 delayNs *= 2
} }
@ -335,15 +335,7 @@ func (c *Consumer) getHandler(msg amqplib.Delivery) (*handler, bool) {
} }
// Subscribe allows to subscribe an action handler. // Subscribe allows to subscribe an action handler.
// By default it won't retry any failed event. func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler, options *messaging.SubscribeOptions) error {
func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) error {
return c.SubscribeWithOptions(action, handlerFn, time.Duration(0), false, 0)
}
// SubscribeWithOptions allows to subscribe an action handler with retry options.
func (c *Consumer) SubscribeWithOptions(action string, handlerFn messaging.EventHandler,
retryDelay time.Duration, delayProgression bool, maxRetries int32) error {
// TODO: Replace # pattern too. // TODO: Replace # pattern too.
pattern := strings.Replace(action, "*", "(.*)", 0) pattern := strings.Replace(action, "*", "(.*)", 0)
re, err := regexp.Compile(pattern) re, err := regexp.Compile(pattern)
@ -364,13 +356,21 @@ func (c *Consumer) SubscribeWithOptions(action string, handlerFn messaging.Event
return err return err
} }
if options == nil {
options = &messaging.SubscribeOptions{
RetryDelay: time.Duration(0),
DelayedRetry: false,
MaxRetries: 0,
}
}
c.handlers = append(c.handlers, handler{ c.handlers = append(c.handlers, handler{
action: action, action: action,
fn: handlerFn, fn: handlerFn,
re: re, re: re,
maxRetries: maxRetries, maxRetries: options.MaxRetries,
retryDelay: retryDelay, retryDelay: options.RetryDelay,
delayProgression: delayProgression, delayedRetry: options.DelayedRetry,
}) })
return nil return nil

View File

@ -5,6 +5,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/eventials/goevents/messaging"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -28,15 +29,15 @@ func TestSubscribeActions(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action_1", func(body []byte) error { c.Subscribe("my_action_1", func(e messaging.Event) error {
func1 <- true func1 <- true
return nil return nil
}) }, nil)
c.Subscribe("my_action_2", func(body []byte) error { c.Subscribe("my_action_2", func(e messaging.Event) error {
func2 <- true func2 <- true
return nil return nil
}) }, nil)
go c.Consume() go c.Consume()
@ -75,15 +76,15 @@ func TestSubscribeWildcardActions(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) error { c.Subscribe("webinar.*", func(e messaging.Event) error {
func1 <- true func1 <- true
return nil return nil
}) }, nil)
c.Subscribe("foobar.*", func(body []byte) error { c.Subscribe("foobar.*", func(e messaging.Event) error {
func2 <- true func2 <- true
return nil return nil
}) }, nil)
go c.Consume() go c.Consume()
@ -122,15 +123,15 @@ func TestSubscribeWildcardActionOrder1(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) error { c.Subscribe("webinar.*", func(e messaging.Event) error {
func1 <- true func1 <- true
return nil return nil
}) }, nil)
c.Subscribe("webinar.state_changed", func(body []byte) error { c.Subscribe("webinar.state_changed", func(e messaging.Event) error {
func2 <- true func2 <- true
return nil return nil
}) }, nil)
go c.Consume() go c.Consume()
@ -169,15 +170,15 @@ func TestSubscribeWildcardActionOrder2(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.state_changed", func(body []byte) error { c.Subscribe("webinar.state_changed", func(e messaging.Event) error {
func1 <- true func1 <- true
return nil return nil
}) }, nil)
c.Subscribe("webinar.*", func(body []byte) error { c.Subscribe("webinar.*", func(e messaging.Event) error {
func2 <- true func2 <- true
return nil return nil
}) }, nil)
go c.Consume() go c.Consume()
@ -215,7 +216,7 @@ func TestDontRetryMessageIfFailsToProcess(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action", func(body []byte) error { c.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
if timesCalled == 0 { if timesCalled == 0 {
@ -223,7 +224,7 @@ func TestDontRetryMessageIfFailsToProcess(t *testing.T) {
} }
return nil return nil
}) }, nil)
go c.Consume() go c.Consume()
@ -258,7 +259,7 @@ func TestRetryMessageIfFailsToProcess(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.SubscribeWithOptions("my_action", func(body []byte) error { c.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
if timesCalled == 0 { if timesCalled == 0 {
@ -266,7 +267,11 @@ func TestRetryMessageIfFailsToProcess(t *testing.T) {
} }
return nil return nil
}, 100*time.Millisecond, false, 5) }, &messaging.SubscribeOptions{
RetryDelay: 100 * time.Millisecond,
DelayedRetry: false,
MaxRetries: 5,
})
go c.Consume() go c.Consume()
@ -301,7 +306,7 @@ func TestRetryMessageIfPanicsToProcess(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.SubscribeWithOptions("my_action", func(body []byte) error { c.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
if timesCalled == 0 { if timesCalled == 0 {
@ -309,7 +314,11 @@ func TestRetryMessageIfPanicsToProcess(t *testing.T) {
} }
return nil return nil
}, 100*time.Millisecond, false, 5) }, &messaging.SubscribeOptions{
RetryDelay: 100 * time.Millisecond,
DelayedRetry: false,
MaxRetries: 5,
})
go c.Consume() go c.Consume()
@ -351,12 +360,12 @@ func TestRetryMessageToTheSameQueue(t *testing.T) {
consumer2 := c2.(*Consumer) consumer2 := c2.(*Consumer)
consumer2.channel.QueuePurge(consumer2.queueName, false) consumer2.channel.QueuePurge(consumer2.queueName, false)
c1.Subscribe("my_action", func(body []byte) error { c1.Subscribe("my_action", func(e messaging.Event) error {
timesCalled2++ timesCalled2++
return nil return nil
}) }, nil)
c2.SubscribeWithOptions("my_action", func(body []byte) error { c2.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled1++ }() defer func() { timesCalled1++ }()
if timesCalled1 == 0 { if timesCalled1 == 0 {
@ -364,7 +373,11 @@ func TestRetryMessageToTheSameQueue(t *testing.T) {
} else { } else {
return nil return nil
} }
}, 100*time.Millisecond, false, 5) }, &messaging.SubscribeOptions{
RetryDelay: 100 * time.Millisecond,
DelayedRetry: false,
MaxRetries: 5,
})
go c1.Consume() go c1.Consume()
go c2.Consume() go c2.Consume()
@ -401,10 +414,14 @@ func TestActionExitsMaxRetries(t *testing.T) {
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
// It runs once and get an error, it will try five times more until it stops. // It runs once and get an error, it will try five times more until it stops.
c.SubscribeWithOptions("my_action", func(body []byte) error { c.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
return fmt.Errorf("Error.") return fmt.Errorf("Error.")
}, 100*time.Millisecond, false, 5) }, &messaging.SubscribeOptions{
RetryDelay: 100 * time.Millisecond,
DelayedRetry: false,
MaxRetries: 5,
})
go c.Consume() go c.Consume()
@ -439,10 +456,14 @@ func TestActionExitsMaxRetriesWhenDelayed(t *testing.T) {
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
// It runs once and get an error, it will try three times more until it stops. // It runs once and get an error, it will try three times more until it stops.
c.SubscribeWithOptions("my_action", func(body []byte) error { c.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
return fmt.Errorf("Error.") return fmt.Errorf("Error.")
}, 100*time.Millisecond, true, 3) }, &messaging.SubscribeOptions{
RetryDelay: 100 * time.Millisecond,
DelayedRetry: true,
MaxRetries: 3,
})
go c.Consume() go c.Consume()
@ -477,10 +498,14 @@ func TestActionExitsMaxRetriesWhenDelayedWindow(t *testing.T) {
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
// It runs once and get an error, it will try three times more until it stops. // It runs once and get an error, it will try three times more until it stops.
c.SubscribeWithOptions("my_action", func(body []byte) error { c.Subscribe("my_action", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
return fmt.Errorf("Error.") return fmt.Errorf("Error.")
}, 100*time.Millisecond, true, 5) }, &messaging.SubscribeOptions{
RetryDelay: 100 * time.Millisecond,
DelayedRetry: true,
MaxRetries: 5,
})
go c.Consume() go c.Consume()
@ -520,19 +545,23 @@ func TestActionRetryTimeout(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.SubscribeWithOptions("test1", func(body []byte) error { c.Subscribe("test1", func(e messaging.Event) error {
defer func() { defer func() {
myActionTimesCalled++ myActionTimesCalled++
}() }()
return fmt.Errorf("Error.") return fmt.Errorf("Error.")
}, 300*time.Millisecond, true, 4) }, &messaging.SubscribeOptions{
RetryDelay: 300 * time.Millisecond,
DelayedRetry: true,
MaxRetries: 4,
})
c.Subscribe("test2", func(body []byte) error { c.Subscribe("test2", func(e messaging.Event) error {
defer func() { defer func() {
myAction2TimesCalled++ myAction2TimesCalled++
}() }()
return nil return nil
}) }, nil)
go c.Consume() go c.Consume()
@ -575,11 +604,11 @@ func TestConsumePrefetch(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action", func(body []byte) error { c.Subscribe("my_action", func(e messaging.Event) error {
timesCalled++ timesCalled++
<-wait <-wait
return nil return nil
}) }, nil)
go c.Consume() go c.Consume()

View File

@ -4,6 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/eventials/goevents/messaging"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -26,10 +27,10 @@ func TestPublish(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("action.name", func(body []byte) error { c.Subscribe("action.name", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
return nil return nil
}) }, nil)
go c.Consume() go c.Consume()
@ -64,10 +65,10 @@ func TestPublishMultipleTimes(t *testing.T) {
consumer := c.(*Consumer) consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false) consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("action.name", func(body []byte) error { c.Subscribe("action.name", func(e messaging.Event) error {
defer func() { timesCalled++ }() defer func() { timesCalled++ }()
return nil return nil
}) }, nil)
go c.Consume() go c.Consume()

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/eventials/goevents/amqp" "github.com/eventials/goevents/amqp"
"github.com/eventials/goevents/messaging"
) )
func main() { func main() {
@ -28,41 +29,49 @@ func main() {
panic(err) panic(err)
} }
consumerA.Subscribe("object.eventA", func(body []byte) error { consumerA.Subscribe("object.eventA", func(e messaging.Event) error {
fmt.Println("object.eventA:", string(body)) fmt.Println("object.eventA:", string(e.Body))
return nil return nil
}, nil)
consumerA.Subscribe("object.eventB", func(e messaging.Event) error {
fmt.Println("object.eventB:", string(e.Body))
return nil
}, nil)
consumerA.Subscribe("object.eventToRetryDelay", func(e messaging.Event) error {
fmt.Println("object.eventToRetryDelay:", string(e.Body))
return fmt.Errorf("Try again.")
}, &messaging.SubscribeOptions{
RetryDelay: 10 * time.Second,
DelayedRetry: true,
MaxRetries: 30,
}) })
consumerA.Subscribe("object.eventB", func(body []byte) error { consumerA.Subscribe("object.eventToRetry", func(e messaging.Event) error {
fmt.Println("object.eventB:", string(body)) fmt.Println("object.eventToRetry:", string(e.Body))
return nil return fmt.Errorf("Try again.")
}, &messaging.SubscribeOptions{
RetryDelay: 1 * time.Second,
DelayedRetry: false,
MaxRetries: 10,
}) })
consumerA.SubscribeWithOptions("object.eventToRetryDelay", func(body []byte) error {
fmt.Println("object.eventToRetryDelay:", string(body))
return fmt.Errorf("Try again.")
}, 10*time.Second, true, 30)
consumerA.SubscribeWithOptions("object.eventToRetry", func(body []byte) error {
fmt.Println("object.eventToRetry:", string(body))
return fmt.Errorf("Try again.")
}, 1*time.Second, false, 10)
consumerB, err := conn.Consumer(false, "events-exchange", "events-queue-b") consumerB, err := conn.Consumer(false, "events-exchange", "events-queue-b")
if err != nil { if err != nil {
panic(err) panic(err)
} }
consumerB.Subscribe("object.eventC", func(body []byte) error { consumerB.Subscribe("object.eventC", func(e messaging.Event) error {
fmt.Println("object.eventC:", string(body)) fmt.Println("object.eventC:", string(e.Body))
return nil return nil
}) }, nil)
consumerB.Subscribe("object.eventD", func(body []byte) error { consumerB.Subscribe("object.eventD", func(e messaging.Event) error {
fmt.Println("object.eventD:", string(body)) fmt.Println("object.eventD:", string(e.Body))
return nil return nil
}) }, nil)
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -1,15 +1,36 @@
package messaging package messaging
import ( import (
"context"
"time" "time"
) )
type EventHandler func(body []byte) error const (
MaxInt32 = 1<<31 - 1
MaxRetries = MaxInt32
)
type SubscribeOptions struct {
// The time to retry after it fails.
RetryDelay time.Duration
// If enable the retry time it will be incresed in power of two.
// This means if your retry delay is 1s, the first retry will be after 1s,
// the sencond 2s, the third 4s and so on.
DelayedRetry bool
// Max attempts to retry.
MaxRetries int32
}
type Event struct {
Action string
Body []byte
Context context.Context
}
type EventHandler func(Event) error
type Consumer interface { type Consumer interface {
Subscribe(action string, handler EventHandler) error Subscribe(action string, handler EventHandler, options *SubscribeOptions) error
SubscribeWithOptions(action string, handlerFn EventHandler,
retryDelay time.Duration, delayPow2 bool, maxRetries int32) error
Unsubscribe(action string) error Unsubscribe(action string) error
Consume() Consume()
Close() Close()

View File

@ -9,16 +9,34 @@ type Connection struct {
mock.Mock mock.Mock
} }
func (c *Connection) Consumer(autoAck bool) (messaging.Consumer, error) { func NewMockConnection() messaging.Connection {
args := c.Called(autoAck) return &Connection{}
}
func (c *Connection) Consumer(autoAck bool, exchange, queue string) (messaging.Consumer, error) {
args := c.Called(autoAck, exchange, queue)
return args.Get(0).(messaging.Consumer), args.Error(1) return args.Get(0).(messaging.Consumer), args.Error(1)
} }
func (c *Connection) Producer() (messaging.Producer, error) { func (c *Connection) Producer(exchange string) (messaging.Producer, error) {
args := c.Called() args := c.Called(exchange)
return args.Get(0).(*Producer), args.Error(1) return args.Get(0).(messaging.Producer), args.Error(1)
} }
func (c *Connection) Close() { func (c *Connection) Close() {
c.Called() c.Called()
} }
func (c *Connection) NotifyConnectionClose() <-chan error {
args := c.Called()
return args.Get(0).(chan error)
}
func (c *Connection) NotifyReestablish() <-chan bool {
args := c.Called()
return args.Get(0).(chan bool)
}
func (c *Connection) WaitUntilConnectionCloses() {
c.Called()
}

View File

@ -9,22 +9,24 @@ type Consumer struct {
mock.Mock mock.Mock
} }
func (c *Consumer) Subscribe(action string, handler messaging.EventHandler) error { func NewMockConsumer() messaging.Consumer {
args := c.Called(action, handler) return &Consumer{}
return args.Error(1) }
func (c *Consumer) Subscribe(action string, handler messaging.EventHandler, options *messaging.SubscribeOptions) error {
args := c.Called(action, handler, options)
return args.Error(0)
} }
func (c *Consumer) Unsubscribe(action string) error { func (c *Consumer) Unsubscribe(action string) error {
args := c.Called(action) args := c.Called(action)
return args.Error(1) return args.Error(0)
} }
func (c *Consumer) Listen() error { func (c *Consumer) Consume() {
args := c.Called() c.Called()
return args.Error(1)
} }
func (c *Consumer) ListenForever() error { func (c *Consumer) Close() {
args := c.Called() c.Called()
return args.Error(1)
} }

View File

@ -1,6 +1,7 @@
package mock package mock
import ( import (
"github.com/eventials/goevents/messaging"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
) )
@ -8,6 +9,10 @@ type Producer struct {
mock.Mock mock.Mock
} }
func NewMockProducer() messaging.Producer {
return &Producer{}
}
func (p *Producer) Publish(action string, data []byte) { func (p *Producer) Publish(action string, data []byte) {
p.Called(action, data) p.Called(action, data)
} }
@ -17,6 +22,6 @@ func (p *Producer) Close() {
} }
func (p *Producer) NotifyClose() <-chan bool { func (p *Producer) NotifyClose() <-chan bool {
p.Called() args := p.Called()
return make(chan bool) return args.Get(0).(chan bool)
} }