"use strict"; /** * Created by Martin on 10/02/2016. */ var url = require('url'), util = require('util'), errno = require('errno'), log4js = require('log4js'), logger = log4js.getLogger(), mqttws = require('mqtt-ws'); function getErrnoDescription(err) { if (!err.errno) return undefined; if (typeof err.errno == 'number') { var e = errno.errno[err.errno]; if (e) { return e.description; } else { return undefined; } } else if (typeof err.errno == 'string') { for (var e in errno.errno) { if (errno.errno[e].code == err.code) { return errno.errno[e].description; } } return undefined; } } function logError(err, message) { if (err.syscall != undefined) { var description = getErrnoDescription(err) || err.code; logger.error("%s on %s: %s", message, err.syscall, description); } else { logger.error("%s: %s", message, err); } } exports.run = function(config) { if (config.log4js) { log4js.configure(config.log4js); } // Create our bridge var bridge = mqttws.createBridge(config); logger.info("Listening for incoming WebSocket connections on port %d", bridge.port); // Set up error handling bridge.on('error', function(err) { logError(err, "WebSocket Error"); }); // Handle incoming WS connection bridge.on('connection', function(ws) { // URL-decode the URL, and use the URI part as the subscription topic logger.info("WebSocket connection from %s received", ws.connectString); var self = this; ws.on('error', function(err) { logError(err, util.format("WebSocket error in client %s", ws.connectString)); }); // Parse the URL var parsed = url.parse(ws.upgradeReq.url, true); // Connect to the MQTT server using the URL query as options var mqtt = bridge.connectMqtt(parsed.query); mqtt.topic = decodeURIComponent(parsed.pathname.substring(1)); mqtt.isWildcardTopic = (mqtt.topic.match(/[\+#]/) != null); ws.on('close', function() { logger.info("WebSocket client %s closed", ws.connectString); mqtt.end(); }); ws.on('message', function(message) { logger.info("WebSocket client %s publishing '%s' to %s", ws.connectString, message, mqtt.topic); mqtt.publish(mqtt.topic, message, mqtt.options); }); mqtt.on('error', function(err) { logError(err, "MQTT error"); }); mqtt.on('connect', function() { logger.info("Connected to MQTT server at %s:%d", mqtt.host, mqtt.port); logger.info("WebSocket client %s subscribing to '%s'", ws.connectString, mqtt.topic); mqtt.subscribe(mqtt.topic); }); mqtt.on('close', function() { logger.info("MQTT connection for client %s closed", ws.connectString); ws.terminate(); }); mqtt.on('message', function(topic, message, packet) { if (mqtt.isWildcardTopic) { ws.send(util.format("%s: %s", topic, message), self.options); } else { ws.send(message, self.options); } }); }); };