package amqp import ( "errors" "fmt" "runtime/debug" "sync" "time" "github.com/eventials/goevents/messaging" log "github.com/sirupsen/logrus" "github.com/streadway/amqp" amqplib "github.com/streadway/amqp" ) // ErrNotAcked indicated that published messages was not acked by RabbitMQ var ErrNotAcked = errors.New("message was not acked") var ErrTimedout = errors.New("message was timed out") type message struct { action string msg amqplib.Publishing } // producer holds a amqp connection and channel to publish messages to. type producer struct { m sync.RWMutex wg sync.WaitGroup conn *connection channel *amqplib.Channel notifyConfirm chan amqplib.Confirmation notifyChanClose chan *amqplib.Error config ProducerConfig internalQueue chan message exchangeName string closed bool channelReady bool closes []chan bool } // ProducerConfig to be used when creating a new producer. type ProducerConfig struct { PublishInterval time.Duration ConfirmTimeout time.Duration } // NewProducer returns a new AMQP Producer. // Uses a default ProducerConfig with 2 second of publish interval. func NewProducer(c messaging.Connection, exchange string) (*producer, error) { return NewProducerConfig(c, exchange, ProducerConfig{ PublishInterval: 2 * time.Second, ConfirmTimeout: 10 * time.Second, }) } // NewProducerConfig returns a new AMQP Producer. func NewProducerConfig(c messaging.Connection, exchange string, config ProducerConfig) (*producer, error) { producer := &producer{ conn: c.(*connection), config: config, internalQueue: make(chan message), exchangeName: exchange, } err := producer.setupTopology() if err != nil { return nil, err } go producer.drainInternalQueue() go producer.handleReestablishedConnnection() return producer, err } // Publish publishes an action. func (p *producer) Publish(action string, data []byte) { // ignore messages published to a closed producer if p.isClosed() { return } messageID, _ := NewUUIDv4() now := time.Now().UTC() p.publishAmqMessage(action, amqplib.Publishing{ MessageId: messageID, DeliveryMode: amqplib.Persistent, Timestamp: now, Body: data, Headers: amqp.Table{ "x-epoch-milli": int64(now.UnixNano()/int64(time.Nanosecond)) / int64(time.Millisecond), }, }) } func (p *producer) publishAmqMessage(queue string, msg amqplib.Publishing) { p.wg.Add(1) log.WithFields(log.Fields{ "action": queue, "message_id": msg.MessageId, "type": "goevents", "sub_type": "producer", "exchange": p.exchangeName, "length": len(p.internalQueue), }).Debug("Publishing message to internal queue.") p.internalQueue <- message{ action: queue, msg: msg, } } // NotifyClose returns a channel to be notified then this producer closes. func (p *producer) NotifyClose() <-chan bool { receiver := make(chan bool, 1) p.m.Lock() p.closes = append(p.closes, receiver) p.m.Unlock() return receiver } func (p *producer) setClosed() { p.m.Lock() defer p.m.Unlock() p.closed = true } func (p *producer) notifyProducerClosed() { p.m.RLock() defer p.m.RUnlock() for _, c := range p.closes { c <- true } } // Close the producer's internal queue. func (p *producer) Close() { p.setClosed() p.wg.Wait() close(p.internalQueue) p.channel.Close() p.notifyProducerClosed() } // changeChannel takes a new channel to the queue, // and updates the channel listeners to reflect this. func (p *producer) changeChannel(channel *amqplib.Channel) { p.channel = channel p.notifyChanClose = make(chan *amqplib.Error) p.channel.NotifyClose(p.notifyChanClose) p.notifyConfirm = make(chan amqplib.Confirmation, 1) p.channel.NotifyPublish(p.notifyConfirm) p.channelReady = true } func (p *producer) setupTopology() error { log.WithFields(log.Fields{ "type": "goevents", "sub_type": "producer", }).Debug("Setting up topology...") p.m.Lock() defer p.m.Unlock() channel, err := p.conn.openChannel() if err != nil { return err } if p.exchangeName != "" { if err != nil { return err } err = channel.ExchangeDeclare( p.exchangeName, // name "topic", // type true, // durable false, // auto-delete false, // internal false, // no-wait nil, // arguments ) if err != nil { return err } } err = channel.Confirm(false) if err != nil { err = fmt.Errorf("Channel could not be put into confirm mode: %s", err) return err } p.changeChannel(channel) log.WithFields(log.Fields{ "type": "goevents", "sub_type": "producer", }).Debug("Topology ready.") return nil } func (p *producer) setChannelReady(ready bool) { p.m.Lock() defer p.m.Unlock() p.channelReady = ready } func (p *producer) isChannelReady() bool { p.m.RLock() defer p.m.RUnlock() return p.channelReady } func (p *producer) isConnected() bool { if !p.conn.IsConnected() { return false } return p.isChannelReady() } func (p *producer) waitConnectionLost() bool { if !p.isConnected() { return true } defer p.setChannelReady(false) select { case <-p.conn.NotifyConnectionClose(): log.Warn("Producer connection closed") return true case <-p.notifyChanClose: log.Warn("Producer channel closed") return false } } func (p *producer) handleReestablishedConnnection() { rs := p.conn.NotifyReestablish() for !p.isClosed() { // true if connection is lot // false if channel connection is lost connectionLost := p.waitConnectionLost() if connectionLost { // Wait reconnect <-rs } err := p.setupTopology() if err != nil { log.WithFields(log.Fields{ "type": "goevents", "sub_type": "producer", "error": err, }).Error("Error setting up topology after reconnection.") } } } func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err error) { if !p.isConnected() { err = errors.New("connection/channel is not open") return } logMessage := log.WithFields(log.Fields{ "action": queue, "message_id": msg.MessageId, "type": "goevents", "sub_type": "producer", "exchange": p.exchangeName, }) logMessage.WithFields(log.Fields{ "body": msg.Body, }).Debug("Publishing message to the exchange.") defer func() { if r := recover(); r != nil { debug.PrintStack() switch x := r.(type) { case string: err = errors.New(x) case error: err = x default: err = errors.New("Unknown panic") } } }() err = p.channel.Publish( p.exchangeName, // Exchange queue, // Routing key false, // Mandatory false, // Immediate msg) if err != nil { return } logMessage.Debug("Waiting message to be acked or timed out.") select { case confirm := <-p.notifyConfirm: if confirm.Ack { return } err = ErrNotAcked return case <-time.After(p.config.ConfirmTimeout): err = ErrTimedout return } return } func (p *producer) isClosed() bool { p.m.RLock() defer p.m.RUnlock() return p.closed } func (p *producer) drainInternalQueue() { for m := range p.internalQueue { retry := true for retry { // block until confirmation err := p.publishMessage(m.msg, m.action) if err != nil { log.WithFields(log.Fields{ "action": m.action, "body": m.msg.Body, "message_id": m.msg.MessageId, "error": err, "type": "goevents", "sub_type": "producer", }).Error("Error publishing message to the exchange. Retrying...") if err == ErrTimedout { log.Warn("Closing producer channel due timeout wating msg confirmation") // force close to run setupTopology p.setChannelReady(false) p.channel.Close() } time.Sleep(p.config.PublishInterval) } else { p.wg.Done() retry = false } } } }