1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-05-02 22:17:10 +08:00

Fix MQTT raw message deserialization (#753)

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
This commit is contained in:
Aleksandar Novaković 2019-05-21 11:27:26 +02:00 committed by Nikola Marčetić
parent 066057d914
commit 6d6f3cc858
2 changed files with 6 additions and 3 deletions

View File

@ -53,7 +53,10 @@ var config = {
thingsSchema = protoDescriptor.mainflux, thingsSchema = protoDescriptor.mainflux,
messagesSchema = new protobuf.Root().loadSync(config.schema_dir + '/message.proto'), messagesSchema = new protobuf.Root().loadSync(config.schema_dir + '/message.proto'),
RawMessage = messagesSchema.lookupType('mainflux.RawMessage'), RawMessage = messagesSchema.lookupType('mainflux.RawMessage'),
nats = require('nats').connect(config.nats_url), nats = require('nats').connect({
servers: [config.nats_url],
preserveBuffers: true,
}),
aedesRedis = require('aedes-persistence-redis')({ aedesRedis = require('aedes-persistence-redis')({
port: config.redis_port, port: config.redis_port,
host: config.redis_host, host: config.redis_host,
@ -116,7 +119,7 @@ function startMqtt() {
} }
nats.subscribe('channel.>', {'queue':'mqtts'}, function (msg) { nats.subscribe('channel.>', {'queue':'mqtts'}, function (msg) {
var m = RawMessage.decode(Uint8Array.from(msg, c => c.charCodeAt(0))), var m = RawMessage.decode(msg),
packet, subtopic; packet, subtopic;
if (m && m.protocol !== 'mqtt') { if (m && m.protocol !== 'mqtt') {
subtopic = m.subtopic !== '' ? '/' + m.subtopic.replace(/\./g, '/') : ''; subtopic = m.subtopic !== '' ? '/' + m.subtopic.replace(/\./g, '/') : '';

View File

@ -23,7 +23,7 @@
"grpc": "^1.20.3", "grpc": "^1.20.3",
"lodash": "^4.17.10", "lodash": "^4.17.10",
"mqemitter-redis": "^3.0.0", "mqemitter-redis": "^3.0.0",
"nats": "^0.6.8", "nats": "^1.2.10",
"protobufjs": "^6.8.8", "protobufjs": "^6.8.8",
"redis": "^2.8.0", "redis": "^2.8.0",
"request": "^2.81.0", "request": "^2.81.0",