1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

Handle events async

This commit is contained in:
skrater 2018-08-15 18:00:19 -03:00
parent 15e7e8fbc3
commit a06c4a72a1
2 changed files with 31 additions and 22 deletions

View File

@ -16,12 +16,14 @@ func main() {
})
consumer.Subscribe("arn:aws:sns:us-east-1:0000000000:test", func(e messaging.Event) error {
fmt.Println(e)
fmt.Println("Action:\t", e.Action)
fmt.Println("Body:\t", string(e.Body))
return nil
}, nil)
consumer.Subscribe("arn:aws:sns:us-east-1:0000000000:test2", func(e messaging.Event) error {
fmt.Println(e)
fmt.Println("Action:\t", e.Action)
fmt.Println("Body:\t", string(e.Body))
return nil
}, nil)

View File

@ -86,6 +86,7 @@ type consumer struct {
config *ConsumerConfig
receiveMessageInput *sqs.ReceiveMessageInput
m sync.RWMutex
wg sync.WaitGroup
handlers map[string]handler
}
@ -243,33 +244,38 @@ func (c *consumer) handleMessage(message *sqs.Message) {
handler, ok := c.getHandler(sns.TopicArn)
if !ok {
log.WithError(err).Error("Action not found.")
log.Error("Action not found.")
return
}
err = c.callAndHandlePanic(messaging.Event{
c.wg.Add(1)
go func(event messaging.Event, fn messaging.EventHandler, receiptHandle string) {
defer c.wg.Done()
err := c.callAndHandlePanic(event, fn)
if err != nil {
log.WithError(err).Debug("Failed to process event.")
return
}
_, err = c.sqs.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(c.config.QueueUrl),
ReceiptHandle: &receiptHandle,
})
if err != nil {
log.WithError(err).Error("Failed to delete message.")
return
}
log.Debug("Message handled successfully.")
}(messaging.Event{
Id: *message.MessageId,
Action: handler.action,
Body: []byte(sns.Message),
Timestamp: stringToTime(*message.Attributes["SentTimestamp"]),
}, handler.fn)
if err != nil {
log.WithError(err).Debug("Failed to process event.")
return
}
_, err = c.sqs.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(c.config.QueueUrl),
ReceiptHandle: message.ReceiptHandle,
})
if err != nil {
log.WithError(err).Error("Failed to delete message.")
return
}
log.Debug("Message handled successfully.")
}, handler.fn, *message.ReceiptHandle)
}
func (c *consumer) doConsume() {
@ -313,4 +319,5 @@ func (c *consumer) Consume() {
func (c *consumer) Close() {
c.stop <- true
c.wg.Wait()
}