From ceb630c782f0e232797b953c443ec56f94165af6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Novakovi=C4=87?= Date: Mon, 14 May 2018 22:15:12 +0200 Subject: [PATCH] NOISSUE - Emit non-SenML messages (#279) --- normalizer/normalizer.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/normalizer/normalizer.go b/normalizer/normalizer.go index d04a4db9..5f819e11 100644 --- a/normalizer/normalizer.go +++ b/normalizer/normalizer.go @@ -2,6 +2,7 @@ package normalizer import ( "fmt" + "strings" "github.com/cisco/senml" "github.com/go-kit/kit/metrics" @@ -12,9 +13,11 @@ import ( ) const ( - queue string = "normalizers" - subject string = "channel.*" - output string = "normalized" + queue = "normalizers" + input = "channel.*" + outputSenML = "out.senml" + outputUnknown = "out.unknown" + senML = "application/senml+json" ) type eventFlow struct { @@ -26,7 +29,7 @@ type eventFlow struct { func Subscribe(nc *nats.Conn, logger log.Logger, counter metrics.Counter, latency metrics.Histogram) { flow := eventFlow{nc, logger} mm := newMetricsMiddleware(flow, counter, latency) - flow.nc.QueueSubscribe(subject, queue, mm.handleMessage) + flow.nc.QueueSubscribe(input, queue, mm.handleMessage) } func (ef eventFlow) handleMsg(m *nats.Msg) { @@ -44,10 +47,18 @@ func (ef eventFlow) handleMsg(m *nats.Msg) { } func (ef eventFlow) publish(msg mainflux.RawMessage) error { + output := outputSenML normalized, err := ef.normalize(msg) if err != nil { ef.logger.Warn(fmt.Sprintf("Normalization failed: %s", err)) - return err + switch ct := strings.ToLower(msg.ContentType); ct { + case senML: + return err + case "": + output = outputUnknown + default: + output = fmt.Sprintf("out.%s", ct) + } } for _, v := range normalized {