From 166f0110f6f725e7e78719cb87bcd15ba120c400 Mon Sep 17 00:00:00 2001 From: Alexandre Vicenzi Date: Fri, 25 Nov 2016 18:51:34 -0200 Subject: [PATCH 1/4] Go messaging library. --- Dockerfile | 12 ++++ connection.go | 76 +++++++++++++++++++++ consumer.go | 163 +++++++++++++++++++++++++++++++++++++++++++++ docker-compose.yml | 19 ++++++ main.go | 5 ++ producer.go | 29 ++++++++ 6 files changed, 304 insertions(+) create mode 100644 Dockerfile create mode 100644 connection.go create mode 100644 consumer.go create mode 100644 docker-compose.yml create mode 100644 main.go create mode 100644 producer.go diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..dcab353 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.7 + +RUN apt-get update && apt-get install -y wget +RUN wget https://github.com/jwilder/dockerize/releases/download/v0.2.0/dockerize-linux-amd64-v0.2.0.tar.gz +RUN tar -C /usr/local/bin -xzvf dockerize-linux-amd64-v0.2.0.tar.gz + +RUN mkdir -p /go/src/github.com/eventials/goevents +WORKDIR /go/src/github.com/eventials/goevents + +RUN go get github.com/eventials/golog +RUN go get github.com/streadway/amqp +RUN go get github.com/stretchr/testify diff --git a/connection.go b/connection.go new file mode 100644 index 0000000..b4454d1 --- /dev/null +++ b/connection.go @@ -0,0 +1,76 @@ +package main + +import ( + "github.com/streadway/amqp" +) + +type Connection struct { + connection *amqp.Connection + channel *amqp.Channel + queue *amqp.Queue + + exchangeName string + queueName string +} + +func NewConnection(url, exchange, queue string) (*Connection, error) { + conn, err := amqp.Dial(url) + + if err != nil { + return nil, err + } + + ch, err := conn.Channel() + + if err != nil { + return nil, err + } + + err = ch.ExchangeDeclare( + exchange, // name + "topic", // type + true, // durable + false, // auto-delete + false, // internal + false, // no-wait + nil, // arguments + ) + + if err != nil { + return nil, err + } + + q, err := ch.QueueDeclare( + queue, // name + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + return nil, err + } + + return &Connection{ + conn, + ch, + &q, + exchange, + queue, + }, nil +} + +func (c *Connection) Consumer(autoAck bool) *Consumer { + return NewConsumer(c, autoAck) +} + +func (c *Connection) Producer() *Producer { + return NewProducer(c) +} + +func (c *Connection) Close() { + c.channel.Close() + c.connection.Close() +} diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..19c3e8a --- /dev/null +++ b/consumer.go @@ -0,0 +1,163 @@ +package main + +import ( + "github.com/streadway/amqp" + "regexp" + "strings" +) + +type eventHandler func(body []byte) bool + +type handler struct { + action string + handler eventHandler + re *regexp.Regexp +} + +type Consumer struct { + conn *Connection + autoAck bool + handlers []handler +} + +func NewConsumer(c *Connection, autoAck bool) *Consumer { + return &Consumer{ + c, + autoAck, + make([]handler, 0), + } +} + +func (c *Consumer) dispatch(msg amqp.Delivery) { + if fn, ok := c.getHandler(msg.RoutingKey); ok { + defer func() { + if err := recover(); err != nil { + if !c.autoAck { + msg.Nack(false, true) + } + } + }() + + ok := fn(msg.Body) + + if !c.autoAck { + if ok { + msg.Ack(false) + } else { + msg.Nack(false, true) + } + } + } else { + // got a message from wrong exchange? + // ignore and don't requeue. + msg.Nack(false, false) + } +} + +func (c *Consumer) getHandler(action string) (eventHandler, bool) { + for _, h := range c.handlers { + if h.re.MatchString(action) { + return h.handler, true + } + } + + return nil, false +} + +func (c *Consumer) Subscribe(action string, handlerFn eventHandler) error { + // TODO: Replace # pattern too. + pattern := strings.Replace(action, "*", "(.*)", 0) + re, err := regexp.Compile(pattern) + + if err != nil { + return err + } + + for _, h := range c.handlers { + if h.action == action { + // return fmt.Errorf("Action '%s' already registered.", action) + } + } + + err = c.conn.channel.QueueBind( + c.conn.queueName, // queue name + action, // routing key + c.conn.exchangeName, // exchange + false, // no-wait + nil, // arguments + ) + + if err != nil { + return err + } + + c.handlers = append(c.handlers, handler{ + action, + handlerFn, + re, + }) + + return nil +} + +func (c *Consumer) Unsubscribe(action string) error { + err := c.conn.channel.QueueUnbind( + c.conn.queueName, // queue name + action, // routing key + c.conn.exchangeName, // exchange + nil, // arguments + ) + + if err != nil { + return err + } + + idx := -1 + + for i, h := range c.handlers { + if h.action == action { + idx = i + break + } + } + + if idx != -1 { + c.handlers = append(c.handlers[:idx], c.handlers[idx+1:]...) + } + + return nil +} + +func (c *Consumer) Listen() error { + msgs, err := c.conn.channel.Consume( + c.conn.queueName, // queue + "", // consumer + c.autoAck, // auto ack + false, // exclusive + false, // no local + false, // no wait + nil, // args + ) + + if err != nil { + return err + } + + go func() { + for m := range msgs { + c.dispatch(m) + } + }() + + return nil +} + +func (c *Consumer) ListenForever() error { + err := c.Listen() + + if err != nil { + return err + } + + select {} +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..c441208 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,19 @@ +version: '2' +services: + app: + build: . + working_dir: /go/src/github.com/eventials/goevents + command: dockerize -wait tcp://broker:5672 -timeout 60s go run main.go + environment: + APP_AMQP_URL: 'amqp://guest:guest@broker:5672/' + APP_AMQP_AUTOACK: 'false' + APP_AMQP_EXCHANGE: event_dispatcher + APP_AMQP_QUEUE: webhooks + links: + - broker + volumes: + - .:/go/src/github.com/eventials/goevents + broker: + image: rabbitmq:3.6-management + # ports: + # - "15672:15672" diff --git a/main.go b/main.go new file mode 100644 index 0000000..7905807 --- /dev/null +++ b/main.go @@ -0,0 +1,5 @@ +package main + +func main() { + +} diff --git a/producer.go b/producer.go new file mode 100644 index 0000000..caa701d --- /dev/null +++ b/producer.go @@ -0,0 +1,29 @@ +package main + +import ( + "time" + + "github.com/streadway/amqp" +) + +type Producer struct { + conn *Connection +} + +func NewProducer(c *Connection) *Producer { + return &Producer{ + c, + } +} + +func (p *Producer) Publish(action string, data interface{}) error { + msg := amqp.Publishing{ + DeliveryMode: amqp.Persistent, + Timestamp: time.Now(), + ContentType: "application/json", + ContentEncoding: "utf-8", + Body: data, + } + + return c.channel.Publish(c.exchangeName, routingKey, false, false, msg) +} From 3bae7283b06e407360e89e89ff6bfb5b4b0cf8f5 Mon Sep 17 00:00:00 2001 From: Alexandre Vicenzi Date: Mon, 28 Nov 2016 15:08:56 -0200 Subject: [PATCH 2/4] Fix errors. --- producer.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/producer.go b/producer.go index caa701d..01d1576 100644 --- a/producer.go +++ b/producer.go @@ -16,14 +16,12 @@ func NewProducer(c *Connection) *Producer { } } -func (p *Producer) Publish(action string, data interface{}) error { +func (p *Producer) Publish(action string, data []byte) error { msg := amqp.Publishing{ DeliveryMode: amqp.Persistent, Timestamp: time.Now(), - ContentType: "application/json", - ContentEncoding: "utf-8", Body: data, } - return c.channel.Publish(c.exchangeName, routingKey, false, false, msg) + return p.conn.channel.Publish(p.conn.exchangeName, action, false, false, msg) } From 9736a24867b2a624095de0a14b5fefb1c65fee3a Mon Sep 17 00:00:00 2001 From: Alexandre Vicenzi Date: Mon, 28 Nov 2016 15:35:19 -0200 Subject: [PATCH 3/4] Add tests. --- connection.go | 2 +- consumer.go | 2 +- integration_test.go | 256 ++++++++++++++++++++++++++++++++++++++++++++ main.go | 5 - producer.go | 2 +- 5 files changed, 259 insertions(+), 8 deletions(-) create mode 100644 integration_test.go delete mode 100644 main.go diff --git a/connection.go b/connection.go index b4454d1..6dbc3cc 100644 --- a/connection.go +++ b/connection.go @@ -1,4 +1,4 @@ -package main +package events import ( "github.com/streadway/amqp" diff --git a/consumer.go b/consumer.go index 19c3e8a..88cfacc 100644 --- a/consumer.go +++ b/consumer.go @@ -1,4 +1,4 @@ -package main +package events import ( "github.com/streadway/amqp" diff --git a/integration_test.go b/integration_test.go new file mode 100644 index 0000000..16b6235 --- /dev/null +++ b/integration_test.go @@ -0,0 +1,256 @@ +package events + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPublishConsume(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks") + + assert.Nil(t, err) + + defer conn.Close() + + // Clean all messages if any... + conn.channel.QueuePurge(conn.queueName, false) + + c := NewConsumer(conn, false) + + c.Subscribe("my_action_1", func(body []byte) bool { + func1 <- true + return true + }) + + c.Subscribe("my_action_2", func(body []byte) bool { + func2 <- true + return true + }) + + c.Listen() + + p := NewProducer(conn) + + err = p.Publish("my_action_1", []byte("")) + + assert.Nil(t, err) + + select { + case <-func1: + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(5 * time.Second): + assert.Fail(t, "timed out") + } +} + +func TestPublishConsumeWildcardAction(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks") + + assert.Nil(t, err) + + defer conn.Close() + + // Clean all messages if any... + conn.channel.QueuePurge(conn.queueName, false) + + c := NewConsumer(conn, false) + + c.Subscribe("webinar.*", func(body []byte) bool { + func1 <- true + return true + }) + + c.Subscribe("foobar.*", func(body []byte) bool { + func2 <- true + return true + }) + + c.Listen() + + p := NewProducer(conn) + + err = p.Publish("webinar.state_changed", []byte("")) + + assert.Nil(t, err) + + select { + case <-func1: + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(5 * time.Second): + assert.Fail(t, "timed out") + } +} + +func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks") + + assert.Nil(t, err) + + defer conn.Close() + + // Clean all messages if any... + conn.channel.QueuePurge(conn.queueName, false) + + c := NewConsumer(conn, false) + + c.Subscribe("webinar.*", func(body []byte) bool { + func1 <- true + return true + }) + + c.Subscribe("webinar.state_changed", func(body []byte) bool { + func2 <- true + return true + }) + + c.Listen() + + p := NewProducer(conn) + + err = p.Publish("webinar.state_changed", []byte("")) + + assert.Nil(t, err) + + select { + case <-func1: + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(5 * time.Second): + assert.Fail(t, "timed out") + } +} + +func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) { + func1 := make(chan bool) + func2 := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks") + + assert.Nil(t, err) + + defer conn.Close() + + // Clean all messages if any... + conn.channel.QueuePurge(conn.queueName, false) + + c := NewConsumer(conn, false) + + c.Subscribe("webinar.state_changed", func(body []byte) bool { + func1 <- true + return true + }) + + c.Subscribe("webinar.*", func(body []byte) bool { + func2 <- true + return true + }) + + c.Listen() + + p := NewProducer(conn) + + err = p.Publish("webinar.state_changed", []byte("")) + + assert.Nil(t, err) + + select { + case <-func1: + case <-func2: + assert.Fail(t, "called wrong action") + case <-time.After(5 * time.Second): + assert.Fail(t, "timed out") + } +} + +func TestPublishConsumeRequeueIfFail(t *testing.T) { + calledOnce := false + called := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks") + + assert.Nil(t, err) + + defer conn.Close() + + // Clean all messages if any... + conn.channel.QueuePurge(conn.queueName, false) + + c := NewConsumer(conn, false) + + c.Subscribe("my_action", func(body []byte) bool { + if calledOnce { + called <- true + return true + } else { + calledOnce = true + return false + } + }) + + c.Listen() + + p := NewProducer(conn) + + err = p.Publish("my_action", []byte("")) + + assert.Nil(t, err) + + select { + case <-called: + case <-time.After(5 * time.Second): + assert.Fail(t, "timed out") + } +} + +func TestPublishConsumeRequeueIfPanic(t *testing.T) { + calledOnce := false + called := make(chan bool) + + conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks") + + assert.Nil(t, err) + + defer conn.Close() + + // Clean all messages if any... + conn.channel.QueuePurge(conn.queueName, false) + + c := NewConsumer(conn, false) + + c.Subscribe("my_action", func(body []byte) bool { + if calledOnce { + called <- true + return true + } else { + calledOnce = true + panic("this is a panic!") + } + }) + + c.Listen() + + p := NewProducer(conn) + + err = p.Publish("my_action", []byte("")) + + assert.Nil(t, err) + + select { + case <-called: + case <-time.After(5 * time.Second): + assert.Fail(t, "timed out") + } +} diff --git a/main.go b/main.go deleted file mode 100644 index 7905807..0000000 --- a/main.go +++ /dev/null @@ -1,5 +0,0 @@ -package main - -func main() { - -} diff --git a/producer.go b/producer.go index 01d1576..d343216 100644 --- a/producer.go +++ b/producer.go @@ -1,4 +1,4 @@ -package main +package events import ( "time" From 3e3bb7a7def1ceeb204df9a8008e0e5d4d05072e Mon Sep 17 00:00:00 2001 From: Alexandre Vicenzi Date: Mon, 28 Nov 2016 15:36:24 -0200 Subject: [PATCH 4/4] Remove env vars. --- docker-compose.yml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index c441208..ffde7a7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,11 +4,6 @@ services: build: . working_dir: /go/src/github.com/eventials/goevents command: dockerize -wait tcp://broker:5672 -timeout 60s go run main.go - environment: - APP_AMQP_URL: 'amqp://guest:guest@broker:5672/' - APP_AMQP_AUTOACK: 'false' - APP_AMQP_EXCHANGE: event_dispatcher - APP_AMQP_QUEUE: webhooks links: - broker volumes: