1
0
mirror of https://github.com/eventials/goevents.git synced 2025-05-08 19:29:15 +08:00
eventials.goevents/amqp/producer.go

213 lines
4.7 KiB
Go
Raw Normal View History

2016-12-01 10:52:22 -02:00
package amqp
import (
2017-03-09 17:32:05 -03:00
"sync"
2016-12-01 10:52:22 -02:00
"time"
2016-12-01 11:40:58 -02:00
"github.com/eventials/goevents/messaging"
2017-03-09 17:32:05 -03:00
log "github.com/Sirupsen/logrus"
2016-12-01 10:52:22 -02:00
amqplib "github.com/streadway/amqp"
)
2017-03-09 17:32:05 -03:00
type message struct {
action string
data []byte
}
// Producer holds a amqp connection and channel to publish messages to.
2016-12-01 10:52:22 -02:00
type Producer struct {
2017-03-09 17:32:05 -03:00
m sync.Mutex
2016-12-01 10:52:22 -02:00
conn *Connection
2016-12-28 09:20:29 -02:00
2017-03-09 17:32:05 -03:00
config ProducerConfig
internalQueue chan message
ackChannel chan uint64
nackChannel chan uint64
2016-12-28 09:20:29 -02:00
channel *amqplib.Channel
exchangeName string
2017-03-09 17:32:05 -03:00
closed bool
closes []chan bool
}
// ProducerConfig to be used when creating a new producer.
type ProducerConfig struct {
publishInterval time.Duration
publishRetries int
2016-12-01 10:52:22 -02:00
}
2016-12-01 16:17:55 -02:00
// NewProducer returns a new AMQP Producer.
2017-03-09 17:32:05 -03:00
// Uses a default ProducerConfig with 2 second of publish interval and 10 publish retries.
func NewProducer(c messaging.Connection, exchange string) (messaging.Producer, error) {
return NewProducerConfig(c, exchange, ProducerConfig{
2017-03-09 17:32:05 -03:00
publishInterval: 2 * time.Second,
publishRetries: 10,
})
}
// NewProducerConfig returns a new AMQP Producer.
func NewProducerConfig(c messaging.Connection, exchange string, config ProducerConfig) (messaging.Producer, error) {
2017-03-09 17:32:05 -03:00
producer := &Producer{
conn: c.(*Connection),
config: config,
internalQueue: make(chan message),
exchangeName: exchange,
}
err := producer.setupTopology()
go producer.handleReestablishedConnnection()
go producer.drainInternalQueue()
return producer, err
}
// Publish publishes an action.
func (p *Producer) Publish(action string, data []byte) {
p.internalQueue <- message{action, data}
}
2016-12-01 10:52:22 -02:00
2017-03-09 17:32:05 -03:00
// NotifyClose returns a channel to be notified then this producer closes.
func (p *Producer) NotifyClose() <-chan bool {
receiver := make(chan bool)
p.closes = append(p.closes, receiver)
return receiver
}
// Close the producer's internal queue.
func (p *Producer) Close() {
p.closed = true
close(p.internalQueue)
}
func (p *Producer) setupTopology() error {
p.m.Lock()
defer p.m.Unlock()
var err error
p.channel, err = p.conn.OpenChannel()
p.ackChannel, p.nackChannel = p.channel.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))
// put the channel in confirm mode.
p.channel.Confirm(false)
2016-12-28 09:20:29 -02:00
if err != nil {
2017-03-09 17:32:05 -03:00
return err
2016-12-28 09:20:29 -02:00
}
2017-03-09 17:32:05 -03:00
err = p.channel.ExchangeDeclare(
p.exchangeName, // name
"topic", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
2016-12-28 09:20:29 -02:00
)
if err != nil {
2017-03-09 17:32:05 -03:00
return err
2016-12-28 09:20:29 -02:00
}
if err != nil {
2017-03-09 17:32:05 -03:00
return err
2016-12-28 09:20:29 -02:00
}
2017-03-09 17:32:05 -03:00
return nil
2016-12-01 10:52:22 -02:00
}
2017-03-09 17:32:05 -03:00
func (p *Producer) handleReestablishedConnnection() {
for !p.closed {
<-p.conn.NotifyReestablish()
err := p.setupTopology()
if err != nil {
log.WithFields(log.Fields{
"type": "amqp",
"error": err,
}).Error("Error setting up topology after reconnection")
}
}
}
func (p *Producer) drainInternalQueue() {
for m := range p.internalQueue {
// try to publish in N attempts.
for i := 1; i <= p.config.publishRetries; i++ {
msg := amqplib.Publishing{
DeliveryMode: amqplib.Persistent,
Timestamp: time.Now(),
Body: m.data,
}
err := func() error {
p.m.Lock()
defer p.m.Unlock()
log.WithFields(log.Fields{
"type": "amqp",
"attempt": i,
"max_retries": p.config.publishRetries,
}).Debug("Publishing message to the exchange")
return p.channel.Publish(p.exchangeName, m.action, false, false, msg)
}()
if err != nil {
if i < p.config.publishRetries {
log.WithFields(log.Fields{
"type": "amqp",
"error": err,
"attempt": i,
"max_retries": p.config.publishRetries,
}).Error("Error publishing message to the exchange. Retrying...")
time.Sleep(p.config.publishInterval)
continue
} else {
log.WithFields(log.Fields{
"type": "amqp",
"error": err,
"attempt": i,
"max_retries": p.config.publishRetries,
}).Error("Error publishing message to the exchange. Max retries reached, giving up...")
}
}
select {
case <-p.ackChannel:
goto outer // 😈
case <-p.nackChannel:
if i < p.config.publishRetries {
log.WithFields(log.Fields{
"type": "amqp",
"attempt": i,
"max_retries": p.config.publishRetries,
}).Error("Error publishing message to the exchange. Retrying...")
time.Sleep(p.config.publishInterval)
} else {
log.WithFields(log.Fields{
"type": "amqp",
"attempt": i,
"max_retries": p.config.publishRetries,
}).Error("Error publishing message to the exchange. Max retries reached, giving up...")
}
}
}
outer:
2016-12-01 10:52:22 -02:00
}
2017-03-09 17:32:05 -03:00
for _, c := range p.closes {
c <- true
}
2016-12-01 10:52:22 -02:00
}