'use strict'; var mqtt = require('mqtt'); var logger = require('log4js').getLogger(); var EventEmitter = require('events'); var busEmitter = new EventEmitter(); var requestify = require('requestify'); var db = require('../server/db-connector').dbConnection; var dbSave = require('../server/db-save')(db); //let remoteHost = process.env.NODE_ENV === 'production' ? 'http://mdotserver.mybluemix.net/apiv2/message' : 'http://localhost:3011/apiv2/message'; const remoteHost = process.env.NODE_ENV === 'production' ? 'http://ec2-52-211-111-57.eu-west-1.compute.amazonaws.com/apiv2/message' : 'http://localhost:3011/apiv2/message'; function dataBuilder(obj) { var now = new Date(); var newObj = { device_type: 'mDot', evt_type: 'update', timestamp: {}, evt: {} }; newObj.device_id = obj.topic.split('/')[4]; newObj.evt = obj; newObj.timestamp['$date'] = now.getTime(); return newObj; } function saveToDB(data) { // logger.debug('Inserting into couch...'); // Logger.info(util.inspect(obj)); dbCouch.insert(data, function(err, body, header) { if (err) { logger.error('Error inserting into couch'); logger.error(err); return; } }); } var doSendMessage = (obj) => { 'use strict'; requestify.post(remoteHost, obj) .then(function(response) { // Get the response body // logger.debug(response.getBody()); }) .catch(function(e) { logger.error('doSendMessage', e); }); }; var doInsertEntry = (obj) => { // Logger.info('sendSocket: ' + JSON.stringify(obj)); // insertEntry(obj); dbSave.addNewEvent(obj) .then(function(d) { 'use strict'; // logger.info('Obj', obj.type); // saveToDB(obj); if (obj.type === 'mDot') { busEmitter.emit('sendMessage', obj); } }) .catch(function(e) { 'use strict'; logger.error(e); }); }; var lastReceived; var watchDog = 0; var watchDogHie = 0; var wdTimedOut = false; var watchDogTimeout = () => { 'use strict'; var now = new Date(); logger.warn( 'Watchdog timeout. Message has not been received for over 20 minutes.'); logger.warn('Last received:' + lastReceived + 'Now:' + now); wdTimedOut = true; }; var mqttClient = function() { var count = 0; var subscribeTopic; var orgId = 'qz0da4'; var userName = 'a-qz0da4-dfwwdkmkzr'; var address = '.messaging.internetofthings.ibmcloud.com'; var appKey = '9txJEf3Cjy7hkSOvkv'; var connection = 'mqtt://' + orgId + address; this.connected = false; var options = { keepalive: 10, clientId: 'a:' + orgId + ':' + Date.now(), username: userName, password: new Buffer(appKey), reconnectPeriod: 1000, connectTimeout: 30 * 1000 }; logger.info('Connecting to ', connection); //this.client = mqtt.connect(connection, options); this.client = mqtt.connect('mqtt://' + orgId + address, options); this.client.on('connect', function() { // connected = true; logger.info('Connected to ', address); clearTimeout(watchDog); watchDog = setTimeout(watchDogTimeout, 1200000); lastReceived = new Date(); }.bind(this)); this.client.on('connected', function() { logger.debug('mqttConnect - doConnection - Connected'); }); this.client.on('close', function() { logger.warn('mqttConnect - Connection closed'); }); this.client.on('offline', function() { logger.warn('mqttConnect - OFFLINE!'); }); this.client.on('error', function(e) { logger.error('mqttConnect - error'); logger.error(e); }); this.client.on('reconnect', function() { logger.warn('mqttConnect - Attempting to reconnect...'); }); subscribeTopic = 'iot-2/type/+/id/+/evt/+/fmt/json'; //subscribeTopic = 'livingroomTemp'; logger.info('Subscribing:', subscribeTopic); this.client.subscribe(subscribeTopic); this.client.on('message', function(topic, message) { var json = JSON.parse(message.toString()); var topicArray = topic.split('/'); json.topic = topic; json.type = topicArray[2]; json.device = topicArray[4]; json.event = topicArray[6]; busEmitter.emit('saveData', json); //console.log(json.type); if (json.type !== 'Ti-CC3200') { console.log(json); clearTimeout(watchDog); watchDog = setTimeout(watchDogTimeout, 1200000); if (wdTimedOut) { var now = new Date(); var s = 'Receiving again :::\n\nLast received: ' + lastReceived + 'Now: ' + now + "\n\n" + message.toString(); logger.warn(s); wdTimedOut = false; } lastReceived = new Date(); } count++; }.bind(this)); busEmitter.on('saveData', doInsertEntry); busEmitter.on('sendMessage', doSendMessage); this.getCount = function() { 'use strict'; return count; }; }; module.exports.mqttClient = mqttClient;