1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00
This commit is contained in:
skrater 2018-08-16 14:54:29 -03:00
parent a06c4a72a1
commit 22549a8742
2 changed files with 85 additions and 14 deletions

View File

@ -2,22 +2,32 @@ package main
import ( import (
"fmt" "fmt"
"time"
"github.com/eventials/goevents/messaging" "github.com/eventials/goevents/messaging"
"github.com/eventials/goevents/sns" "github.com/eventials/goevents/sns"
"github.com/sirupsen/logrus"
) )
func main() { func main() {
logrus.SetLevel(logrus.DebugLevel)
consumer := sns.MustNewConsumer(&sns.ConsumerConfig{ consumer := sns.MustNewConsumer(&sns.ConsumerConfig{
AccessKey: "", AccessKey: "",
SecretKey: "", SecretKey: "0",
Region: "us-east-1", Region: "us-east-1",
QueueUrl: "https://sqs.us-east-1.amazonaws.com/0000000000/vlab-exams-mp4-dev", QueueUrl: "https://sqs.us-east-1.amazonaws.com/0000000000/vlab-exams-mp4-dev",
MaxNumberOfMessages: 5,
}) })
defer consumer.Close()
consumer.Subscribe("arn:aws:sns:us-east-1:0000000000:test", func(e messaging.Event) error { consumer.Subscribe("arn:aws:sns:us-east-1:0000000000:test", func(e messaging.Event) error {
fmt.Println("Action:\t", e.Action) fmt.Println("Action:\t", e.Action)
fmt.Println("Body:\t", string(e.Body)) fmt.Println("Body:\t", string(e.Body))
time.Sleep(time.Minute)
return nil return nil
}, nil) }, nil)

View File

@ -46,7 +46,7 @@ type ConsumerConfig struct {
func (c *ConsumerConfig) setDefaults() { func (c *ConsumerConfig) setDefaults() {
if c.VisibilityTimeout == 0 { if c.VisibilityTimeout == 0 {
c.VisibilityTimeout = 20 c.VisibilityTimeout = 45
} }
if c.WaitTimeSeconds == 0 { if c.WaitTimeSeconds == 0 {
@ -83,11 +83,14 @@ type handler struct {
type consumer struct { type consumer struct {
sqs *sqs.SQS sqs *sqs.SQS
stop chan bool stop chan bool
qos chan bool
config *ConsumerConfig config *ConsumerConfig
receiveMessageInput *sqs.ReceiveMessageInput receiveMessageInput *sqs.ReceiveMessageInput
m sync.RWMutex m sync.RWMutex
wg sync.WaitGroup wg sync.WaitGroup
handlers map[string]handler handlers map[string]handler
processingMessages map[string]bool
mProcessingMessages sync.RWMutex
} }
func NewConsumer(config *ConsumerConfig) (messaging.Consumer, error) { func NewConsumer(config *ConsumerConfig) (messaging.Consumer, error) {
@ -109,10 +112,12 @@ func NewConsumer(config *ConsumerConfig) (messaging.Consumer, error) {
} }
c := &consumer{ c := &consumer{
sqs: sqs.New(sess), sqs: sqs.New(sess),
config: config, config: config,
stop: make(chan bool), stop: make(chan bool),
handlers: make(map[string]handler), qos: make(chan bool, config.MaxNumberOfMessages),
handlers: make(map[string]handler),
processingMessages: make(map[string]bool),
} }
c.receiveMessageInput = &sqs.ReceiveMessageInput{ c.receiveMessageInput = &sqs.ReceiveMessageInput{
@ -223,6 +228,28 @@ func (c *consumer) callAndHandlePanic(event messaging.Event, fn messaging.EventH
return return
} }
func (c *consumer) isMessageProcessing(id string) bool {
c.mProcessingMessages.RLock()
defer c.mProcessingMessages.RUnlock()
_, ok := c.processingMessages[id]
return ok
}
func (c *consumer) addMessageProcessing(id string) {
c.mProcessingMessages.Lock()
defer c.mProcessingMessages.Unlock()
c.processingMessages[id] = true
}
func (c *consumer) deleteMessageProcessing(id string) {
c.mProcessingMessages.Lock()
defer c.mProcessingMessages.Unlock()
delete(c.processingMessages, id)
}
func (c *consumer) handleMessage(message *sqs.Message) { func (c *consumer) handleMessage(message *sqs.Message) {
sns := &snsMessagePayload{} sns := &snsMessagePayload{}
err := json.Unmarshal([]byte(*message.Body), sns) err := json.Unmarshal([]byte(*message.Body), sns)
@ -235,9 +262,12 @@ func (c *consumer) handleMessage(message *sqs.Message) {
return return
} }
id := *message.MessageId
receiptHandle := *message.ReceiptHandle
log := logrus.WithFields(logrus.Fields{ log := logrus.WithFields(logrus.Fields{
"action": sns.TopicArn, "action": sns.TopicArn,
"message_id": message.MessageId, "message_id": id,
"body": sns.Message, "body": sns.Message,
}) })
@ -248,9 +278,27 @@ func (c *consumer) handleMessage(message *sqs.Message) {
return return
} }
// Check if message is already processing in goroutine.
// This will occur if consumer is slower than VisibilityTimeout.
if c.isMessageProcessing(id) {
log.Debug("Message is already processing.")
return
}
// QOS: do not consume while prior messages are in goroutines.
c.qos <- true
c.addMessageProcessing(id)
c.wg.Add(1) c.wg.Add(1)
go func(event messaging.Event, fn messaging.EventHandler, receiptHandle string) { go func(event messaging.Event, fn messaging.EventHandler, receiptHandle string) {
defer c.wg.Done() defer func() {
c.wg.Done()
c.deleteMessageProcessing(event.Id)
<-c.qos
}()
err := c.callAndHandlePanic(event, fn) err := c.callAndHandlePanic(event, fn)
@ -259,9 +307,11 @@ func (c *consumer) handleMessage(message *sqs.Message) {
return return
} }
log.Debug("Deleting message.")
_, err = c.sqs.DeleteMessage(&sqs.DeleteMessageInput{ _, err = c.sqs.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(c.config.QueueUrl), QueueUrl: aws.String(c.config.QueueUrl),
ReceiptHandle: &receiptHandle, ReceiptHandle: aws.String(receiptHandle),
}) })
if err != nil { if err != nil {
@ -271,14 +321,24 @@ func (c *consumer) handleMessage(message *sqs.Message) {
log.Debug("Message handled successfully.") log.Debug("Message handled successfully.")
}(messaging.Event{ }(messaging.Event{
Id: *message.MessageId, Id: id,
Action: handler.action, Action: handler.action,
Body: []byte(sns.Message), Body: []byte(sns.Message),
Timestamp: stringToTime(*message.Attributes["SentTimestamp"]), Timestamp: stringToTime(*message.Attributes["SentTimestamp"]),
}, handler.fn, *message.ReceiptHandle) }, handler.fn, receiptHandle)
} }
func (c *consumer) doConsume() { func (c *consumer) doConsume() {
var nextMessages int64 = c.config.MaxNumberOfMessages - int64(len(c.qos))
if nextMessages == 0 {
logrus.Debugf("QOS full with %d.", c.config.MaxNumberOfMessages)
time.Sleep(5 * time.Second)
return
}
c.receiveMessageInput.MaxNumberOfMessages = aws.Int64(nextMessages)
result, err := c.sqs.ReceiveMessage(c.receiveMessageInput) result, err := c.sqs.ReceiveMessage(c.receiveMessageInput)
if err != nil { if err != nil {
@ -296,6 +356,7 @@ func (c *consumer) doConsume() {
// Messages successfully handled will be deleted from SQS. // Messages successfully handled will be deleted from SQS.
// Messages who failed to delete from SQS will be received again, and application needs to handle // Messages who failed to delete from SQS will be received again, and application needs to handle
// by using MessageId. // by using MessageId.
// Receiving duplicate messages may happen using more than one consumer if not processing in VisibilityTimeout.
func (c *consumer) Consume() { func (c *consumer) Consume() {
logrus.Info("Registered handlers:") logrus.Info("Registered handlers:")