From 65b5375e65f6847878f07cbdf26eb5f0ab82f21f Mon Sep 17 00:00:00 2001 From: Guilherme Date: Mon, 9 Oct 2017 19:33:27 -0300 Subject: [PATCH] wait for rabbitmq ack --- amqp/consumer.go | 32 +++++++++++++-- amqp/consumer_test.go | 5 +-- amqp/producer.go | 95 ++++++++++++++++++++++++++----------------- 3 files changed, 88 insertions(+), 44 deletions(-) diff --git a/amqp/consumer.go b/amqp/consumer.go index 06465e2..da26074 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -83,6 +83,12 @@ func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue str } func (c *Consumer) Close() { + c.m.Lock() + defer c.m.Unlock() + + // Unsubscribe all handlers + c.handlers = make([]handler, 0) + c.closed = true c.channel.Close() } @@ -215,10 +221,15 @@ func (c *Consumer) dispatch(msg amqplib.Delivery) { c.doDispatch(msg, h, retryCount, delay) } else { - // got wrong message? - // ignore and don't requeue. if !c.autoAck { - msg.Nack(false, false) + err := msg.Nack(false, true) + + if err != nil { + logger.WithFields(log.Fields{ + "error": err, + "message_id": msg.MessageId, + }).Error("Failed to nack message.") + } } } } @@ -322,6 +333,9 @@ func (c *Consumer) requeueMessage(msg amqplib.Delivery) { } func (c *Consumer) getHandler(msg amqplib.Delivery) (*handler, bool) { + c.m.Lock() + defer c.m.Unlock() + action := getAction(msg) for _, h := range c.handlers { @@ -442,10 +456,20 @@ func (c *Consumer) Consume() { "queue": c.queueName, }).Info("Consuming messages...") + wg := &sync.WaitGroup{} + for m := range msgs { - go c.dispatch(m) + logger.Info("Received from channel.") + wg.Add(1) + go func(msg amqplib.Delivery) { + c.dispatch(msg) + wg.Done() + }(m) } + // Wait all go routine finish. + wg.Wait() + logger.WithFields(log.Fields{ "queue": c.queueName, "closed": c.closed, diff --git a/amqp/consumer_test.go b/amqp/consumer_test.go index 3db8529..e1934f3 100644 --- a/amqp/consumer_test.go +++ b/amqp/consumer_test.go @@ -2,11 +2,10 @@ package amqp import ( "fmt" - "testing" - "time" - "github.com/eventials/goevents/messaging" "github.com/stretchr/testify/assert" + "testing" + "time" ) func TestSubscribeActions(t *testing.T) { diff --git a/amqp/producer.go b/amqp/producer.go index 07ab058..fdc40af 100644 --- a/amqp/producer.go +++ b/amqp/producer.go @@ -1,15 +1,20 @@ package amqp import ( + "errors" + "fmt" + "github.com/eventials/goevents/messaging" "sync" "time" - "github.com/eventials/goevents/messaging" - log "github.com/sirupsen/logrus" amqplib "github.com/streadway/amqp" ) +var ( + ErrNotAcked = errors.New("Messge was not acked") +) + type message struct { action string data []byte @@ -27,8 +32,6 @@ type Producer struct { ackChannel chan uint64 nackChannel chan uint64 - channel *amqplib.Channel - exchangeName string closed bool @@ -80,6 +83,9 @@ func (p *Producer) NotifyClose() <-chan bool { // Close the producer's internal queue. func (p *Producer) Close() { + p.m.Lock() + defer p.m.Unlock() + p.closed = true close(p.internalQueue) } @@ -95,17 +101,15 @@ func (p *Producer) setupTopology() error { var err error - p.channel, err = p.conn.OpenChannel() - p.ackChannel, p.nackChannel = p.channel.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1)) - - // put the channel in confirm mode. - p.channel.Confirm(false) + channel, err := p.conn.OpenChannel() if err != nil { return err } - err = p.channel.ExchangeDeclare( + defer channel.Close() + + err = channel.ExchangeDeclare( p.exchangeName, // name "topic", // type true, // durable @@ -148,9 +152,45 @@ func (p *Producer) handleReestablishedConnnection() { } } +func (p *Producer) publishMessage(msg amqplib.Publishing, action string) error { + channel, err := p.conn.OpenChannel() + + if err != nil { + return err + } + + defer channel.Close() + + if err := channel.Confirm(false); err != nil { + return fmt.Errorf("Channel could not be put into confirm mode: %s", err) + } + + confirms := channel.NotifyPublish(make(chan amqplib.Confirmation, 1)) + + err = channel.Publish(p.exchangeName, action, false, false, msg) + + if err != nil { + return err + } else { + if confirmed := <-confirms; !confirmed.Ack { + return ErrNotAcked + } + } + + return nil +} + +func (p *Producer) isClosed() bool { + p.m.Lock() + defer p.m.Unlock() + + return p.closed +} + func (p *Producer) drainInternalQueue() { for m := range p.internalQueue { - for i := 0; !p.closed; i++ { + var retry = true + for retry && !p.isClosed() { messageId, _ := NewUUIDv4() msg := amqplib.Publishing{ @@ -160,45 +200,26 @@ func (p *Producer) drainInternalQueue() { Body: m.data, } - err := func() error { - p.m.Lock() - defer p.m.Unlock() + log.WithFields(log.Fields{ + "type": "goevents", + "sub_type": "producer", + }).Info("Publishing message to the exchange.") - log.WithFields(log.Fields{ - "type": "goevents", - "sub_type": "producer", - "attempt": i, - }).Debug("Publishing message to the exchange.") - - return p.channel.Publish(p.exchangeName, m.action, false, false, msg) - }() + err := p.publishMessage(msg, m.action) if err != nil { log.WithFields(log.Fields{ "type": "goevents", "sub_type": "producer", "error": err, - "attempt": i, }).Error("Error publishing message to the exchange. Retrying...") time.Sleep(p.config.publishInterval) continue - } - - select { - case <-p.ackChannel: - goto outer // 😈 - case <-p.nackChannel: - log.WithFields(log.Fields{ - "type": "goevents", - "sub_type": "producer", - "attempt": i, - }).Error("Error publishing message to the exchange. Retrying...") - - time.Sleep(p.config.publishInterval) + } else { + retry = false } } - outer: } for _, c := range p.closes {