Stop using MQTT for this service and use messaging instead

This commit is contained in:
Martin Donnelly 2016-08-30 14:34:34 +01:00
parent 93a2e9063f
commit 8429b48f93
7 changed files with 186 additions and 53 deletions

15
app.js
View File

@ -32,6 +32,8 @@ var mdotApiV2 = require('./lib/mdot/apiv2.js');
var trackApi = require('./lib/mdot/track.js');
var meetingApi = require('./lib/mdot/meeting.js');
var messageApi = require('./lib/mdot/message.js');
process.env.NODE_ENV = process.env.NODE_ENV || 'development';
if (process.env.NODE_ENV === 'production') {
@ -86,6 +88,9 @@ function originIsAllowed(origin) {
// Put logic here to detect whether the specified origin is allowed.
return true;
}
app.get('/graph', authentication(({
hash: 'sha1', // Type of hash
file: 'htpasswd', // Path of file
@ -98,22 +103,26 @@ app.get('/meeting', function(req, res) {
res.render('meeting',{delimiter: '^'});
});
// Glue routes
mdotApi(app);
mdotApiV2(app);
trackApi(app);
meetingApi(app);
messageApi(app);
var mqttClient = require('./lib/mqtt/mqttClient');
//var mqttClient = require('./lib/mqtt/mqttClient');
var mqtt;
if (isProduction) {
mqtt = new mqttClient.mqttClient(busEmitter);
// mqtt = new mqttClient.mqttClient(busEmitter);
}
var port = (process.env.VCAP_APP_PORT || 3010);
var port = (process.env.VCAP_APP_PORT || 3011);
var host = (process.env.VCAP_APP_HOST || 'localhost');
app.get('*', function(req, res) {

View File

@ -14,6 +14,19 @@ var mdot = require('./lib/mdot/mdot.js');
(
function() {
var doBulkInsert = (qArray) => {
'use strict';
logger.info('Doing bulk insert...');
dbSave.doBulkInsert(qArray)
.then(function(d) {
logger.info('Finished bulk insert');
})
.catch(function(e) {
logger.error(e);
});
};
var doInsertEntry = (obj) => {
// Logger.info('sendSocket: ' + JSON.stringify(obj));
@ -44,12 +57,18 @@ var mdot = require('./lib/mdot/mdot.js');
};
var device = 13;
var queries = [];
var device = 8;
var ids = ['CENSIS-LoRa-1','CENSIS-LoRa-2','CENSIS-LoRa-3','CENSIS-LoRa-4','HIE-mobile-1','HIE-demo','HIE-mobile-2','HIE-smart-campus-1','HIE-smart-campus-2','HIE-smart-campus-3','HIE-smart-campus-4','HIE-smart-campus-5','HIE-smart-campus-6','HIE-smart-campus-7','HIE-mDot-1'];
var data = {id: ids[device]};
var startTs = Sugar.Date.create('2016-08-19 14:46:48');
var endTs = Sugar.Date.create('2016-08-21 10:03:39');
//var startTs = Sugar.Date.create('2015-07-19 14:46:48');
var startTs = Sugar.Date.create('2015-8-29 08:46:48');
var endTs = Sugar.Date.create('2016-08-29 09:35:36');
// first record recorded "2016-08-29 09:44:59+00"
// logger.debug(startTs, endTs);
// logger.debug(startTs.getTime(), endTs.getTime());
@ -61,19 +80,16 @@ var mdot = require('./lib/mdot/mdot.js');
.then(function(d) {
'use strict';
//Logger.debug(d);
logger.info('Number of items:', d.events.length);
for (var item in d.events) {
var newItem = d.events[item].evt;
newItem.topic=['','','','',ids[device]].join('/');
// logger.debug(newItem);
var gwTime = new Date(newItem.gateway_info['0'].gw_time);
if ((gwTime > startTs) && (gwTime < endTs)) {
doInsertEntry(newItem);
} else {
logger.warn('Out of range...');
}
var newItem = d.events[item];
queries.push(dbSave.genRawQuery(newItem));
}
doBulkInsert(queries);
logger.info('Number of queries:', queries.length);
})
.catch(function(err) {
'use strict';

View File

@ -31,13 +31,14 @@ function getData(data, prev) {
var options = {
uri: 'https://qz0da4.internetofthings.ibmcloud.com/api/v0002/historian/types/mDot/devices/' + data.id + range,
//uri: 'https://qz0da4.internetofthings.ibmcloud.com/api/v0002/historian/types/Ti-CC3200/devices/LightingISP15',
headers: {
Authorization: buildAuthentication()
},
json: true
};
logger.debug(options);
// logger.debug(options);
// This is put together to get around the Paging that bluemix uses.
// They limit the number of results to 100 per query
@ -51,6 +52,7 @@ function getData(data, prev) {
Array.prototype.push.apply(prev.events, d.events);
if (d.hasOwnProperty('bookmark')) {
logger.info('Getting more...');
data.bookmark = d.bookmark;
getData(data, prev).then(function(d) {
resolve(d);

71
lib/mdot/message.js Normal file
View File

@ -0,0 +1,71 @@
'use strict';
var logger = require('log4js').getLogger();
var mdot = require('./mdot.js');
var db = require('../server/db-connector').dbConnection;
var dbSave = require('../server/db-save')(db);
module.exports = function(app) {
var express = require('express');
var messageRouter = express.Router();
messageRouter.post('/', function(req, res) {
var obj = req.body;
if (req.body.type !== 'mDot') {
logger.error('Not of the required type!');
res.status(500).json({});
return -1;
} else {
logger.debug(req.body.type, req.body.device);
dbSave.addProcessedEvent(obj)
.then(function(d) {
'use strict';
// logger.info('Finished - Processed');
res.status(200).json({});
})
.catch(function(e) {
'use strict';
logger.error(e);
res.status(500).json({});
});
}
/* If (!req.params.id) {
logger.error('MDot','Missing required parameter');
res.status(400).send({
status: 'error',
error: 'missing required parameter'
});
return;
}
if (req.query.hasOwnProperty('start') && req.query.hasOwnProperty('end')) {
data.start = req.query.start;
data.end = req.query.end;
}
if (req.params.hasOwnProperty('id')) {
data.id = req.params.id;
dbMdot.doGet(data)
.then((d) => {
res.json({events:d});
})
.catch((e) => {
logger.error(e);
res.status(500).json({});
});
}*/
});
app.use('/apiv2/message/', messageRouter);
};

View File

@ -29,7 +29,7 @@ function insertEntry(obj) {
newObj.rtype = 1;
logger.debug('Inserting into couch...');
// logger.info(util.inspect(newObj));
// Logger.info(util.inspect(newObj));
dbCouch.insert(newObj, function(err, body, header) {
if (err) {
logger.error('Error inserting into couch');
@ -42,7 +42,7 @@ function insertEntry(obj) {
var doInsertEntry = (obj) => {
// Logger.info('sendSocket: ' + JSON.stringify(obj));
// insertEntry(obj);
// insertEntry(obj);
dbSave.addNewEvent(obj)
.then(function(d) {
@ -75,9 +75,9 @@ var mqttClient = function() {
var address = '.messaging.internetofthings.ibmcloud.com';
var appKey = '9txJEf3Cjy7hkSOvkv';
var prefix = 'iot-2/type/mDot/id/';
//var deviceId = ['CENSIS-LoRa-1','CENSIS-LoRa-2','CENSIS-LoRa-3','CENSIS-LoRa-4','HIE-mobile-1','HIE-mobile-2','HIE-smart-campus-1','HIE-smart-campus-2','HIE-smart-campus-3','HIE-smart-campus-4','HIE-smart-campus-5','HIE-smart-campus-6','HIE-smart-campus-7','HIE-mDot-1'];
//Var deviceId = ['CENSIS-LoRa-1','CENSIS-LoRa-2','CENSIS-LoRa-3','CENSIS-LoRa-4','HIE-mobile-1','HIE-mobile-2','HIE-smart-campus-1','HIE-smart-campus-2','HIE-smart-campus-3','HIE-smart-campus-4','HIE-smart-campus-5','HIE-smart-campus-6','HIE-smart-campus-7','HIE-mDot-1'];
var deviceId = ['CENSIS-LoRa-1','CENSIS-LoRa-2','CENSIS-LoRa-3','CENSIS-LoRa-4','HIE-mobile-1----','HIE-mobile-2----','HIE-smart-campus-1','HIE-smart-campus-2','HIE-smart-campus-3','HIE-smart-campus-4','HIE-smart-campus-5','HIE-smart-campus-6','HIE-smart-campus-7','HIE-mDot-1'];
var deviceId = ['CENSIS-LoRa-1','CENSIS-LoRa-2','CENSIS-LoRa-3','CENSIS-LoRa-4','HIE-mobile-1----','HIE-mobile-2----','HIE-smart-campus-1','HIE-smart-campus-2','HIE-smart-campus-3','HIE-smart-campus-4','HIE-smart-campus-5','HIE-smart-campus-6','HIE-smart-campus-7','HIE-mDot-1'];
//Var subscribeTopic = prefix + deviceId + '/evt/+/fmt/json';
@ -86,10 +86,12 @@ var mqttClient = function() {
this.connected = false;
var options = {
keepalive: 3600,
keepalive: 10,
clientId: 'a:' + orgId + ':' + Date.now(),
username: userName,
password: new Buffer(appKey)
password: new Buffer(appKey),
reconnectPeriod: 1000,
connectTimeout: 30 * 1000
};
@ -115,31 +117,44 @@ var mqttClient = function() {
var json = JSON.parse(message.toString());
json.topic = topic;
// logger.debug(JSON.stringify(json));
// Logger.debug(JSON.stringify(json));
busEmitter.emit('saveData', json);
}.bind(this));
this.client.on('connected', function() {
logger.debug('mqttConnect - doConnection - Connected');
});
this.client.on('close', function(e) {
logger.error('mqttConnect - close');
logger.error(e);
});
this.client.on('offline', function(e) {
logger.error('mqttConnect - offline');
logger.error(e);
});
this.client.on('error', function(e) {
logger.error('mqttConnect - error');
logger.error(e);
});
this.client.on('reconnect', function(e) {
logger.error('mqttConnect - reconnect');
logger.error(e);
});
this.isConnected = function() {
return this.connected;
};
this.client.on('connected', function() {
logger.debug('mqttConnect - doConnection - Connected');
});
this.client.on('disconnect', function(e) {
logger.error('mqttConnect - doConnection - disconnect');
logger.error(e);
});
this.client.on('error', function(e) {
logger.error('mqttConnect - doConnection - disconnect');
logger.error(e);
});
busEmitter.on('saveData', doInsertEntry);

View File

@ -9,7 +9,6 @@
var pgp = require('pg-promise')();
/*
var cn = {
host: 'localhost',
port: 5432,
@ -17,17 +16,16 @@ var cn = {
user: 'postgres',
password: ''
};
*/
// ElephantSql settings
var cn = {
/*var cn = {
host: 'jumbo.db.elephantsql.com',
port: 5432,
database: 'amlrxqev',
user: 'amlrxqev',
password: 'K11cvCplk0--oNafsYj4ISN-rVQmVS3y'
};
database: 'vmlcokon',
user: 'vmlcokon',
password: 'PQUYLiIW4M6r7SWyZevrES_rRAULYFkp'
};*/
exports.dbConnection = pgp(cn);

View File

@ -6,6 +6,25 @@ module.exports = function(db) {
var module = {};
module.deviceIds = ['CENSIS-LoRa-1','CENSIS-LoRa-2','CENSIS-LoRa-3','CENSIS-LoRa-4','HIE-mobile-1','HIE-demo','HIE-mobile-2','HIE-smart-campus-1','HIE-smart-campus-2','HIE-smart-campus-3','HIE-smart-campus-4','HIE-smart-campus-5','HIE-smart-campus-6','HIE-smart-campus-7','HIE-mDot-1'];
module.doBulkInsert = function(bulkData) {
return new Promise(function(resolve, reject) {
db.tx(function(t) {
// t = this;
// creating a sequence of transaction queries:
// returning a promise that determines a successful transaction:
return this.batch(bulkData); // all of the queries are to be resolved;
})
.then(function(data) {
return resolve(data);
})
.catch(function(error) {
return reject(error);
});
});
};
module.sqlInsertRawEvent = function(data) {
let _data = data;
console.log('sqlInsertRawEvent');
@ -16,7 +35,7 @@ module.exports = function(db) {
return resolve('ok');
})
.catch((err)=> {
console.error(err);
console.error(err);
return reject(err);
});
});
@ -24,8 +43,6 @@ module.exports = function(db) {
module.sqlInsertDecoded = function(data) {
let _data = data;
console.log('sqlInsertDecoded', _data.deviceid, _data.timestamp);
return new Promise(function(resolve, reject) {
db.func('insert_decoded',
[_data.deviceid, _data.timestamp, _data.lux, _data.co2, _data.temp, _data.humidity, _data.sound])
@ -33,13 +50,18 @@ module.exports = function(db) {
return resolve('ok');
})
.catch((err)=> {
console.error(err);
console.error(err);
return reject(err);
});
});
};
module.genRawQuery = function(data) {
const timestamp = new Date(data.timestamp['$date']);
return db.func('insert_raw', [timestamp, data.device_type, data.device_id, data.evt_type, data.evt]);
};
module.addNewEvent = function(data) {
console.log('addNewEvent');
var self = this;
@ -62,7 +84,7 @@ module.exports = function(db) {
};
module.addProcessedEvent = function(data) {
console.log('addProcessedEvent');
// console.log('addProcessedEvent');
var self = this;
return new Promise((resolve, reject) => {
@ -70,11 +92,11 @@ module.exports = function(db) {
self.sqlInsertDecoded(_data)
.then((d)=> {
console.log('Postgres returns', d);
// console.log('Postgres returns', d);
return resolve({reply: 'Processed event inserted'});
})
.catch((err)=> {
console.error(err);
console.error(err);
return reject(err);
});
@ -95,7 +117,7 @@ module.exports = function(db) {
_obj.humid = (parseInt(_data[12] + _data[13] + _data[14] + _data[15] + _data[16], 10) / 10);
_obj.noise = parseInt('0x' + ('0' + bytes[17]).substr(-2) + ('0' + bytes[18]).substr(-2));
_obj.binData = bytes;
console.log(_obj);
// console.log(_obj);
return _obj;
};
@ -104,7 +126,7 @@ module.exports = function(db) {
var workObj = {};
var device_name = data.topic.split('/')[4];
console.log('Device_name', device_name);
// console.log('Device_name', device_name);
workObj.deviceid = self.deviceIds.indexOf(device_name);
if (data.hasOwnProperty('data')) {