From 144d3870f6f9c68a4279f9ce05e45e6b33ba82e3 Mon Sep 17 00:00:00 2001 From: Alexandre Vicenzi Date: Thu, 1 Dec 2016 10:52:22 -0200 Subject: [PATCH] Use interface to allow mocking. --- .gitignore | 1 + amqp/connection.go | 77 ++++++++ amqp/consumer.go | 165 ++++++++++++++++++ .../integration_test.go | 68 ++++++-- amqp/producer.go | 30 ++++ connection.go | 77 +------- consumer.go | 165 +----------------- producer.go | 26 +-- 8 files changed, 334 insertions(+), 275 deletions(-) create mode 100644 amqp/connection.go create mode 100644 amqp/consumer.go rename integration_test.go => amqp/integration_test.go (77%) create mode 100644 amqp/producer.go diff --git a/.gitignore b/.gitignore index daf913b..eb1369f 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ _testmain.go *.exe *.test *.prof +.DS_Store diff --git a/amqp/connection.go b/amqp/connection.go new file mode 100644 index 0000000..0efd6a4 --- /dev/null +++ b/amqp/connection.go @@ -0,0 +1,77 @@ +package amqp + +import ( + base "github.com/eventials/goevents" + amqplib "github.com/streadway/amqp" +) + +type Connection struct { + connection *amqplib.Connection + channel *amqplib.Channel + queue *amqplib.Queue + + exchangeName string + queueName string +} + +func NewConnection(url, exchange, queue string) (base.Connection, error) { + conn, err := amqplib.Dial(url) + + if err != nil { + return nil, err + } + + ch, err := conn.Channel() + + if err != nil { + return nil, err + } + + err = ch.ExchangeDeclare( + exchange, // name + "topic", // type + true, // durable + false, // auto-delete + false, // internal + false, // no-wait + nil, // arguments + ) + + if err != nil { + return nil, err + } + + q, err := ch.QueueDeclare( + queue, // name + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + return nil, err + } + + return &Connection{ + conn, + ch, + &q, + exchange, + queue, + }, nil +} + +func (c *Connection) Consumer(autoAck bool) (base.Consumer, error) { + return NewConsumer(c, autoAck) +} + +func (c *Connection) Producer() (base.Producer, error) { + return NewProducer(c) +} + +func (c *Connection) Close() { + c.channel.Close() + c.connection.Close() +} diff --git a/amqp/consumer.go b/amqp/consumer.go new file mode 100644 index 0000000..0c32897 --- /dev/null +++ b/amqp/consumer.go @@ -0,0 +1,165 @@ +package amqp + +import ( + "regexp" + "strings" + + base "github.com/eventials/goevents" + amqplib "github.com/streadway/amqp" +) + +type handler struct { + action string + handler base.EventHandler + re *regexp.Regexp +} + +type Consumer struct { + conn *Connection + autoAck bool + handlers []handler +} + +func NewConsumer(c base.Connection, autoAck bool) (base.Consumer, error) { + amqpConn := c.(*Connection) + + return &Consumer{ + amqpConn, + autoAck, + make([]handler, 0), + }, nil +} + +func (c *Consumer) dispatch(msg amqplib.Delivery) { + if fn, ok := c.getHandler(msg.RoutingKey); ok { + defer func() { + if err := recover(); err != nil { + if !c.autoAck { + msg.Nack(false, true) + } + } + }() + + ok := fn(msg.Body) + + if !c.autoAck { + if ok { + msg.Ack(false) + } else { + msg.Nack(false, true) + } + } + } else { + // got a message from wrong exchange? + // ignore and don't requeue. + msg.Nack(false, false) + } +} + +func (c *Consumer) getHandler(action string) (base.EventHandler, bool) { + for _, h := range c.handlers { + if h.re.MatchString(action) { + return h.handler, true + } + } + + return nil, false +} + +func (c *Consumer) Subscribe(action string, handlerFn base.EventHandler) error { + // TODO: Replace # pattern too. + pattern := strings.Replace(action, "*", "(.*)", 0) + re, err := regexp.Compile(pattern) + + if err != nil { + return err + } + + for _, h := range c.handlers { + if h.action == action { + // return fmt.Errorf("Action '%s' already registered.", action) + } + } + + err = c.conn.channel.QueueBind( + c.conn.queueName, // queue name + action, // routing key + c.conn.exchangeName, // exchange + false, // no-wait + nil, // arguments + ) + + if err != nil { + return err + } + + c.handlers = append(c.handlers, handler{ + action, + handlerFn, + re, + }) + + return nil +} + +func (c *Consumer) Unsubscribe(action string) error { + err := c.conn.channel.QueueUnbind( + c.conn.queueName, // queue name + action, // routing key + c.conn.exchangeName, // exchange + nil, // arguments + ) + + if err != nil { + return err + } + + idx := -1 + + for i, h := range c.handlers { + if h.action == action { + idx = i + break + } + } + + if idx != -1 { + c.handlers = append(c.handlers[:idx], c.handlers[idx+1:]...) + } + + return nil +} + +func (c *Consumer) Listen() error { + msgs, err := c.conn.channel.Consume( + c.conn.queueName, // queue + "", // consumer + c.autoAck, // auto ack + false, // exclusive + false, // no local + false, // no wait + nil, // args + ) + + if err != nil { + return err + } + + go func() { + for m := range msgs { + c.dispatch(m) + } + }() + + return nil +} + +func (c *Consumer) ListenForever() error { + err := c.Listen() + + if err != nil { + return err + } + + select {} +} diff --git a/integration_test.go b/amqp/integration_test.go similarity index 77% rename from integration_test.go rename to amqp/integration_test.go index 16b6235..13f117f 100644 --- a/integration_test.go +++ b/amqp/integration_test.go @@ -1,4 +1,4 @@ -package events +package amqp import ( "testing" @@ -18,9 +18,12 @@ func TestPublishConsume(t *testing.T) { defer conn.Close() // Clean all messages if any... - conn.channel.QueuePurge(conn.queueName, false) + amqpConn := conn.(*Connection) + amqpConn.channel.QueuePurge(amqpConn.queueName, false) - c := NewConsumer(conn, false) + c, err := NewConsumer(conn, false) + + assert.Nil(t, err) c.Subscribe("my_action_1", func(body []byte) bool { func1 <- true @@ -34,7 +37,9 @@ func TestPublishConsume(t *testing.T) { c.Listen() - p := NewProducer(conn) + p, err := NewProducer(conn) + + assert.Nil(t, err) err = p.Publish("my_action_1", []byte("")) @@ -60,9 +65,12 @@ func TestPublishConsumeWildcardAction(t *testing.T) { defer conn.Close() // Clean all messages if any... - conn.channel.QueuePurge(conn.queueName, false) + amqpConn := conn.(*Connection) + amqpConn.channel.QueuePurge(amqpConn.queueName, false) - c := NewConsumer(conn, false) + c, err := NewConsumer(conn, false) + + assert.Nil(t, err) c.Subscribe("webinar.*", func(body []byte) bool { func1 <- true @@ -76,7 +84,9 @@ func TestPublishConsumeWildcardAction(t *testing.T) { c.Listen() - p := NewProducer(conn) + p, err := NewProducer(conn) + + assert.Nil(t, err) err = p.Publish("webinar.state_changed", []byte("")) @@ -102,9 +112,12 @@ func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) { defer conn.Close() // Clean all messages if any... - conn.channel.QueuePurge(conn.queueName, false) + amqpConn := conn.(*Connection) + amqpConn.channel.QueuePurge(amqpConn.queueName, false) - c := NewConsumer(conn, false) + c, err := NewConsumer(conn, false) + + assert.Nil(t, err) c.Subscribe("webinar.*", func(body []byte) bool { func1 <- true @@ -118,7 +131,9 @@ func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) { c.Listen() - p := NewProducer(conn) + p, err := NewProducer(conn) + + assert.Nil(t, err) err = p.Publish("webinar.state_changed", []byte("")) @@ -144,9 +159,12 @@ func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) { defer conn.Close() // Clean all messages if any... - conn.channel.QueuePurge(conn.queueName, false) + amqpConn := conn.(*Connection) + amqpConn.channel.QueuePurge(amqpConn.queueName, false) - c := NewConsumer(conn, false) + c, err := NewConsumer(conn, false) + + assert.Nil(t, err) c.Subscribe("webinar.state_changed", func(body []byte) bool { func1 <- true @@ -160,7 +178,9 @@ func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) { c.Listen() - p := NewProducer(conn) + p, err := NewProducer(conn) + + assert.Nil(t, err) err = p.Publish("webinar.state_changed", []byte("")) @@ -186,9 +206,12 @@ func TestPublishConsumeRequeueIfFail(t *testing.T) { defer conn.Close() // Clean all messages if any... - conn.channel.QueuePurge(conn.queueName, false) + amqpConn := conn.(*Connection) + amqpConn.channel.QueuePurge(amqpConn.queueName, false) - c := NewConsumer(conn, false) + c, err := NewConsumer(conn, false) + + assert.Nil(t, err) c.Subscribe("my_action", func(body []byte) bool { if calledOnce { @@ -202,7 +225,9 @@ func TestPublishConsumeRequeueIfFail(t *testing.T) { c.Listen() - p := NewProducer(conn) + p, err := NewProducer(conn) + + assert.Nil(t, err) err = p.Publish("my_action", []byte("")) @@ -226,9 +251,12 @@ func TestPublishConsumeRequeueIfPanic(t *testing.T) { defer conn.Close() // Clean all messages if any... - conn.channel.QueuePurge(conn.queueName, false) + amqpConn := conn.(*Connection) + amqpConn.channel.QueuePurge(amqpConn.queueName, false) - c := NewConsumer(conn, false) + c, err := NewConsumer(conn, false) + + assert.Nil(t, err) c.Subscribe("my_action", func(body []byte) bool { if calledOnce { @@ -242,7 +270,9 @@ func TestPublishConsumeRequeueIfPanic(t *testing.T) { c.Listen() - p := NewProducer(conn) + p, err := NewProducer(conn) + + assert.Nil(t, err) err = p.Publish("my_action", []byte("")) diff --git a/amqp/producer.go b/amqp/producer.go new file mode 100644 index 0000000..58f7c1e --- /dev/null +++ b/amqp/producer.go @@ -0,0 +1,30 @@ +package amqp + +import ( + "time" + + base "github.com/eventials/goevents" + amqplib "github.com/streadway/amqp" +) + +type Producer struct { + conn *Connection +} + +func NewProducer(c base.Connection) (base.Producer, error) { + amqpConn := c.(*Connection) + + return &Producer{ + amqpConn, + }, nil +} + +func (p *Producer) Publish(action string, data []byte) error { + msg := amqplib.Publishing{ + DeliveryMode: amqplib.Persistent, + Timestamp: time.Now(), + Body: data, + } + + return p.conn.channel.Publish(p.conn.exchangeName, action, false, false, msg) +} diff --git a/connection.go b/connection.go index 6dbc3cc..1b48718 100644 --- a/connection.go +++ b/connection.go @@ -1,76 +1,7 @@ package events -import ( - "github.com/streadway/amqp" -) - -type Connection struct { - connection *amqp.Connection - channel *amqp.Channel - queue *amqp.Queue - - exchangeName string - queueName string -} - -func NewConnection(url, exchange, queue string) (*Connection, error) { - conn, err := amqp.Dial(url) - - if err != nil { - return nil, err - } - - ch, err := conn.Channel() - - if err != nil { - return nil, err - } - - err = ch.ExchangeDeclare( - exchange, // name - "topic", // type - true, // durable - false, // auto-delete - false, // internal - false, // no-wait - nil, // arguments - ) - - if err != nil { - return nil, err - } - - q, err := ch.QueueDeclare( - queue, // name - true, // durable - false, // auto-delete - false, // exclusive - false, // no-wait - nil, // arguments - ) - - if err != nil { - return nil, err - } - - return &Connection{ - conn, - ch, - &q, - exchange, - queue, - }, nil -} - -func (c *Connection) Consumer(autoAck bool) *Consumer { - return NewConsumer(c, autoAck) -} - -func (c *Connection) Producer() *Producer { - return NewProducer(c) -} - -func (c *Connection) Close() { - c.channel.Close() - c.connection.Close() +type Connection interface { + Consumer(autoAck bool) (Consumer, error) + Producer() (Producer, error) + Close() } diff --git a/consumer.go b/consumer.go index 88cfacc..69c83e6 100644 --- a/consumer.go +++ b/consumer.go @@ -1,163 +1,10 @@ package events -import ( - "github.com/streadway/amqp" - "regexp" - "strings" -) +type EventHandler func(body []byte) bool -type eventHandler func(body []byte) bool - -type handler struct { - action string - handler eventHandler - re *regexp.Regexp -} - -type Consumer struct { - conn *Connection - autoAck bool - handlers []handler -} - -func NewConsumer(c *Connection, autoAck bool) *Consumer { - return &Consumer{ - c, - autoAck, - make([]handler, 0), - } -} - -func (c *Consumer) dispatch(msg amqp.Delivery) { - if fn, ok := c.getHandler(msg.RoutingKey); ok { - defer func() { - if err := recover(); err != nil { - if !c.autoAck { - msg.Nack(false, true) - } - } - }() - - ok := fn(msg.Body) - - if !c.autoAck { - if ok { - msg.Ack(false) - } else { - msg.Nack(false, true) - } - } - } else { - // got a message from wrong exchange? - // ignore and don't requeue. - msg.Nack(false, false) - } -} - -func (c *Consumer) getHandler(action string) (eventHandler, bool) { - for _, h := range c.handlers { - if h.re.MatchString(action) { - return h.handler, true - } - } - - return nil, false -} - -func (c *Consumer) Subscribe(action string, handlerFn eventHandler) error { - // TODO: Replace # pattern too. - pattern := strings.Replace(action, "*", "(.*)", 0) - re, err := regexp.Compile(pattern) - - if err != nil { - return err - } - - for _, h := range c.handlers { - if h.action == action { - // return fmt.Errorf("Action '%s' already registered.", action) - } - } - - err = c.conn.channel.QueueBind( - c.conn.queueName, // queue name - action, // routing key - c.conn.exchangeName, // exchange - false, // no-wait - nil, // arguments - ) - - if err != nil { - return err - } - - c.handlers = append(c.handlers, handler{ - action, - handlerFn, - re, - }) - - return nil -} - -func (c *Consumer) Unsubscribe(action string) error { - err := c.conn.channel.QueueUnbind( - c.conn.queueName, // queue name - action, // routing key - c.conn.exchangeName, // exchange - nil, // arguments - ) - - if err != nil { - return err - } - - idx := -1 - - for i, h := range c.handlers { - if h.action == action { - idx = i - break - } - } - - if idx != -1 { - c.handlers = append(c.handlers[:idx], c.handlers[idx+1:]...) - } - - return nil -} - -func (c *Consumer) Listen() error { - msgs, err := c.conn.channel.Consume( - c.conn.queueName, // queue - "", // consumer - c.autoAck, // auto ack - false, // exclusive - false, // no local - false, // no wait - nil, // args - ) - - if err != nil { - return err - } - - go func() { - for m := range msgs { - c.dispatch(m) - } - }() - - return nil -} - -func (c *Consumer) ListenForever() error { - err := c.Listen() - - if err != nil { - return err - } - - select {} +type Consumer interface { + Subscribe(action string, handler EventHandler) error + Unsubscribe(action string) error + Listen() error + ListenForever() error } diff --git a/producer.go b/producer.go index d343216..a041b48 100644 --- a/producer.go +++ b/producer.go @@ -1,27 +1,5 @@ package events -import ( - "time" - - "github.com/streadway/amqp" -) - -type Producer struct { - conn *Connection -} - -func NewProducer(c *Connection) *Producer { - return &Producer{ - c, - } -} - -func (p *Producer) Publish(action string, data []byte) error { - msg := amqp.Publishing{ - DeliveryMode: amqp.Persistent, - Timestamp: time.Now(), - Body: data, - } - - return p.conn.channel.Publish(p.conn.exchangeName, action, false, false, msg) +type Producer interface { + Publish(action string, data []byte) error }