1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

Merge pull request #45 from skrater/master

AMQP Fixes
This commit is contained in:
Guilherme Emilio Raduenz 2019-03-12 18:23:29 -03:00 committed by GitHub
commit cad296e0e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 216 additions and 126 deletions

View File

@ -1,13 +1,8 @@
language: go
go:
- "1.9"
- "1.10"
- "1.11"
- "1.12"
- "tip"
install:
- go get github.com/streadway/amqp
- go get github.com/stretchr/testify
- go get github.com/sirupsen/logrus
- go get github.com/aws/aws-sdk-go/service/sqs
services:
- rabbitmq
addons:

View File

@ -1,28 +1,12 @@
FROM golang:1.10
FROM golang:1.12
ARG PLATFORM=linux
ENV PLATFORM $PLATFORM
RUN DOCKERIZE_VERSION=v0.6.1 \
&& wget --no-check-certificate https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-alpine-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
&& tar -C /usr/local/bin -xzvf dockerize-alpine-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
&& rm dockerize-alpine-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
&& mkdir -p /goevents
ARG ARCH=amd64
ENV ARCH $ARCH
ARG DOCKERIZE_VERSION=v0.6.1
ENV DOCKERIZE_VERSION $DOCKERIZE_VERSION
ADD https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-$PLATFORM-$ARCH-$DOCKERIZE_VERSION.tar.gz /usr/local/bin
RUN cd /usr/local/bin \
&& tar -xzf ./dockerize-$PLATFORM-$ARCH-$DOCKERIZE_VERSION.tar.gz \
&& rm -f ./dockerize-$PLATFORM-$ARCH-$DOCKERIZE_VERSION.tar.gz
RUN mkdir -p /go/src/github.com/eventials/goevents
WORKDIR /go/src/github.com/eventials/goevents
RUN go get \
github.com/streadway/amqp \
github.com/sirupsen/logrus \
github.com/stretchr/testify \
github.com/aws/aws-sdk-go/service/sqs
WORKDIR /goevents
ENTRYPOINT ["dockerize"]

View File

