diff --git a/.travis.yml b/.travis.yml index 836e386..4ece792 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,8 @@ language: go go: - - "1.9" - - "1.10" + - "1.11" + - "1.12" - "tip" -install: - - go get github.com/streadway/amqp - - go get github.com/stretchr/testify - - go get github.com/sirupsen/logrus - - go get github.com/aws/aws-sdk-go/service/sqs services: - rabbitmq addons: diff --git a/Dockerfile b/Dockerfile index f673e28..62fba49 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,24 +1,12 @@ FROM golang:1.12 -ARG PLATFORM=linux -ENV PLATFORM $PLATFORM_VERSION +RUN DOCKERIZE_VERSION=v0.6.1 \ + && wget --no-check-certificate https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-alpine-linux-amd64-$DOCKERIZE_VERSION.tar.gz \ + && tar -C /usr/local/bin -xzvf dockerize-alpine-linux-amd64-$DOCKERIZE_VERSION.tar.gz \ + && rm dockerize-alpine-linux-amd64-$DOCKERIZE_VERSION.tar.gz \ + && mkdir -p /goevents -ARG ARCH=amd64 -ENV ARCH $ARCH_VERSION - -ARG DOCKERIZE_VERSION=v0.6.1 -ENV DOCKERIZE_VERSION $DOCKERIZE_VERSION - -ADD https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-$PLATFORM-$ARCH-$DOCKERIZE_VERSION.tar.gz /usr/local/bin - -RUN cd /usr/local/bin \ - && tar -xzf ./dockerize-$PLATFORM-$ARCH-$DOCKERIZE_VERSION.tar.gz \ - && rm -f ./dockerize-$PLATFORM-$ARCH-$DOCKERIZE_VERSION.tar.gz - -RUN mkdir -p /go/src/github.com/eventials/goevents -WORKDIR /go/src/github.com/eventials/goevents - -RUN go mod download +WORKDIR /goevents ENTRYPOINT ["dockerize"] diff --git a/README.md b/README.md index 3a57c9c..3bd87c4 100644 --- a/README.md +++ b/README.md @@ -26,12 +26,16 @@ if err != nil { panic(err) } +defer conn.Close() + c, err := NewConsumer(conn, false, "events-exchange", "events-queue") if err != nil { panic(err) } +defer c.Close() + c.Subscribe("object.*", func(body []byte) bool { fmt.Println(body) return true @@ -39,7 +43,7 @@ c.Subscribe("object.*", func(body []byte) bool { go c.Consume() -conn.WaitUntilConnectionClose() +select{} ``` **The producer** @@ -51,15 +55,24 @@ if err != nil { panic(err) } +defer conn.Close() + p, err := NewProducer(conn, "events-exchange", "events-queue") if err != nil { panic(err) } +defer p.Close() + err = p.Publish("object.my_action", []byte("message")) if err != nil { panic(err) } ``` + +## Important + +When using `producer`, always close all your producers (things who call the producer.Publish) before closing the producer itself (producer.Close). +In this way, you have more garanties that your messages is delivered to RabbitMQ. diff --git a/amqp/consumer.go b/amqp/consumer.go index 4a3cb0c..697a954 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -89,16 +89,18 @@ func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue str } +func (c *consumer) closeAndClearHandlers() { + c.m.Lock() + defer c.m.Unlock() + + // Unsubscribe all handlers + c.handlers = make([]handler, 0) + + c.closed = true +} + func (c *consumer) Close() { - func() { - c.m.Lock() - defer c.m.Unlock() - - // Unsubscribe all handlers - c.handlers = make([]handler, 0) - - c.closed = true - }() + c.closeAndClearHandlers() // Wait all go routine finish. c.wg.Wait() @@ -401,6 +403,9 @@ func (c *consumer) bindActionToQueue(channel *amqplib.Channel, queueName string, } func (c *consumer) bindAllActionsQueue(channel *amqplib.Channel, queueName string) error { + c.m.RLock() + defer c.m.RUnlock() + for _, h := range c.handlers { err := c.bindActionToQueue(channel, queueName, h.action) if err != nil { @@ -506,6 +511,13 @@ func (c *consumer) doConsume() error { return nil } +func (c *consumer) isClosed() bool { + c.m.Lock() + defer c.m.Unlock() + + return c.closed +} + // Listen start to listen for new messages. func (c *consumer) Consume() { logger.Info("Registered handlers:") @@ -516,7 +528,7 @@ func (c *consumer) Consume() { rs := c.conn.NotifyReestablish() - for !c.closed { + for !c.isClosed() { if !c.conn.IsConnected() { logger.Info("Connection not established. Waiting connection to be reestablished.") diff --git a/amqp/producer.go b/amqp/producer.go index 45aac00..c5a1f69 100644 --- a/amqp/producer.go +++ b/amqp/producer.go @@ -23,18 +23,17 @@ type message struct { // producer holds a amqp connection and channel to publish messages to. type producer struct { - m sync.Mutex - conn *connection - channel *amqp.Channel - notifyConfirm chan amqp.Confirmation - - config ProducerConfig + m sync.Mutex + wg sync.WaitGroup + conn *connection + channel *amqplib.Channel + notifyConfirm chan amqplib.Confirmation + connectionClosed <-chan error + closeQueue chan bool + config ProducerConfig internalQueue chan message - ackChannel chan uint64 - nackChannel chan uint64 - exchangeName string closed bool @@ -56,12 +55,16 @@ func NewProducer(c messaging.Connection, exchange string) (*producer, error) { // NewProducerConfig returns a new AMQP Producer. func NewProducerConfig(c messaging.Connection, exchange string, config ProducerConfig) (*producer, error) { + conn := c.(*connection) + producer := &producer{ - conn: c.(*connection), - config: config, - internalQueue: make(chan message), - exchangeName: exchange, - notifyConfirm: make(chan amqp.Confirmation), + conn: c.(*connection), + config: config, + internalQueue: make(chan message, 2), + exchangeName: exchange, + notifyConfirm: make(chan amqplib.Confirmation), + closeQueue: make(chan bool), + connectionClosed: conn.NotifyConnectionClose(), } err := producer.setupTopology() @@ -71,13 +74,17 @@ func NewProducerConfig(c messaging.Connection, exchange string, config ProducerC } go producer.handleReestablishedConnnection() - go producer.drainInternalQueue() return producer, err } // Publish publishes an action. func (p *producer) Publish(action string, data []byte) { + // ignore messages published to a closed producer + if p.isClosed() { + return + } + messageID, _ := NewUUIDv4() p.publishAmqMessage(action, amqplib.Publishing{ @@ -89,6 +96,8 @@ func (p *producer) Publish(action string, data []byte) { } func (p *producer) publishAmqMessage(queue string, msg amqplib.Publishing) { + p.wg.Add(1) + p.internalQueue <- message{ action: queue, msg: msg, @@ -106,22 +115,32 @@ func (p *producer) NotifyClose() <-chan bool { return receiver } -// Close the producer's internal queue. -func (p *producer) Close() { +func (p *producer) setClosed() { p.m.Lock() defer p.m.Unlock() p.closed = true +} + +// Close the producer's internal queue. +func (p *producer) Close() { + p.setClosed() + + p.wg.Wait() + + p.closeQueue <- true + close(p.internalQueue) + close(p.closeQueue) p.channel.Close() } // changeChannel takes a new channel to the queue, // and updates the channel listeners to reflect this. -func (p *producer) changeChannel(channel *amqp.Channel) { +func (p *producer) changeChannel(channel *amqplib.Channel) { p.channel = channel - p.notifyConfirm = make(chan amqp.Confirmation) + p.notifyConfirm = make(chan amqplib.Confirmation) p.channel.NotifyPublish(p.notifyConfirm) } @@ -172,7 +191,9 @@ func (p *producer) setupTopology() error { log.WithFields(log.Fields{ "type": "goevents", "sub_type": "producer", - }).Debug("Topology ready.") + }).Debug("Topology ready. Draining internal queue.") + + go p.drainInternalQueue() return nil } @@ -180,7 +201,7 @@ func (p *producer) setupTopology() error { func (p *producer) handleReestablishedConnnection() { rs := p.conn.NotifyReestablish() - for !p.closed { + for !p.isClosed() { <-rs err := p.setupTopology() @@ -191,11 +212,22 @@ func (p *producer) handleReestablishedConnnection() { "sub_type": "producer", "error": err, }).Error("Error setting up topology after reconnection.") + + continue } } } func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err error) { + log.WithFields(log.Fields{ + "action": queue, + "body": msg.Body, + "message_id": msg.MessageId, + "type": "goevents", + "sub_type": "producer", + "exchange": p.exchangeName, + }).Debug("Publishing message to the exchange.") + defer func() { if r := recover(); r != nil { debug.PrintStack() @@ -243,22 +275,15 @@ func (p *producer) isClosed() bool { } func (p *producer) drainInternalQueue() { - for m := range p.internalQueue { - var retry = true - - for retry && !p.isClosed() { - log.WithFields(log.Fields{ - "action": m.action, - "body": m.msg.Body, - "message_id": m.msg.MessageId, - "type": "goevents", - "sub_type": "producer", - "exchange": p.exchangeName, - }).Debug("Publishing message to the exchange.") - + for { + select { + case <-p.closeQueue: + return + case <-p.connectionClosed: + return + case m := <-p.internalQueue: // block until confirmation err := p.publishMessage(m.msg, m.action) - retry = err != nil if err != nil { log.WithFields(log.Fields{ @@ -269,6 +294,10 @@ func (p *producer) drainInternalQueue() { "type": "goevents", "sub_type": "producer", }).Error("Error publishing message to the exchange. Retrying...") + + p.internalQueue <- m + } else { + p.wg.Done() } } } diff --git a/docker-compose.yml b/docker-compose.yml index 253f3fb..d68328c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,7 +7,7 @@ services: depends_on: - broker volumes: - - .:/go/src/github.com/eventials/goevents + - .:/goevents broker: image: rabbitmq:3.6-management diff --git a/examples/producer/amqp/producer.go b/examples/producer/amqp/producer.go index 366c0ba..9e71ee9 100644 --- a/examples/producer/amqp/producer.go +++ b/examples/producer/amqp/producer.go @@ -5,9 +5,9 @@ import ( "os/signal" "sync" "syscall" - "time" "fmt" + "github.com/eventials/goevents/amqp" ) @@ -28,7 +28,6 @@ func main() { } producerB, err := amqp.NewProducer(conn, "events-exchange") - wg.Add(1) if err != nil { panic(err) @@ -36,23 +35,26 @@ func main() { go func() { for { - producerA.Publish("object.eventA", []byte("some data")) - producerB.Publish("object.eventC", []byte("some data")) - - time.Sleep(1 * time.Second) + select { + case <-producerA.NotifyClose(): + fmt.Println("ProducerA closed for good") + return + default: + producerA.Publish("object.eventA", []byte("some data")) + } } }() go func() { - <-producerA.NotifyClose() - fmt.Println("Producer closed for good") - wg.Done() - }() - - go func() { - <-producerB.NotifyClose() - fmt.Println("Producer closed for good") - wg.Done() + for { + select { + case <-producerB.NotifyClose(): + fmt.Println("ProducerB closed for good") + return + default: + producerB.Publish("object.eventC", []byte("some data")) + } + } }() sigc := make(chan os.Signal, 1) @@ -66,6 +68,4 @@ func main() { fmt.Println("Closing producerB") producerB.Close() - - wg.Wait() } diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d7f5f17 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/eventials/goevents + +go 1.12 + +require ( + github.com/aws/aws-sdk-go v1.18.0 + github.com/sirupsen/logrus v1.4.0 + github.com/streadway/amqp v0.0.0-20190312002841-61ee40d2027b + github.com/stretchr/testify v1.3.0 + golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..fcb87b0 --- /dev/null +++ b/go.sum @@ -0,0 +1,33 @@ +github.com/aws/aws-sdk-go v1.18.0 h1:CXoiktHzavv3gwZsVPGuyTXpHg6dd1ssrGKMQuQNrdY= +github.com/aws/aws-sdk-go v1.18.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE= +github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/streadway/amqp v0.0.0-20190312002841-61ee40d2027b h1:VPo/aUrW0PUdrSwI0UWPY/zXBoLg78fEXLPSsOqngk4= +github.com/streadway/amqp v0.0.0-20190312002841-61ee40d2027b/go.mod h1:1WNBiOZtZQLpVAyu0iTduoJL9hEsMloAK5XWrtW0xdY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=