1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

not use producer in consumer

This commit is contained in:
Guilherme 2017-10-10 18:06:42 -03:00
parent ef3afa56c1
commit 3d6ee55fa8

View File

@ -2,14 +2,14 @@ package amqp
import (
"errors"
"fmt"
"github.com/eventials/goevents/messaging"
log "github.com/sirupsen/logrus"
amqplib "github.com/streadway/amqp"
"regexp"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
amqplib "github.com/streadway/amqp"
)
var (
@ -46,8 +46,6 @@ type Consumer struct {
queueName string
closed bool
producer *producer
}
// ConsumerConfig to be used when creating a new producer.
@ -69,12 +67,6 @@ func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (
// NewConsumerConfig returns a new AMQP Consumer.
func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue string, config ConsumerConfig) (messaging.Consumer, error) {
producer, err := NewProducer(c, "")
if err != nil {
return nil, err
}
consumer := &Consumer{
config: config,
conn: c.(*Connection),
@ -82,14 +74,18 @@ func NewConsumerConfig(c messaging.Connection, autoAck bool, exchange, queue str
handlers: make([]handler, 0),
exchangeName: exchange,
queueName: queue,
producer: producer,
}
err = consumer.setupTopology()
err := consumer.setupTopology()
if err != nil {
return nil, err
}
go consumer.handleReestablishedConnnection()
return consumer, err
}
func (c *Consumer) Close() {
@ -106,8 +102,6 @@ func (c *Consumer) Close() {
// Wait all go routine finish.
c.wg.Wait()
c.producer.Close()
}
func (c *Consumer) setupTopology() error {
@ -255,6 +249,34 @@ func (c *Consumer) doDispatch(msg amqplib.Delivery, h *handler, retryCount int32
}
}
func (c *Consumer) publishMessage(msg amqplib.Publishing, queue string) error {
channel, err := c.conn.OpenChannel()
if err != nil {
return err
}
defer channel.Close()
if err := channel.Confirm(false); err != nil {
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}
confirms := channel.NotifyPublish(make(chan amqplib.Confirmation, 1))
err = channel.Publish("", queue, false, false, msg)
if err != nil {
return err
} else {
if confirmed := <-confirms; !confirmed.Ack {
return ErrNotAcked
}
}
return nil
}
func (c *Consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int32, delay time.Duration) {
delayNs := delay.Nanoseconds()
@ -276,9 +298,17 @@ func (c *Consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int
MessageId: msg.MessageId,
}
c.producer.publishAmqMessage(c.queueName, retryMsg)
err := c.publishMessage(retryMsg, c.queueName)
if !c.autoAck {
if err != nil {
logger.WithFields(log.Fields{
"error": err,
}).Error("Failed to retry.")
if !c.autoAck {
msg.Nack(false, true)
}
} else if !c.autoAck {
msg.Ack(false)
}
}