1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

Fix producer to no block when publish fail

This commit is contained in:
skrater 2019-03-15 11:27:45 -03:00
parent dce9bd51db
commit 4d54df7868

View File

@ -23,14 +23,13 @@ type message struct {
// producer holds a amqp connection and channel to publish messages to.
type producer struct {
m sync.Mutex
wg sync.WaitGroup
conn *connection
channel *amqplib.Channel
notifyConfirm chan amqplib.Confirmation
connectionClosed <-chan error
closeQueue chan bool
config ProducerConfig
m sync.Mutex
wg sync.WaitGroup
conn *connection
channel *amqplib.Channel
notifyConfirm chan amqplib.Confirmation
closeQueue chan bool
config ProducerConfig
internalQueue chan message
@ -55,16 +54,13 @@ func NewProducer(c messaging.Connection, exchange string) (*producer, error) {
// NewProducerConfig returns a new AMQP Producer.
func NewProducerConfig(c messaging.Connection, exchange string, config ProducerConfig) (*producer, error) {
conn := c.(*connection)
producer := &producer{
conn: c.(*connection),
config: config,
internalQueue: make(chan message, 2),
exchangeName: exchange,
notifyConfirm: make(chan amqplib.Confirmation),
closeQueue: make(chan bool),
connectionClosed: conn.NotifyConnectionClose(),
conn: c.(*connection),
config: config,
internalQueue: make(chan message, 2),
exchangeName: exchange,
notifyConfirm: make(chan amqplib.Confirmation),
closeQueue: make(chan bool),
}
err := producer.setupTopology()
@ -73,6 +69,7 @@ func NewProducerConfig(c messaging.Connection, exchange string, config ProducerC
return nil, err
}
go producer.drainInternalQueue()
go producer.handleReestablishedConnnection()
return producer, err
@ -134,6 +131,10 @@ func (p *producer) Close() {
close(p.closeQueue)
p.channel.Close()
for _, c := range p.closes {
c <- true
}
}
// changeChannel takes a new channel to the queue,
@ -191,9 +192,7 @@ func (p *producer) setupTopology() error {
log.WithFields(log.Fields{
"type": "goevents",
"sub_type": "producer",
}).Debug("Topology ready. Draining internal queue.")
go p.drainInternalQueue()
}).Debug("Topology ready.")
return nil
}
@ -217,6 +216,11 @@ func (p *producer) handleReestablishedConnnection() {
}
func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err error) {
if !p.conn.IsConnected() {
err = errors.New("connection is not open")
return
}
log.WithFields(log.Fields{
"action": queue,
"body": msg.Body,
@ -241,11 +245,6 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
}
}()
if !p.conn.IsConnected() {
err = errors.New("connection is not open")
return
}
err = p.channel.Publish(p.exchangeName, queue, false, false, msg)
if err != nil {
@ -257,6 +256,9 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
if confirm.Ack {
return
}
err = ErrNotAcked
return
case <-time.After(p.config.publishInterval):
err = ErrNotAcked
return
@ -277,30 +279,29 @@ func (p *producer) drainInternalQueue() {
select {
case <-p.closeQueue:
return
case <-p.connectionClosed:
return
case m := <-p.internalQueue:
// block until confirmation
err := p.publishMessage(m.msg, m.action)
retry := true
if err != nil {
log.WithFields(log.Fields{
"action": m.action,
"body": m.msg.Body,
"message_id": m.msg.MessageId,
"error": err,
"type": "goevents",
"sub_type": "producer",
}).Error("Error publishing message to the exchange. Retrying...")
for retry {
// block until confirmation
err := p.publishMessage(m.msg, m.action)
p.internalQueue <- m
} else {
p.wg.Done()
if err != nil {
log.WithFields(log.Fields{
"action": m.action,
"body": m.msg.Body,
"message_id": m.msg.MessageId,
"error": err,
"type": "goevents",
"sub_type": "producer",
}).Error("Error publishing message to the exchange. Retrying...")
time.Sleep(p.config.publishInterval)
} else {
p.wg.Done()
retry = false
}
}
}
}
for _, c := range p.closes {
c <- true
}
}