diff --git a/amqp/producer.go b/amqp/producer.go index f5cf4d1..f7c0fa8 100644 --- a/amqp/producer.go +++ b/amqp/producer.go @@ -24,7 +24,7 @@ type message struct { // producer holds a amqp connection and channel to publish messages to. type producer struct { - m sync.Mutex + m sync.RWMutex wg sync.WaitGroup conn *connection channel *amqplib.Channel @@ -43,14 +43,16 @@ type producer struct { // ProducerConfig to be used when creating a new producer. type ProducerConfig struct { - publishInterval time.Duration + PublishInterval time.Duration + ConfirmTimeout time.Duration } // NewProducer returns a new AMQP Producer. // Uses a default ProducerConfig with 2 second of publish interval. func NewProducer(c messaging.Connection, exchange string) (*producer, error) { return NewProducerConfig(c, exchange, ProducerConfig{ - publishInterval: 2 * time.Second, + PublishInterval: 2 * time.Second, + ConfirmTimeout: 10 * time.Second, }) } @@ -59,7 +61,7 @@ func NewProducerConfig(c messaging.Connection, exchange string, config ProducerC producer := &producer{ conn: c.(*connection), config: config, - internalQueue: make(chan message, 2), + internalQueue: make(chan message), exchangeName: exchange, } @@ -95,6 +97,15 @@ func (p *producer) Publish(action string, data []byte) { func (p *producer) publishAmqMessage(queue string, msg amqplib.Publishing) { p.wg.Add(1) + log.WithFields(log.Fields{ + "action": queue, + "message_id": msg.MessageId, + "type": "goevents", + "sub_type": "producer", + "exchange": p.exchangeName, + "length": len(p.internalQueue), + }).Debug("Publishing message to internal queue.") + p.internalQueue <- message{ action: queue, msg: msg, @@ -119,6 +130,15 @@ func (p *producer) setClosed() { p.closed = true } +func (p *producer) notifyProducerClosed() { + p.m.RLock() + defer p.m.RUnlock() + + for _, c := range p.closes { + c <- true + } +} + // Close the producer's internal queue. func (p *producer) Close() { p.setClosed() @@ -129,9 +149,7 @@ func (p *producer) Close() { p.channel.Close() - for _, c := range p.closes { - c <- true - } + p.notifyProducerClosed() } // changeChannel takes a new channel to the queue, @@ -208,8 +226,8 @@ func (p *producer) setChannelReady(ready bool) { } func (p *producer) isChannelReady() bool { - p.m.Lock() - defer p.m.Unlock() + p.m.RLock() + defer p.m.RUnlock() return p.channelReady } @@ -269,13 +287,16 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err return } - log.WithFields(log.Fields{ + logMessage := log.WithFields(log.Fields{ "action": queue, - "body": msg.Body, "message_id": msg.MessageId, "type": "goevents", "sub_type": "producer", "exchange": p.exchangeName, + }) + + logMessage.WithFields(log.Fields{ + "body": msg.Body, }).Debug("Publishing message to the exchange.") defer func() { @@ -304,6 +325,8 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err return } + logMessage.Debug("Waiting message to be acked or timed out.") + select { case confirm := <-p.notifyConfirm: if confirm.Ack { @@ -312,7 +335,7 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err err = ErrNotAcked return - case <-time.After(p.config.publishInterval): + case <-time.After(p.config.ConfirmTimeout): err = ErrTimedout return } @@ -321,8 +344,8 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err } func (p *producer) isClosed() bool { - p.m.Lock() - defer p.m.Unlock() + p.m.RLock() + defer p.m.RUnlock() return p.closed } @@ -345,7 +368,7 @@ func (p *producer) drainInternalQueue() { "sub_type": "producer", }).Error("Error publishing message to the exchange. Retrying...") - time.Sleep(p.config.publishInterval) + time.Sleep(p.config.PublishInterval) } else { p.wg.Done() retry = false diff --git a/examples/producer/amqp/producer.go b/examples/producer/amqp/producer.go index 9e71ee9..1037f65 100644 --- a/examples/producer/amqp/producer.go +++ b/examples/producer/amqp/producer.go @@ -1,12 +1,12 @@ package main import ( + "fmt" "os" "os/signal" "sync" "syscall" - - "fmt" + "time" "github.com/eventials/goevents/amqp" ) @@ -58,14 +58,29 @@ func main() { }() sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) fmt.Println("Waiting CTRL+C") <-sigc - fmt.Println("Closing producerA") - producerA.Close() - fmt.Println("Closing producerB") - producerB.Close() + closed := make(chan bool) + + go func() { + fmt.Println("Closing producerA") + producerA.Close() + + fmt.Println("Closing producerB") + producerB.Close() + + closed <- true + }() + + select { + case <-closed: + fmt.Println("Successfully closed.") + case <-time.After(20 * time.Second): + fmt.Println("Close timeout.") + } }