203 lines
4.7 KiB
JavaScript
203 lines
4.7 KiB
JavaScript
'use strict';
|
|
var mqtt = require('mqtt');
|
|
var logger = require('log4js').getLogger();
|
|
|
|
var EventEmitter = require('events');
|
|
|
|
var busEmitter = new EventEmitter();
|
|
|
|
var requestify = require('requestify');
|
|
|
|
var db = require('../server/db-connector').dbConnection;
|
|
|
|
var dbSave = require('../server/db-save')(db);
|
|
|
|
//let remoteHost = process.env.NODE_ENV === 'production' ? 'http://mdotserver.mybluemix.net/apiv2/message' : 'http://localhost:3011/apiv2/message';
|
|
const remoteHost = process.env.NODE_ENV === 'production' ? 'http://ec2-52-211-111-57.eu-west-1.compute.amazonaws.com/apiv2/message' : 'http://localhost:3011/apiv2/message';
|
|
|
|
function dataBuilder(obj) {
|
|
|
|
var now = new Date();
|
|
var newObj = {
|
|
device_type: 'mDot', evt_type: 'update', timestamp: {}, evt: {}
|
|
};
|
|
|
|
newObj.device_id = obj.topic.split('/')[4];
|
|
|
|
newObj.evt = obj;
|
|
newObj.timestamp['$date'] = now.getTime();
|
|
|
|
return newObj;
|
|
}
|
|
|
|
function saveToDB(data) {
|
|
// logger.debug('Inserting into couch...');
|
|
// Logger.info(util.inspect(obj));
|
|
dbCouch.insert(data, function(err, body, header) {
|
|
if (err) {
|
|
logger.error('Error inserting into couch');
|
|
logger.error(err);
|
|
return;
|
|
}
|
|
});
|
|
}
|
|
|
|
var doSendMessage = (obj) => {
|
|
'use strict';
|
|
|
|
requestify.post(remoteHost, obj)
|
|
.then(function(response) {
|
|
// Get the response body
|
|
// logger.debug(response.getBody());
|
|
})
|
|
.catch(function(e) {
|
|
logger.error('doSendMessage', e);
|
|
});
|
|
};
|
|
|
|
var doInsertEntry = (obj) => {
|
|
// Logger.info('sendSocket: ' + JSON.stringify(obj));
|
|
|
|
// insertEntry(obj);
|
|
|
|
dbSave.addNewEvent(obj)
|
|
.then(function(d) {
|
|
'use strict';
|
|
// logger.info('Obj', obj.type);
|
|
// saveToDB(obj);
|
|
if (obj.type === 'mDot') {
|
|
busEmitter.emit('sendMessage', obj);
|
|
}
|
|
})
|
|
.catch(function(e) {
|
|
'use strict';
|
|
logger.error(e);
|
|
});
|
|
|
|
};
|
|
var lastReceived;
|
|
|
|
var watchDog = 0;
|
|
var watchDogHie = 0;
|
|
var wdTimedOut = false;
|
|
var watchDogTimeout = () => {
|
|
'use strict';
|
|
var now = new Date();
|
|
logger.warn(
|
|
'Watchdog timeout. Message has not been received for over 20 minutes.');
|
|
logger.warn('Last received:' + lastReceived + 'Now:' + now);
|
|
|
|
|
|
wdTimedOut = true;
|
|
};
|
|
|
|
var mqttClient = function() {
|
|
|
|
var count = 0;
|
|
var subscribeTopic;
|
|
var orgId = 'qz0da4';
|
|
var userName = 'a-qz0da4-dfwwdkmkzr';
|
|
var address = '.messaging.internetofthings.ibmcloud.com';
|
|
var appKey = '9txJEf3Cjy7hkSOvkv';
|
|
|
|
var connection = 'mqtt://' + orgId + address;
|
|
|
|
this.connected = false;
|
|
|
|
var options = {
|
|
keepalive: 10,
|
|
clientId: 'a:' + orgId + ':' + Date.now(),
|
|
username: userName,
|
|
password: new Buffer(appKey),
|
|
reconnectPeriod: 1000,
|
|
connectTimeout: 30 * 1000
|
|
|
|
};
|
|
|
|
logger.info('Connecting to ', connection);
|
|
//this.client = mqtt.connect(connection, options);
|
|
this.client = mqtt.connect('mqtt://' + orgId + address, options);
|
|
|
|
this.client.on('connect', function() {
|
|
// connected = true;
|
|
logger.info('Connected to ', address);
|
|
|
|
clearTimeout(watchDog);
|
|
watchDog = setTimeout(watchDogTimeout, 1200000);
|
|
lastReceived = new Date();
|
|
|
|
}.bind(this));
|
|
|
|
this.client.on('connected', function() {
|
|
logger.debug('mqttConnect - doConnection - Connected');
|
|
|
|
});
|
|
this.client.on('close', function() {
|
|
logger.warn('mqttConnect - Connection closed');
|
|
|
|
});
|
|
|
|
this.client.on('offline', function() {
|
|
logger.warn('mqttConnect - OFFLINE!');
|
|
|
|
});
|
|
|
|
this.client.on('error', function(e) {
|
|
logger.error('mqttConnect - error');
|
|
logger.error(e);
|
|
});
|
|
|
|
this.client.on('reconnect', function() {
|
|
logger.warn('mqttConnect - Attempting to reconnect...');
|
|
|
|
});
|
|
|
|
subscribeTopic = 'iot-2/type/+/id/+/evt/+/fmt/json';
|
|
//subscribeTopic = 'livingroomTemp';
|
|
logger.info('Subscribing:', subscribeTopic);
|
|
|
|
this.client.subscribe(subscribeTopic);
|
|
|
|
this.client.on('message', function(topic, message) {
|
|
|
|
var json = JSON.parse(message.toString());
|
|
|
|
var topicArray = topic.split('/');
|
|
json.topic = topic;
|
|
json.type = topicArray[2];
|
|
json.device = topicArray[4];
|
|
json.event = topicArray[6];
|
|
|
|
busEmitter.emit('saveData', json);
|
|
//console.log(json.type);
|
|
if (json.type !== 'Ti-CC3200') {
|
|
console.log(json);
|
|
clearTimeout(watchDog);
|
|
watchDog = setTimeout(watchDogTimeout, 1200000);
|
|
if (wdTimedOut) {
|
|
var now = new Date();
|
|
var s = 'Receiving again :::\n\nLast received: ' + lastReceived + 'Now: ' + now + "\n\n" + message.toString();
|
|
logger.warn(s);
|
|
|
|
wdTimedOut = false;
|
|
}
|
|
lastReceived = new Date();
|
|
|
|
}
|
|
|
|
count++;
|
|
|
|
}.bind(this));
|
|
|
|
busEmitter.on('saveData', doInsertEntry);
|
|
busEmitter.on('sendMessage', doSendMessage);
|
|
|
|
this.getCount = function() {
|
|
'use strict';
|
|
return count;
|
|
};
|
|
|
|
};
|
|
|
|
module.exports.mqttClient = mqttClient;
|