From 22549a87425614589b509cd1b82ef0099491c00b Mon Sep 17 00:00:00 2001 From: skrater Date: Thu, 16 Aug 2018 14:54:29 -0300 Subject: [PATCH] SNS QOS --- examples/consumer/sns/consumer.go | 18 +++++-- sns/consumer.go | 81 +++++++++++++++++++++++++++---- 2 files changed, 85 insertions(+), 14 deletions(-) diff --git a/examples/consumer/sns/consumer.go b/examples/consumer/sns/consumer.go index 4128f73..26bd9d2 100644 --- a/examples/consumer/sns/consumer.go +++ b/examples/consumer/sns/consumer.go @@ -2,22 +2,32 @@ package main import ( "fmt" + "time" "github.com/eventials/goevents/messaging" "github.com/eventials/goevents/sns" + "github.com/sirupsen/logrus" ) func main() { + logrus.SetLevel(logrus.DebugLevel) + consumer := sns.MustNewConsumer(&sns.ConsumerConfig{ - AccessKey: "", - SecretKey: "", - Region: "us-east-1", - QueueUrl: "https://sqs.us-east-1.amazonaws.com/0000000000/vlab-exams-mp4-dev", + AccessKey: "", + SecretKey: "0", + Region: "us-east-1", + QueueUrl: "https://sqs.us-east-1.amazonaws.com/0000000000/vlab-exams-mp4-dev", + MaxNumberOfMessages: 5, }) + defer consumer.Close() + consumer.Subscribe("arn:aws:sns:us-east-1:0000000000:test", func(e messaging.Event) error { fmt.Println("Action:\t", e.Action) fmt.Println("Body:\t", string(e.Body)) + + time.Sleep(time.Minute) + return nil }, nil) diff --git a/sns/consumer.go b/sns/consumer.go index 9f7b93a..15bbf07 100644 --- a/sns/consumer.go +++ b/sns/consumer.go @@ -46,7 +46,7 @@ type ConsumerConfig struct { func (c *ConsumerConfig) setDefaults() { if c.VisibilityTimeout == 0 { - c.VisibilityTimeout = 20 + c.VisibilityTimeout = 45 } if c.WaitTimeSeconds == 0 { @@ -83,11 +83,14 @@ type handler struct { type consumer struct { sqs *sqs.SQS stop chan bool + qos chan bool config *ConsumerConfig receiveMessageInput *sqs.ReceiveMessageInput m sync.RWMutex wg sync.WaitGroup handlers map[string]handler + processingMessages map[string]bool + mProcessingMessages sync.RWMutex } func NewConsumer(config *ConsumerConfig) (messaging.Consumer, error) { @@ -109,10 +112,12 @@ func NewConsumer(config *ConsumerConfig) (messaging.Consumer, error) { } c := &consumer{ - sqs: sqs.New(sess), - config: config, - stop: make(chan bool), - handlers: make(map[string]handler), + sqs: sqs.New(sess), + config: config, + stop: make(chan bool), + qos: make(chan bool, config.MaxNumberOfMessages), + handlers: make(map[string]handler), + processingMessages: make(map[string]bool), } c.receiveMessageInput = &sqs.ReceiveMessageInput{ @@ -223,6 +228,28 @@ func (c *consumer) callAndHandlePanic(event messaging.Event, fn messaging.EventH return } +func (c *consumer) isMessageProcessing(id string) bool { + c.mProcessingMessages.RLock() + defer c.mProcessingMessages.RUnlock() + + _, ok := c.processingMessages[id] + return ok +} + +func (c *consumer) addMessageProcessing(id string) { + c.mProcessingMessages.Lock() + defer c.mProcessingMessages.Unlock() + + c.processingMessages[id] = true +} + +func (c *consumer) deleteMessageProcessing(id string) { + c.mProcessingMessages.Lock() + defer c.mProcessingMessages.Unlock() + + delete(c.processingMessages, id) +} + func (c *consumer) handleMessage(message *sqs.Message) { sns := &snsMessagePayload{} err := json.Unmarshal([]byte(*message.Body), sns) @@ -235,9 +262,12 @@ func (c *consumer) handleMessage(message *sqs.Message) { return } + id := *message.MessageId + receiptHandle := *message.ReceiptHandle + log := logrus.WithFields(logrus.Fields{ "action": sns.TopicArn, - "message_id": message.MessageId, + "message_id": id, "body": sns.Message, }) @@ -248,9 +278,27 @@ func (c *consumer) handleMessage(message *sqs.Message) { return } + // Check if message is already processing in goroutine. + // This will occur if consumer is slower than VisibilityTimeout. + if c.isMessageProcessing(id) { + log.Debug("Message is already processing.") + return + } + + // QOS: do not consume while prior messages are in goroutines. + c.qos <- true + + c.addMessageProcessing(id) + c.wg.Add(1) go func(event messaging.Event, fn messaging.EventHandler, receiptHandle string) { - defer c.wg.Done() + defer func() { + c.wg.Done() + + c.deleteMessageProcessing(event.Id) + + <-c.qos + }() err := c.callAndHandlePanic(event, fn) @@ -259,9 +307,11 @@ func (c *consumer) handleMessage(message *sqs.Message) { return } + log.Debug("Deleting message.") + _, err = c.sqs.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: aws.String(c.config.QueueUrl), - ReceiptHandle: &receiptHandle, + ReceiptHandle: aws.String(receiptHandle), }) if err != nil { @@ -271,14 +321,24 @@ func (c *consumer) handleMessage(message *sqs.Message) { log.Debug("Message handled successfully.") }(messaging.Event{ - Id: *message.MessageId, + Id: id, Action: handler.action, Body: []byte(sns.Message), Timestamp: stringToTime(*message.Attributes["SentTimestamp"]), - }, handler.fn, *message.ReceiptHandle) + }, handler.fn, receiptHandle) } func (c *consumer) doConsume() { + var nextMessages int64 = c.config.MaxNumberOfMessages - int64(len(c.qos)) + + if nextMessages == 0 { + logrus.Debugf("QOS full with %d.", c.config.MaxNumberOfMessages) + time.Sleep(5 * time.Second) + return + } + + c.receiveMessageInput.MaxNumberOfMessages = aws.Int64(nextMessages) + result, err := c.sqs.ReceiveMessage(c.receiveMessageInput) if err != nil { @@ -296,6 +356,7 @@ func (c *consumer) doConsume() { // Messages successfully handled will be deleted from SQS. // Messages who failed to delete from SQS will be received again, and application needs to handle // by using MessageId. +// Receiving duplicate messages may happen using more than one consumer if not processing in VisibilityTimeout. func (c *consumer) Consume() { logrus.Info("Registered handlers:")