mirror of
https://github.com/eventials/goevents.git
synced 2025-04-24 13:48:53 +08:00
(feat): implement PriorityConsume function
The `PriorityConsume` function consumes messages based on the priority of consumers. The function registers and logs the handlers of each consumer. It then starts consuming messages in a loop, prioritizing the consumers accordingly. The consumption stops if a stop signal is received from a consumer. The function also checks and handles the scenario when all consumers are stopped. The `checkPriorityMessages` function is also introduced which determines if a consumer is eligible to consume a message, considering the messages in higher priority consumers. The code has been refactored for improved readability, and detailed comments have been added for better understanding.
This commit is contained in:
parent
68652d356d
commit
82c88a052a
@ -92,6 +92,7 @@ type consumer struct {
|
||||
processingMessages map[string]bool
|
||||
mProcessingMessages sync.RWMutex
|
||||
closeOnce sync.Once
|
||||
stopped bool
|
||||
}
|
||||
|
||||
func NewConsumer(config *ConsumerConfig) (messaging.Consumer, error) {
|
||||
@ -379,6 +380,79 @@ func (c *consumer) Consume() {
|
||||
}
|
||||
}
|
||||
|
||||
// PriorityConsume function takes a list of consumer objects and starts consuming them in a loop.
|
||||
// The consumers are processed according to their priority. The process is stopped if the stop signal is received.
|
||||
// The function exits when all consumers are stopped.
|
||||
func PriorityConsume(consumers []*consumer) {
|
||||
logrus.Info("Registered handlers:")
|
||||
|
||||
// Logging the registered handlers for each consumer
|
||||
for priority, consumer := range consumers {
|
||||
for _, handler := range consumer.handlers {
|
||||
logrus.Infof(" %s (priority %d)", handler.action, priority)
|
||||
}
|
||||
}
|
||||
|
||||
// Counter for tracking the number of stopped consumers
|
||||
var consumersStopped int
|
||||
|
||||
// Main loop to consume messages
|
||||
for {
|
||||
// Iterate over each consumer
|
||||
for _, consumer := range consumers {
|
||||
// Determine if a consumer should consume messages
|
||||
shouldConsume := !consumer.stopped && checkPriorityMessages(consumers, consumer)
|
||||
|
||||
if !shouldConsume {
|
||||
continue
|
||||
}
|
||||
|
||||
// If the consumer is still active, try to consume a message
|
||||
select {
|
||||
case <-consumer.stop:
|
||||
// If a stop signal is received, stop the consumer and increase the count of stopped consumers
|
||||
consumer.stopped = true
|
||||
consumersStopped++
|
||||
default:
|
||||
// If no stop signal, consume the message
|
||||
consumer.doConsume()
|
||||
}
|
||||
}
|
||||
|
||||
// If all consumers are stopped, exit the function
|
||||
if consumersStopped == len(consumers) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkPriorityMessages checks whether the given consumer can consume a message.
|
||||
// It checks all other consumers and allows a consumer to consume only if there are no higher priority consumers with messages to consume.
|
||||
func checkPriorityMessages(consumers []*consumer, currentConsumer *consumer) bool {
|
||||
// Iterate over each consumer
|
||||
for _, consumer := range consumers {
|
||||
|
||||
// Skip stopped consumers
|
||||
if consumer.stopped {
|
||||
continue
|
||||
}
|
||||
|
||||
// If we reached the currentConsumer in the iteration, then all higher priority consumers were checked and they don't have any messages to consume.
|
||||
// So, the currentConsumer is allowed to consume.
|
||||
if consumer == currentConsumer {
|
||||
return true
|
||||
}
|
||||
|
||||
// If any higher priority consumer has messages to consume, then the currentConsumer should not consume.
|
||||
if len(consumer.qos) > 0 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// If no consumers were found to have messages, the currentConsumer can consume.
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *consumer) doClose() {
|
||||
logrus.Info("Closing SNS consumer...")
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user