mirror of
https://github.com/eventials/goevents.git
synced 2025-04-24 13:48:53 +08:00
Merge pull request #37 from skrater/master
removed Wait methods, due channels leak
This commit is contained in:
commit
9692e82e24
@ -1,4 +1,4 @@
|
||||
FROM golang:1.9
|
||||
FROM golang:1.10
|
||||
|
||||
RUN apt-get update && apt-get install -y wget
|
||||
RUN wget https://github.com/jwilder/dockerize/releases/download/v0.2.0/dockerize-linux-amd64-v0.2.0.tar.gz
|
||||
|
@ -2,6 +2,7 @@ package amqp
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -16,8 +17,9 @@ type connection struct {
|
||||
m sync.Mutex
|
||||
url string
|
||||
connection *amqplib.Connection
|
||||
closed bool
|
||||
connected bool
|
||||
|
||||
closed bool
|
||||
connected int32
|
||||
|
||||
reestablishs []chan bool
|
||||
}
|
||||
@ -48,7 +50,7 @@ func NewConnectionConfig(url string, config ConnectionConfig) (*connection, erro
|
||||
return nil, err
|
||||
}
|
||||
|
||||
connection.setConnected(true)
|
||||
atomic.StoreInt32(&connection.connected, 1)
|
||||
|
||||
go connection.handleConnectionClose()
|
||||
|
||||
@ -68,8 +70,11 @@ func (c *connection) NotifyConnectionClose() <-chan error {
|
||||
|
||||
// NotifyReestablish returns a channel to notify when the connection is restablished.
|
||||
func (c *connection) NotifyReestablish() <-chan bool {
|
||||
receiver := make(chan bool)
|
||||
receiver := make(chan bool, 1)
|
||||
|
||||
c.m.Lock()
|
||||
c.reestablishs = append(c.reestablishs, receiver)
|
||||
c.m.Unlock()
|
||||
|
||||
return receiver
|
||||
}
|
||||
@ -97,55 +102,37 @@ func (c *connection) Close() {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
c.closed = true
|
||||
c.connection.Close()
|
||||
if !c.closed {
|
||||
c.closed = true
|
||||
c.connection.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *connection) IsConnected() bool {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
return c.connected
|
||||
}
|
||||
|
||||
// WaitUntilConnectionCloses holds the execution until the connection closes.
|
||||
func (c *connection) WaitUntilConnectionCloses() {
|
||||
<-c.NotifyConnectionClose()
|
||||
}
|
||||
|
||||
// WaitUntilConnectionReestablished holds the execution until the connection reestablished.
|
||||
func (c *connection) WaitUntilConnectionReestablished() {
|
||||
<-c.NotifyReestablish()
|
||||
return atomic.LoadInt32(&c.connected) > 0
|
||||
}
|
||||
|
||||
func (c *connection) dial() error {
|
||||
conn, err := amqplib.Dial(c.url)
|
||||
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
c.connection = conn
|
||||
c.m.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *connection) setConnected(connected bool) {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
c.connected = connected
|
||||
}
|
||||
|
||||
func (c *connection) handleConnectionClose() {
|
||||
for !c.closed {
|
||||
c.WaitUntilConnectionCloses()
|
||||
c.setConnected(false)
|
||||
<-c.NotifyConnectionClose()
|
||||
|
||||
atomic.StoreInt32(&c.connected, 0)
|
||||
|
||||
for i := 0; !c.closed; i++ {
|
||||
err := c.dial()
|
||||
|
||||
if err == nil {
|
||||
c.setConnected(true)
|
||||
atomic.StoreInt32(&c.connected, 1)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"type": "goevents",
|
||||
|
@ -84,8 +84,6 @@ func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue str
|
||||
queueName: queue,
|
||||
}
|
||||
|
||||
go consumer.handleReestablishedConnnection()
|
||||
|
||||
return consumer, nil
|
||||
|
||||
}
|
||||
@ -109,12 +107,6 @@ func (c *consumer) uniqueNameWithPrefix() string {
|
||||
return fmt.Sprintf("%s%d", c.config.PrefixName, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func (c *consumer) handleReestablishedConnnection() {
|
||||
for !c.closed {
|
||||
c.conn.WaitUntilConnectionReestablished()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *consumer) dispatch(msg amqplib.Delivery) {
|
||||
if h, ok := c.getHandler(msg); ok {
|
||||
delay, isRetry := getXRetryDelayHeader(msg)
|
||||
@ -519,11 +511,13 @@ func (c *consumer) Consume() {
|
||||
logger.Infof(" %s", handler.action)
|
||||
}
|
||||
|
||||
rs := c.conn.NotifyReestablish()
|
||||
|
||||
for !c.closed {
|
||||
if !c.conn.IsConnected() {
|
||||
logger.Info("Connection not established. Waiting connection to be reestablished.")
|
||||
|
||||
c.conn.WaitUntilConnectionReestablished()
|
||||
<-rs
|
||||
|
||||
continue
|
||||
}
|
||||
|
@ -94,8 +94,11 @@ func (p *producer) publishAmqMessage(queue string, msg amqplib.Publishing) {
|
||||
|
||||
// NotifyClose returns a channel to be notified then this producer closes.
|
||||
func (p *producer) NotifyClose() <-chan bool {
|
||||
receiver := make(chan bool)
|
||||
receiver := make(chan bool, 1)
|
||||
|
||||
p.m.Lock()
|
||||
p.closes = append(p.closes, receiver)
|
||||
p.m.Unlock()
|
||||
|
||||
return receiver
|
||||
}
|
||||
@ -151,8 +154,10 @@ func (p *producer) setupTopology() error {
|
||||
}
|
||||
|
||||
func (p *producer) handleReestablishedConnnection() {
|
||||
rs := p.conn.NotifyReestablish()
|
||||
|
||||
for !p.closed {
|
||||
p.conn.WaitUntilConnectionReestablished()
|
||||
<-rs
|
||||
|
||||
err := p.setupTopology()
|
||||
|
||||
@ -165,17 +170,38 @@ func (p *producer) handleReestablishedConnnection() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *producer) publishMessage(msg amqplib.Publishing, queue string) error {
|
||||
func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
switch x := r.(type) {
|
||||
case string:
|
||||
err = errors.New(x)
|
||||
case error:
|
||||
err = x
|
||||
default:
|
||||
err = errors.New("Unknown panic")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if !p.conn.IsConnected() {
|
||||
err = errors.New("Connection is not open.")
|
||||
return
|
||||
}
|
||||
|
||||
channel, err := p.conn.openChannel()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
defer channel.Close()
|
||||
|
||||
if err := channel.Confirm(false); err != nil {
|
||||
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
|
||||
err = channel.Confirm(false)
|
||||
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Channel could not be put into confirm mode: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
confirms := channel.NotifyPublish(make(chan amqplib.Confirmation, 1))
|
||||
@ -183,14 +209,15 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) error {
|
||||
err = channel.Publish(p.exchangeName, queue, false, false, msg)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return
|
||||
} else {
|
||||
if confirmed := <-confirms; !confirmed.Ack {
|
||||
return ErrNotAcked
|
||||
err = ErrNotAcked
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
func (p *producer) isClosed() bool {
|
||||
|
@ -6,7 +6,5 @@ type Connection interface {
|
||||
Close()
|
||||
NotifyConnectionClose() <-chan error
|
||||
NotifyReestablish() <-chan bool
|
||||
WaitUntilConnectionCloses()
|
||||
WaitUntilConnectionReestablished()
|
||||
IsConnected() bool
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user