mirror of
https://github.com/eventials/goevents.git
synced 2025-04-24 13:48:53 +08:00
Receive notification when channel is closed
This commit is contained in:
parent
4d54df7868
commit
8231c14ac1
@ -14,7 +14,8 @@ import (
|
||||
)
|
||||
|
||||
// ErrNotAcked indicated that published messages was not acked by RabbitMQ
|
||||
var ErrNotAcked = errors.New("messge was not acked")
|
||||
var ErrNotAcked = errors.New("message was not acked")
|
||||
var ErrTimedout = errors.New("message was timed out")
|
||||
|
||||
type message struct {
|
||||
action string
|
||||
@ -23,20 +24,22 @@ type message struct {
|
||||
|
||||
// producer holds a amqp connection and channel to publish messages to.
|
||||
type producer struct {
|
||||
m sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
conn *connection
|
||||
channel *amqplib.Channel
|
||||
notifyConfirm chan amqplib.Confirmation
|
||||
closeQueue chan bool
|
||||
config ProducerConfig
|
||||
m sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
conn *connection
|
||||
channel *amqplib.Channel
|
||||
notifyConfirm chan amqplib.Confirmation
|
||||
notifyChanClose chan *amqplib.Error
|
||||
closeQueue chan bool
|
||||
config ProducerConfig
|
||||
|
||||
internalQueue chan message
|
||||
|
||||
exchangeName string
|
||||
|
||||
closed bool
|
||||
closes []chan bool
|
||||
closed bool
|
||||
channelReady bool
|
||||
closes []chan bool
|
||||
}
|
||||
|
||||
// ProducerConfig to be used when creating a new producer.
|
||||
@ -59,7 +62,6 @@ func NewProducerConfig(c messaging.Connection, exchange string, config ProducerC
|
||||
config: config,
|
||||
internalQueue: make(chan message, 2),
|
||||
exchangeName: exchange,
|
||||
notifyConfirm: make(chan amqplib.Confirmation),
|
||||
closeQueue: make(chan bool),
|
||||
}
|
||||
|
||||
@ -141,8 +143,14 @@ func (p *producer) Close() {
|
||||
// and updates the channel listeners to reflect this.
|
||||
func (p *producer) changeChannel(channel *amqplib.Channel) {
|
||||
p.channel = channel
|
||||
|
||||
p.notifyChanClose = make(chan *amqplib.Error)
|
||||
p.channel.NotifyClose(p.notifyChanClose)
|
||||
|
||||
p.notifyConfirm = make(chan amqplib.Confirmation)
|
||||
p.channel.NotifyPublish(p.notifyConfirm)
|
||||
|
||||
p.channelReady = true
|
||||
}
|
||||
|
||||
func (p *producer) setupTopology() error {
|
||||
@ -197,14 +205,59 @@ func (p *producer) setupTopology() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *producer) setChannelReady(ready bool) {
|
||||
p.m.Lock()
|
||||
defer p.m.Unlock()
|
||||
|
||||
p.channelReady = ready
|
||||
}
|
||||
|
||||
func (p *producer) isChannelReady() bool {
|
||||
p.m.Lock()
|
||||
defer p.m.Unlock()
|
||||
|
||||
return p.channelReady
|
||||
}
|
||||
|
||||
func (p *producer) isConnected() bool {
|
||||
if !p.conn.IsConnected() {
|
||||
return false
|
||||
}
|
||||
|
||||
return p.isChannelReady()
|
||||
}
|
||||
|
||||
func (p *producer) waitConnectionLost() bool {
|
||||
if !p.isConnected() {
|
||||
return true
|
||||
}
|
||||
|
||||
defer p.setChannelReady(false)
|
||||
|
||||
select {
|
||||
case <-p.conn.NotifyConnectionClose():
|
||||
log.Warn("Producer connection closed")
|
||||
return true
|
||||
case <-p.notifyChanClose:
|
||||
log.Warn("Producer channel closed")
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (p *producer) handleReestablishedConnnection() {
|
||||
rs := p.conn.NotifyReestablish()
|
||||
|
||||
for !p.isClosed() {
|
||||
<-rs
|
||||
// true if connection is lot
|
||||
// false if channel connection is lost
|
||||
connectionLost := p.waitConnectionLost()
|
||||
|
||||
if connectionLost {
|
||||
// Wait reconnect
|
||||
<-rs
|
||||
}
|
||||
|
||||
err := p.setupTopology()
|
||||
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"type": "goevents",
|
||||
@ -216,8 +269,8 @@ func (p *producer) handleReestablishedConnnection() {
|
||||
}
|
||||
|
||||
func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err error) {
|
||||
if !p.conn.IsConnected() {
|
||||
err = errors.New("connection is not open")
|
||||
if !p.isConnected() {
|
||||
err = errors.New("connection/channel is not open")
|
||||
return
|
||||
}
|
||||
|
||||
@ -245,7 +298,12 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
|
||||
}
|
||||
}()
|
||||
|
||||
err = p.channel.Publish(p.exchangeName, queue, false, false, msg)
|
||||
err = p.channel.Publish(
|
||||
p.exchangeName, // Exchange
|
||||
queue, // Routing key
|
||||
false, // Mandatory
|
||||
false, // Immediate
|
||||
msg)
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
@ -260,7 +318,7 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err
|
||||
err = ErrNotAcked
|
||||
return
|
||||
case <-time.After(p.config.publishInterval):
|
||||
err = ErrNotAcked
|
||||
err = ErrTimedout
|
||||
return
|
||||
}
|
||||
|
||||
|
8
go.mod
8
go.mod
@ -3,9 +3,9 @@ module github.com/eventials/goevents
|
||||
go 1.12
|
||||
|
||||
require (
|
||||
github.com/aws/aws-sdk-go v1.18.0
|
||||
github.com/sirupsen/logrus v1.4.0
|
||||
github.com/streadway/amqp v0.0.0-20190312002841-61ee40d2027b
|
||||
github.com/aws/aws-sdk-go v1.19.21
|
||||
github.com/sirupsen/logrus v1.4.1
|
||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
|
||||
github.com/stretchr/testify v1.3.0
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect
|
||||
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09 // indirect
|
||||
)
|
||||
|
19
go.sum
19
go.sum
@ -1,5 +1,5 @@
|
||||
github.com/aws/aws-sdk-go v1.18.0 h1:CXoiktHzavv3gwZsVPGuyTXpHg6dd1ssrGKMQuQNrdY=
|
||||
github.com/aws/aws-sdk-go v1.18.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/aws/aws-sdk-go v1.19.21 h1:xLaPxl8gy0ZSXbc13jsCKIaHD6NiX+2tAQodPSEL5r8=
|
||||
github.com/aws/aws-sdk-go v1.19.21/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
@ -9,22 +9,19 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGi
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE=
|
||||
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/streadway/amqp v0.0.0-20190312002841-61ee40d2027b h1:VPo/aUrW0PUdrSwI0UWPY/zXBoLg78fEXLPSsOqngk4=
|
||||
github.com/streadway/amqp v0.0.0-20190312002841-61ee40d2027b/go.mod h1:1WNBiOZtZQLpVAyu0iTduoJL9hEsMloAK5XWrtW0xdY=
|
||||
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
|
||||
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
|
||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k=
|
||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09 h1:KaQtG+aDELoNmXYas3TVkGNYRuq8JQ1aa7LJt8EXVyo=
|
||||
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
|
||||
|
Loading…
x
Reference in New Issue
Block a user