SODashServer/lib/mqtt-ws/mqtt-ws.js
Martin Donnelly d93ebb4466 init
2016-04-13 10:01:28 +01:00

109 lines
3.3 KiB
JavaScript

"use strict";
/**
* Created by Martin on 10/02/2016.
*/
var url = require('url'),
util = require('util'),
errno = require('errno'),
log4js = require('log4js'),
logger = log4js.getLogger(),
mqttws = require('mqtt-ws');
function getErrnoDescription(err) {
if (!err.errno) return undefined;
if (typeof err.errno == 'number') {
var e = errno.errno[err.errno];
if (e) {
return e.description;
} else {
return undefined;
}
} else if (typeof err.errno == 'string') {
for (var e in errno.errno) {
if (errno.errno[e].code == err.code) {
return errno.errno[e].description;
}
}
return undefined;
}
}
function logError(err, message) {
if (err.syscall != undefined) {
var description = getErrnoDescription(err) || err.code;
logger.error("%s on %s: %s", message, err.syscall, description);
} else {
logger.error("%s: %s", message, err);
}
}
exports.run = function(config) {
if (config.log4js) {
log4js.configure(config.log4js);
}
// Create our bridge
var bridge = mqttws.createBridge(config);
logger.info("Listening for incoming WebSocket connections on port %d",
bridge.port);
// Set up error handling
bridge.on('error', function(err) {
logError(err, "WebSocket Error");
});
// Handle incoming WS connection
bridge.on('connection', function(ws) {
// URL-decode the URL, and use the URI part as the subscription topic
logger.info("WebSocket connection from %s received", ws.connectString);
var self = this;
ws.on('error', function(err) {
logError(err, util.format("WebSocket error in client %s", ws.connectString));
});
// Parse the URL
var parsed = url.parse(ws.upgradeReq.url, true);
// Connect to the MQTT server using the URL query as options
var mqtt = bridge.connectMqtt(parsed.query);
mqtt.topic = decodeURIComponent(parsed.pathname.substring(1));
mqtt.isWildcardTopic = (mqtt.topic.match(/[\+#]/) != null);
ws.on('close', function() {
logger.info("WebSocket client %s closed", ws.connectString);
mqtt.end();
});
ws.on('message', function(message) {
logger.info("WebSocket client %s publishing '%s' to %s",
ws.connectString, message, mqtt.topic);
mqtt.publish(mqtt.topic, message, mqtt.options);
});
mqtt.on('error', function(err) {
logError(err, "MQTT error");
});
mqtt.on('connect', function() {
logger.info("Connected to MQTT server at %s:%d", mqtt.host, mqtt.port);
logger.info("WebSocket client %s subscribing to '%s'", ws.connectString, mqtt.topic);
mqtt.subscribe(mqtt.topic);
});
mqtt.on('close', function() {
logger.info("MQTT connection for client %s closed",
ws.connectString);
ws.terminate();
});
mqtt.on('message', function(topic, message, packet) {
if (mqtt.isWildcardTopic) {
ws.send(util.format("%s: %s", topic, message), self.options);
} else {
ws.send(message, self.options);
}
});
});
};