1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-26 13:48:59 +08:00

Merge pull request #46 from skrater/master

Fix producer to no block when publish fail
This commit is contained in:
Guilherme Emilio Raduenz 2019-03-15 11:35:52 -03:00 committed by GitHub
commit cb1bd81da2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -23,14 +23,13 @@ type message struct {
// producer holds a amqp connection and channel to publish messages to. // producer holds a amqp connection and channel to publish messages to.
type producer struct { type producer struct {
m sync.Mutex m sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
conn *connection conn *connection
channel *amqplib.Channel channel *amqplib.Channel
notifyConfirm chan amqplib.Confirmation notifyConfirm chan amqplib.Confirmation
connectionClosed <-chan error closeQueue chan bool
closeQueue chan bool config ProducerConfig
config ProducerConfig
internalQueue chan message internalQueue chan message
@ -55,16 +54,13 @@ func NewProducer(c messaging.Connection, exchange string) (*producer, error) {
// NewProducerConfig returns a new AMQP Producer. // NewProducerConfig returns a new AMQP Producer.
func NewProducerConfig(c messaging.Connection, exchange string, config ProducerConfig) (*producer, error) { func NewProducerConfig(c messaging.Connection, exchange string, config ProducerConfig) (*producer, error) {
conn := c.(*connection)
producer := &producer{ producer := &producer{
conn: c.(*connection), conn: c.(*connection),
config: config, config: config,
internalQueue: make(chan message, 2), internalQueue: make(chan message, 2),
exchangeName: exchange, exchangeName: exchange,
notifyConfirm: make(chan amqplib.Confirmation), notifyConfirm: make(chan amqplib.Confirmation),
closeQueue: make(chan bool), closeQueue: make(chan bool),
connectionClosed: conn.NotifyConnectionClose(),
} }
err := producer.setupTopology() err := producer.setupTopology()
@ -73,6 +69,7 @@ func NewProducerConfig(c messaging.Connection, exchange string, config ProducerC
return nil, err return nil, err
} }
go producer.drainInternalQueue()
go producer.handleReestablishedConnnection() go producer.handleReestablishedConnnection()
return producer, err return producer, err
@ -134,6 +131,10 @@ func (p *producer) Close() {
close(p.closeQueue) close(p.closeQueue)
p.channel.Close() p.channel.Close()
for _, c := range p.closes {
c <- true
}
} }
// changeChannel takes a new channel to the queue, // changeChannel takes a new channel to the queue,
@ -191,9 +192,7 @@ func (p *producer) setupTopology() error {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"type": "goevents", "type": "goevents",
"sub_type": "producer", "sub_type": "producer",
}).Debug("Topology ready. Draining internal queue.") }).Debug("Topology ready.")
go p.drainInternalQueue()
return nil return nil
} }
@ -217,6 +216,11 @@ func (p *producer) handleReestablishedConnnection() {
} }
func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err error) { func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err error) {
if !p.conn.IsConnected() {
err = errors.New("connection is not open")
return
}
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"action": queue, "action": queue,
"body": msg.Body, "body": msg.Body,
@ -241,11 +245,6 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
} }
}() }()
if !p.conn.IsConnected() {
err = errors.New("connection is not open")
return
}
err = p.channel.Publish(p.exchangeName, queue, false, false, msg) err = p.channel.Publish(p.exchangeName, queue, false, false, msg)
if err != nil { if err != nil {
@ -257,6 +256,9 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
if confirm.Ack { if confirm.Ack {
return return
} }
err = ErrNotAcked
return
case <-time.After(p.config.publishInterval): case <-time.After(p.config.publishInterval):
err = ErrNotAcked err = ErrNotAcked
return return
@ -277,30 +279,29 @@ func (p *producer) drainInternalQueue() {
select { select {
case <-p.closeQueue: case <-p.closeQueue:
return return
case <-p.connectionClosed:
return
case m := <-p.internalQueue: case m := <-p.internalQueue:
// block until confirmation retry := true
err := p.publishMessage(m.msg, m.action)
if err != nil { for retry {
log.WithFields(log.Fields{ // block until confirmation
"action": m.action, err := p.publishMessage(m.msg, m.action)
"body": m.msg.Body,
"message_id": m.msg.MessageId,
"error": err,
"type": "goevents",
"sub_type": "producer",
}).Error("Error publishing message to the exchange. Retrying...")
p.internalQueue <- m if err != nil {
} else { log.WithFields(log.Fields{
p.wg.Done() "action": m.action,
"body": m.msg.Body,
"message_id": m.msg.MessageId,
"error": err,
"type": "goevents",
"sub_type": "producer",
}).Error("Error publishing message to the exchange. Retrying...")
time.Sleep(p.config.publishInterval)
} else {
p.wg.Done()
retry = false
}
} }
} }
} }
for _, c := range p.closes {
c <- true
}
} }