diff --git a/mdot/mdot_server/mdot_server/app.js b/mdot/mdot_server/mdot_server/app.js index bbc581e..66a7770 100644 --- a/mdot/mdot_server/mdot_server/app.js +++ b/mdot/mdot_server/mdot_server/app.js @@ -32,6 +32,8 @@ var mdotApiV2 = require('./lib/mdot/apiv2.js'); var trackApi = require('./lib/mdot/track.js'); var meetingApi = require('./lib/mdot/meeting.js'); +var messageApi = require('./lib/mdot/message.js'); + process.env.NODE_ENV = process.env.NODE_ENV || 'development'; if (process.env.NODE_ENV === 'production') { @@ -86,6 +88,9 @@ function originIsAllowed(origin) { // Put logic here to detect whether the specified origin is allowed. return true; } + + + app.get('/graph', authentication(({ hash: 'sha1', // Type of hash file: 'htpasswd', // Path of file @@ -98,22 +103,26 @@ app.get('/meeting', function(req, res) { res.render('meeting',{delimiter: '^'}); }); + + // Glue routes mdotApi(app); mdotApiV2(app); trackApi(app); meetingApi(app); +messageApi(app); -var mqttClient = require('./lib/mqtt/mqttClient'); + +//var mqttClient = require('./lib/mqtt/mqttClient'); var mqtt; if (isProduction) { - mqtt = new mqttClient.mqttClient(busEmitter); +// mqtt = new mqttClient.mqttClient(busEmitter); } -var port = (process.env.VCAP_APP_PORT || 3010); +var port = (process.env.VCAP_APP_PORT || 3011); var host = (process.env.VCAP_APP_HOST || 'localhost'); app.get('*', function(req, res) { diff --git a/mdot/mdot_server/mdot_server/filler.js b/mdot/mdot_server/mdot_server/filler.js index e3959b4..363bf7e 100644 --- a/mdot/mdot_server/mdot_server/filler.js +++ b/mdot/mdot_server/mdot_server/filler.js @@ -14,6 +14,19 @@ var mdot = require('./lib/mdot/mdot.js'); ( function() { + var doBulkInsert = (qArray) => { + 'use strict'; + + logger.info('Doing bulk insert...'); + dbSave.doBulkInsert(qArray) + .then(function(d) { + logger.info('Finished bulk insert'); + }) + .catch(function(e) { + logger.error(e); + }); + }; + var doInsertEntry = (obj) => { // Logger.info('sendSocket: ' + JSON.stringify(obj)); @@ -44,12 +57,18 @@ var mdot = require('./lib/mdot/mdot.js'); }; - var device = 13; + var queries = []; + var device = 8; var ids = ['CENSIS-LoRa-1','CENSIS-LoRa-2','CENSIS-LoRa-3','CENSIS-LoRa-4','HIE-mobile-1','HIE-demo','HIE-mobile-2','HIE-smart-campus-1','HIE-smart-campus-2','HIE-smart-campus-3','HIE-smart-campus-4','HIE-smart-campus-5','HIE-smart-campus-6','HIE-smart-campus-7','HIE-mDot-1']; var data = {id: ids[device]}; - var startTs = Sugar.Date.create('2016-07-19 14:46:48'); - var endTs = Sugar.Date.create('2016-08-29 10:03:39'); + //var startTs = Sugar.Date.create('2015-07-19 14:46:48'); + var startTs = Sugar.Date.create('2015-8-29 08:46:48'); + var endTs = Sugar.Date.create('2016-08-29 09:35:36'); + + // first record recorded "2016-08-29 09:44:59+00" + + // logger.debug(startTs, endTs); // logger.debug(startTs.getTime(), endTs.getTime()); @@ -63,18 +82,14 @@ var mdot = require('./lib/mdot/mdot.js'); //Logger.debug(d); logger.info('Number of items:', d.events.length); for (var item in d.events) { - var newItem = d.events[item].evt; - newItem.topic=['','','','',ids[device]].join('/'); - // logger.debug(newItem); - // var gwTime = new Date(newItem.gateway_info['0'].gw_time); - /* if ((gwTime > startTs) && (gwTime < endTs)) { - // doInsertEntry(newItem); - } else { - logger.warn('Out of range...'); - }*/ + var newItem = d.events[item]; + queries.push(dbSave.genRawQuery(newItem)); } + doBulkInsert(queries); + logger.info('Number of queries:', queries.length); + }) .catch(function(err) { 'use strict'; diff --git a/mdot/mdot_server/mdot_server/lib/mdot/mdot.js b/mdot/mdot_server/mdot_server/lib/mdot/mdot.js index cb6883f..8ac5592 100644 --- a/mdot/mdot_server/mdot_server/lib/mdot/mdot.js +++ b/mdot/mdot_server/mdot_server/lib/mdot/mdot.js @@ -38,7 +38,7 @@ function getData(data, prev) { json: true }; - logger.debug(options); + // logger.debug(options); // This is put together to get around the Paging that bluemix uses. // They limit the number of results to 100 per query diff --git a/mdot/mdot_server/mdot_server/lib/mdot/message.js b/mdot/mdot_server/mdot_server/lib/mdot/message.js new file mode 100644 index 0000000..f62958a --- /dev/null +++ b/mdot/mdot_server/mdot_server/lib/mdot/message.js @@ -0,0 +1,71 @@ +'use strict'; + +var logger = require('log4js').getLogger(); +var mdot = require('./mdot.js'); + +var db = require('../server/db-connector').dbConnection; + +var dbSave = require('../server/db-save')(db); + + +module.exports = function(app) { + var express = require('express'); + var messageRouter = express.Router(); + + messageRouter.post('/', function(req, res) { + + var obj = req.body; + + if (req.body.type !== 'mDot') { + logger.error('Not of the required type!'); + res.status(500).json({}); + return -1; + } else { + logger.debug(req.body.type, req.body.device); + dbSave.addProcessedEvent(obj) + .then(function(d) { + 'use strict'; + // Logger.info('Finished - Processed'); + res.status(200).json({}); + }) + .catch(function(e) { + 'use strict'; + logger.error(e); + res.status(500).json({}); + }); + + } + + /* If (!req.params.id) { + logger.error('MDot','Missing required parameter'); + res.status(400).send({ + status: 'error', + error: 'missing required parameter' + }); + return; + } + + if (req.query.hasOwnProperty('start') && req.query.hasOwnProperty('end')) { + data.start = req.query.start; + data.end = req.query.end; + } + + if (req.params.hasOwnProperty('id')) { + data.id = req.params.id; + + dbMdot.doGet(data) + .then((d) => { + res.json({events:d}); + }) + .catch((e) => { + logger.error(e); + res.status(500).json({}); + }); + + }*/ + + }); + + app.use('/apiv2/message/', messageRouter); +}; + diff --git a/mdot/mdot_server/mdot_server/lib/server/db-connector.js b/mdot/mdot_server/mdot_server/lib/server/db-connector.js index 02e24b2..ebd300e 100644 --- a/mdot/mdot_server/mdot_server/lib/server/db-connector.js +++ b/mdot/mdot_server/mdot_server/lib/server/db-connector.js @@ -9,15 +9,13 @@ var pgp = require('pg-promise')(); -/* -var cn = { +/*var cn = { host: 'localhost', port: 5432, database: 'mdot', user: 'postgres', password: '' -}; -*/ +};*/ // ElephantSql settings diff --git a/mdot/mdot_server/mdot_server/lib/server/db-save.js b/mdot/mdot_server/mdot_server/lib/server/db-save.js index 8fc5f0f..12d82b9 100644 --- a/mdot/mdot_server/mdot_server/lib/server/db-save.js +++ b/mdot/mdot_server/mdot_server/lib/server/db-save.js @@ -6,6 +6,25 @@ module.exports = function(db) { var module = {}; module.deviceIds = ['CENSIS-LoRa-1','CENSIS-LoRa-2','CENSIS-LoRa-3','CENSIS-LoRa-4','HIE-mobile-1','HIE-demo','HIE-mobile-2','HIE-smart-campus-1','HIE-smart-campus-2','HIE-smart-campus-3','HIE-smart-campus-4','HIE-smart-campus-5','HIE-smart-campus-6','HIE-smart-campus-7','HIE-mDot-1']; + module.doBulkInsert = function(bulkData) { + return new Promise(function(resolve, reject) { + db.tx(function(t) { + // t = this; + // creating a sequence of transaction queries: + + // returning a promise that determines a successful transaction: + return this.batch(bulkData); // all of the queries are to be resolved; + }) + .then(function(data) { + return resolve(data); + }) + .catch(function(error) { + return reject(error); + }); + }); + }; + + module.sqlInsertRawEvent = function(data) { let _data = data; console.log('sqlInsertRawEvent'); @@ -16,7 +35,7 @@ module.exports = function(db) { return resolve('ok'); }) .catch((err)=> { - console.error(err); + console.error(err); return reject(err); }); }); @@ -24,8 +43,6 @@ module.exports = function(db) { module.sqlInsertDecoded = function(data) { let _data = data; - - console.log('sqlInsertDecoded', _data.deviceid, _data.timestamp); return new Promise(function(resolve, reject) { db.func('insert_decoded', [_data.deviceid, _data.timestamp, _data.lux, _data.co2, _data.temp, _data.humidity, _data.sound]) @@ -33,13 +50,18 @@ module.exports = function(db) { return resolve('ok'); }) .catch((err)=> { - console.error(err); + console.error(err); return reject(err); }); }); }; + module.genRawQuery = function(data) { + const timestamp = new Date(data.timestamp['$date']); + return db.func('insert_raw', [timestamp, data.device_type, data.device_id, data.evt_type, data.evt]); + }; + module.addNewEvent = function(data) { console.log('addNewEvent'); var self = this; @@ -62,7 +84,7 @@ module.exports = function(db) { }; module.addProcessedEvent = function(data) { - console.log('addProcessedEvent'); + // console.log('addProcessedEvent'); var self = this; return new Promise((resolve, reject) => { @@ -70,11 +92,11 @@ module.exports = function(db) { self.sqlInsertDecoded(_data) .then((d)=> { - console.log('Postgres returns', d); + // console.log('Postgres returns', d); return resolve({reply: 'Processed event inserted'}); }) .catch((err)=> { - console.error(err); + console.error(err); return reject(err); }); @@ -95,7 +117,7 @@ module.exports = function(db) { _obj.humid = (parseInt(_data[12] + _data[13] + _data[14] + _data[15] + _data[16], 10) / 10); _obj.noise = parseInt('0x' + ('0' + bytes[17]).substr(-2) + ('0' + bytes[18]).substr(-2)); _obj.binData = bytes; - console.log(_obj); + // console.log(_obj); return _obj; }; @@ -104,7 +126,7 @@ module.exports = function(db) { var workObj = {}; var device_name = data.topic.split('/')[4]; - console.log('Device_name', device_name); + // console.log('Device_name', device_name); workObj.deviceid = self.deviceIds.indexOf(device_name); if (data.hasOwnProperty('data')) {