From 82c88a052a69acd92c060bd677f99770557edf0f Mon Sep 17 00:00:00 2001 From: Eddy Santos Date: Fri, 2 Jun 2023 16:12:26 -0300 Subject: [PATCH 1/4] (feat): implement PriorityConsume function The `PriorityConsume` function consumes messages based on the priority of consumers. The function registers and logs the handlers of each consumer. It then starts consuming messages in a loop, prioritizing the consumers accordingly. The consumption stops if a stop signal is received from a consumer. The function also checks and handles the scenario when all consumers are stopped. The `checkPriorityMessages` function is also introduced which determines if a consumer is eligible to consume a message, considering the messages in higher priority consumers. The code has been refactored for improved readability, and detailed comments have been added for better understanding. --- sns/consumer.go | 74 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/sns/consumer.go b/sns/consumer.go index 1acf6c6..1360ed2 100644 --- a/sns/consumer.go +++ b/sns/consumer.go @@ -92,6 +92,7 @@ type consumer struct { processingMessages map[string]bool mProcessingMessages sync.RWMutex closeOnce sync.Once + stopped bool } func NewConsumer(config *ConsumerConfig) (messaging.Consumer, error) { @@ -379,6 +380,79 @@ func (c *consumer) Consume() { } } +// PriorityConsume function takes a list of consumer objects and starts consuming them in a loop. +// The consumers are processed according to their priority. The process is stopped if the stop signal is received. +// The function exits when all consumers are stopped. +func PriorityConsume(consumers []*consumer) { + logrus.Info("Registered handlers:") + + // Logging the registered handlers for each consumer + for priority, consumer := range consumers { + for _, handler := range consumer.handlers { + logrus.Infof(" %s (priority %d)", handler.action, priority) + } + } + + // Counter for tracking the number of stopped consumers + var consumersStopped int + + // Main loop to consume messages + for { + // Iterate over each consumer + for _, consumer := range consumers { + // Determine if a consumer should consume messages + shouldConsume := !consumer.stopped && checkPriorityMessages(consumers, consumer) + + if !shouldConsume { + continue + } + + // If the consumer is still active, try to consume a message + select { + case <-consumer.stop: + // If a stop signal is received, stop the consumer and increase the count of stopped consumers + consumer.stopped = true + consumersStopped++ + default: + // If no stop signal, consume the message + consumer.doConsume() + } + } + + // If all consumers are stopped, exit the function + if consumersStopped == len(consumers) { + return + } + } +} + +// checkPriorityMessages checks whether the given consumer can consume a message. +// It checks all other consumers and allows a consumer to consume only if there are no higher priority consumers with messages to consume. +func checkPriorityMessages(consumers []*consumer, currentConsumer *consumer) bool { + // Iterate over each consumer + for _, consumer := range consumers { + + // Skip stopped consumers + if consumer.stopped { + continue + } + + // If we reached the currentConsumer in the iteration, then all higher priority consumers were checked and they don't have any messages to consume. + // So, the currentConsumer is allowed to consume. + if consumer == currentConsumer { + return true + } + + // If any higher priority consumer has messages to consume, then the currentConsumer should not consume. + if len(consumer.qos) > 0 { + return false + } + } + + // If no consumers were found to have messages, the currentConsumer can consume. + return true +} + func (c *consumer) doClose() { logrus.Info("Closing SNS consumer...") From f1b607259f7019fa2627dc0315312c60a79d6406 Mon Sep 17 00:00:00 2001 From: Eddy Santos Date: Fri, 2 Jun 2023 17:03:27 -0300 Subject: [PATCH 2/4] (feat): Refactor and improve PriorityConsume function --- sns/consumer.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/sns/consumer.go b/sns/consumer.go index 1360ed2..d32b07d 100644 --- a/sns/consumer.go +++ b/sns/consumer.go @@ -383,14 +383,23 @@ func (c *consumer) Consume() { // PriorityConsume function takes a list of consumer objects and starts consuming them in a loop. // The consumers are processed according to their priority. The process is stopped if the stop signal is received. // The function exits when all consumers are stopped. -func PriorityConsume(consumers []*consumer) { +func PriorityConsume(consumers []messaging.Consumer) { logrus.Info("Registered handlers:") + // consumersQueue holds the actual *consumer types cast from the messaging.Consumer interface + consumersQueue := make([]*consumer, len(consumers)) + // Logging the registered handlers for each consumer - for priority, consumer := range consumers { - for _, handler := range consumer.handlers { + for priority, c := range consumers { + actualConsumer, ok := c.(*consumer) // Casting to *consumer + if !ok { + logrus.Error("Failed to cast consumer to consumer type") + return + } + for _, handler := range actualConsumer.handlers { logrus.Infof(" %s (priority %d)", handler.action, priority) } + consumersQueue[priority] = actualConsumer } // Counter for tracking the number of stopped consumers @@ -399,9 +408,9 @@ func PriorityConsume(consumers []*consumer) { // Main loop to consume messages for { // Iterate over each consumer - for _, consumer := range consumers { + for _, consumer := range consumersQueue { // Determine if a consumer should consume messages - shouldConsume := !consumer.stopped && checkPriorityMessages(consumers, consumer) + shouldConsume := !consumer.stopped && checkPriorityMessages(consumersQueue, consumer) if !shouldConsume { continue From 3ea7599c2cc08bb570f2be7afe71d27b65b0d4bd Mon Sep 17 00:00:00 2001 From: Eddy Santos Date: Mon, 5 Jun 2023 13:59:55 -0300 Subject: [PATCH 3/4] (debug): Add debug information for priority check - Added a debug log statement in the checkPriorityMessages function - The debug log outputs when a higher priority consumer has messages in queue, skipping the current consumer --- sns/consumer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sns/consumer.go b/sns/consumer.go index d32b07d..afdc112 100644 --- a/sns/consumer.go +++ b/sns/consumer.go @@ -454,6 +454,7 @@ func checkPriorityMessages(consumers []*consumer, currentConsumer *consumer) boo // If any higher priority consumer has messages to consume, then the currentConsumer should not consume. if len(consumer.qos) > 0 { + logrus.Debugf("Higher priority consumer %s has messages to consume, skipping current consumer %s", consumer.config.QueueUrl, currentConsumer.config.QueueUrl) return false } } From da936f49005360685755438978a76a6e5624f6dc Mon Sep 17 00:00:00 2001 From: Eddy Santos Date: Mon, 5 Jun 2023 15:19:10 -0300 Subject: [PATCH 4/4] (feat): Implement func to estimate messages queue - Add approximateNumberOfMessages method to the consumer type - This function utilizes AWS SQS's GetQueueAttributes API to fetch an estimated count of the messages currently in the queue --- sns/consumer.go | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/sns/consumer.go b/sns/consumer.go index afdc112..24e87df 100644 --- a/sns/consumer.go +++ b/sns/consumer.go @@ -452,8 +452,15 @@ func checkPriorityMessages(consumers []*consumer, currentConsumer *consumer) boo return true } + // Get the approximate number of messages in the queue for the consumer + qtMessages, err := consumer.approximateNumberOfMessages() + if err != nil { + logrus.WithError(err).Errorf("Failed to get approximate number of messages for consumer %s", consumer.config.QueueUrl) + continue + } + // If any higher priority consumer has messages to consume, then the currentConsumer should not consume. - if len(consumer.qos) > 0 { + if len(consumer.qos)+qtMessages > 0 { logrus.Debugf("Higher priority consumer %s has messages to consume, skipping current consumer %s", consumer.config.QueueUrl, currentConsumer.config.QueueUrl) return false } @@ -463,6 +470,25 @@ func checkPriorityMessages(consumers []*consumer, currentConsumer *consumer) boo return true } +// approximateNumberOfMessages is a method on the consumer type that gets the approximate number of messages in the queue. +// It uses AWS SQS's GetQueueAttributes API to fetch the ApproximateNumberOfMessages attribute, +// which gives an estimate of the number of visible messages in the queue. +func (c *consumer) approximateNumberOfMessages() (int, error) { + // Request to get queue attributes from AWS SQS + result, err := c.sqs.GetQueueAttributes(&sqs.GetQueueAttributesInput{ + QueueUrl: aws.String(c.config.QueueUrl), // Specify the queue URL from consumer config + AttributeNames: []*string{aws.String(sqs.QueueAttributeNameApproximateNumberOfMessages)}, // Specify the attribute we want to fetch + }) + + // If there's an error in fetching the queue attributes, return 0 and the error + if err != nil { + return 0, err + } + + // If the fetch was successful, convert the returned attribute (which is a string) to an integer + return strconv.Atoi(*result.Attributes[sqs.QueueAttributeNameApproximateNumberOfMessages]) +} + func (c *consumer) doClose() { logrus.Info("Closing SNS consumer...")