mirror of
https://github.com/eventials/goevents.git
synced 2025-05-01 13:48:56 +08:00
(feat): Refactor and improve PriorityConsume function
This commit is contained in:
parent
82c88a052a
commit
f1b607259f
@ -383,14 +383,23 @@ func (c *consumer) Consume() {
|
|||||||
// PriorityConsume function takes a list of consumer objects and starts consuming them in a loop.
|
// PriorityConsume function takes a list of consumer objects and starts consuming them in a loop.
|
||||||
// The consumers are processed according to their priority. The process is stopped if the stop signal is received.
|
// The consumers are processed according to their priority. The process is stopped if the stop signal is received.
|
||||||
// The function exits when all consumers are stopped.
|
// The function exits when all consumers are stopped.
|
||||||
func PriorityConsume(consumers []*consumer) {
|
func PriorityConsume(consumers []messaging.Consumer) {
|
||||||
logrus.Info("Registered handlers:")
|
logrus.Info("Registered handlers:")
|
||||||
|
|
||||||
|
// consumersQueue holds the actual *consumer types cast from the messaging.Consumer interface
|
||||||
|
consumersQueue := make([]*consumer, len(consumers))
|
||||||
|
|
||||||
// Logging the registered handlers for each consumer
|
// Logging the registered handlers for each consumer
|
||||||
for priority, consumer := range consumers {
|
for priority, c := range consumers {
|
||||||
for _, handler := range consumer.handlers {
|
actualConsumer, ok := c.(*consumer) // Casting to *consumer
|
||||||
|
if !ok {
|
||||||
|
logrus.Error("Failed to cast consumer to consumer type")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, handler := range actualConsumer.handlers {
|
||||||
logrus.Infof(" %s (priority %d)", handler.action, priority)
|
logrus.Infof(" %s (priority %d)", handler.action, priority)
|
||||||
}
|
}
|
||||||
|
consumersQueue[priority] = actualConsumer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Counter for tracking the number of stopped consumers
|
// Counter for tracking the number of stopped consumers
|
||||||
@ -399,9 +408,9 @@ func PriorityConsume(consumers []*consumer) {
|
|||||||
// Main loop to consume messages
|
// Main loop to consume messages
|
||||||
for {
|
for {
|
||||||
// Iterate over each consumer
|
// Iterate over each consumer
|
||||||
for _, consumer := range consumers {
|
for _, consumer := range consumersQueue {
|
||||||
// Determine if a consumer should consume messages
|
// Determine if a consumer should consume messages
|
||||||
shouldConsume := !consumer.stopped && checkPriorityMessages(consumers, consumer)
|
shouldConsume := !consumer.stopped && checkPriorityMessages(consumersQueue, consumer)
|
||||||
|
|
||||||
if !shouldConsume {
|
if !shouldConsume {
|
||||||
continue
|
continue
|
||||||
|
Loading…
x
Reference in New Issue
Block a user