diff --git a/app.js b/app.js index 82a80a3..fbf5a7c 100644 --- a/app.js +++ b/app.js @@ -15,7 +15,7 @@ var authentication = require('basic-authentication'); var cfenv = require('cfenv'); -var WebSocketServer = require('websocket').server; +// var WebSocketServer = require('websocket').server; var EventEmitter = require('events'); var busEmitter = new EventEmitter(); @@ -26,7 +26,6 @@ var mqtt = new mqttClient.mqttClient(busEmitter); var historianApi = require('./lib/historian/emulator.js'); - var app = express(); var port = (process.env.VCAP_APP_PORT || 3010); @@ -39,10 +38,26 @@ if (process.env.NODE_ENV === 'production') { isProduction = true; } +var restartTimer = 0; + logger.warn('isProduction:', isProduction); +var doRestart = () => { + mqtt = new mqttClient.mqttClient(busEmitter); +}; + +busEmitter.on('MAJORERROR', () => { + 'use strict'; + logger.warn('Major error discovered. Restarting MQTT Service in 10 seconds.'); + + clearTimeout(restartTimer); + restartTimer = setTimeout(doRestart, 10000); + +}); + var heartBeat = function() { + console.log(this); this.pingTimer = 0; this.count = 0; this.rate = 90000; diff --git a/lib/mailer.js b/lib/mailer.js new file mode 100644 index 0000000..b456f2a --- /dev/null +++ b/lib/mailer.js @@ -0,0 +1,44 @@ +/** + * + * User: Martin Donnelly + * Date: 2016-04-08 + * Time: 16:35 + * + */ +var UltraSES = require('ultrases'), dateFormat = require('dateformat'); + +var logger = require('log4js').getLogger(); + +var mailer = new UltraSES({ + aws: { + accessKeyId: 'AKIAJWJS75F7WNCGK64A', + secretAccessKey: '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', + region: 'eu-west-1' + }, defaults: { + from: 'Martin Donnelly ' + } +}); + +const prefix = process.env.NODE_ENV === 'production' ? 'Production' : 'Dev'; + +module.exports = { + + sendEmailV1: function(contents) { + var now = new Date(); + var email = { + to: 'martind2000@gmail.com', subject: 'MQTT ' + prefix + ' Archiver Alert' + }; + + var msg = '

MQTT ' + prefix + ' Archiver Alert

Alert logged at ' + dateFormat(now, "dddd, mmmm dS, yyyy, HH:MM:ss") + '

