mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-28 13:48:49 +08:00
Use interface
Signed-off-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
This commit is contained in:
parent
3494c7a5e1
commit
2554adcc1e
@ -8,15 +8,15 @@ import (
|
||||
"syscall"
|
||||
|
||||
"github.com/mainflux/mainflux/normalizer"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
"github.com/mainflux/mainflux/normalizer/nats"
|
||||
broker "github.com/nats-io/go-nats"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
subSubject string = "adapter.*"
|
||||
pubSubject string = "normalizer.senml"
|
||||
subject string = "adapter.*"
|
||||
queue string = "normalizers"
|
||||
defNatsURL string = nats.DefaultURL
|
||||
defNatsURL string = broker.DefaultURL
|
||||
envNatsURL string = "MESSAGE_WRITER_NATS_URL"
|
||||
)
|
||||
|
||||
@ -35,7 +35,10 @@ func main() {
|
||||
nc := connectToNats(cfg)
|
||||
defer nc.Close()
|
||||
|
||||
nc.QueueSubscribe(subSubject, queue, func(m *nats.Msg) {
|
||||
repo := nats.NewMessageRepository(nc)
|
||||
svc := normalizer.NewService(repo)
|
||||
|
||||
nc.QueueSubscribe(subject, queue, func(m *broker.Msg) {
|
||||
msg := normalizer.Message{}
|
||||
|
||||
if err := json.Unmarshal(m.Data, &msg); err != nil {
|
||||
@ -43,16 +46,10 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
if msgs, err := normalizer.Normalize(logger, msg); err != nil {
|
||||
if msgs, err := normalizer.Normalize(msg); err != nil {
|
||||
logger.Error("Failed to normalize message", zap.Error(err))
|
||||
} else {
|
||||
for _, v := range msgs {
|
||||
if b, err := json.Marshal(v); err != nil {
|
||||
logger.Error("Failed to marshal writer message", zap.Error(err))
|
||||
} else {
|
||||
nc.Publish(pubSubject, b)
|
||||
}
|
||||
}
|
||||
svc.Send(msgs)
|
||||
}
|
||||
})
|
||||
|
||||
@ -76,8 +73,8 @@ func env(key, fallback string) string {
|
||||
return value
|
||||
}
|
||||
|
||||
func connectToNats(cfg *config) *nats.Conn {
|
||||
nc, err := nats.Connect(cfg.NatsURL)
|
||||
func connectToNats(cfg *config) *broker.Conn {
|
||||
nc, err := broker.Connect(cfg.NatsURL)
|
||||
if err != nil {
|
||||
logger.Error("Failed to connect to NATS", zap.Error(err))
|
||||
os.Exit(1)
|
||||
|
34
normalizer/nats/messages.go
Normal file
34
normalizer/nats/messages.go
Normal file
@ -0,0 +1,34 @@
|
||||
// Package nats contains NATS-specific message repository implementation.
|
||||
package nats
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/mainflux/mainflux/writer"
|
||||
broker "github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
const topic string = "normalizer.senml"
|
||||
|
||||
var _ writer.MessageRepository = (*natsRepository)(nil)
|
||||
|
||||
type natsRepository struct {
|
||||
nc *broker.Conn
|
||||
}
|
||||
|
||||
// NewMessageRepository instantiates NATS message repository. Note that the
|
||||
// repository will not truly persist messages, but instead they will be
|
||||
// published to the topic and made available for persisting by all interested
|
||||
// parties, i.e. the message-writer service.
|
||||
func NewMessageRepository(nc *broker.Conn) writer.MessageRepository {
|
||||
return &natsRepository{nc}
|
||||
}
|
||||
|
||||
func (repo *natsRepository) Save(msg writer.Message) error {
|
||||
b, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return repo.nc.Publish(topic, b)
|
||||
}
|
@ -4,9 +4,10 @@ package normalizer
|
||||
import (
|
||||
"github.com/cisco/senml"
|
||||
"github.com/mainflux/mainflux/writer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var _ Service = (*normalizerService)(nil)
|
||||
|
||||
// Message represents a message emitted by the mainflux adapters layer.
|
||||
type Message struct {
|
||||
Channel string `json:"channel"`
|
||||
@ -16,11 +17,25 @@ type Message struct {
|
||||
Payload []byte `json:"payload"`
|
||||
}
|
||||
|
||||
func Normalize(logger *zap.Logger, msg Message) (msgs []writer.Message, err error) {
|
||||
type normalizerService struct {
|
||||
mr writer.MessageRepository
|
||||
}
|
||||
|
||||
// NewService instantiates the domain service implementation.
|
||||
func NewService(mr writer.MessageRepository) Service {
|
||||
return &normalizerService{mr}
|
||||
}
|
||||
|
||||
func (ns *normalizerService) Send(msgs []writer.Message) {
|
||||
for _, msg := range msgs {
|
||||
ns.mr.Save(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func Normalize(msg Message) (msgs []writer.Message, err error) {
|
||||
var s, n senml.SenML
|
||||
|
||||
if s, err = senml.Decode(msg.Payload, senml.JSON); err != nil {
|
||||
logger.Error("Failed to decode SenML", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
9
normalizer/service.go
Normal file
9
normalizer/service.go
Normal file
@ -0,0 +1,9 @@
|
||||
package normalizer
|
||||
|
||||
import "github.com/mainflux/mainflux/writer"
|
||||
|
||||
// Service specifies an API that must be fullfiled by the domain service
|
||||
// implementation, and all of its decorators (e.g. logging & metrics).
|
||||
type Service interface {
|
||||
Send([]writer.Message)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user