From bf9ff86a03332ae5cfd3389d505a5c6f57c1ace6 Mon Sep 17 00:00:00 2001 From: skrater Date: Wed, 1 May 2019 14:08:44 -0300 Subject: [PATCH] Remove useless code --- amqp/producer.go | 46 ++++++++++++++++++---------------------------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/amqp/producer.go b/amqp/producer.go index 8a38c2c..f5cf4d1 100644 --- a/amqp/producer.go +++ b/amqp/producer.go @@ -30,7 +30,6 @@ type producer struct { channel *amqplib.Channel notifyConfirm chan amqplib.Confirmation notifyChanClose chan *amqplib.Error - closeQueue chan bool config ProducerConfig internalQueue chan message @@ -62,7 +61,6 @@ func NewProducerConfig(c messaging.Connection, exchange string, config ProducerC config: config, internalQueue: make(chan message, 2), exchangeName: exchange, - closeQueue: make(chan bool), } err := producer.setupTopology() @@ -127,10 +125,7 @@ func (p *producer) Close() { p.wg.Wait() - p.closeQueue <- true - close(p.internalQueue) - close(p.closeQueue) p.channel.Close() @@ -333,32 +328,27 @@ func (p *producer) isClosed() bool { } func (p *producer) drainInternalQueue() { - for { - select { - case <-p.closeQueue: - return - case m := <-p.internalQueue: - retry := true + for m := range p.internalQueue { + retry := true - for retry { - // block until confirmation - err := p.publishMessage(m.msg, m.action) + for retry { + // block until confirmation + err := p.publishMessage(m.msg, m.action) - if err != nil { - log.WithFields(log.Fields{ - "action": m.action, - "body": m.msg.Body, - "message_id": m.msg.MessageId, - "error": err, - "type": "goevents", - "sub_type": "producer", - }).Error("Error publishing message to the exchange. Retrying...") + if err != nil { + log.WithFields(log.Fields{ + "action": m.action, + "body": m.msg.Body, + "message_id": m.msg.MessageId, + "error": err, + "type": "goevents", + "sub_type": "producer", + }).Error("Error publishing message to the exchange. Retrying...") - time.Sleep(p.config.publishInterval) - } else { - p.wg.Done() - retry = false - } + time.Sleep(p.config.publishInterval) + } else { + p.wg.Done() + retry = false } } }