mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-26 13:48:53 +08:00
Add logging middleware to normalizer service (#360)
Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>
This commit is contained in:
parent
ec03d877f0
commit
6461761985
@ -10,14 +10,16 @@ import (
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/normalizer"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
"github.com/mainflux/mainflux/normalizer/api"
|
||||
"github.com/mainflux/mainflux/normalizer/nats"
|
||||
broker "github.com/nats-io/go-nats"
|
||||
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
defNatsURL string = nats.DefaultURL
|
||||
defNatsURL string = broker.DefaultURL
|
||||
defPort string = "8180"
|
||||
envNatsURL string = "MF_NATS_URL"
|
||||
envPort string = "MF_NORMALIZER_PORT"
|
||||
@ -36,19 +38,37 @@ func main() {
|
||||
|
||||
logger := log.New(os.Stdout)
|
||||
|
||||
nc, err := nats.Connect(cfg.NatsURL)
|
||||
nc, err := broker.Connect(cfg.NatsURL)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
svc := normalizer.New()
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
svc = api.MetricsMiddleware(
|
||||
svc,
|
||||
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: "normalizer",
|
||||
Subsystem: "api",
|
||||
Name: "request_count",
|
||||
Help: "Number of requests received.",
|
||||
}, []string{"method"}),
|
||||
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
|
||||
Namespace: "normalizer",
|
||||
Subsystem: "api",
|
||||
Name: "request_latency_microseconds",
|
||||
Help: "Total duration of requests in microseconds.",
|
||||
}, []string{"method"}),
|
||||
)
|
||||
|
||||
errs := make(chan error, 2)
|
||||
|
||||
go func() {
|
||||
p := fmt.Sprintf(":%s", cfg.Port)
|
||||
logger.Info(fmt.Sprintf("Normalizer service started, exposed port %s", cfg.Port))
|
||||
errs <- http.ListenAndServe(p, normalizer.MakeHandler())
|
||||
errs <- http.ListenAndServe(p, api.MakeHandler())
|
||||
}()
|
||||
|
||||
go func() {
|
||||
@ -57,21 +77,7 @@ func main() {
|
||||
errs <- fmt.Errorf("%s", <-c)
|
||||
}()
|
||||
|
||||
counter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: "normalizer",
|
||||
Subsystem: "api",
|
||||
Name: "request_count",
|
||||
Help: "Number of requests received.",
|
||||
}, []string{"method"})
|
||||
|
||||
latency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
|
||||
Namespace: "normalizer",
|
||||
Subsystem: "api",
|
||||
Name: "request_latency_microseconds",
|
||||
Help: "Total duration of requests in microseconds.",
|
||||
}, []string{"method"})
|
||||
|
||||
normalizer.Subscribe(nc, logger, counter, latency)
|
||||
nats.Subscribe(svc, nc, logger)
|
||||
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("Normalizer service terminated: %s", err))
|
||||
|
@ -1,4 +1,4 @@
|
||||
package normalizer
|
||||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
38
normalizer/api/logging.go
Normal file
38
normalizer/api/logging.go
Normal file
@ -0,0 +1,38 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/normalizer"
|
||||
)
|
||||
|
||||
var _ normalizer.Service = (*loggingMiddleware)(nil)
|
||||
|
||||
type loggingMiddleware struct {
|
||||
logger log.Logger
|
||||
svc normalizer.Service
|
||||
}
|
||||
|
||||
// LoggingMiddleware adds logging facilities to the core service.
|
||||
func LoggingMiddleware(svc normalizer.Service, logger log.Logger) normalizer.Service {
|
||||
return &loggingMiddleware{
|
||||
logger: logger,
|
||||
svc: svc,
|
||||
}
|
||||
}
|
||||
|
||||
func (lm loggingMiddleware) Normalize(msg mainflux.RawMessage) (nd normalizer.NormalizedData, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method normalize took %s to complete", time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.Normalize(msg)
|
||||
}
|
36
normalizer/api/metrics.go
Normal file
36
normalizer/api/metrics.go
Normal file
@ -0,0 +1,36 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/mainflux/mainflux"
|
||||
"github.com/mainflux/mainflux/normalizer"
|
||||
)
|
||||
|
||||
var _ normalizer.Service = (*metricsMiddleware)(nil)
|
||||
|
||||
type metricsMiddleware struct {
|
||||
counter metrics.Counter
|
||||
latency metrics.Histogram
|
||||
svc normalizer.Service
|
||||
}
|
||||
|
||||
// MetricsMiddleware instruments core service by tracking request count and
|
||||
// latency.
|
||||
func MetricsMiddleware(svc normalizer.Service, counter metrics.Counter, latency metrics.Histogram) normalizer.Service {
|
||||
return &metricsMiddleware{
|
||||
counter: counter,
|
||||
latency: latency,
|
||||
svc: svc,
|
||||
}
|
||||
}
|
||||
|
||||
func (mm *metricsMiddleware) Normalize(msg mainflux.RawMessage) (normalizer.NormalizedData, error) {
|
||||
defer func(begin time.Time) {
|
||||
mm.counter.With("method", "normalize").Add(1)
|
||||
mm.latency.With("method", "normalize").Observe(time.Since(begin).Seconds())
|
||||
}(time.Now())
|
||||
|
||||
return mm.svc.Normalize(msg)
|
||||
}
|
@ -1,30 +0,0 @@
|
||||
package normalizer
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/metrics"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
type metricsMiddleware struct {
|
||||
counter metrics.Counter
|
||||
latency metrics.Histogram
|
||||
ef eventFlow
|
||||
}
|
||||
|
||||
func newMetricsMiddleware(ef eventFlow, counter metrics.Counter, latency metrics.Histogram) *metricsMiddleware {
|
||||
return &metricsMiddleware{
|
||||
counter: counter,
|
||||
latency: latency,
|
||||
ef: ef,
|
||||
}
|
||||
}
|
||||
|
||||
func (mm *metricsMiddleware) handleMessage(msg *nats.Msg) {
|
||||
defer func(begin time.Time) {
|
||||
mm.counter.With("method", "handleMessage").Add(1)
|
||||
mm.latency.With("method", "handleMessage").Observe(time.Since(begin).Seconds())
|
||||
}(time.Now())
|
||||
mm.ef.handleMsg(msg)
|
||||
}
|
77
normalizer/nats/pubsub.go
Normal file
77
normalizer/nats/pubsub.go
Normal file
@ -0,0 +1,77 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/normalizer"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
const (
|
||||
queue = "normalizers"
|
||||
input = "channel.*"
|
||||
outputUnknown = "out.unknown"
|
||||
senML = "application/senml+json"
|
||||
)
|
||||
|
||||
type pubsub struct {
|
||||
nc *nats.Conn
|
||||
svc normalizer.Service
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// Subscribe to appropriate NATS topic and normalizes received messages.
|
||||
func Subscribe(svc normalizer.Service, nc *nats.Conn, logger log.Logger) {
|
||||
ps := pubsub{
|
||||
nc: nc,
|
||||
svc: svc,
|
||||
logger: logger,
|
||||
}
|
||||
ps.nc.QueueSubscribe(input, queue, ps.handleMsg)
|
||||
}
|
||||
|
||||
func (ps pubsub) handleMsg(m *nats.Msg) {
|
||||
var msg mainflux.RawMessage
|
||||
if err := proto.Unmarshal(m.Data, &msg); err != nil {
|
||||
ps.logger.Warn(fmt.Sprintf("Unmarshalling failed: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
if err := ps.publish(msg); err != nil {
|
||||
ps.logger.Warn(fmt.Sprintf("Publishing failed: %s", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (ps pubsub) publish(msg mainflux.RawMessage) error {
|
||||
output := mainflux.OutputSenML
|
||||
normalized, err := ps.svc.Normalize(msg)
|
||||
if err != nil {
|
||||
switch ct := normalized.ContentType; ct {
|
||||
case senML:
|
||||
return err
|
||||
case "":
|
||||
output = outputUnknown
|
||||
default:
|
||||
output = fmt.Sprintf("out.%s", ct)
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range normalized.Messages {
|
||||
data, err := proto.Marshal(&v)
|
||||
if err != nil {
|
||||
ps.logger.Warn(fmt.Sprintf("Marshalling failed: %s", err))
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ps.nc.Publish(output, data); err != nil {
|
||||
ps.logger.Warn(fmt.Sprintf("Publishing failed: %s", err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -1,92 +1,26 @@
|
||||
package normalizer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/cisco/senml"
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
const (
|
||||
queue = "normalizers"
|
||||
input = "channel.*"
|
||||
outputUnknown = "out.unknown"
|
||||
senML = "application/senml+json"
|
||||
)
|
||||
type normalizer struct{}
|
||||
|
||||
type eventFlow struct {
|
||||
nc *nats.Conn
|
||||
logger log.Logger
|
||||
// New returns normalizer service implementation.
|
||||
func New() Service {
|
||||
return normalizer{}
|
||||
}
|
||||
|
||||
// Subscribe instantiates and starts a new NATS message flow.
|
||||
func Subscribe(nc *nats.Conn, logger log.Logger, counter metrics.Counter, latency metrics.Histogram) {
|
||||
flow := eventFlow{nc, logger}
|
||||
mm := newMetricsMiddleware(flow, counter, latency)
|
||||
flow.nc.QueueSubscribe(input, queue, mm.handleMessage)
|
||||
}
|
||||
|
||||
func (ef eventFlow) handleMsg(m *nats.Msg) {
|
||||
msg := mainflux.RawMessage{}
|
||||
|
||||
if err := proto.Unmarshal(m.Data, &msg); err != nil {
|
||||
ef.logger.Warn(fmt.Sprintf("Unmarshalling failed: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
if err := ef.publish(msg); err != nil {
|
||||
ef.logger.Warn(fmt.Sprintf("Publishing failed: %s", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (ef eventFlow) publish(msg mainflux.RawMessage) error {
|
||||
output := mainflux.OutputSenML
|
||||
normalized, err := ef.normalize(msg)
|
||||
func (n normalizer) Normalize(msg mainflux.RawMessage) (NormalizedData, error) {
|
||||
raw, err := senml.Decode(msg.Payload, senml.JSON)
|
||||
if err != nil {
|
||||
ef.logger.Warn(fmt.Sprintf("Normalization failed: %s", err))
|
||||
switch ct := strings.ToLower(msg.ContentType); ct {
|
||||
case senML:
|
||||
return err
|
||||
case "":
|
||||
output = outputUnknown
|
||||
default:
|
||||
output = fmt.Sprintf("out.%s", ct)
|
||||
}
|
||||
return NormalizedData{}, err
|
||||
}
|
||||
|
||||
for _, v := range normalized {
|
||||
data, err := proto.Marshal(&v)
|
||||
if err != nil {
|
||||
ef.logger.Warn(fmt.Sprintf("Marshalling failed: %s", err))
|
||||
return err
|
||||
}
|
||||
|
||||
if err = ef.nc.Publish(output, data); err != nil {
|
||||
ef.logger.Warn(fmt.Sprintf("Publishing failed: %s", err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ef eventFlow) normalize(msg mainflux.RawMessage) ([]mainflux.Message, error) {
|
||||
var (
|
||||
raw, normalized senml.SenML
|
||||
err error
|
||||
)
|
||||
|
||||
if raw, err = senml.Decode(msg.Payload, senml.JSON); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
normalized = senml.Normalize(raw)
|
||||
normalized := senml.Normalize(raw)
|
||||
|
||||
msgs := make([]mainflux.Message, len(normalized.Records))
|
||||
for k, v := range normalized.Records {
|
||||
@ -118,5 +52,10 @@ func (ef eventFlow) normalize(msg mainflux.RawMessage) ([]mainflux.Message, erro
|
||||
msgs[k] = m
|
||||
}
|
||||
|
||||
return msgs, nil
|
||||
output := strings.ToLower(msg.ContentType)
|
||||
|
||||
return NormalizedData{
|
||||
ContentType: output,
|
||||
Messages: msgs,
|
||||
}, nil
|
||||
}
|
||||
|
15
normalizer/service.go
Normal file
15
normalizer/service.go
Normal file
@ -0,0 +1,15 @@
|
||||
package normalizer
|
||||
|
||||
import "github.com/mainflux/mainflux"
|
||||
|
||||
// Service specifies API for normalizing messages.
|
||||
type Service interface {
|
||||
// Normalizes raw message to array of standard SenML messages.
|
||||
Normalize(mainflux.RawMessage) (NormalizedData, error)
|
||||
}
|
||||
|
||||
// NormalizedData contains normalized messages and their content type.
|
||||
type NormalizedData struct {
|
||||
ContentType string
|
||||
Messages []mainflux.Message
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user