From 4d54df7868f3a431642a6b9acd695501927a51ad Mon Sep 17 00:00:00 2001 From: skrater Date: Fri, 15 Mar 2019 11:27:45 -0300 Subject: [PATCH] Fix producer to no block when publish fail --- amqp/producer.go | 91 ++++++++++++++++++++++++------------------------ 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/amqp/producer.go b/amqp/producer.go index 0cb10e0..7e8a6d1 100644 --- a/amqp/producer.go +++ b/amqp/producer.go @@ -23,14 +23,13 @@ type message struct { // producer holds a amqp connection and channel to publish messages to. type producer struct { - m sync.Mutex - wg sync.WaitGroup - conn *connection - channel *amqplib.Channel - notifyConfirm chan amqplib.Confirmation - connectionClosed <-chan error - closeQueue chan bool - config ProducerConfig + m sync.Mutex + wg sync.WaitGroup + conn *connection + channel *amqplib.Channel + notifyConfirm chan amqplib.Confirmation + closeQueue chan bool + config ProducerConfig internalQueue chan message @@ -55,16 +54,13 @@ func NewProducer(c messaging.Connection, exchange string) (*producer, error) { // NewProducerConfig returns a new AMQP Producer. func NewProducerConfig(c messaging.Connection, exchange string, config ProducerConfig) (*producer, error) { - conn := c.(*connection) - producer := &producer{ - conn: c.(*connection), - config: config, - internalQueue: make(chan message, 2), - exchangeName: exchange, - notifyConfirm: make(chan amqplib.Confirmation), - closeQueue: make(chan bool), - connectionClosed: conn.NotifyConnectionClose(), + conn: c.(*connection), + config: config, + internalQueue: make(chan message, 2), + exchangeName: exchange, + notifyConfirm: make(chan amqplib.Confirmation), + closeQueue: make(chan bool), } err := producer.setupTopology() @@ -73,6 +69,7 @@ func NewProducerConfig(c messaging.Connection, exchange string, config ProducerC return nil, err } + go producer.drainInternalQueue() go producer.handleReestablishedConnnection() return producer, err @@ -134,6 +131,10 @@ func (p *producer) Close() { close(p.closeQueue) p.channel.Close() + + for _, c := range p.closes { + c <- true + } } // changeChannel takes a new channel to the queue, @@ -191,9 +192,7 @@ func (p *producer) setupTopology() error { log.WithFields(log.Fields{ "type": "goevents", "sub_type": "producer", - }).Debug("Topology ready. Draining internal queue.") - - go p.drainInternalQueue() + }).Debug("Topology ready.") return nil } @@ -217,6 +216,11 @@ func (p *producer) handleReestablishedConnnection() { } func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err error) { + if !p.conn.IsConnected() { + err = errors.New("connection is not open") + return + } + log.WithFields(log.Fields{ "action": queue, "body": msg.Body, @@ -241,11 +245,6 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err } }() - if !p.conn.IsConnected() { - err = errors.New("connection is not open") - return - } - err = p.channel.Publish(p.exchangeName, queue, false, false, msg) if err != nil { @@ -257,6 +256,9 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err if confirm.Ack { return } + + err = ErrNotAcked + return case <-time.After(p.config.publishInterval): err = ErrNotAcked return @@ -277,30 +279,29 @@ func (p *producer) drainInternalQueue() { select { case <-p.closeQueue: return - case <-p.connectionClosed: - return case m := <-p.internalQueue: - // block until confirmation - err := p.publishMessage(m.msg, m.action) + retry := true - if err != nil { - log.WithFields(log.Fields{ - "action": m.action, - "body": m.msg.Body, - "message_id": m.msg.MessageId, - "error": err, - "type": "goevents", - "sub_type": "producer", - }).Error("Error publishing message to the exchange. Retrying...") + for retry { + // block until confirmation + err := p.publishMessage(m.msg, m.action) - p.internalQueue <- m - } else { - p.wg.Done() + if err != nil { + log.WithFields(log.Fields{ + "action": m.action, + "body": m.msg.Body, + "message_id": m.msg.MessageId, + "error": err, + "type": "goevents", + "sub_type": "producer", + }).Error("Error publishing message to the exchange. Retrying...") + + time.Sleep(p.config.publishInterval) + } else { + p.wg.Done() + retry = false + } } } } - - for _, c := range p.closes { - c <- true - } }