1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

Consume messages from AWS SQS and SNS

This commit is contained in:
skrater 2018-08-15 17:00:24 -03:00
parent 14d32d7f49
commit 4e8a4889d6
5 changed files with 309 additions and 17 deletions

View File

@ -21,7 +21,8 @@ WORKDIR /go/src/github.com/eventials/goevents
RUN go get \
github.com/streadway/amqp \
github.com/sirupsen/logrus \
github.com/stretchr/testify
github.com/stretchr/testify \
github.com/aws/aws-sdk-go/service/sqs
ENTRYPOINT ["dockerize"]

View File

@ -63,16 +63,3 @@ if err != nil {
panic(err)
}
```
### Actions
The action can be a full word, a wildcard (`*`) or multiple words or wildcards delimited by dots (`.`)
Look the examples below:
- The action handler `my_action` will match only `my_action` event.
- The action handler `my_action.foo` will match only `my_action.foo` event.
- The action handler `my_action.*` will match `my_action.foo`, `my_action.bar` and all `my_action.*` events.
- The action handler `my_action.foo.bar` will match only `my_action.foo.bar` event.
- The action handler `my_action.*.bar` will match `my_action.foo.bar`, `my_action.bar.bar` and all `my_action.*.bar` events.
- The action handler `*` will match all events.

View File

@ -0,0 +1,29 @@
package main
import (
"fmt"
"github.com/eventials/goevents/messaging"
"github.com/eventials/goevents/sns"
)
func main() {
consumer := sns.NewConsumer(sns.ConsumerConfig{
AccessKey: "",
SecretKey: "",
Region: "us-east-1",
QueueUrl: "https://sqs.us-east-1.amazonaws.com/0000000000/vlab-exams-mp4-dev",
})
consumer.Subscribe("arn:aws:sns:us-east-1:0000000000:test", func(e messaging.Event) error {
fmt.Println(e)
return nil
}, nil)
consumer.Subscribe("arn:aws:sns:us-east-1:0000000000:test2", func(e messaging.Event) error {
fmt.Println(e)
return nil
}, nil)
consumer.Consume()
}

View File

@ -4,7 +4,4 @@ type Connection interface {
Consumer(autoAck bool, exchange, queue string) (Consumer, error)
Producer(exchange string) (Producer, error)
Close()
NotifyConnectionClose() <-chan error
NotifyReestablish() <-chan bool
IsConnected() bool
}

278
sns/consumer.go Normal file
View File

@ -0,0 +1,278 @@
package sns
import (
"encoding/json"
"errors"
"strconv"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/eventials/goevents/messaging"
"github.com/sirupsen/logrus"
)
type snsMessagePayload struct {
Message string `json:"Message"`
MessageID string `json:"MessageId"`
Signature string `json:"Signature"`
SignatureVersion string `json:"SignatureVersion"`
SigningCertURL string `json:"SigningCertURL"`
Subject string `json:"Subject"`
Timestamp time.Time `json:"Timestamp"`
TopicArn string `json:"TopicArn"`
Type string `json:"Type"`
UnsubscribeURL string `json:"UnsubscribeURL"`
}
type ConsumerConfig struct {
AccessKey string
SecretKey string
Region string
VisibilityTimeout int64
WaitTimeSeconds int64
MaxNumberOfMessages int64
QueueUrl string
}
type handler struct {
action string
fn messaging.EventHandler
options *messaging.SubscribeOptions
}
type consumer struct {
sqs *sqs.SQS
stop chan bool
config ConsumerConfig
receiveMessageInput *sqs.ReceiveMessageInput
m sync.RWMutex
handlers map[string]handler
}
func NewConsumer(config ConsumerConfig) messaging.Consumer {
creds := credentials.NewStaticCredentials(config.AccessKey, config.SecretKey, "")
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(config.Region),
Credentials: creds,
}))
if config.VisibilityTimeout == 0 {
config.VisibilityTimeout = 20
}
if config.WaitTimeSeconds == 0 {
config.WaitTimeSeconds = 20
}
if config.MaxNumberOfMessages == 0 {
config.MaxNumberOfMessages = 5
}
c := &consumer{
sqs: sqs.New(sess),
config: config,
stop: make(chan bool),
handlers: make(map[string]handler),
}
c.receiveMessageInput = &sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(c.config.QueueUrl),
MaxNumberOfMessages: aws.Int64(c.config.MaxNumberOfMessages),
VisibilityTimeout: aws.Int64(c.config.VisibilityTimeout),
WaitTimeSeconds: aws.Int64(c.config.WaitTimeSeconds),
}
return c
}
func (c *consumer) getHandler(action string) (handler, bool) {
c.m.RLock()
defer c.m.RUnlock()
hnd, ok := c.handlers[action]
return hnd, ok
}
// Subscribe subscribes an handler to a action. Action must be ARN URL.
func (c *consumer) Subscribe(action string, handlerFn messaging.EventHandler, options *messaging.SubscribeOptions) error {
c.m.Lock()
defer c.m.Unlock()
c.handlers[action] = handler{
action: action,
options: options,
fn: handlerFn,
}
return nil
}
func (c *consumer) Unsubscribe(action string) error {
c.m.Lock()
defer c.m.Unlock()
delete(c.handlers, action)
return nil
}
// TODO: needs to bind SNS Topic (actions) to SQS Queue in AWS.
func (c *consumer) BindActions(actions ...string) error {
return nil
}
// TODO: reverts anything done by BindActions
func (c *consumer) UnbindActions(actions ...string) error {
c.m.Lock()
defer c.m.Unlock()
for _, action := range actions {
delete(c.handlers, action)
}
return nil
}
func stringToTime(t string) time.Time {
if t == "" {
return time.Time{}
}
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return time.Time{}
}
return time.Unix(0, i*1000000)
}
func (c *consumer) callAndHandlePanic(event messaging.Event, fn messaging.EventHandler) (err error) {
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case string:
err = errors.New(x)
case error:
err = x
default:
err = errors.New("Unknown panic")
}
}
}()
err = fn(event)
return
}
func (c *consumer) handleMessage(message *sqs.Message) {
sns := &snsMessagePayload{}
err := json.Unmarshal([]byte(*message.Body), sns)
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"body": []byte(*message.Body),
}).Error("Failed to unmarshall sns message.")
return
}
handler, ok := c.getHandler(sns.TopicArn)
if !ok {
logrus.WithFields(logrus.Fields{
"error": err,
"action": sns.TopicArn,
}).Error("Action not found.")
return
}
err = c.callAndHandlePanic(messaging.Event{
Id: *message.MessageId,
Action: handler.action,
Body: []byte(sns.Message),
Timestamp: stringToTime(*message.Attributes["SentTimestamp"]),
}, handler.fn)
if err != nil {
logrus.WithFields(logrus.Fields{
"action": handler.action,
"message_id": message.MessageId,
"body": sns.Message,
"error": err,
}).Debug("Failed to process event.")
return
}
_, err = c.sqs.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(c.config.QueueUrl),
ReceiptHandle: message.ReceiptHandle,
})
if err != nil {
logrus.WithError(err).Error("Failed to delete message.")
return
}
logrus.WithFields(logrus.Fields{
"action": handler.action,
"body": sns.Message,
"message_id": message.MessageId,
}).Debug("Message handled successfully.")
}
func (c *consumer) doConsume() {
result, err := c.sqs.ReceiveMessage(c.receiveMessageInput)
if err != nil {
logrus.WithError(err).Error("Failed to get messages.")
return
}
for _, message := range result.Messages {
c.handleMessage(message)
}
}
// Consume consumes with long-poll, messages from AWS SQS and dispatch to handlers.
// Polling time is configured by WaitTimeSeconds.
// Messages successfully handled will be deleted from SQS.
// Messages who failed to delete from SQS will be received again, and application needs to handle
// by using MessageId.
func (c *consumer) Consume() {
logrus.Info("Registered handlers:")
for _, handler := range c.handlers {
logrus.Infof(" %s", handler.action)
}
logrus.WithFields(logrus.Fields{
"queue": c.config.QueueUrl,
}).Info("Consuming messages...")
for {
select {
case <-c.stop:
return
default:
c.doConsume()
}
}
}
func (c *consumer) Close() {
c.stop <- true
}