mirror of
https://github.com/eventials/goevents.git
synced 2025-04-24 13:48:53 +08:00
customize publish timeout
This commit is contained in:
parent
bf9ff86a03
commit
5187fa6213
@ -24,7 +24,7 @@ type message struct {
|
||||
|
||||
// producer holds a amqp connection and channel to publish messages to.
|
||||
type producer struct {
|
||||
m sync.Mutex
|
||||
m sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
conn *connection
|
||||
channel *amqplib.Channel
|
||||
@ -43,14 +43,16 @@ type producer struct {
|
||||
|
||||
// ProducerConfig to be used when creating a new producer.
|
||||
type ProducerConfig struct {
|
||||
publishInterval time.Duration
|
||||
PublishInterval time.Duration
|
||||
ConfirmTimeout time.Duration
|
||||
}
|
||||
|
||||
// NewProducer returns a new AMQP Producer.
|
||||
// Uses a default ProducerConfig with 2 second of publish interval.
|
||||
func NewProducer(c messaging.Connection, exchange string) (*producer, error) {
|
||||
return NewProducerConfig(c, exchange, ProducerConfig{
|
||||
publishInterval: 2 * time.Second,
|
||||
PublishInterval: 2 * time.Second,
|
||||
ConfirmTimeout: 10 * time.Second,
|
||||
})
|
||||
}
|
||||
|
||||
@ -59,7 +61,7 @@ func NewProducerConfig(c messaging.Connection, exchange string, config ProducerC
|
||||
producer := &producer{
|
||||
conn: c.(*connection),
|
||||
config: config,
|
||||
internalQueue: make(chan message, 2),
|
||||
internalQueue: make(chan message),
|
||||
exchangeName: exchange,
|
||||
}
|
||||
|
||||
@ -95,6 +97,15 @@ func (p *producer) Publish(action string, data []byte) {
|
||||
func (p *producer) publishAmqMessage(queue string, msg amqplib.Publishing) {
|
||||
p.wg.Add(1)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"action": queue,
|
||||
"message_id": msg.MessageId,
|
||||
"type": "goevents",
|
||||
"sub_type": "producer",
|
||||
"exchange": p.exchangeName,
|
||||
"length": len(p.internalQueue),
|
||||
}).Debug("Publishing message to internal queue.")
|
||||
|
||||
p.internalQueue <- message{
|
||||
action: queue,
|
||||
msg: msg,
|
||||
@ -119,6 +130,15 @@ func (p *producer) setClosed() {
|
||||
p.closed = true
|
||||
}
|
||||
|
||||
func (p *producer) notifyProducerClosed() {
|
||||
p.m.RLock()
|
||||
defer p.m.RUnlock()
|
||||
|
||||
for _, c := range p.closes {
|
||||
c <- true
|
||||
}
|
||||
}
|
||||
|
||||
// Close the producer's internal queue.
|
||||
func (p *producer) Close() {
|
||||
p.setClosed()
|
||||
@ -129,9 +149,7 @@ func (p *producer) Close() {
|
||||
|
||||
p.channel.Close()
|
||||
|
||||
for _, c := range p.closes {
|
||||
c <- true
|
||||
}
|
||||
p.notifyProducerClosed()
|
||||
}
|
||||
|
||||
// changeChannel takes a new channel to the queue,
|
||||
@ -208,8 +226,8 @@ func (p *producer) setChannelReady(ready bool) {
|
||||
}
|
||||
|
||||
func (p *producer) isChannelReady() bool {
|
||||
p.m.Lock()
|
||||
defer p.m.Unlock()
|
||||
p.m.RLock()
|
||||
defer p.m.RUnlock()
|
||||
|
||||
return p.channelReady
|
||||
}
|
||||
@ -269,13 +287,16 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
|
||||
return
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
logMessage := log.WithFields(log.Fields{
|
||||
"action": queue,
|
||||
"body": msg.Body,
|
||||
"message_id": msg.MessageId,
|
||||
"type": "goevents",
|
||||
"sub_type": "producer",
|
||||
"exchange": p.exchangeName,
|
||||
})
|
||||
|
||||
logMessage.WithFields(log.Fields{
|
||||
"body": msg.Body,
|
||||
}).Debug("Publishing message to the exchange.")
|
||||
|
||||
defer func() {
|
||||
@ -304,6 +325,8 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
|
||||
return
|
||||
}
|
||||
|
||||
logMessage.Debug("Waiting message to be acked or timed out.")
|
||||
|
||||
select {
|
||||
case confirm := <-p.notifyConfirm:
|
||||
if confirm.Ack {
|
||||
@ -312,7 +335,7 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
|
||||
|
||||
err = ErrNotAcked
|
||||
return
|
||||
case <-time.After(p.config.publishInterval):
|
||||
case <-time.After(p.config.ConfirmTimeout):
|
||||
err = ErrTimedout
|
||||
return
|
||||
}
|
||||
@ -321,8 +344,8 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
|
||||
}
|
||||
|
||||
func (p *producer) isClosed() bool {
|
||||
p.m.Lock()
|
||||
defer p.m.Unlock()
|
||||
p.m.RLock()
|
||||
defer p.m.RUnlock()
|
||||
|
||||
return p.closed
|
||||
}
|
||||
@ -345,7 +368,7 @@ func (p *producer) drainInternalQueue() {
|
||||
"sub_type": "producer",
|
||||
}).Error("Error publishing message to the exchange. Retrying...")
|
||||
|
||||
time.Sleep(p.config.publishInterval)
|
||||
time.Sleep(p.config.PublishInterval)
|
||||
} else {
|
||||
p.wg.Done()
|
||||
retry = false
|
||||
|
@ -1,12 +1,12 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/eventials/goevents/amqp"
|
||||
)
|
||||
@ -58,14 +58,29 @@ func main() {
|
||||
}()
|
||||
|
||||
sigc := make(chan os.Signal, 1)
|
||||
|
||||
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
|
||||
fmt.Println("Waiting CTRL+C")
|
||||
|
||||
<-sigc
|
||||
|
||||
closed := make(chan bool)
|
||||
|
||||
go func() {
|
||||
fmt.Println("Closing producerA")
|
||||
producerA.Close()
|
||||
|
||||
fmt.Println("Closing producerB")
|
||||
producerB.Close()
|
||||
|
||||
closed <- true
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-closed:
|
||||
fmt.Println("Successfully closed.")
|
||||
case <-time.After(20 * time.Second):
|
||||
fmt.Println("Close timeout.")
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user