1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

Better log messages

This commit is contained in:
skrater 2018-08-15 17:42:10 -03:00
parent 53cd7ebcd0
commit 31b724fcc7
2 changed files with 70 additions and 32 deletions

View File

@ -8,7 +8,7 @@ import (
)
func main() {
consumer := sns.NewConsumer(sns.ConsumerConfig{
consumer := sns.MustNewConsumer(&sns.ConsumerConfig{
AccessKey: "",
SecretKey: "",
Region: "us-east-1",

View File

@ -15,6 +15,12 @@ import (
"github.com/sirupsen/logrus"
)
var (
ErrEmptyConig = errors.New("empty config")
ErrEmptyAccessKey = errors.New("empty access key")
ErrEmptySecretKey = errors.New("empty secret key")
)
type snsMessagePayload struct {
Message string `json:"Message"`
MessageID string `json:"MessageId"`
@ -38,6 +44,36 @@ type ConsumerConfig struct {
QueueUrl string
}
func (c *ConsumerConfig) setDefaults() {
if c.VisibilityTimeout == 0 {
c.VisibilityTimeout = 20
}
if c.WaitTimeSeconds == 0 {
c.WaitTimeSeconds = 20
}
if c.MaxNumberOfMessages == 0 {
c.MaxNumberOfMessages = 5
}
}
func (c *ConsumerConfig) isValid() error {
if c == nil {
return ErrEmptyConig
}
if c.AccessKey == "" {
return ErrEmptyAccessKey
}
if c.SecretKey == "" {
return ErrEmptySecretKey
}
return nil
}
type handler struct {
action string
fn messaging.EventHandler
@ -47,30 +83,28 @@ type handler struct {
type consumer struct {
sqs *sqs.SQS
stop chan bool
config ConsumerConfig
config *ConsumerConfig
receiveMessageInput *sqs.ReceiveMessageInput
m sync.RWMutex
handlers map[string]handler
}
func NewConsumer(config ConsumerConfig) messaging.Consumer {
func NewConsumer(config *ConsumerConfig) (messaging.Consumer, error) {
if err := config.isValid(); err != nil {
return nil, err
}
config.setDefaults()
creds := credentials.NewStaticCredentials(config.AccessKey, config.SecretKey, "")
sess := session.Must(session.NewSession(&aws.Config{
sess, err := session.NewSession(&aws.Config{
Region: aws.String(config.Region),
Credentials: creds,
}))
})
if config.VisibilityTimeout == 0 {
config.VisibilityTimeout = 20
}
if config.WaitTimeSeconds == 0 {
config.WaitTimeSeconds = 20
}
if config.MaxNumberOfMessages == 0 {
config.MaxNumberOfMessages = 5
if err != nil {
return nil, err
}
c := &consumer{
@ -93,7 +127,17 @@ func NewConsumer(config ConsumerConfig) messaging.Consumer {
WaitTimeSeconds: aws.Int64(c.config.WaitTimeSeconds),
}
return c
return c, nil
}
func MustNewConsumer(config *ConsumerConfig) messaging.Consumer {
consumer, err := NewConsumer(config)
if err != nil {
panic(err)
}
return consumer
}
func (c *consumer) getHandler(action string) (handler, bool) {
@ -190,13 +234,16 @@ func (c *consumer) handleMessage(message *sqs.Message) {
return
}
log := logrus.WithFields(logrus.Fields{
"action": sns.TopicArn,
"message_id": message.MessageId,
"body": sns.Message,
})
handler, ok := c.getHandler(sns.TopicArn)
if !ok {
logrus.WithFields(logrus.Fields{
"error": err,
"action": sns.TopicArn,
}).Error("Action not found.")
log.WithError(err).Error("Action not found.")
return
}
@ -208,12 +255,7 @@ func (c *consumer) handleMessage(message *sqs.Message) {
}, handler.fn)
if err != nil {
logrus.WithFields(logrus.Fields{
"action": handler.action,
"message_id": message.MessageId,
"body": sns.Message,
"error": err,
}).Debug("Failed to process event.")
log.WithError(err).Debug("Failed to process event.")
return
}
@ -223,15 +265,11 @@ func (c *consumer) handleMessage(message *sqs.Message) {
})
if err != nil {
logrus.WithError(err).Error("Failed to delete message.")
log.WithError(err).Error("Failed to delete message.")
return
}
logrus.WithFields(logrus.Fields{
"action": handler.action,
"body": sns.Message,
"message_id": message.MessageId,
}).Debug("Message handled successfully.")
log.Debug("Message handled successfully.")
}
func (c *consumer) doConsume() {