mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-27 13:48:49 +08:00
MF-429 -Enabled MQTT subtopic's (#554)
* Enabled MQTT subtopic Signed-off-by: nmarcetic <n.marcetic86@gmail.com> * Removed debug logs Signed-off-by: nmarcetic <n.marcetic86@gmail.com> * Resolved remarks Signed-off-by: nmarcetic <n.marcetic86@gmail.com> * Resolved remarks Signed-off-by: nmarcetic <n.marcetic86@gmail.com>
This commit is contained in:
parent
6bc287e028
commit
c8e7a97122
27
mqtt/mqtt.js
27
mqtt/mqtt.js
@ -98,7 +98,8 @@ nats.subscribe('channel.*', function (msg) {
|
|||||||
|
|
||||||
aedes.authorizePublish = function (client, packet, publish) {
|
aedes.authorizePublish = function (client, packet, publish) {
|
||||||
// Topics are in the form `channels/<channel_id>/messages`
|
// Topics are in the form `channels/<channel_id>/messages`
|
||||||
var channel = /^channels\/(.+?)\/messages$/.exec(packet.topic);
|
// Subtopic's are in the form `channels/<channel_id>/messages/<subtopic>`
|
||||||
|
var channel = /^channels\/(.+?)\/messages\/?(.+?)?$/.exec(packet.topic);
|
||||||
if (!channel) {
|
if (!channel) {
|
||||||
logger.warn('unknown topic');
|
logger.warn('unknown topic');
|
||||||
publish(4); // Bad username or password
|
publish(4); // Bad username or password
|
||||||
@ -109,6 +110,18 @@ aedes.authorizePublish = function (client, packet, publish) {
|
|||||||
token: client.password,
|
token: client.password,
|
||||||
chanID: channelId
|
chanID: channelId
|
||||||
},
|
},
|
||||||
|
// Parse unlimited subtopics
|
||||||
|
baseLength = 3, // First 3 elements which represents the base part of topic.
|
||||||
|
elements = packet.topic.split('/').slice(baseLength),
|
||||||
|
baseTopic = 'channel.' + channelId;
|
||||||
|
// Remove empty elements
|
||||||
|
for (var i = 0; i < elements.length; i++) {
|
||||||
|
if (elements[i] === '') {
|
||||||
|
elements.pop(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var channelTopic = elements.length ? baseTopic + '.' + elements.join('.') : baseTopic,
|
||||||
|
|
||||||
onAuthorize = function (err, res) {
|
onAuthorize = function (err, res) {
|
||||||
var rawMsg;
|
var rawMsg;
|
||||||
if (!err) {
|
if (!err) {
|
||||||
@ -120,7 +133,7 @@ aedes.authorizePublish = function (client, packet, publish) {
|
|||||||
Protocol: 'mqtt',
|
Protocol: 'mqtt',
|
||||||
Payload: packet.payload
|
Payload: packet.payload
|
||||||
});
|
});
|
||||||
nats.publish('channel.' + channelId, rawMsg);
|
nats.publish(channelTopic, rawMsg);
|
||||||
|
|
||||||
publish(0);
|
publish(0);
|
||||||
} else {
|
} else {
|
||||||
@ -135,8 +148,14 @@ aedes.authorizePublish = function (client, packet, publish) {
|
|||||||
|
|
||||||
aedes.authorizeSubscribe = function (client, packet, subscribe) {
|
aedes.authorizeSubscribe = function (client, packet, subscribe) {
|
||||||
// Topics are in the form `channels/<channel_id>/messages`
|
// Topics are in the form `channels/<channel_id>/messages`
|
||||||
var channel = /^channels\/(.+?)\/messages$/.exec(packet.topic),
|
// Subtopic's are in the form `channels/<channel_id>/messages/<subtopic>`
|
||||||
channelId = channel[1],
|
var channel = /^channels\/(.+?)\/messages\/?(.+?)?$/.exec(packet.topic);
|
||||||
|
if (!channel) {
|
||||||
|
logger.warn('unknown topic');
|
||||||
|
subscribe(4, packet); // Bad username or password
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
var channelId = channel[1],
|
||||||
accessReq = {
|
accessReq = {
|
||||||
token: client.password,
|
token: client.password,
|
||||||
chanID: channelId
|
chanID: channelId
|
||||||
|
@ -19,7 +19,7 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
queue = "normalizers"
|
queue = "normalizers"
|
||||||
input = "channel.*"
|
input = "channel.>"
|
||||||
outputUnknown = "out.unknown"
|
outputUnknown = "out.unknown"
|
||||||
senML = "application/senml+json"
|
senML = "application/senml+json"
|
||||||
)
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user