var mqtt = require('mqtt'); var request = require('request'); var util = require('util'); var logger = require('log4js').getLogger(); var EventEmitter = require('events'); //var nano = require('nano')('http://martind2000:1V3D4m526i@localhost:5984'); var nano = require('nano')('http://localhost:5984'); var busEmitter = new EventEmitter(); var db_name = 'mdot'; var dbCouch = nano.use(db_name); var db = require('../server/db-connector').dbConnection; var dbSave = require('../server/db-save')(db); function dataBuilder(obj) { 'use strict'; var now = new Date(); var newObj = {device_type: 'mDot', evt_type: 'update', timestamp: {}, evt: {}}; newObj.device_id = obj.topic.split('/')[4]; newObj.evt = obj; newObj.timestamp['$date'] = now.getTime(); return newObj; } function insertEntry(obj) { var newObj = dataBuilder(obj); newObj.rtype = 1; logger.debug('Inserting into couch...'); // logger.info(util.inspect(newObj)); dbCouch.insert(newObj, function(err, body, header) { if (err) { logger.error('Error inserting into couch'); return; } }); logger.debug('Insert done..'); } var doInsertEntry = (obj) => { // Logger.info('sendSocket: ' + JSON.stringify(obj)); // insertEntry(obj); dbSave.addNewEvent(obj) .then(function(d) { 'use strict'; logger.info('Finished - Raw',d); }) .catch(function(e) { 'use strict'; logger.error(e); }); dbSave.addProcessedEvent(obj) .then(function(d) { 'use strict'; logger.info('Finished - Processed',d); }) .catch(function(e) { 'use strict'; logger.error(e); }); }; var mqttClient = function() { var subscribeTopic; var orgId = 'qz0da4'; var userName = 'a-qz0da4-dfwwdkmkzr'; var address = '.messaging.internetofthings.ibmcloud.com'; var appKey = '9txJEf3Cjy7hkSOvkv'; var prefix = 'iot-2/type/mDot/id/'; var deviceId = ['CENSIS-LoRa-1','CENSIS-LoRa-2','CENSIS-LoRa-3','CENSIS-LoRa-4','HIE-mobile-1','HIE-mobile-2','HIE-smart-campus-1','HIE-smart-campus-2','HIE-smart-campus-3','HIE-smart-campus-4','HIE-smart-campus-5','HIE-smart-campus-6','HIE-smart-campus-7','HIE-mDot-1']; //Var subscribeTopic = prefix + deviceId + '/evt/+/fmt/json'; this.connected = false; var options = { keepalive: 3600, clientId: 'a:' + orgId + ':' + Date.now(), username: userName, password: new Buffer(appKey) }; this.client = mqtt.connect('mqtt://' + orgId + address, options); this.client.on('connect', function() { connected = true; logger.info('Connected to ', address); }.bind(this)); for (var t = 0;t < deviceId.length;t++) { subscribeTopic = prefix + deviceId[t] + '/evt/+/fmt/json'; logger.info('Subscribing:', subscribeTopic); this.client.subscribe(subscribeTopic); } this.client.on('message', function(topic, message) { console.log('onMessage',topic); var json = JSON.parse(message.toString()); json.topic = topic; // logger.debug(JSON.stringify(json)); busEmitter.emit('saveData', json); }.bind(this)); this.isConnected = function() { return this.connected; }; this.client.on('connected', function() { logger.debug('mqttConnect - doConnection - Connected'); }); this.client.on('disconnect', function(e) { logger.error('mqttConnect - doConnection - disconnect'); logger.error(e); }); this.client.on('error', function(e) { logger.error('mqttConnect - doConnection - disconnect'); logger.error(e); }); busEmitter.on('saveData', doInsertEntry); }; module.exports.mqttClient = mqttClient;