cf stupidity

This commit is contained in:
Martin Donnelly 2016-09-26 17:13:12 +01:00
parent 3a61e3f3fd
commit cc5d261f27
5 changed files with 162 additions and 65 deletions

19
app.js
View File

@ -15,7 +15,7 @@ var authentication = require('basic-authentication');
var cfenv = require('cfenv'); var cfenv = require('cfenv');
var WebSocketServer = require('websocket').server; // var WebSocketServer = require('websocket').server;
var EventEmitter = require('events'); var EventEmitter = require('events');
var busEmitter = new EventEmitter(); var busEmitter = new EventEmitter();
@ -26,7 +26,6 @@ var mqtt = new mqttClient.mqttClient(busEmitter);
var historianApi = require('./lib/historian/emulator.js'); var historianApi = require('./lib/historian/emulator.js');
var app = express(); var app = express();
var port = (process.env.VCAP_APP_PORT || 3010); var port = (process.env.VCAP_APP_PORT || 3010);
@ -39,10 +38,26 @@ if (process.env.NODE_ENV === 'production') {
isProduction = true; isProduction = true;
} }
var restartTimer = 0;
logger.warn('isProduction:', isProduction); logger.warn('isProduction:', isProduction);
var doRestart = () => {
mqtt = new mqttClient.mqttClient(busEmitter);
};
busEmitter.on('MAJORERROR', () => {
'use strict';
logger.warn('Major error discovered. Restarting MQTT Service in 10 seconds.');
clearTimeout(restartTimer);
restartTimer = setTimeout(doRestart, 10000);
});
var heartBeat = function() { var heartBeat = function() {
console.log(this);
this.pingTimer = 0; this.pingTimer = 0;
this.count = 0; this.count = 0;
this.rate = 90000; this.rate = 90000;

44
lib/mailer.js Normal file
View File

@ -0,0 +1,44 @@
/**
*
* User: Martin Donnelly
* Date: 2016-04-08
* Time: 16:35
*
*/
var UltraSES = require('ultrases'), dateFormat = require('dateformat');
var logger = require('log4js').getLogger();
var mailer = new UltraSES({
aws: {
accessKeyId: 'AKIAJWJS75F7WNCGK64A',
secretAccessKey: '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',
region: 'eu-west-1'
}, defaults: {
from: 'Martin Donnelly <martind2000@gmail.com>'
}
});
const prefix = process.env.NODE_ENV === 'production' ? 'Production' : 'Dev';
module.exports = {
sendEmailV1: function(contents) {
var now = new Date();
var email = {
to: 'martind2000@gmail.com', subject: 'MQTT ' + prefix + ' Archiver Alert'
};
var msg = '<h1>MQTT ' + prefix + ' Archiver Alert</h1><p>Alert logged at ' + dateFormat(now, "dddd, mmmm dS, yyyy, HH:MM:ss") + '</p><pre>' + contents + '</pre>';
mailer.sendHTML(email, msg, function(err){
if(err) {
logger.error(err);
throw err;
}
logger.info('email sent!');
});
}
};

View File

@ -1,3 +1,4 @@
'use strict';
var mqtt = require('mqtt'); var mqtt = require('mqtt');
var logger = require('log4js').getLogger(); var logger = require('log4js').getLogger();
@ -10,17 +11,26 @@ var requestify = require('requestify');
var db = require('../server/db-connector').dbConnection; var db = require('../server/db-connector').dbConnection;
var dbSave = require('../server/db-save')(db); var dbSave = require('../server/db-save')(db);
var emailer = require('../mailer');
//var nano = require('nano')('http://martind2000:1V3D4m526i@localhost:5984'); //var nano = require('nano')('http://martind2000:1V3D4m526i@localhost:5984');
/*
var nano = require('nano')('http://localhost:5984'); var nano = require('nano')('http://localhost:5984');
var db_name = 'mqttarchive'; var db_name = 'mqttarchive';
var dbCouch = nano.use(db_name); var dbCouch = nano.use(db_name);
*/
let remoteHost = process.env.NODE_ENV === 'production' ? 'http://mdotserver.mybluemix.net/apiv2/message' : 'http://localhost:3011/apiv2/message';
function dataBuilder(obj) { function dataBuilder(obj) {
'use strict';
var now = new Date(); var now = new Date();
var newObj = {device_type: 'mDot', evt_type: 'update', timestamp: {}, evt: {}}; var newObj = {
device_type: 'mDot',
evt_type: 'update',
timestamp: {},
evt: {}
};
newObj.device_id = obj.topic.split('/')[4]; newObj.device_id = obj.topic.split('/')[4];
@ -30,7 +40,6 @@ function dataBuilder(obj) {
return newObj; return newObj;
} }
function saveToDB(data) { function saveToDB(data) {
// logger.debug('Inserting into couch...'); // logger.debug('Inserting into couch...');
// Logger.info(util.inspect(obj)); // Logger.info(util.inspect(obj));
@ -45,28 +54,28 @@ function saveToDB(data) {
var doSendMessage = (obj) => { var doSendMessage = (obj) => {
'use strict'; 'use strict';
requestify.post('http://localhost:3011/apiv2/message', obj)
//requestify.post('http://mdotserver.mybluemix.net/apiv2/message', obj)
.then(function(response) { requestify.post(remoteHost, obj)
// Get the response body .then(function(response) {
// logger.debug(response.getBody()); // Get the response body
}) // logger.debug(response.getBody());
})
.catch(function(e) { .catch(function(e) {
logger.error('doSendMessage',e); logger.error('doSendMessage', e);
}); });
}; };
var doInsertEntry = (obj) => { var doInsertEntry = (obj) => {
// Logger.info('sendSocket: ' + JSON.stringify(obj)); // Logger.info('sendSocket: ' + JSON.stringify(obj));
// insertEntry(obj); // insertEntry(obj);
dbSave.addNewEvent(obj) dbSave.addNewEvent(obj)
.then(function(d) { .then(function(d) {
'use strict'; 'use strict';
//logger.info('Finished - Raw',d); // logger.info('Obj', obj.type);
saveToDB(obj); // saveToDB(obj);
if (obj.type === 'mDot') { if (obj.type === 'mDot') {
busEmitter.emit('sendMessage', obj); busEmitter.emit('sendMessage', obj);
} }
@ -76,92 +85,118 @@ var doInsertEntry = (obj) => {
logger.error(e); logger.error(e);
}); });
};
var lastReceived;
var watchDog = 0;
var wdTimedOut = false;
var watchDogTimeout = () => {
'use strict';
var now = new Date();
logger.warn(
'Watchdog timeout. Message has not been received for over 20 minutes.');
logger.warn('Last received:' + lastReceived + 'Now:' + now);
emailer.sendEmailV1('Watchdog timeout. Message has not been received for over 20 Minutes. \n\nLast received:' + lastReceived + 'Now:' + now);
wdTimedOut = true;
}; };
var mqttClient = function() { var mqttClient = function() {
var count=0;
var subscribeTopic;
var orgId = 'qz0da4';
var userName = 'a-qz0da4-dfwwdkmkzr';
var address = '.messaging.internetofthings.ibmcloud.com';
var appKey = '9txJEf3Cjy7hkSOvkv';
//Var subscribeTopic = prefix + deviceId + '/evt/+/fmt/json'; var count = 0;
var subscribeTopic;
var orgId = 'qz0da4';
var userName = 'a-qz0da4-dfwwdkmkzr';
var address = '.messaging.internetofthings.ibmcloud.com';
var appKey = '9txJEf3Cjy7hkSOvkv';
var connection = 'mqtt://' + orgId + address;
this.connected = false; this.connected = false;
var options = { var options = {
keepalive: 10, keepalive: 10,
clientId: 'a:' + orgId + ':' + Date.now(), clientId: 'a:' + orgId + ':' + Date.now(),
username: userName, username: userName,
password: new Buffer(appKey), password: new Buffer(appKey),
reconnectPeriod:1000, reconnectPeriod: 1000,
connectTimeout:30 * 1000 connectTimeout: 30 * 1000
}; };
this.client = mqtt.connect('mqtt://' + orgId + address, options); logger.info('Connecting to ', connection);
//this.client = mqtt.connect(connection, options);
this.client = mqtt.connect('mqtt://' + orgId + address, options);
this.client.on('connect', function() { this.client.on('connect', function() {
connected = true; // connected = true;
logger.info('Connected to ', address); logger.info('Connected to ', address);
}.bind(this)); emailer.sendEmailV1('Connected to ' + address);
}.bind(this));
this.client.on('connected', function() { this.client.on('connected', function() {
logger.debug('mqttConnect - doConnection - Connected'); logger.debug('mqttConnect - doConnection - Connected');
}); watchDog = setTimeout(watchDogTimeout, 1200000);
console.log(watchDog);
});
this.client.on('close', function() { this.client.on('close', function() {
logger.warn('mqttConnect - Connection closed'); logger.warn('mqttConnect - Connection closed');
}); });
this.client.on('offline', function() { this.client.on('offline', function() {
logger.warn('mqttConnect - OFFLINE!'); logger.warn('mqttConnect - OFFLINE!');
emailer.sendEmailV1('mqttConnect - OFFLINE!');
}); });
this.client.on('error', function(e) { this.client.on('error', function(e) {
logger.error('mqttConnect - error'); logger.error('mqttConnect - error');
logger.error(e); logger.error(e);
}); });
this.client.on('reconnect', function() { this.client.on('reconnect', function() {
logger.warn('mqttConnect - Attempting to reconnect...'); logger.warn('mqttConnect - Attempting to reconnect...');
}); });
subscribeTopic = 'iot-2/type/+/id/+/evt/+/fmt/json'; subscribeTopic = 'iot-2/type/+/id/+/evt/+/fmt/json';
//subscribeTopic = 'livingroomTemp';
logger.info('Subscribing:', subscribeTopic); logger.info('Subscribing:', subscribeTopic);
this.client.subscribe(subscribeTopic); this.client.subscribe(subscribeTopic);
this.client.on('message', function(topic, message) { this.client.on('message', function(topic, message) {
var json = JSON.parse(message.toString()); var json = JSON.parse(message.toString());
var topicArray = topic.split('/'); var topicArray = topic.split('/');
json.topic = topic; json.topic = topic;
json.type = topicArray[2]; json.type = topicArray[2];
json.device = topicArray[4]; json.device = topicArray[4];
json.event = topicArray[6]; json.event = topicArray[6];
busEmitter.emit('saveData', json); busEmitter.emit('saveData', json);
count++; clearTimeout(watchDog);
watchDog = setTimeout(watchDogTimeout, 1200000);
lastReceived = new Date();
console.log('wdTimedOut',wdTimedOut);
if (wdTimedOut) {
}.bind(this)); emailer.sendEmailV1('Receiving again: ' + message.toString());
wdTimedOut = false;
}
count++;
busEmitter.on('saveData', doInsertEntry); }.bind(this));
busEmitter.on('sendMessage', doSendMessage);
busEmitter.on('saveData', doInsertEntry);
busEmitter.on('sendMessage', doSendMessage);
this.getCount = function() { this.getCount = function() {
'use strict'; 'use strict';
return count; return count;
} }
}; };
module.exports.mqttClient = mqttClient; module.exports.mqttClient = mqttClient;

View File

@ -9,7 +9,7 @@
var pgp = require('pg-promise')(); var pgp = require('pg-promise')();
var cn = { var localCN = {
host: 'localhost', host: 'localhost',
port: 5432, port: 5432,
database: 'mqttstore', database: 'mqttstore',
@ -19,15 +19,16 @@ var cn = {
// ElephantSql settings // ElephantSql settings
/* var remoteCN = {
var cn = {
host: 'jumbo.db.elephantsql.com', host: 'jumbo.db.elephantsql.com',
port: 5432, port: 5432,
database: 'vmlcokon', database: 'vmlcokon',
user: 'vmlcokon', user: 'vmlcokon',
password: 'PQUYLiIW4M6r7SWyZevrES_rRAULYFkp' password: 'PQUYLiIW4M6r7SWyZevrES_rRAULYFkp'
}; };
*/
const cn = process.env.NODE_ENV === 'production' ? remoteCN : localCN;
exports.dbConnection = pgp(cn); exports.dbConnection = pgp(cn);

View File

@ -1,5 +1,5 @@
{ {
"name": "mdot_mqtt", "name": "mqttArchiver",
"version": "1.0.0", "version": "1.0.0",
"description": "", "description": "",
"main": "index.js", "main": "index.js",
@ -14,6 +14,7 @@
"body-parser": "^1.15.2", "body-parser": "^1.15.2",
"cfenv": "1.0.x", "cfenv": "1.0.x",
"cookie-parser": "^1.4.3", "cookie-parser": "^1.4.3",
"dateformat": "^1.0.12",
"ejs": "^2.5.1", "ejs": "^2.5.1",
"errorhandler": "^1.4.3", "errorhandler": "^1.4.3",
"events": "^1.1.1", "events": "^1.1.1",
@ -35,6 +36,7 @@
"routes": "^2.1.0", "routes": "^2.1.0",
"sugar": "^2.0.1", "sugar": "^2.0.1",
"sugar-date": "^2.0.0", "sugar-date": "^2.0.0",
"ultrases": "^0.1.3",
"websocket": "^1.0.22" "websocket": "^1.0.22"
}, },
"devDependencies": { "devDependencies": {
@ -81,6 +83,6 @@
"author": "Martin Donnelly <martind2000@gmail.com>", "author": "Martin Donnelly <martind2000@gmail.com>",
"license": "ISC", "license": "ISC",
"engines": { "engines": {
"node": "5.7.0" "node": "6.0.0"
} }
} }