mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-26 13:48:53 +08:00
MF-139 - Expose metrics from all services (#213)
* Expose metrics from normalizer service Add service interface in order to expose method to be monitored. Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Refactor normalizer Add metrics to normalizer service without exporting its internals. Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com>
This commit is contained in:
parent
523716b090
commit
7a193829f4
@ -11,6 +11,9 @@ import (
|
||||
"github.com/mainflux/mainflux"
|
||||
"github.com/mainflux/mainflux/normalizer"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -55,6 +58,20 @@ func main() {
|
||||
errs <- fmt.Errorf("%s", <-c)
|
||||
}()
|
||||
|
||||
normalizer.Subscribe(nc, logger)
|
||||
counter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: "normalizer",
|
||||
Subsystem: "api",
|
||||
Name: "request_count",
|
||||
Help: "Number of requests received.",
|
||||
}, []string{"method"})
|
||||
|
||||
latency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
|
||||
Namespace: "normalizer",
|
||||
Subsystem: "api",
|
||||
Name: "request_latency_microseconds",
|
||||
Help: "Total duration of requests in microseconds.",
|
||||
}, []string{"method"})
|
||||
|
||||
normalizer.Subscribe(nc, logger, counter, latency)
|
||||
logger.Log("terminated", <-errs)
|
||||
}
|
||||
|
30
normalizer/metrics.go
Normal file
30
normalizer/metrics.go
Normal file
@ -0,0 +1,30 @@
|
||||
package normalizer
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/metrics"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
type metricsMiddleware struct {
|
||||
counter metrics.Counter
|
||||
latency metrics.Histogram
|
||||
ef eventFlow
|
||||
}
|
||||
|
||||
func newMetricsMiddleware(ef eventFlow, counter metrics.Counter, latency metrics.Histogram) *metricsMiddleware {
|
||||
return &metricsMiddleware{
|
||||
counter: counter,
|
||||
latency: latency,
|
||||
ef: ef,
|
||||
}
|
||||
}
|
||||
|
||||
func (mm *metricsMiddleware) handleMessage(msg *nats.Msg) {
|
||||
defer func(begin time.Time) {
|
||||
mm.counter.With("method", "handleMessage").Add(1)
|
||||
mm.latency.With("method", "handleMessage").Observe(time.Since(begin).Seconds())
|
||||
}(time.Now())
|
||||
mm.ef.handleMsg(msg)
|
||||
}
|
@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/cisco/senml"
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/mainflux/mainflux"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
@ -22,25 +23,24 @@ type eventFlow struct {
|
||||
}
|
||||
|
||||
// Subscribe instantiates and starts a new NATS message flow.
|
||||
func Subscribe(nc *nats.Conn, logger log.Logger) {
|
||||
func Subscribe(nc *nats.Conn, logger log.Logger, counter metrics.Counter, latency metrics.Histogram) {
|
||||
flow := eventFlow{nc, logger}
|
||||
flow.start()
|
||||
mm := newMetricsMiddleware(flow, counter, latency)
|
||||
flow.nc.QueueSubscribe(subject, queue, mm.handleMessage)
|
||||
}
|
||||
|
||||
func (ef eventFlow) start() {
|
||||
ef.nc.QueueSubscribe(subject, queue, func(m *nats.Msg) {
|
||||
msg := mainflux.RawMessage{}
|
||||
func (ef eventFlow) handleMsg(m *nats.Msg) {
|
||||
msg := mainflux.RawMessage{}
|
||||
|
||||
if err := proto.Unmarshal(m.Data, &msg); err != nil {
|
||||
ef.logger.Log("error", fmt.Sprintf("Unmarshalling failed: %s", err))
|
||||
return
|
||||
}
|
||||
if err := proto.Unmarshal(m.Data, &msg); err != nil {
|
||||
ef.logger.Log("error", fmt.Sprintf("Unmarshalling failed: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
if err := ef.publish(msg); err != nil {
|
||||
ef.logger.Log("error", fmt.Sprintf("Publishing failed: %s", err))
|
||||
return
|
||||
}
|
||||
})
|
||||
if err := ef.publish(msg); err != nil {
|
||||
ef.logger.Log("error", fmt.Sprintf("Publishing failed: %s", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (ef eventFlow) publish(msg mainflux.RawMessage) error {
|
||||
|
Loading…
x
Reference in New Issue
Block a user