Stable version of listener and now messaging other app.

This commit is contained in:
Martin Donnelly 2016-09-05 10:27:12 +01:00
parent 589ec833d4
commit 3a61e3f3fd
8 changed files with 234 additions and 15 deletions

4
README.md Normal file
View File

@ -0,0 +1,4 @@
# MQTT Archiver
This app listens to all the available subscriptions for a given MQTT server then archives the raw data in a PGSQL database.

19
app.js
View File

@ -5,6 +5,7 @@ var ejs = require('ejs');
var morgan = require('morgan');
var cookieparser = require('cookie-parser');
var session = require('express-session');
var sessionLW = require('express-session-lw');
var methodoverride = require('method-override');
var bodyparser = require('body-parser');
var errorhandler = require('errorhandler');
@ -23,6 +24,9 @@ var mqttClient = require('./lib/mqtt/mqttClient');
var mqtt = new mqttClient.mqttClient(busEmitter);
var historianApi = require('./lib/historian/emulator.js');
var app = express();
var port = (process.env.VCAP_APP_PORT || 3010);
@ -41,6 +45,7 @@ logger.warn('isProduction:', isProduction);
var heartBeat = function() {
this.pingTimer = 0;
this.count = 0;
this.rate = 90000;
this.setupPing = function() {
logger.warn('Starting heartbeat...');
@ -49,10 +54,8 @@ var heartBeat = function() {
};
this.ping = function() {
//Logger.error('Ping!');
//this.sendRefresh();
var now = new Date;
var mod = 90000 - (now.getTime() % 90000);
var mod = this.rate - (now.getTime() % this.rate);
this.count++;
if (this.count > 5) {
this.count = 1;
@ -61,6 +64,7 @@ var heartBeat = function() {
var _newDots = ['.','.','.','.','.'];
_newDots[this.count - 1] = 'O';
logger.info(_newDots.join(''));
this.pingTimer = setTimeout(function() {this.ping();}.bind(this), mod + 10);
};
@ -75,9 +79,10 @@ app.set('views', __dirname + '/views');
app.set('view engine', 'ejs');
app.use(morgan('combined'));
app.use(cookieparser('your secret here'));
app.use(session({
secret: '1234567890QWERTY', resave: false, saveUninitialized: false
}));
app.use(session({secret: '1234567890QWERTY', resave: false, saveUninitialized: false}));
app.use(sessionLW());
/* 'default', 'short', 'tiny', 'dev' */
app.use(methodoverride());
@ -108,6 +113,8 @@ function originIsAllowed(origin) {
return true;
}
historianApi(app);
heartBeat();
app.get('*', function(req, res) {

60
lib/historian/emulator.js Normal file
View File

@ -0,0 +1,60 @@
/**
*
* User: Martin Donnelly
* Date: 2016-08-31
* Time: 10:03
*
*/
// Historian emulator
'use strict';
var logger = require('log4js').getLogger();
var db = require('../server/db-connector').dbConnection;
var dbHistorian = require('../server/db-historian.js')(db);
module.exports = function(app) {
var express = require('express');
var mdotRouter = express.Router();
mdotRouter.get('/types/:type/devices/:deviceid', function(req, res) {
var data = {};
if (!req.params.type && !req.params.deviceid) {
{
logger.error('Historian','Missing required parameter');
res.status(400).send({
status: 'error',
error: 'missing required parameter'
});
return;
}
}
if (req.query.hasOwnProperty('start') && req.query.hasOwnProperty('end')) {
data.start = req.query.start;
data.end = req.query.end;
}
data.type = req.params.type;
data.device = req.params.deviceid;
if (data.hasOwnProperty('type') && data.hasOwnProperty('device')) {
dbHistorian.doGet(data)
.then((d) => {
res.json({events:d});
})
.catch((e) => {
logger.error(e);
res.status(500).json({});
});
}
});
app.use('/apiv2/historian/', mdotRouter);
};

View File

@ -5,10 +5,17 @@ var EventEmitter = require('events');
var busEmitter = new EventEmitter();
var requestify = require('requestify');
var db = require('../server/db-connector').dbConnection;
var dbSave = require('../server/db-save')(db);
//var nano = require('nano')('http://martind2000:1V3D4m526i@localhost:5984');
var nano = require('nano')('http://localhost:5984');
var db_name = 'mqttarchive';
var dbCouch = nano.use(db_name);
function dataBuilder(obj) {
'use strict';
@ -24,6 +31,32 @@ function dataBuilder(obj) {
}
function saveToDB(data) {
// logger.debug('Inserting into couch...');
// Logger.info(util.inspect(obj));
dbCouch.insert(data, function(err, body, header) {
if (err) {
logger.error('Error inserting into couch');
logger.error(err);
return;
}
});
}
var doSendMessage = (obj) => {
'use strict';
requestify.post('http://localhost:3011/apiv2/message', obj)
//requestify.post('http://mdotserver.mybluemix.net/apiv2/message', obj)
.then(function(response) {
// Get the response body
// logger.debug(response.getBody());
})
.catch(function(e) {
logger.error('doSendMessage',e);
});
};
var doInsertEntry = (obj) => {
// Logger.info('sendSocket: ' + JSON.stringify(obj));
@ -33,6 +66,10 @@ var doInsertEntry = (obj) => {
.then(function(d) {
'use strict';
//logger.info('Finished - Raw',d);
saveToDB(obj);
if (obj.type === 'mDot') {
busEmitter.emit('sendMessage', obj);
}
})
.catch(function(e) {
'use strict';
@ -116,7 +153,7 @@ var count=0;
}.bind(this));
busEmitter.on('saveData', doInsertEntry);
busEmitter.on('sendMessage', doSendMessage);
this.getCount = function() {
'use strict';

View File

@ -21,11 +21,11 @@ var cn = {
/*
var cn = {
host: 'qdjjtnkv.db.elephantsql.com',
host: 'jumbo.db.elephantsql.com',
port: 5432,
database: 'gxwxnqoh',
user: 'gxwxnqoh',
password: 'LN5Uk39GNcBCxCHv5dfL8aKuowFT6WKG'
database: 'vmlcokon',
user: 'vmlcokon',
password: 'PQUYLiIW4M6r7SWyZevrES_rRAULYFkp'
};
*/

106
lib/server/db-historian.js Normal file
View File

@ -0,0 +1,106 @@
'use strict';
var logger = require('log4js').getLogger();
var Sugar = require('sugar/date');
module.exports = function(db) {
var module = {};
module.sqlGetAllRaw = function(type, deviceId) {
return new Promise(function(resolve, reject) {
db.any('select distinct on (raw.timestamp) * from raw where type=$1 and device=$2 order by timestamp asc;', [type,deviceId])
.then(function(d) {
return resolve(d);
})
.catch((err)=> {
logger.error(err);
return reject(err);
});
});
};
module.sqlGetRangedRaw = function(params) {
return new Promise(function(resolve, reject) {
db.any('select distinct on (raw.timestamp) * from raw where type=$1 and device=$2 and timestamp between $3 and $4 order by timestamp asc;', [params.type, params.device, params.startTS, params.endTS])
.then(function(d) {
return resolve(d);
})
.catch((err)=> {
logger.error(err);
return reject(err);
});
});
};
module.doGet = function(params) {
var self = this;
var _obj = {};
var useRange=false;
return new Promise(function(resolve, reject) {
logger.debug('historian.doGet', params);
_obj.type = params.type;
_obj.device = params.device;
if (params.hasOwnProperty('start') && params.hasOwnProperty('end')) {
try{
_obj.startTS = new Sugar.Date(parseInt(params.start,10)).raw;
_obj.endTS = new Sugar.Date(parseInt(params.end,10)).raw;
useRange = true;
}
catch(err)
{
logger.error(err);
useRange = false;
}
if ((_obj.start === null) || (_obj.end === null))
{
useRange = false;
}
}
if (useRange) {
logger.info('Userange:',_obj);
self.sqlGetRangedRaw(_obj)
.then(function(d) {
resolve(d);
})
.catch(function(e) {
logger.error(e);
reject(e);
});
} else {
self.sqlGetAllRaw(_obj.type, _obj.device)
.then(function(d) {
resolve(d);
})
.catch(function(e) {
logger.error(e);
reject(e);
});
}
});
};
return module;
};

View File

@ -3,6 +3,6 @@ applications:
memory: 256M
instances: 1
domain: mybluemix.net
name: mdotmqtt
host: mdotmqtt
name: mqttArchiver
host: mqttarchiver
disk_quota: 1024M

View File

@ -3,6 +3,7 @@
"version": "1.0.0",
"description": "",
"main": "index.js",
"private": true,
"scripts": {
"start": "node app.js",
"test": "echo \"Error: no test specified\" && exit 1"
@ -18,7 +19,9 @@
"events": "^1.1.1",
"express": "^4.14.0",
"express-session": "^1.14.1",
"express-session-lw": "^1.0.9",
"http": "0.0.0",
"http-post": "^0.1.1",
"log4js": "^0.6.36",
"method-override": "^2.3.6",
"morgan": "^1.7.0",
@ -26,8 +29,12 @@
"mqtt_over_websockets": "0.0.1",
"path": "^0.12.7",
"pg-promise": "^5.2.7",
"queue": "^4.0.0",
"request": "^2.72.0",
"requestify": "^0.2.3",
"routes": "^2.1.0",
"sugar": "^2.0.1",
"sugar-date": "^2.0.0",
"websocket": "^1.0.22"
},
"devDependencies": {
@ -68,8 +75,6 @@
"require-dir": "^0.3.0",
"should": "^10.0.0",
"string": "^3.3.1",
"sugar": "^2.0.1",
"sugar-date": "^2.0.0",
"superagent": "^2.1.0",
"supertest": "^2.0.0"
},