From 2554adcc1e8850bd02b2cbcfba27edca75a9b6be Mon Sep 17 00:00:00 2001 From: Drasko DRASKOVIC Date: Sun, 24 Sep 2017 17:19:42 +0200 Subject: [PATCH] Use interface Signed-off-by: Drasko DRASKOVIC --- cmd/normalizer/main.go | 27 ++++++++---------- normalizer/nats/messages.go | 34 +++++++++++++++++++++++ normalizer/{messages.go => normalizer.go} | 21 ++++++++++++-- normalizer/service.go | 9 ++++++ 4 files changed, 73 insertions(+), 18 deletions(-) create mode 100644 normalizer/nats/messages.go rename normalizer/{messages.go => normalizer.go} (72%) create mode 100644 normalizer/service.go diff --git a/cmd/normalizer/main.go b/cmd/normalizer/main.go index fe2416ca..5211f642 100644 --- a/cmd/normalizer/main.go +++ b/cmd/normalizer/main.go @@ -8,15 +8,15 @@ import ( "syscall" "github.com/mainflux/mainflux/normalizer" - nats "github.com/nats-io/go-nats" + "github.com/mainflux/mainflux/normalizer/nats" + broker "github.com/nats-io/go-nats" "go.uber.org/zap" ) const ( - subSubject string = "adapter.*" - pubSubject string = "normalizer.senml" + subject string = "adapter.*" queue string = "normalizers" - defNatsURL string = nats.DefaultURL + defNatsURL string = broker.DefaultURL envNatsURL string = "MESSAGE_WRITER_NATS_URL" ) @@ -35,7 +35,10 @@ func main() { nc := connectToNats(cfg) defer nc.Close() - nc.QueueSubscribe(subSubject, queue, func(m *nats.Msg) { + repo := nats.NewMessageRepository(nc) + svc := normalizer.NewService(repo) + + nc.QueueSubscribe(subject, queue, func(m *broker.Msg) { msg := normalizer.Message{} if err := json.Unmarshal(m.Data, &msg); err != nil { @@ -43,16 +46,10 @@ func main() { return } - if msgs, err := normalizer.Normalize(logger, msg); err != nil { + if msgs, err := normalizer.Normalize(msg); err != nil { logger.Error("Failed to normalize message", zap.Error(err)) } else { - for _, v := range msgs { - if b, err := json.Marshal(v); err != nil { - logger.Error("Failed to marshal writer message", zap.Error(err)) - } else { - nc.Publish(pubSubject, b) - } - } + svc.Send(msgs) } }) @@ -76,8 +73,8 @@ func env(key, fallback string) string { return value } -func connectToNats(cfg *config) *nats.Conn { - nc, err := nats.Connect(cfg.NatsURL) +func connectToNats(cfg *config) *broker.Conn { + nc, err := broker.Connect(cfg.NatsURL) if err != nil { logger.Error("Failed to connect to NATS", zap.Error(err)) os.Exit(1) diff --git a/normalizer/nats/messages.go b/normalizer/nats/messages.go new file mode 100644 index 00000000..9f318ba3 --- /dev/null +++ b/normalizer/nats/messages.go @@ -0,0 +1,34 @@ +// Package nats contains NATS-specific message repository implementation. +package nats + +import ( + "encoding/json" + + "github.com/mainflux/mainflux/writer" + broker "github.com/nats-io/go-nats" +) + +const topic string = "normalizer.senml" + +var _ writer.MessageRepository = (*natsRepository)(nil) + +type natsRepository struct { + nc *broker.Conn +} + +// NewMessageRepository instantiates NATS message repository. Note that the +// repository will not truly persist messages, but instead they will be +// published to the topic and made available for persisting by all interested +// parties, i.e. the message-writer service. +func NewMessageRepository(nc *broker.Conn) writer.MessageRepository { + return &natsRepository{nc} +} + +func (repo *natsRepository) Save(msg writer.Message) error { + b, err := json.Marshal(msg) + if err != nil { + return err + } + + return repo.nc.Publish(topic, b) +} diff --git a/normalizer/messages.go b/normalizer/normalizer.go similarity index 72% rename from normalizer/messages.go rename to normalizer/normalizer.go index 24560596..74ec6c02 100644 --- a/normalizer/messages.go +++ b/normalizer/normalizer.go @@ -4,9 +4,10 @@ package normalizer import ( "github.com/cisco/senml" "github.com/mainflux/mainflux/writer" - "go.uber.org/zap" ) +var _ Service = (*normalizerService)(nil) + // Message represents a message emitted by the mainflux adapters layer. type Message struct { Channel string `json:"channel"` @@ -16,11 +17,25 @@ type Message struct { Payload []byte `json:"payload"` } -func Normalize(logger *zap.Logger, msg Message) (msgs []writer.Message, err error) { +type normalizerService struct { + mr writer.MessageRepository +} + +// NewService instantiates the domain service implementation. +func NewService(mr writer.MessageRepository) Service { + return &normalizerService{mr} +} + +func (ns *normalizerService) Send(msgs []writer.Message) { + for _, msg := range msgs { + ns.mr.Save(msg) + } +} + +func Normalize(msg Message) (msgs []writer.Message, err error) { var s, n senml.SenML if s, err = senml.Decode(msg.Payload, senml.JSON); err != nil { - logger.Error("Failed to decode SenML", zap.Error(err)) return nil, err } diff --git a/normalizer/service.go b/normalizer/service.go new file mode 100644 index 00000000..8e374ec5 --- /dev/null +++ b/normalizer/service.go @@ -0,0 +1,9 @@ +package normalizer + +import "github.com/mainflux/mainflux/writer" + +// Service specifies an API that must be fullfiled by the domain service +// implementation, and all of its decorators (e.g. logging & metrics). +type Service interface { + Send([]writer.Message) +}