From a06c4a72a1b45d5e403797d22e80f2a513a63c75 Mon Sep 17 00:00:00 2001 From: skrater Date: Wed, 15 Aug 2018 18:00:19 -0300 Subject: [PATCH] Handle events async --- examples/consumer/sns/consumer.go | 6 ++-- sns/consumer.go | 47 ++++++++++++++++++------------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/examples/consumer/sns/consumer.go b/examples/consumer/sns/consumer.go index 2ccb45b..4128f73 100644 --- a/examples/consumer/sns/consumer.go +++ b/examples/consumer/sns/consumer.go @@ -16,12 +16,14 @@ func main() { }) consumer.Subscribe("arn:aws:sns:us-east-1:0000000000:test", func(e messaging.Event) error { - fmt.Println(e) + fmt.Println("Action:\t", e.Action) + fmt.Println("Body:\t", string(e.Body)) return nil }, nil) consumer.Subscribe("arn:aws:sns:us-east-1:0000000000:test2", func(e messaging.Event) error { - fmt.Println(e) + fmt.Println("Action:\t", e.Action) + fmt.Println("Body:\t", string(e.Body)) return nil }, nil) diff --git a/sns/consumer.go b/sns/consumer.go index bf59937..9f7b93a 100644 --- a/sns/consumer.go +++ b/sns/consumer.go @@ -86,6 +86,7 @@ type consumer struct { config *ConsumerConfig receiveMessageInput *sqs.ReceiveMessageInput m sync.RWMutex + wg sync.WaitGroup handlers map[string]handler } @@ -243,33 +244,38 @@ func (c *consumer) handleMessage(message *sqs.Message) { handler, ok := c.getHandler(sns.TopicArn) if !ok { - log.WithError(err).Error("Action not found.") + log.Error("Action not found.") return } - err = c.callAndHandlePanic(messaging.Event{ + c.wg.Add(1) + go func(event messaging.Event, fn messaging.EventHandler, receiptHandle string) { + defer c.wg.Done() + + err := c.callAndHandlePanic(event, fn) + + if err != nil { + log.WithError(err).Debug("Failed to process event.") + return + } + + _, err = c.sqs.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: aws.String(c.config.QueueUrl), + ReceiptHandle: &receiptHandle, + }) + + if err != nil { + log.WithError(err).Error("Failed to delete message.") + return + } + + log.Debug("Message handled successfully.") + }(messaging.Event{ Id: *message.MessageId, Action: handler.action, Body: []byte(sns.Message), Timestamp: stringToTime(*message.Attributes["SentTimestamp"]), - }, handler.fn) - - if err != nil { - log.WithError(err).Debug("Failed to process event.") - return - } - - _, err = c.sqs.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: aws.String(c.config.QueueUrl), - ReceiptHandle: message.ReceiptHandle, - }) - - if err != nil { - log.WithError(err).Error("Failed to delete message.") - return - } - - log.Debug("Message handled successfully.") + }, handler.fn, *message.ReceiptHandle) } func (c *consumer) doConsume() { @@ -313,4 +319,5 @@ func (c *consumer) Consume() { func (c *consumer) Close() { c.stop <- true + c.wg.Wait() }