mirror of
https://github.com/eventials/goevents.git
synced 2025-05-08 19:29:15 +08:00
Merge pull request #2 from alexandrevicenzi/master
Use interface to allow mocking.
This commit is contained in:
commit
33c6190d23
1
.gitignore
vendored
1
.gitignore
vendored
@ -22,3 +22,4 @@ _testmain.go
|
||||
*.exe
|
||||
*.test
|
||||
*.prof
|
||||
.DS_Store
|
||||
|
@ -1,20 +1,22 @@
|
||||
package events
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"github.com/streadway/amqp"
|
||||
"github.com/eventials/goevents/messaging"
|
||||
|
||||
amqplib "github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
type Connection struct {
|
||||
connection *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
queue *amqp.Queue
|
||||
connection *amqplib.Connection
|
||||
channel *amqplib.Channel
|
||||
queue *amqplib.Queue
|
||||
|
||||
exchangeName string
|
||||
queueName string
|
||||
}
|
||||
|
||||
func NewConnection(url, exchange, queue string) (*Connection, error) {
|
||||
conn, err := amqp.Dial(url)
|
||||
func NewConnection(url, exchange, queue string) (messaging.Connection, error) {
|
||||
conn, err := amqplib.Dial(url)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -62,11 +64,11 @@ func NewConnection(url, exchange, queue string) (*Connection, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Connection) Consumer(autoAck bool) *Consumer {
|
||||
func (c *Connection) Consumer(autoAck bool) (messaging.Consumer, error) {
|
||||
return NewConsumer(c, autoAck)
|
||||
}
|
||||
|
||||
func (c *Connection) Producer() *Producer {
|
||||
func (c *Connection) Producer() (messaging.Producer, error) {
|
||||
return NewProducer(c)
|
||||
}
|
||||
|
@ -1,16 +1,17 @@
|
||||
package events
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"github.com/streadway/amqp"
|
||||
"regexp"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type eventHandler func(body []byte) bool
|
||||
"github.com/eventials/goevents/messaging"
|
||||
|
||||
amqplib "github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
action string
|
||||
handler eventHandler
|
||||
handler messaging.EventHandler
|
||||
re *regexp.Regexp
|
||||
}
|
||||
|
||||
@ -20,15 +21,17 @@ type Consumer struct {
|
||||
handlers []handler
|
||||
}
|
||||
|
||||
func NewConsumer(c *Connection, autoAck bool) *Consumer {
|
||||
func NewConsumer(c messaging.Connection, autoAck bool) (messaging.Consumer, error) {
|
||||
amqpConn := c.(*Connection)
|
||||
|
||||
return &Consumer{
|
||||
c,
|
||||
amqpConn,
|
||||
autoAck,
|
||||
make([]handler, 0),
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Consumer) dispatch(msg amqp.Delivery) {
|
||||
func (c *Consumer) dispatch(msg amqplib.Delivery) {
|
||||
if fn, ok := c.getHandler(msg.RoutingKey); ok {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
@ -54,7 +57,7 @@ func (c *Consumer) dispatch(msg amqp.Delivery) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) getHandler(action string) (eventHandler, bool) {
|
||||
func (c *Consumer) getHandler(action string) (messaging.EventHandler, bool) {
|
||||
for _, h := range c.handlers {
|
||||
if h.re.MatchString(action) {
|
||||
return h.handler, true
|
||||
@ -64,7 +67,7 @@ func (c *Consumer) getHandler(action string) (eventHandler, bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (c *Consumer) Subscribe(action string, handlerFn eventHandler) error {
|
||||
func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) error {
|
||||
// TODO: Replace # pattern too.
|
||||
pattern := strings.Replace(action, "*", "(.*)", 0)
|
||||
re, err := regexp.Compile(pattern)
|
@ -1,4 +1,4 @@
|
||||
package events
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"testing"
|
||||
@ -18,9 +18,12 @@ func TestPublishConsume(t *testing.T) {
|
||||
defer conn.Close()
|
||||
|
||||
// Clean all messages if any...
|
||||
conn.channel.QueuePurge(conn.queueName, false)
|
||||
amqpConn := conn.(*Connection)
|
||||
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
|
||||
|
||||
c := NewConsumer(conn, false)
|
||||
c, err := NewConsumer(conn, false)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
c.Subscribe("my_action_1", func(body []byte) bool {
|
||||
func1 <- true
|
||||
@ -34,7 +37,9 @@ func TestPublishConsume(t *testing.T) {
|
||||
|
||||
c.Listen()
|
||||
|
||||
p := NewProducer(conn)
|
||||
p, err := NewProducer(conn)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = p.Publish("my_action_1", []byte(""))
|
||||
|
||||
@ -60,9 +65,12 @@ func TestPublishConsumeWildcardAction(t *testing.T) {
|
||||
defer conn.Close()
|
||||
|
||||
// Clean all messages if any...
|
||||
conn.channel.QueuePurge(conn.queueName, false)
|
||||
amqpConn := conn.(*Connection)
|
||||
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
|
||||
|
||||
c := NewConsumer(conn, false)
|
||||
c, err := NewConsumer(conn, false)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
c.Subscribe("webinar.*", func(body []byte) bool {
|
||||
func1 <- true
|
||||
@ -76,7 +84,9 @@ func TestPublishConsumeWildcardAction(t *testing.T) {
|
||||
|
||||
c.Listen()
|
||||
|
||||
p := NewProducer(conn)
|
||||
p, err := NewProducer(conn)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = p.Publish("webinar.state_changed", []byte(""))
|
||||
|
||||
@ -102,9 +112,12 @@ func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) {
|
||||
defer conn.Close()
|
||||
|
||||
// Clean all messages if any...
|
||||
conn.channel.QueuePurge(conn.queueName, false)
|
||||
amqpConn := conn.(*Connection)
|
||||
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
|
||||
|
||||
c := NewConsumer(conn, false)
|
||||
c, err := NewConsumer(conn, false)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
c.Subscribe("webinar.*", func(body []byte) bool {
|
||||
func1 <- true
|
||||
@ -118,7 +131,9 @@ func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) {
|
||||
|
||||
c.Listen()
|
||||
|
||||
p := NewProducer(conn)
|
||||
p, err := NewProducer(conn)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = p.Publish("webinar.state_changed", []byte(""))
|
||||
|
||||
@ -144,9 +159,12 @@ func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) {
|
||||
defer conn.Close()
|
||||
|
||||
// Clean all messages if any...
|
||||
conn.channel.QueuePurge(conn.queueName, false)
|
||||
amqpConn := conn.(*Connection)
|
||||
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
|
||||
|
||||
c := NewConsumer(conn, false)
|
||||
c, err := NewConsumer(conn, false)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
c.Subscribe("webinar.state_changed", func(body []byte) bool {
|
||||
func1 <- true
|
||||
@ -160,7 +178,9 @@ func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) {
|
||||
|
||||
c.Listen()
|
||||
|
||||
p := NewProducer(conn)
|
||||
p, err := NewProducer(conn)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = p.Publish("webinar.state_changed", []byte(""))
|
||||
|
||||
@ -186,9 +206,12 @@ func TestPublishConsumeRequeueIfFail(t *testing.T) {
|
||||
defer conn.Close()
|
||||
|
||||
// Clean all messages if any...
|
||||
conn.channel.QueuePurge(conn.queueName, false)
|
||||
amqpConn := conn.(*Connection)
|
||||
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
|
||||
|
||||
c := NewConsumer(conn, false)
|
||||
c, err := NewConsumer(conn, false)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
c.Subscribe("my_action", func(body []byte) bool {
|
||||
if calledOnce {
|
||||
@ -202,7 +225,9 @@ func TestPublishConsumeRequeueIfFail(t *testing.T) {
|
||||
|
||||
c.Listen()
|
||||
|
||||
p := NewProducer(conn)
|
||||
p, err := NewProducer(conn)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = p.Publish("my_action", []byte(""))
|
||||
|
||||
@ -226,9 +251,12 @@ func TestPublishConsumeRequeueIfPanic(t *testing.T) {
|
||||
defer conn.Close()
|
||||
|
||||
// Clean all messages if any...
|
||||
conn.channel.QueuePurge(conn.queueName, false)
|
||||
amqpConn := conn.(*Connection)
|
||||
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
|
||||
|
||||
c := NewConsumer(conn, false)
|
||||
c, err := NewConsumer(conn, false)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
c.Subscribe("my_action", func(body []byte) bool {
|
||||
if calledOnce {
|
||||
@ -242,7 +270,9 @@ func TestPublishConsumeRequeueIfPanic(t *testing.T) {
|
||||
|
||||
c.Listen()
|
||||
|
||||
p := NewProducer(conn)
|
||||
p, err := NewProducer(conn)
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = p.Publish("my_action", []byte(""))
|
||||
|
31
amqp/producer.go
Normal file
31
amqp/producer.go
Normal file
@ -0,0 +1,31 @@
|
||||
package amqp
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/eventials/goevents/messaging"
|
||||
|
||||
amqplib "github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
type Producer struct {
|
||||
conn *Connection
|
||||
}
|
||||
|
||||
func NewProducer(c messaging.Connection) (messaging.Producer, error) {
|
||||
amqpConn := c.(*Connection)
|
||||
|
||||
return &Producer{
|
||||
amqpConn,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *Producer) Publish(action string, data []byte) error {
|
||||
msg := amqplib.Publishing{
|
||||
DeliveryMode: amqplib.Persistent,
|
||||
Timestamp: time.Now(),
|
||||
Body: data,
|
||||
}
|
||||
|
||||
return p.conn.channel.Publish(p.conn.exchangeName, action, false, false, msg)
|
||||
}
|
7
messaging/connection.go
Normal file
7
messaging/connection.go
Normal file
@ -0,0 +1,7 @@
|
||||
package messaging
|
||||
|
||||
type Connection interface {
|
||||
Consumer(autoAck bool) (Consumer, error)
|
||||
Producer() (Producer, error)
|
||||
Close()
|
||||
}
|
10
messaging/consumer.go
Normal file
10
messaging/consumer.go
Normal file
@ -0,0 +1,10 @@
|
||||
package messaging
|
||||
|
||||
type EventHandler func(body []byte) bool
|
||||
|
||||
type Consumer interface {
|
||||
Subscribe(action string, handler EventHandler) error
|
||||
Unsubscribe(action string) error
|
||||
Listen() error
|
||||
ListenForever() error
|
||||
}
|
5
messaging/producer.go
Normal file
5
messaging/producer.go
Normal file
@ -0,0 +1,5 @@
|
||||
package messaging
|
||||
|
||||
type Producer interface {
|
||||
Publish(action string, data []byte) error
|
||||
}
|
24
mock/connection.go
Normal file
24
mock/connection.go
Normal file
@ -0,0 +1,24 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"github.com/eventials/goevents/messaging"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
type Connection struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (c *Connection) Consumer(autoAck bool) (messaging.Consumer, error) {
|
||||
args := c.Called(autoAck)
|
||||
return args.Get(0).(messaging.Consumer), args.Error(1)
|
||||
}
|
||||
|
||||
func (c *Connection) Producer() (messaging.Producer, error) {
|
||||
args := c.Called()
|
||||
return args.Get(0).(*Producer), args.Error(1)
|
||||
}
|
||||
|
||||
func (c *Connection) Close() {
|
||||
c.Called()
|
||||
}
|
30
mock/consumer.go
Normal file
30
mock/consumer.go
Normal file
@ -0,0 +1,30 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"github.com/eventials/goevents/messaging"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
type Consumer struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (c *Consumer) Subscribe(action string, handler messaging.EventHandler) error {
|
||||
args := c.Called(action, handler)
|
||||
return args.Error(1)
|
||||
}
|
||||
|
||||
func (c *Consumer) Unsubscribe(action string) error {
|
||||
args := c.Called(action)
|
||||
return args.Error(1)
|
||||
}
|
||||
|
||||
func (c *Consumer) Listen() error {
|
||||
args := c.Called()
|
||||
return args.Error(1)
|
||||
}
|
||||
|
||||
func (c *Consumer) ListenForever() error {
|
||||
args := c.Called()
|
||||
return args.Error(1)
|
||||
}
|
14
mock/producer.go
Normal file
14
mock/producer.go
Normal file
@ -0,0 +1,14 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
type Producer struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (p *Producer) Publish(action string, data []byte) error {
|
||||
args := p.Called(action, data)
|
||||
return args.Error(1)
|
||||
}
|
27
producer.go
27
producer.go
@ -1,27 +0,0 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
type Producer struct {
|
||||
conn *Connection
|
||||
}
|
||||
|
||||
func NewProducer(c *Connection) *Producer {
|
||||
return &Producer{
|
||||
c,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Producer) Publish(action string, data []byte) error {
|
||||
msg := amqp.Publishing{
|
||||
DeliveryMode: amqp.Persistent,
|
||||
Timestamp: time.Now(),
|
||||
Body: data,
|
||||
}
|
||||
|
||||
return p.conn.channel.Publish(p.conn.exchangeName, action, false, false, msg)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user