1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

Merge pull request #1 from alexandrevicenzi/master

Go messaging library.
This commit is contained in:
Germano Fronza 2016-11-28 15:37:57 -02:00 committed by GitHub
commit 4477dc24e8
6 changed files with 548 additions and 0 deletions

12
Dockerfile Normal file
View File

@ -0,0 +1,12 @@
FROM golang:1.7
RUN apt-get update && apt-get install -y wget
RUN wget https://github.com/jwilder/dockerize/releases/download/v0.2.0/dockerize-linux-amd64-v0.2.0.tar.gz
RUN tar -C /usr/local/bin -xzvf dockerize-linux-amd64-v0.2.0.tar.gz
RUN mkdir -p /go/src/github.com/eventials/goevents
WORKDIR /go/src/github.com/eventials/goevents
RUN go get github.com/eventials/golog
RUN go get github.com/streadway/amqp
RUN go get github.com/stretchr/testify

76
connection.go Normal file
View File

@ -0,0 +1,76 @@
package events
import (
"github.com/streadway/amqp"
)
type Connection struct {
connection *amqp.Connection
channel *amqp.Channel
queue *amqp.Queue
exchangeName string
queueName string
}
func NewConnection(url, exchange, queue string) (*Connection, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, err
}
err = ch.ExchangeDeclare(
exchange, // name
"topic", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
q, err := ch.QueueDeclare(
queue, // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
return &Connection{
conn,
ch,
&q,
exchange,
queue,
}, nil
}
func (c *Connection) Consumer(autoAck bool) *Consumer {
return NewConsumer(c, autoAck)
}
func (c *Connection) Producer() *Producer {
return NewProducer(c)
}
func (c *Connection) Close() {
c.channel.Close()
c.connection.Close()
}

163
consumer.go Normal file
View File

@ -0,0 +1,163 @@
package events
import (
"github.com/streadway/amqp"
"regexp"
"strings"
)
type eventHandler func(body []byte) bool
type handler struct {
action string
handler eventHandler
re *regexp.Regexp
}
type Consumer struct {
conn *Connection
autoAck bool
handlers []handler
}
func NewConsumer(c *Connection, autoAck bool) *Consumer {
return &Consumer{
c,
autoAck,
make([]handler, 0),
}
}
func (c *Consumer) dispatch(msg amqp.Delivery) {
if fn, ok := c.getHandler(msg.RoutingKey); ok {
defer func() {
if err := recover(); err != nil {
if !c.autoAck {
msg.Nack(false, true)
}
}
}()
ok := fn(msg.Body)
if !c.autoAck {
if ok {
msg.Ack(false)
} else {
msg.Nack(false, true)
}
}
} else {
// got a message from wrong exchange?
// ignore and don't requeue.
msg.Nack(false, false)
}
}
func (c *Consumer) getHandler(action string) (eventHandler, bool) {
for _, h := range c.handlers {
if h.re.MatchString(action) {
return h.handler, true
}
}
return nil, false
}
func (c *Consumer) Subscribe(action string, handlerFn eventHandler) error {
// TODO: Replace # pattern too.
pattern := strings.Replace(action, "*", "(.*)", 0)
re, err := regexp.Compile(pattern)
if err != nil {
return err
}
for _, h := range c.handlers {
if h.action == action {
// return fmt.Errorf("Action '%s' already registered.", action)
}
}
err = c.conn.channel.QueueBind(
c.conn.queueName, // queue name
action, // routing key
c.conn.exchangeName, // exchange
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
c.handlers = append(c.handlers, handler{
action,
handlerFn,
re,
})
return nil
}
func (c *Consumer) Unsubscribe(action string) error {
err := c.conn.channel.QueueUnbind(
c.conn.queueName, // queue name
action, // routing key
c.conn.exchangeName, // exchange
nil, // arguments
)
if err != nil {
return err
}
idx := -1
for i, h := range c.handlers {
if h.action == action {
idx = i
break
}
}
if idx != -1 {
c.handlers = append(c.handlers[:idx], c.handlers[idx+1:]...)
}
return nil
}
func (c *Consumer) Listen() error {
msgs, err := c.conn.channel.Consume(
c.conn.queueName, // queue
"", // consumer
c.autoAck, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
return err
}
go func() {
for m := range msgs {
c.dispatch(m)
}
}()
return nil
}
func (c *Consumer) ListenForever() error {
err := c.Listen()
if err != nil {
return err
}
select {}
}

14
docker-compose.yml Normal file
View File

@ -0,0 +1,14 @@
version: '2'
services:
app:
build: .
working_dir: /go/src/github.com/eventials/goevents
command: dockerize -wait tcp://broker:5672 -timeout 60s go run main.go
links:
- broker
volumes:
- .:/go/src/github.com/eventials/goevents
broker:
image: rabbitmq:3.6-management
# ports:
# - "15672:15672"

256
integration_test.go Normal file
View File

@ -0,0 +1,256 @@
package events
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestPublishConsume(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
conn.channel.QueuePurge(conn.queueName, false)
c := NewConsumer(conn, false)
c.Subscribe("my_action_1", func(body []byte) bool {
func1 <- true
return true
})
c.Subscribe("my_action_2", func(body []byte) bool {
func2 <- true
return true
})
c.Listen()
p := NewProducer(conn)
err = p.Publish("my_action_1", []byte(""))
assert.Nil(t, err)
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestPublishConsumeWildcardAction(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
conn.channel.QueuePurge(conn.queueName, false)
c := NewConsumer(conn, false)
c.Subscribe("webinar.*", func(body []byte) bool {
func1 <- true
return true
})
c.Subscribe("foobar.*", func(body []byte) bool {
func2 <- true
return true
})
c.Listen()
p := NewProducer(conn)
err = p.Publish("webinar.state_changed", []byte(""))
assert.Nil(t, err)
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
conn.channel.QueuePurge(conn.queueName, false)
c := NewConsumer(conn, false)
c.Subscribe("webinar.*", func(body []byte) bool {
func1 <- true
return true
})
c.Subscribe("webinar.state_changed", func(body []byte) bool {
func2 <- true
return true
})
c.Listen()
p := NewProducer(conn)
err = p.Publish("webinar.state_changed", []byte(""))
assert.Nil(t, err)
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
conn.channel.QueuePurge(conn.queueName, false)
c := NewConsumer(conn, false)
c.Subscribe("webinar.state_changed", func(body []byte) bool {
func1 <- true
return true
})
c.Subscribe("webinar.*", func(body []byte) bool {
func2 <- true
return true
})
c.Listen()
p := NewProducer(conn)
err = p.Publish("webinar.state_changed", []byte(""))
assert.Nil(t, err)
select {
case <-func1:
case <-func2:
assert.Fail(t, "called wrong action")
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestPublishConsumeRequeueIfFail(t *testing.T) {
calledOnce := false
called := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
conn.channel.QueuePurge(conn.queueName, false)
c := NewConsumer(conn, false)
c.Subscribe("my_action", func(body []byte) bool {
if calledOnce {
called <- true
return true
} else {
calledOnce = true
return false
}
})
c.Listen()
p := NewProducer(conn)
err = p.Publish("my_action", []byte(""))
assert.Nil(t, err)
select {
case <-called:
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}
func TestPublishConsumeRequeueIfPanic(t *testing.T) {
calledOnce := false
called := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
conn.channel.QueuePurge(conn.queueName, false)
c := NewConsumer(conn, false)
c.Subscribe("my_action", func(body []byte) bool {
if calledOnce {
called <- true
return true
} else {
calledOnce = true
panic("this is a panic!")
}
})
c.Listen()
p := NewProducer(conn)
err = p.Publish("my_action", []byte(""))
assert.Nil(t, err)
select {
case <-called:
case <-time.After(5 * time.Second):
assert.Fail(t, "timed out")
}
}

27
producer.go Normal file
View File

@ -0,0 +1,27 @@
package events
import (
"time"
"github.com/streadway/amqp"
)
type Producer struct {
conn *Connection
}
func NewProducer(c *Connection) *Producer {
return &Producer{
c,
}
}
func (p *Producer) Publish(action string, data []byte) error {
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Body: data,
}
return p.conn.channel.Publish(p.conn.exchangeName, action, false, false, msg)
}