1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

Merge pull request #41 from skrater/master

SNS producer
This commit is contained in:
Guilherme Emilio Raduenz 2018-08-21 11:38:33 -03:00 committed by GitHub
commit 74b5d89d4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 247 additions and 4 deletions

View File

@ -2,6 +2,9 @@ package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/eventials/goevents/messaging"
@ -14,12 +17,13 @@ func main() {
consumer := sns.MustNewConsumer(&sns.ConsumerConfig{
AccessKey: "",
SecretKey: "0",
SecretKey: "",
Region: "us-east-1",
QueueUrl: "https://sqs.us-east-1.amazonaws.com/0000000000/vlab-exams-mp4-dev",
QueueUrl: "https://sqs.us-east-1.amazonaws.com/0000000000/test-queue",
MaxNumberOfMessages: 5,
})
defer consumer.Close()
defer consumer.Close()
consumer.Subscribe("arn:aws:sns:us-east-1:0000000000:test", func(e messaging.Event) error {
@ -37,5 +41,12 @@ func main() {
return nil
}, nil)
consumer.Consume()
go consumer.Consume()
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
fmt.Println("Waiting CTRL+C")
<-sigc
}

View File

@ -0,0 +1,50 @@
package main
import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/eventials/goevents/sns"
)
func main() {
wg := sync.WaitGroup{}
producer := sns.MustNewProducer(sns.ProducerConfig{
AccessKey: "",
SecretKey: "",
Region: "us-east-1",
PublishInterval: 2 * time.Second,
})
go func() {
for {
producer.Publish("arn:aws:sns:us-east-1:0000000000:test2", []byte("some data"))
time.Sleep(20 * time.Second)
}
}()
wg.Add(1)
go func() {
<-producer.NotifyClose()
fmt.Println("Producer closed for good")
wg.Done()
}()
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
fmt.Println("Waiting CTRL+C")
<-sigc
fmt.Println("Closing producer")
producer.Close()
wg.Wait()
}

View File

@ -91,6 +91,7 @@ type consumer struct {
handlers map[string]handler
processingMessages map[string]bool
mProcessingMessages sync.RWMutex
closeOnce sync.Once
}
func NewConsumer(config *ConsumerConfig) (messaging.Consumer, error) {
@ -378,7 +379,19 @@ func (c *consumer) Consume() {
}
}
func (c *consumer) Close() {
func (c *consumer) doClose() {
logrus.Info("Closing SNS consumer...")
c.stop <- true
logrus.Info("SNS consumer closed. Waiting remaining handlers...")
c.wg.Wait()
logrus.Info("SNS consumer closed.")
close(c.stop)
}
func (c *consumer) Close() {
c.closeOnce.Do(c.doClose)
}

169
sns/producer.go Normal file
View File

@ -0,0 +1,169 @@
package sns
import (
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/eventials/goevents/messaging"
"github.com/sirupsen/logrus"
)
type message struct {
action string
data []byte
}
type ProducerConfig struct {
AccessKey string
SecretKey string
Region string
PublishInterval time.Duration
}
func (p *ProducerConfig) setDefaults() {
if p.PublishInterval == 0 {
p.PublishInterval = 2 * time.Second
}
}
func (p *ProducerConfig) isValid() error {
if p == nil {
return ErrEmptyConfig
}
if p.AccessKey == "" {
return ErrEmptyAccessKey
}
if p.SecretKey == "" {
return ErrEmptySecretKey
}
return nil
}
type producer struct {
config ProducerConfig
m sync.Mutex
sns *sns.SNS
internalQueue chan message
closes []chan bool
closed bool
closeOnce sync.Once
}
func NewProducer(config ProducerConfig) (messaging.Producer, error) {
if err := config.isValid(); err != nil {
return nil, err
}
config.setDefaults()
creds := credentials.NewStaticCredentials(config.AccessKey, config.SecretKey, "")
sess, err := session.NewSession(&aws.Config{
Region: aws.String(config.Region),
Credentials: creds,
})
if err != nil {
return nil, err
}
p := &producer{
sns: sns.New(sess),
internalQueue: make(chan message),
config: config,
closed: false,
}
go p.drainInternalQueue()
return p, nil
}
func MustNewProducer(config ProducerConfig) messaging.Producer {
p, err := NewProducer(config)
if err != nil {
panic(err)
}
return p
}
func (p *producer) Publish(action string, data []byte) {
p.internalQueue <- message{
action: action,
data: data,
}
}
func (p *producer) isClosed() bool {
p.m.Lock()
defer p.m.Unlock()
return p.closed
}
func (p *producer) drainInternalQueue() {
for m := range p.internalQueue {
retry := true
for retry && !p.isClosed() {
output, err := p.sns.Publish(&sns.PublishInput{
Message: aws.String(string(m.data)),
TopicArn: aws.String(m.action),
})
if err != nil {
logrus.WithFields(logrus.Fields{
"message": m,
"error": err,
}).Error("Failed to publish message.")
time.Sleep(p.config.PublishInterval)
} else {
logrus.WithFields(logrus.Fields{
"message_id": output.MessageId,
}).Debug("Successfully published message.")
retry = false
}
}
}
p.m.Lock()
for _, c := range p.closes {
c <- true
}
p.m.Unlock()
}
func (p *producer) NotifyClose() <-chan bool {
receiver := make(chan bool, 1)
p.m.Lock()
p.closes = append(p.closes, receiver)
p.m.Unlock()
return receiver
}
func (p *producer) doClose() {
p.m.Lock()
defer p.m.Unlock()
p.closed = true
close(p.internalQueue)
}
func (p *producer) Close() {
p.closeOnce.Do(p.doClose)
}