From dc97af32c1051dec1950fc4a996fb3917e3110b9 Mon Sep 17 00:00:00 2001 From: Eddy Santos Date: Wed, 3 Apr 2024 16:43:46 -0300 Subject: [PATCH] (feat): Refactor producer to support publishing messages to a queue --- amqp/producer.go | 18 +++++------ mock/producer.go | 4 +++ sns/producer.go | 80 +++++++++++++++++++++++++++++++++++++----------- 3 files changed, 74 insertions(+), 28 deletions(-) diff --git a/amqp/producer.go b/amqp/producer.go index 9f500c5..c3caf9d 100644 --- a/amqp/producer.go +++ b/amqp/producer.go @@ -10,7 +10,6 @@ import ( "github.com/eventials/goevents/messaging" log "github.com/sirupsen/logrus" - "github.com/streadway/amqp" amqplib "github.com/streadway/amqp" ) @@ -94,12 +93,17 @@ func (p *producer) Publish(action string, data []byte) { DeliveryMode: amqplib.Persistent, Timestamp: now, Body: data, - Headers: amqp.Table{ + Headers: amqplib.Table{ "x-epoch-milli": int64(now.UnixNano()/int64(time.Nanosecond)) / int64(time.Millisecond), }, }) } +// PublishToQueue publishes a message to a queue. +func (p *producer) PublishToQueue(queue string, data []byte) { + // not implemented yet +} + func (p *producer) publishAmqMessage(queue string, msg amqplib.Publishing) { p.wg.Add(1) @@ -188,10 +192,6 @@ func (p *producer) setupTopology() error { } if p.exchangeName != "" { - if err != nil { - return err - } - err = channel.ExchangeDeclare( p.exchangeName, // name "topic", // type @@ -210,7 +210,7 @@ func (p *producer) setupTopology() error { err = channel.Confirm(false) if err != nil { - err = fmt.Errorf("Channel could not be put into confirm mode: %s", err) + err = fmt.Errorf("channel could not be put into confirm mode: %s", err) return err } @@ -315,7 +315,7 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err case error: err = x default: - err = errors.New("Unknown panic") + err = errors.New("unknown panic") } } }() @@ -345,8 +345,6 @@ func (p *producer) publishMessage(msg amqplib.Publishing, queue string) (err err err = ErrTimedout return } - - return } func (p *producer) isClosed() bool { diff --git a/mock/producer.go b/mock/producer.go index 4d74cf0..ccf81c5 100644 --- a/mock/producer.go +++ b/mock/producer.go @@ -17,6 +17,10 @@ func (p *Producer) Publish(action string, data []byte) { p.Called(action, data) } +func (p *Producer) PublishToQueue(queue string, data []byte) { + p.Called(queue, data) +} + func (p *Producer) Close() { p.Called() } diff --git a/sns/producer.go b/sns/producer.go index 44ed603..38ddb9e 100644 --- a/sns/producer.go +++ b/sns/producer.go @@ -8,13 +8,23 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sns" + "github.com/aws/aws-sdk-go/service/sqs" "github.com/eventials/goevents/messaging" "github.com/sirupsen/logrus" ) +type SendMessageType int + +const ( + SendWithAction SendMessageType = iota + SendWithQueue +) + type message struct { - action string - data []byte + action string + queue string + messageType SendMessageType + data []byte } type ProducerConfig struct { @@ -50,6 +60,7 @@ type producer struct { config ProducerConfig m sync.Mutex sns *sns.SNS + sqs *sqs.SQS internalQueue chan message closes []chan bool closed bool @@ -76,6 +87,7 @@ func NewProducer(config ProducerConfig) (messaging.Producer, error) { p := &producer{ sns: sns.New(sess), + sqs: sqs.New(sess), internalQueue: make(chan message), config: config, closed: false, @@ -98,12 +110,21 @@ func MustNewProducer(config ProducerConfig) messaging.Producer { func (p *producer) Publish(action string, data []byte) { p.internalQueue <- message{ - action: action, - data: data, + action: action, + messageType: SendWithAction, + data: data, } } +func (p *producer) PublishToQueue(queue string, data []byte) { + p.internalQueue <- message{ + queue: queue, + messageType: SendWithQueue, + data: data, + } +} + func (p *producer) isClosed() bool { p.m.Lock() defer p.m.Unlock() @@ -116,24 +137,47 @@ func (p *producer) drainInternalQueue() { retry := true for retry && !p.isClosed() { - output, err := p.sns.Publish(&sns.PublishInput{ - Message: aws.String(string(m.data)), - TopicArn: aws.String(m.action), - }) + if m.messageType == SendWithAction { + output, err := p.sns.Publish(&sns.PublishInput{ + Message: aws.String(string(m.data)), + TopicArn: aws.String(m.action), + }) - if err != nil { - logrus.WithFields(logrus.Fields{ - "message": m, - "error": err, - }).Error("Failed to publish message.") + if err != nil { + logrus.WithFields(logrus.Fields{ + "message": m, + "error": err, + }).Error("Failed to publish message.") - time.Sleep(p.config.PublishInterval) + time.Sleep(p.config.PublishInterval) + } else { + logrus.WithFields(logrus.Fields{ + "message_id": output.MessageId, + }).Debug("Successfully published message.") + + retry = false + } } else { - logrus.WithFields(logrus.Fields{ - "message_id": output.MessageId, - }).Debug("Successfully published message.") + output, err := p.sqs.SendMessage(&sqs.SendMessageInput{ + MessageBody: aws.String(string(m.data)), + QueueUrl: aws.String(m.queue), + DelaySeconds: aws.Int64(0), + }) - retry = false + if err != nil { + logrus.WithFields(logrus.Fields{ + "message": m, + "error": err, + }).Error("Failed to publish message.") + + time.Sleep(p.config.PublishInterval) + } else { + logrus.WithFields(logrus.Fields{ + "message_id": output.MessageId, + }).Debug("Successfully published message.") + + retry = false + } } } }