From 7a193829f4dc4ae5d8e09413c3041ce0f66bb2bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Borov=C4=8Danin?= Date: Tue, 3 Apr 2018 20:25:49 +0200 Subject: [PATCH] MF-139 - Expose metrics from all services (#213) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Expose metrics from normalizer service Add service interface in order to expose method to be monitored. Signed-off-by: Dušan Borovčanin * Refactor normalizer Add metrics to normalizer service without exporting its internals. Signed-off-by: Dušan Borovčanin --- cmd/normalizer/main.go | 19 ++++++++++++++++++- normalizer/metrics.go | 30 ++++++++++++++++++++++++++++++ normalizer/normalizer.go | 28 ++++++++++++++-------------- 3 files changed, 62 insertions(+), 15 deletions(-) create mode 100644 normalizer/metrics.go diff --git a/cmd/normalizer/main.go b/cmd/normalizer/main.go index 83b12033..4a0b3313 100644 --- a/cmd/normalizer/main.go +++ b/cmd/normalizer/main.go @@ -11,6 +11,9 @@ import ( "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/normalizer" nats "github.com/nats-io/go-nats" + + kitprometheus "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" ) const ( @@ -55,6 +58,20 @@ func main() { errs <- fmt.Errorf("%s", <-c) }() - normalizer.Subscribe(nc, logger) + counter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: "normalizer", + Subsystem: "api", + Name: "request_count", + Help: "Number of requests received.", + }, []string{"method"}) + + latency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: "normalizer", + Subsystem: "api", + Name: "request_latency_microseconds", + Help: "Total duration of requests in microseconds.", + }, []string{"method"}) + + normalizer.Subscribe(nc, logger, counter, latency) logger.Log("terminated", <-errs) } diff --git a/normalizer/metrics.go b/normalizer/metrics.go new file mode 100644 index 00000000..c1a44c0a --- /dev/null +++ b/normalizer/metrics.go @@ -0,0 +1,30 @@ +package normalizer + +import ( + "time" + + "github.com/go-kit/kit/metrics" + nats "github.com/nats-io/go-nats" +) + +type metricsMiddleware struct { + counter metrics.Counter + latency metrics.Histogram + ef eventFlow +} + +func newMetricsMiddleware(ef eventFlow, counter metrics.Counter, latency metrics.Histogram) *metricsMiddleware { + return &metricsMiddleware{ + counter: counter, + latency: latency, + ef: ef, + } +} + +func (mm *metricsMiddleware) handleMessage(msg *nats.Msg) { + defer func(begin time.Time) { + mm.counter.With("method", "handleMessage").Add(1) + mm.latency.With("method", "handleMessage").Observe(time.Since(begin).Seconds()) + }(time.Now()) + mm.ef.handleMsg(msg) +} diff --git a/normalizer/normalizer.go b/normalizer/normalizer.go index f5310800..a40b386d 100644 --- a/normalizer/normalizer.go +++ b/normalizer/normalizer.go @@ -5,6 +5,7 @@ import ( "github.com/cisco/senml" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" "github.com/golang/protobuf/proto" "github.com/mainflux/mainflux" nats "github.com/nats-io/go-nats" @@ -22,25 +23,24 @@ type eventFlow struct { } // Subscribe instantiates and starts a new NATS message flow. -func Subscribe(nc *nats.Conn, logger log.Logger) { +func Subscribe(nc *nats.Conn, logger log.Logger, counter metrics.Counter, latency metrics.Histogram) { flow := eventFlow{nc, logger} - flow.start() + mm := newMetricsMiddleware(flow, counter, latency) + flow.nc.QueueSubscribe(subject, queue, mm.handleMessage) } -func (ef eventFlow) start() { - ef.nc.QueueSubscribe(subject, queue, func(m *nats.Msg) { - msg := mainflux.RawMessage{} +func (ef eventFlow) handleMsg(m *nats.Msg) { + msg := mainflux.RawMessage{} - if err := proto.Unmarshal(m.Data, &msg); err != nil { - ef.logger.Log("error", fmt.Sprintf("Unmarshalling failed: %s", err)) - return - } + if err := proto.Unmarshal(m.Data, &msg); err != nil { + ef.logger.Log("error", fmt.Sprintf("Unmarshalling failed: %s", err)) + return + } - if err := ef.publish(msg); err != nil { - ef.logger.Log("error", fmt.Sprintf("Publishing failed: %s", err)) - return - } - }) + if err := ef.publish(msg); err != nil { + ef.logger.Log("error", fmt.Sprintf("Publishing failed: %s", err)) + return + } } func (ef eventFlow) publish(msg mainflux.RawMessage) error {