mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-27 13:48:49 +08:00
NOISSUE - Emit non-SenML messages (#279)
This commit is contained in:
parent
570ebc1c72
commit
ceb630c782
@ -2,6 +2,7 @@ package normalizer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/cisco/senml"
|
||||
"github.com/go-kit/kit/metrics"
|
||||
@ -12,9 +13,11 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
queue string = "normalizers"
|
||||
subject string = "channel.*"
|
||||
output string = "normalized"
|
||||
queue = "normalizers"
|
||||
input = "channel.*"
|
||||
outputSenML = "out.senml"
|
||||
outputUnknown = "out.unknown"
|
||||
senML = "application/senml+json"
|
||||
)
|
||||
|
||||
type eventFlow struct {
|
||||
@ -26,7 +29,7 @@ type eventFlow struct {
|
||||
func Subscribe(nc *nats.Conn, logger log.Logger, counter metrics.Counter, latency metrics.Histogram) {
|
||||
flow := eventFlow{nc, logger}
|
||||
mm := newMetricsMiddleware(flow, counter, latency)
|
||||
flow.nc.QueueSubscribe(subject, queue, mm.handleMessage)
|
||||
flow.nc.QueueSubscribe(input, queue, mm.handleMessage)
|
||||
}
|
||||
|
||||
func (ef eventFlow) handleMsg(m *nats.Msg) {
|
||||
@ -44,10 +47,18 @@ func (ef eventFlow) handleMsg(m *nats.Msg) {
|
||||
}
|
||||
|
||||
func (ef eventFlow) publish(msg mainflux.RawMessage) error {
|
||||
output := outputSenML
|
||||
normalized, err := ef.normalize(msg)
|
||||
if err != nil {
|
||||
ef.logger.Warn(fmt.Sprintf("Normalization failed: %s", err))
|
||||
return err
|
||||
switch ct := strings.ToLower(msg.ContentType); ct {
|
||||
case senML:
|
||||
return err
|
||||
case "":
|
||||
output = outputUnknown
|
||||
default:
|
||||
output = fmt.Sprintf("out.%s", ct)
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range normalized {
|
||||
|
Loading…
x
Reference in New Issue
Block a user