mirror of
https://github.com/eventials/goevents.git
synced 2025-04-26 13:48:59 +08:00
commit
bdc4f9f129
@ -26,14 +26,14 @@ type connection struct {
|
|||||||
|
|
||||||
// ConnectionConfig to be used when creating a new connection.
|
// ConnectionConfig to be used when creating a new connection.
|
||||||
type ConnectionConfig struct {
|
type ConnectionConfig struct {
|
||||||
reconnectInterval time.Duration
|
ReconnectInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConnection returns an AMQP Connection.
|
// NewConnection returns an AMQP Connection.
|
||||||
// Uses a default ConnectionConfig with 2 second of reconnect interval.
|
// Uses a default ConnectionConfig with 2 second of reconnect interval.
|
||||||
func NewConnection(url string) (*connection, error) {
|
func NewConnection(url string) (*connection, error) {
|
||||||
return NewConnectionConfig(url, ConnectionConfig{
|
return NewConnectionConfig(url, ConnectionConfig{
|
||||||
reconnectInterval: 2 * time.Second,
|
ReconnectInterval: 2 * time.Second,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -153,7 +153,7 @@ func (c *connection) handleConnectionClose() {
|
|||||||
"attempt": i,
|
"attempt": i,
|
||||||
}).Error("Error reestablishing connection. Retrying...")
|
}).Error("Error reestablishing connection. Retrying...")
|
||||||
|
|
||||||
time.Sleep(c.config.reconnectInterval)
|
time.Sleep(c.config.ReconnectInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,6 +57,7 @@ var consumerTagSeq uint64
|
|||||||
// ConsumerConfig to be used when creating a new producer.
|
// ConsumerConfig to be used when creating a new producer.
|
||||||
type ConsumerConfig struct {
|
type ConsumerConfig struct {
|
||||||
ConsumeRetryInterval time.Duration
|
ConsumeRetryInterval time.Duration
|
||||||
|
MaxRetryDelay time.Duration
|
||||||
PrefetchCount int
|
PrefetchCount int
|
||||||
PrefixName string
|
PrefixName string
|
||||||
}
|
}
|
||||||
@ -66,6 +67,7 @@ type ConsumerConfig struct {
|
|||||||
func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (*consumer, error) {
|
func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (*consumer, error) {
|
||||||
return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{
|
return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{
|
||||||
ConsumeRetryInterval: 2 * time.Second,
|
ConsumeRetryInterval: 2 * time.Second,
|
||||||
|
MaxRetryDelay: 5 * time.Minute,
|
||||||
PrefetchCount: 0,
|
PrefetchCount: 0,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -120,6 +122,10 @@ func (c *consumer) dispatch(msg amqplib.Delivery) {
|
|||||||
delay, isRetry := getXRetryDelayHeader(msg)
|
delay, isRetry := getXRetryDelayHeader(msg)
|
||||||
|
|
||||||
if isRetry {
|
if isRetry {
|
||||||
|
if delay > c.config.MaxRetryDelay {
|
||||||
|
delay = c.config.MaxRetryDelay
|
||||||
|
}
|
||||||
|
|
||||||
logger.WithFields(logrus.Fields{
|
logger.WithFields(logrus.Fields{
|
||||||
"delay": delay.String(),
|
"delay": delay.String(),
|
||||||
"message_id": msg.MessageId,
|
"message_id": msg.MessageId,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user