@ -26,12 +26,16 @@ if err != nil {
panic(err)
}
defer conn.Close()
c, err := NewConsumer(conn, false, "events-exchange", "events-queue")
if err != nil {
panic(err)
}
defer c.Close()
c.Subscribe("object.*", func(body []byte) bool {
fmt.Println(body)
return true
@ -39,7 +43,7 @@ c.Subscribe("object.*", func(body []byte) bool {
go c.Consume()
conn.WaitUntilConnectionClose()
select{}
```
**The producer**
@ -51,15 +55,24 @@ if err != nil {
panic(err)
}
defer conn.Close()
p, err := NewProducer(conn, "events-exchange", "events-queue")
if err != nil {
panic(err)
}
defer p.Close()
err = p.Publish("object.my_action", []byte("message"))
if err != nil {
panic(err)
}
```
## Important
When using `producer`, always close all your producers (things who call the producer.Publish) before closing the producer itself (producer.Close).
In this way, you have more garanties that your messages is delivered to RabbitMQ.

View File

@ -5,6 +5,7 @@ import (
"fmt"
"os"
"regexp"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
@ -88,16 +89,18 @@ func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue str
}
func (c *consumer) closeAndClearHandlers() {
c.m.Lock()
defer c.m.Unlock()
// Unsubscribe all handlers
c.handlers = make([]handler, 0)
c.closed = true
}
func (c *consumer) Close() {
func() {
c.m.Lock()
defer c.m.Unlock()
// Unsubscribe all handlers
c.handlers = make([]handler, 0)
c.closed = true
}()
c.closeAndClearHandlers()
// Wait all go routine finish.
c.wg.Wait()
@ -140,6 +143,8 @@ func (c *consumer) dispatch(msg amqplib.Delivery) {
func (c *consumer) callAndHandlePanic(msg amqplib.Delivery, h *handler) (err error) {
defer func() {
if r := recover(); r != nil {
debug.PrintStack()
switch x := r.(type) {
case string:
err = errors.New(x)
@ -403,6 +408,9 @@ func (c *consumer) bindActionToQueue(channel *amqplib.Channel, queueName string,
}
func (c *consumer) bindAllActionsQueue(channel *amqplib.Channel, queueName string) error {
c.m.RLock()
defer c.m.RUnlock()
for _, h := range c.handlers {
err := c.bindActionToQueue(channel, queueName, h.action)
if err != nil {
@ -508,6 +516,13 @@ func (c *consumer) doConsume() error {
return nil
}
func (c *consumer) isClosed() bool {
c.m.Lock()
defer c.m.Unlock()
return c.closed
}
// Listen start to listen for new messages.
func (c *consumer) Consume() {
logger.Info("Registered handlers:")
@ -518,7 +533,7 @@ func (c *consumer) Consume() {
rs := c.conn.NotifyReestablish()
for !c.closed {
for !c.isClosed() {
if !c.conn.IsConnected() {
logger.Info("Connection not established. Waiting connection to be reestablished.")

View File

@ -3,6 +3,7 @@ package amqp
import (
"errors"
"fmt"
"runtime/debug"
"sync"
"time"
@ -12,9 +13,8 @@ import (
amqplib "github.com/streadway/amqp"
)
var (
ErrNotAcked = errors.New("Messge was not acked")
)
// ErrNotAcked indicated that published messages was not acked by RabbitMQ
var ErrNotAcked = errors.New("messge was not acked")
type message struct {
action string
@ -23,16 +23,17 @@ type message struct {
// producer holds a amqp connection and channel to publish messages to.
type producer struct {
m sync.Mutex
conn *connection
config ProducerConfig
m sync.Mutex
wg sync.WaitGroup
conn *connection
channel *amqplib.Channel
notifyConfirm chan amqplib.Confirmation
connectionClosed <-chan error
closeQueue chan bool
config ProducerConfig
internalQueue chan message
ackChannel chan uint64
nackChannel chan uint64
exchangeName string
closed bool
@ -54,31 +55,40 @@ func NewProducer(c messaging.Connection, exchange string) (*producer, error) {
// NewProducerConfig returns a new AMQP Producer.
func NewProducerConfig(c messaging.Connection, exchange string, config ProducerConfig) (*producer, error) {
conn := c.(*connection)
producer := &producer{
conn: c.(*connection),
config: config,
internalQueue: make(chan message),
exchangeName: exchange,
conn: c.(*connection),
config: config,
internalQueue: make(chan message, 2),
exchangeName: exchange,
notifyConfirm: make(chan amqplib.Confirmation),
closeQueue: make(chan bool),
connectionClosed: conn.NotifyConnectionClose(),
}
err := producer.setupTopology()
if err != nil {
return nil, err
} else {
go producer.handleReestablishedConnnection()
go producer.drainInternalQueue()
return producer, err
}
go producer.handleReestablishedConnnection()
return producer, err
}
// Publish publishes an action.
func (p *producer) Publish(action string, data []byte) {
messageId, _ := NewUUIDv4()
// ignore messages published to a closed producer
if p.isClosed() {
return
}
messageID, _ := NewUUIDv4()
p.publishAmqMessage(action, amqplib.Publishing{
MessageId: messageId,
MessageId: messageID,
DeliveryMode: amqplib.Persistent,
Timestamp: time.Now().UTC(),
Body: data,
@ -86,6 +96,8 @@ func (p *producer) Publish(action string, data []byte) {
}
func (p *producer) publishAmqMessage(queue string, msg amqplib.Publishing) {
p.wg.Add(1)
p.internalQueue <- message{
action: queue,
msg: msg,
@ -103,13 +115,33 @@ func (p *producer) NotifyClose() <-chan bool {
return receiver
}
// Close the producer's internal queue.
func (p *producer) Close() {
func (p *producer) setClosed() {
p.m.Lock()
defer p.m.Unlock()
p.closed = true
}
// Close the producer's internal queue.
func (p *producer) Close() {
p.setClosed()
p.wg.Wait()
p.closeQueue <- true
close(p.internalQueue)
close(p.closeQueue)
p.channel.Close()
}
// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (p *producer) changeChannel(channel *amqplib.Channel) {
p.channel = channel
p.notifyConfirm = make(chan amqplib.Confirmation)
p.channel.NotifyPublish(p.notifyConfirm)
}
func (p *producer) setupTopology() error {
@ -121,15 +153,17 @@ func (p *producer) setupTopology() error {
p.m.Lock()
defer p.m.Unlock()
if p.exchangeName != "" {
channel, err := p.conn.openChannel()
channel, err := p.conn.openChannel()
if err != nil {
return err
}
if p.exchangeName != "" {
if err != nil {
return err
}
defer channel.Close()
err = channel.ExchangeDeclare(
p.exchangeName, // name
"topic", // type
@ -145,10 +179,21 @@ func (p *producer) setupTopology() error {
}
}
err = channel.Confirm(false)
if err != nil {
err = fmt.Errorf("Channel could not be put into confirm mode: %s", err)
return err
}
p.changeChannel(channel)
log.WithFields(log.Fields{
"type": "goevents",
"sub_type": "producer",
}).Debug("Topology ready.")
}).Debug("Topology ready. Draining internal queue.")
go p.drainInternalQueue()
return nil
}
@ -156,23 +201,35 @@ func (p *producer) setupTopology() error {
func (p *producer) handleReestablishedConnnection() {
rs := p.conn.NotifyReestablish()
for !p.closed {
for !p.isClosed() {
<-rs
err := p.setupTopology()
if err != nil {
log.WithFields(log.Fields{
"type": "amqp",
"error": err,
"type": "goevents",
"sub_type": "producer",
"error": err,
}).Error("Error setting up topology after reconnection.")
}
}
}
func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err error) {
log.WithFields(log.Fields{
"action": queue,
"body": msg.Body,
"message_id": msg.MessageId,
"type": "goevents",
"sub_type": "producer",
"exchange": p.exchangeName,
}).Debug("Publishing message to the exchange.")
defer func() {
if r := recover(); r != nil {
debug.PrintStack()
switch x := r.(type) {
case string:
err = errors.New(x)
@ -185,36 +242,24 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
}()
if !p.conn.IsConnected() {
err = errors.New("Connection is not open.")
err = errors.New("connection is not open")
return
}
channel, err := p.conn.openChannel()
err = p.channel.Publish(p.exchangeName, queue, false, false, msg)
if err != nil {
return
}
defer channel.Close()
err = channel.Confirm(false)
if err != nil {
err = fmt.Errorf("Channel could not be put into confirm mode: %s", err)
return err
}
confirms := channel.NotifyPublish(make(chan amqplib.Confirmation, 1))
err = channel.Publish(p.exchangeName, queue, false, false, msg)
if err != nil {
return
} else {
if confirmed := <-confirms; !confirmed.Ack {
err = ErrNotAcked
select {
case confirm := <-p.notifyConfirm:
if confirm.Ack {
return
}
case <-time.After(p.config.publishInterval):
err = ErrNotAcked
return
}
return
@ -228,19 +273,14 @@ func (p *producer) isClosed() bool {
}
func (p *producer) drainInternalQueue() {
for m := range p.internalQueue {
var retry = true
for retry && !p.isClosed() {
log.WithFields(log.Fields{
"action": m.action,
"body": m.msg.Body,
"message_id": m.msg.MessageId,
"type": "goevents",
"sub_type": "producer",
"exchange": p.exchangeName,
}).Debug("Publishing message to the exchange.")
for {
select {
case <-p.closeQueue:
return
case <-p.connectionClosed:
return
case m := <-p.internalQueue:
// block until confirmation
err := p.publishMessage(m.msg, m.action)
if err != nil {
@ -253,10 +293,9 @@ func (p *producer) drainInternalQueue() {
"sub_type": "producer",
}).Error("Error publishing message to the exchange. Retrying...")
time.Sleep(p.config.publishInterval)
continue
p.internalQueue <- m
} else {
retry = false
p.wg.Done()
}
}
}

View File

@ -7,7 +7,7 @@ services:
depends_on:
- broker
volumes:
- .:/go/src/github.com/eventials/goevents
- .:/goevents
broker:
image: rabbitmq:3.6-management

View File

@ -5,9 +5,9 @@ import (
"os/signal"
"sync"
"syscall"
"time"
"fmt"
"github.com/eventials/goevents/amqp"
)
@ -28,7 +28,6 @@ func main() {
}
producerB, err := amqp.NewProducer(conn, "events-exchange")
wg.Add(1)
if err != nil {
panic(err)
@ -36,23 +35,26 @@ func main() {
go func() {
for {
producerA.Publish("object.eventA", []byte("some data"))
producerB.Publish("object.eventC", []byte("some data"))
time.Sleep(1 * time.Second)
select {
case <-producerA.NotifyClose():
fmt.Println("ProducerA closed for good")
return
default:
producerA.Publish("object.eventA", []byte("some data"))
}
}
}()
go func() {
<-producerA.NotifyClose()
fmt.Println("Producer closed for good")
wg.Done()
}()
go func() {
<-producerB.NotifyClose()
fmt.Println("Producer closed for good")
wg.Done()
for {
select {
case <-producerB.NotifyClose():
fmt.Println("ProducerB closed for good")
return
default:
producerB.Publish("object.eventC", []byte("some data"))
}
}
}()
sigc := make(chan os.Signal, 1)
@ -66,6 +68,4 @@ func main() {
fmt.Println("Closing producerB")
producerB.Close()
wg.Wait()
}

11
go.mod Normal file
View File

@ -0,0 +1,11 @@
module github.com/eventials/goevents
go 1.12
require (
github.com/aws/aws-sdk-go v1.18.0
github.com/sirupsen/logrus v1.4.0
github.com/streadway/amqp v0.0.0-20190312002841-61ee40d2027b
github.com/stretchr/testify v1.3.0
golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect
)

33
go.sum Normal file
View File

@ -0,0 +1,33 @@
github.com/aws/aws-sdk-go v1.18.0 h1:CXoiktHzavv3gwZsVPGuyTXpHg6dd1ssrGKMQuQNrdY=
github.com/aws/aws-sdk-go v1.18.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/streadway/amqp v0.0.0-20190312002841-61ee40d2027b h1:VPo/aUrW0PUdrSwI0UWPY/zXBoLg78fEXLPSsOqngk4=
github.com/streadway/amqp v0.0.0-20190312002841-61ee40d2027b/go.mod h1:1WNBiOZtZQLpVAyu0iTduoJL9hEsMloAK5XWrtW0xdY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=