1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

Merge pull request #5 from skrater/master

Refactor to deal connection close
This commit is contained in:
Guilherme Emilio Raduenz 2016-12-28 10:25:50 -02:00 committed by GitHub
commit 9e17b09215
9 changed files with 265 additions and 153 deletions

View File

@ -20,13 +20,13 @@ Another application consume these events and maybe create new events.
**The consumer**
```go
conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/", "events-queue", "events-exchange")
conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/")
if err != nil {
panic(err)
}
c, err := NewConsumer(conn, false)
c, err := NewConsumer(conn, false, "events-exchange", "events-queue")
if err != nil {
panic(err)
@ -36,18 +36,22 @@ c.Subscribe("object.*", func(body []byte) bool {
fmt.Println(body)
return true
})
go c.Consume()
conn.WaitUntilConnectionClose()
```
**The producer**
```go
conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/", "events-queue", "events-exchange")
conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/")
if err != nil {
panic(err)
}
p, err := NewProducer(conn)
p, err := NewProducer(conn, "events-exchange", "events-queue")
if err != nil {
panic(err)

View File

@ -1,82 +1,53 @@
package amqp
import (
"errors"
"github.com/eventials/goevents/messaging"
amqplib "github.com/streadway/amqp"
)
type Connection struct {
connection *amqplib.Connection
channel *amqplib.Channel
queue *amqplib.Queue
exchangeName string
queueName string
}
// NewConnection returns an AMQP Connection.
func NewConnection(url, exchange, queue string) (messaging.Connection, error) {
func NewConnection(url string) (messaging.Connection, error) {
conn, err := amqplib.Dial(url)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, err
}
err = ch.ExchangeDeclare(
exchange, // name
"topic", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
q, err := ch.QueueDeclare(
queue, // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
return &Connection{
conn,
ch,
&q,
exchange,
queue,
}, nil
}
func (c *Connection) NotifyConnectionClose() <-chan error {
ch := make(chan error)
go func() {
ch <- errors.New((<-c.connection.NotifyClose(make(chan *amqplib.Error))).Error())
}()
return ch
}
// Consumer returns an AMQP Consumer.
func (c *Connection) Consumer(autoAck bool) (messaging.Consumer, error) {
return NewConsumer(c, autoAck)
func (c *Connection) Consumer(autoAck bool, exchange, queue string) (messaging.Consumer, error) {
return NewConsumer(c, autoAck, exchange, queue)
}
// Producer returns an AMQP Producer.
func (c *Connection) Producer() (messaging.Producer, error) {
return NewProducer(c)
func (c *Connection) Producer(exchange, queue string) (messaging.Producer, error) {
return NewProducer(c, exchange, queue)
}
// Close closes the AMQP connection.
func (c *Connection) Close() {
c.channel.Close()
c.connection.Close()
}
func (c *Connection) WaitUntilConnectionCloses() {
<-c.NotifyConnectionClose()
}

View File

@ -19,19 +19,66 @@ type Consumer struct {
conn *Connection
autoAck bool
handlers []handler
channel *amqplib.Channel
queue *amqplib.Queue
exchangeName string
queueName string
}
// NewConsumer returns a new AMQP Consumer.
func NewConsumer(c messaging.Connection, autoAck bool) (messaging.Consumer, error) {
func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (messaging.Consumer, error) {
amqpConn := c.(*Connection)
ch, err := amqpConn.connection.Channel()
if err != nil {
return nil, err
}
err = ch.ExchangeDeclare(
exchange, // name
"topic", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
q, err := ch.QueueDeclare(
queue, // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
return &Consumer{
amqpConn,
autoAck,
make([]handler, 0),
ch,
&q,
exchange,
queue,
}, nil
}
func (c *Consumer) Close() {
c.channel.Close()
}
func (c *Consumer) dispatch(msg amqplib.Delivery) {
if fn, ok := c.getHandler(msg.RoutingKey); ok {
defer func() {
@ -78,18 +125,12 @@ func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) er
return err
}
for _, h := range c.handlers {
if h.action == action {
// return fmt.Errorf("Action '%s' already registered.", action)
}
}
err = c.conn.channel.QueueBind(
c.conn.queueName, // queue name
action, // routing key
c.conn.exchangeName, // exchange
false, // no-wait
nil, // arguments
err = c.channel.QueueBind(
c.queueName, // queue name
action, // routing key
c.exchangeName, // exchange
false, // no-wait
nil, // arguments
)
if err != nil {
@ -107,11 +148,11 @@ func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) er
// Unsubscribe allows to unsubscribe an action handler.
func (c *Consumer) Unsubscribe(action string) error {
err := c.conn.channel.QueueUnbind(
c.conn.queueName, // queue name
action, // routing key
c.conn.exchangeName, // exchange
nil, // arguments
err := c.channel.QueueUnbind(
c.queueName, // queue name
action, // routing key
c.exchangeName, // exchange
nil, // arguments
)
if err != nil {
@ -135,37 +176,24 @@ func (c *Consumer) Unsubscribe(action string) error {
}
// Listen start to listen for new messages.
func (c *Consumer) Listen() error {
msgs, err := c.conn.channel.Consume(
c.conn.queueName, // queue
"", // consumer
c.autoAck, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
func (c *Consumer) Consume() error {
msgs, err := c.channel.Consume(
c.queueName, // queue
"", // consumer
c.autoAck, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
return err
}
go func() {
for m := range msgs {
c.dispatch(m)
}
}()
for m := range msgs {
c.dispatch(m)
}
return nil
}
// ListenForever start to listen for new messages and locks the current thread.
func (c *Consumer) ListenForever() error {
err := c.Listen()
if err != nil {
return err
}
select {}
}

View File

@ -11,20 +11,22 @@ func TestPublishConsume(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "events-queue", "events-exchange")
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
amqpConn := conn.(*Connection)
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
c, err := NewConsumer(conn, false)
c, err := NewConsumer(conn, false, "events-exchange", "events-queue")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action_1", func(body []byte) bool {
func1 <- true
return true
@ -35,9 +37,9 @@ func TestPublishConsume(t *testing.T) {
return true
})
c.Listen()
go c.Consume()
p, err := NewProducer(conn)
p, err := NewProducer(conn, "events-exchange", "events-queue")
assert.Nil(t, err)
@ -58,20 +60,22 @@ func TestPublishConsumeWildcardAction(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
amqpConn := conn.(*Connection)
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
c, err := NewConsumer(conn, false)
c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) bool {
func1 <- true
return true
@ -82,9 +86,9 @@ func TestPublishConsumeWildcardAction(t *testing.T) {
return true
})
c.Listen()
go c.Consume()
p, err := NewProducer(conn)
p, err := NewProducer(conn, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
@ -105,20 +109,22 @@ func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
amqpConn := conn.(*Connection)
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
c, err := NewConsumer(conn, false)
c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.*", func(body []byte) bool {
func1 <- true
return true
@ -129,9 +135,9 @@ func TestPublishConsumeWildcardActionOrderMatters1(t *testing.T) {
return true
})
c.Listen()
go c.Consume()
p, err := NewProducer(conn)
p, err := NewProducer(conn, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
@ -152,20 +158,22 @@ func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
amqpConn := conn.(*Connection)
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
c, err := NewConsumer(conn, false)
c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("webinar.state_changed", func(body []byte) bool {
func1 <- true
return true
@ -176,9 +184,9 @@ func TestPublishConsumeWildcardActionOrderMatters2(t *testing.T) {
return true
})
c.Listen()
go c.Consume()
p, err := NewProducer(conn)
p, err := NewProducer(conn, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
@ -199,20 +207,22 @@ func TestPublishConsumeRequeueIfFail(t *testing.T) {
calledOnce := false
called := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
amqpConn := conn.(*Connection)
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
c, err := NewConsumer(conn, false)
c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action", func(body []byte) bool {
if calledOnce {
called <- true
@ -223,9 +233,9 @@ func TestPublishConsumeRequeueIfFail(t *testing.T) {
}
})
c.Listen()
go c.Consume()
p, err := NewProducer(conn)
p, err := NewProducer(conn, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
@ -244,20 +254,22 @@ func TestPublishConsumeRequeueIfPanic(t *testing.T) {
calledOnce := false
called := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
assert.Nil(t, err)
defer conn.Close()
// Clean all messages if any...
amqpConn := conn.(*Connection)
amqpConn.channel.QueuePurge(amqpConn.queueName, false)
c, err := NewConsumer(conn, false)
c, err := NewConsumer(conn, false, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)
defer c.Close()
// Clean all messages if any...
consumer := c.(*Consumer)
consumer.channel.QueuePurge(consumer.queueName, false)
c.Subscribe("my_action", func(body []byte) bool {
if calledOnce {
called <- true
@ -268,9 +280,9 @@ func TestPublishConsumeRequeueIfPanic(t *testing.T) {
}
})
c.Listen()
go c.Consume()
p, err := NewProducer(conn)
p, err := NewProducer(conn, "webhooks", "event_PublishConsumeer")
assert.Nil(t, err)

View File

@ -10,14 +10,57 @@ import (
type Producer struct {
conn *Connection
channel *amqplib.Channel
queue *amqplib.Queue
exchangeName string
queueName string
}
// NewProducer returns a new AMQP Producer.
func NewProducer(c messaging.Connection) (messaging.Producer, error) {
func NewProducer(c messaging.Connection, exchange, queue string) (messaging.Producer, error) {
amqpConn := c.(*Connection)
ch, err := amqpConn.connection.Channel()
if err != nil {
return nil, err
}
err = ch.ExchangeDeclare(
exchange, // name
"topic", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
q, err := ch.QueueDeclare(
queue, // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
return &Producer{
amqpConn,
ch,
&q,
exchange,
queue,
}, nil
}
@ -29,5 +72,5 @@ func (p *Producer) Publish(action string, data []byte) error {
Body: data,
}
return p.conn.channel.Publish(p.conn.exchangeName, action, false, false, msg)
return p.channel.Publish(p.exchangeName, action, false, false, msg)
}

View File

@ -3,12 +3,12 @@ services:
app:
build: .
working_dir: /go/src/github.com/eventials/goevents
command: dockerize -wait tcp://broker:5672 -timeout 60s go run main.go
command: dockerize -wait tcp://broker:5672 -timeout 60s go run example/consumer.go
links:
- broker
volumes:
- .:/go/src/github.com/eventials/goevents
broker:
image: rabbitmq:3.6-management
# ports:
# - "15672:15672"
ports:
- "15672:15672"

52
example/consumer.go Normal file
View File

@ -0,0 +1,52 @@
package main
import (
"fmt"
"github.com/eventials/goevents/amqp"
)
func main() {
conn, err := amqp.NewConnection("amqp://guest:guest@broker:5672/")
if err != nil {
panic(err)
}
consumerA, err := conn.Consumer(false, "events-exchange", "events-queue-a")
if err != nil {
panic(err)
}
consumerA.Subscribe("object.eventA", func(body []byte) bool {
fmt.Println("object.eventA:", string(body))
return true
})
consumerA.Subscribe("object.eventB", func(body []byte) bool {
fmt.Println("object.eventB:", string(body))
return true
})
consumerB, err := conn.Consumer(false, "events-exchange", "events-queue-b")
if err != nil {
panic(err)
}
consumerB.Subscribe("object.eventC", func(body []byte) bool {
fmt.Println("object.eventC:", string(body))
return true
})
consumerB.Subscribe("object.eventD", func(body []byte) bool {
fmt.Println("object.eventD:", string(body))
return true
})
go consumerA.Consume()
go consumerB.Consume()
fmt.Println("Waiting messages")
conn.WaitUntilConnectionCloses()
}

View File

@ -1,7 +1,9 @@
package messaging
type Connection interface {
Consumer(autoAck bool) (Consumer, error)
Producer() (Producer, error)
Consumer(autoAck bool, exchange, queue string) (Consumer, error)
Producer(exchange, queue string) (Producer, error)
Close()
NotifyConnectionClose() <-chan error
WaitUntilConnectionCloses()
}

View File

@ -5,6 +5,6 @@ type EventHandler func(body []byte) bool
type Consumer interface {
Subscribe(action string, handler EventHandler) error
Unsubscribe(action string) error
Listen() error
ListenForever() error
Consume() error
Close()
}