' + contents + '
'; + + mailer.sendHTML(email, msg, function(err){ + if(err) { + logger.error(err); + throw err; + } + logger.info('email sent!'); + }); + + + } +}; diff --git a/lib/mqtt/mqttClient.js b/lib/mqtt/mqttClient.js index 4dd7db0..b8b0018 100644 --- a/lib/mqtt/mqttClient.js +++ b/lib/mqtt/mqttClient.js @@ -1,3 +1,4 @@ +'use strict'; var mqtt = require('mqtt'); var logger = require('log4js').getLogger(); @@ -10,17 +11,26 @@ var requestify = require('requestify'); var db = require('../server/db-connector').dbConnection; var dbSave = require('../server/db-save')(db); +var emailer = require('../mailer'); //var nano = require('nano')('http://martind2000:1V3D4m526i@localhost:5984'); +/* var nano = require('nano')('http://localhost:5984'); var db_name = 'mqttarchive'; var dbCouch = nano.use(db_name); +*/ +let remoteHost = process.env.NODE_ENV === 'production' ? 'http://mdotserver.mybluemix.net/apiv2/message' : 'http://localhost:3011/apiv2/message'; function dataBuilder(obj) { - 'use strict'; + var now = new Date(); - var newObj = {device_type: 'mDot', evt_type: 'update', timestamp: {}, evt: {}}; + var newObj = { + device_type: 'mDot', + evt_type: 'update', + timestamp: {}, + evt: {} + }; newObj.device_id = obj.topic.split('/')[4]; @@ -30,7 +40,6 @@ function dataBuilder(obj) { return newObj; } - function saveToDB(data) { // logger.debug('Inserting into couch...'); // Logger.info(util.inspect(obj)); @@ -45,28 +54,28 @@ function saveToDB(data) { var doSendMessage = (obj) => { 'use strict'; - requestify.post('http://localhost:3011/apiv2/message', obj) - //requestify.post('http://mdotserver.mybluemix.net/apiv2/message', obj) - .then(function(response) { - // Get the response body - // logger.debug(response.getBody()); - }) + + + requestify.post(remoteHost, obj) + .then(function(response) { + // Get the response body + // logger.debug(response.getBody()); + }) .catch(function(e) { - logger.error('doSendMessage',e); + logger.error('doSendMessage', e); }); }; - var doInsertEntry = (obj) => { // Logger.info('sendSocket: ' + JSON.stringify(obj)); - // insertEntry(obj); + // insertEntry(obj); dbSave.addNewEvent(obj) .then(function(d) { 'use strict'; - //logger.info('Finished - Raw',d); - saveToDB(obj); + // logger.info('Obj', obj.type); + // saveToDB(obj); if (obj.type === 'mDot') { busEmitter.emit('sendMessage', obj); } @@ -76,92 +85,118 @@ var doInsertEntry = (obj) => { logger.error(e); }); - +}; +var lastReceived; +var watchDog = 0; +var wdTimedOut = false; +var watchDogTimeout = () => { + 'use strict'; + var now = new Date(); + logger.warn( + 'Watchdog timeout. Message has not been received for over 20 minutes.'); + logger.warn('Last received:' + lastReceived + 'Now:' + now); + emailer.sendEmailV1('Watchdog timeout. Message has not been received for over 20 Minutes. \n\nLast received:' + lastReceived + 'Now:' + now); + wdTimedOut = true; }; - var mqttClient = function() { -var count=0; - var subscribeTopic; - var orgId = 'qz0da4'; - var userName = 'a-qz0da4-dfwwdkmkzr'; - var address = '.messaging.internetofthings.ibmcloud.com'; - var appKey = '9txJEf3Cjy7hkSOvkv'; - //Var subscribeTopic = prefix + deviceId + '/evt/+/fmt/json'; + var count = 0; + var subscribeTopic; + var orgId = 'qz0da4'; + var userName = 'a-qz0da4-dfwwdkmkzr'; + var address = '.messaging.internetofthings.ibmcloud.com'; + var appKey = '9txJEf3Cjy7hkSOvkv'; + var connection = 'mqtt://' + orgId + address; - this.connected = false; + this.connected = false; - var options = { - keepalive: 10, - clientId: 'a:' + orgId + ':' + Date.now(), - username: userName, - password: new Buffer(appKey), - reconnectPeriod:1000, - connectTimeout:30 * 1000 + var options = { + keepalive: 10, + clientId: 'a:' + orgId + ':' + Date.now(), + username: userName, + password: new Buffer(appKey), + reconnectPeriod: 1000, + connectTimeout: 30 * 1000 - }; + }; - this.client = mqtt.connect('mqtt://' + orgId + address, options); + logger.info('Connecting to ', connection); + //this.client = mqtt.connect(connection, options); + this.client = mqtt.connect('mqtt://' + orgId + address, options); - this.client.on('connect', function() { - connected = true; - logger.info('Connected to ', address); - }.bind(this)); + this.client.on('connect', function() { + // connected = true; + logger.info('Connected to ', address); + emailer.sendEmailV1('Connected to ' + address); + }.bind(this)); this.client.on('connected', function() { - logger.debug('mqttConnect - doConnection - Connected'); + logger.debug('mqttConnect - doConnection - Connected'); - }); + watchDog = setTimeout(watchDogTimeout, 1200000); + console.log(watchDog); + + }); this.client.on('close', function() { logger.warn('mqttConnect - Connection closed'); + }); this.client.on('offline', function() { logger.warn('mqttConnect - OFFLINE!'); + emailer.sendEmailV1('mqttConnect - OFFLINE!'); }); - this.client.on('error', function(e) { logger.error('mqttConnect - error'); logger.error(e); }); this.client.on('reconnect', function() { - logger.warn('mqttConnect - Attempting to reconnect...'); + logger.warn('mqttConnect - Attempting to reconnect...'); - }); + }); subscribeTopic = 'iot-2/type/+/id/+/evt/+/fmt/json'; + //subscribeTopic = 'livingroomTemp'; logger.info('Subscribing:', subscribeTopic); + this.client.subscribe(subscribeTopic); - this.client.on('message', function(topic, message) { + this.client.on('message', function(topic, message) { - var json = JSON.parse(message.toString()); + var json = JSON.parse(message.toString()); - var topicArray = topic.split('/'); - json.topic = topic; - json.type = topicArray[2]; - json.device = topicArray[4]; - json.event = topicArray[6]; + var topicArray = topic.split('/'); + json.topic = topic; + json.type = topicArray[2]; + json.device = topicArray[4]; + json.event = topicArray[6]; - busEmitter.emit('saveData', json); - count++; + busEmitter.emit('saveData', json); + clearTimeout(watchDog); + watchDog = setTimeout(watchDogTimeout, 1200000); + lastReceived = new Date(); + console.log('wdTimedOut',wdTimedOut); + if (wdTimedOut) { - }.bind(this)); + emailer.sendEmailV1('Receiving again: ' + message.toString()); + wdTimedOut = false; + } + count++; - busEmitter.on('saveData', doInsertEntry); - busEmitter.on('sendMessage', doSendMessage); + }.bind(this)); + + busEmitter.on('saveData', doInsertEntry); + busEmitter.on('sendMessage', doSendMessage); this.getCount = function() { 'use strict'; return count; } - }; - - +}; module.exports.mqttClient = mqttClient; diff --git a/lib/server/db-connector.js b/lib/server/db-connector.js index 48207b0..8007530 100644 --- a/lib/server/db-connector.js +++ b/lib/server/db-connector.js @@ -9,7 +9,7 @@ var pgp = require('pg-promise')(); -var cn = { +var localCN = { host: 'localhost', port: 5432, database: 'mqttstore', @@ -19,15 +19,16 @@ var cn = { // ElephantSql settings -/* -var cn = { +var remoteCN = { host: 'jumbo.db.elephantsql.com', port: 5432, database: 'vmlcokon', user: 'vmlcokon', password: 'PQUYLiIW4M6r7SWyZevrES_rRAULYFkp' }; -*/ + + +const cn = process.env.NODE_ENV === 'production' ? remoteCN : localCN; exports.dbConnection = pgp(cn); diff --git a/package.json b/package.json index c70ab63..ca77843 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "name": "mdot_mqtt", + "name": "mqttArchiver", "version": "1.0.0", "description": "", "main": "index.js", @@ -14,6 +14,7 @@ "body-parser": "^1.15.2", "cfenv": "1.0.x", "cookie-parser": "^1.4.3", + "dateformat": "^1.0.12", "ejs": "^2.5.1", "errorhandler": "^1.4.3", "events": "^1.1.1", @@ -35,6 +36,7 @@ "routes": "^2.1.0", "sugar": "^2.0.1", "sugar-date": "^2.0.0", + "ultrases": "^0.1.3", "websocket": "^1.0.22" }, "devDependencies": { @@ -81,6 +83,6 @@ "author": "Martin Donnelly ", "license": "ISC", "engines": { - "node": "5.7.0" + "node": "6.0.0" } }