* Twitter now cleans db on restart

* Server now handles websockets and sends tweets to websocket

* Added new ecosystem for pm2 to control server/twitter
This commit is contained in:
Martin Donnelly 2020-08-19 11:25:53 +01:00
parent 1e03d1697a
commit da5d1384c5
8 changed files with 237 additions and 36 deletions

1
.gitignore vendored
View File

@ -151,3 +151,4 @@ fabric.properties
/src/es2016/bundle.js.map /src/es2016/bundle.js.map
/src/backbone/bundle.js.map /src/backbone/bundle.js.map
!/db/twitter.db

14
ecosystem.config.js Normal file
View File

@ -0,0 +1,14 @@
module.exports = {
'apps' : [{
'name': 'Traintimes Server',
'script': './server.js',
'watch': ['./server.js', 'live/build']
}, {
'name': 'Twitter Grabber',
'script': './twitter.js',
'watch': './twitter.js',
'watch_delay': 10000,
'cron_restart' : '0 4 * * */3'
}]
};

93
package-lock.json generated
View File

@ -1923,6 +1923,11 @@
"integrity": "sha1-lGfQMsOM+u+58teVASUwYvh/ob0=", "integrity": "sha1-lGfQMsOM+u+58teVASUwYvh/ob0=",
"dev": true "dev": true
}, },
"cuid": {
"version": "2.1.8",
"resolved": "https://registry.npmjs.org/cuid/-/cuid-2.1.8.tgz",
"integrity": "sha512-xiEMER6E7TlTPnDxrM4eRiC6TRgjNX9xzEZ5U/Se2YJKr7Mq4pJn/2XEHjl3STcSh96GmkHPcBXLES8M29wyyg=="
},
"dashdash": { "dashdash": {
"version": "1.14.1", "version": "1.14.1",
"resolved": "https://registry.npmjs.org/dashdash/-/dashdash-1.14.1.tgz", "resolved": "https://registry.npmjs.org/dashdash/-/dashdash-1.14.1.tgz",
@ -2170,6 +2175,11 @@
"integrity": "sha1-ED01J/0xUo9AGIEwyEHv3XgmTlw=", "integrity": "sha1-ED01J/0xUo9AGIEwyEHv3XgmTlw=",
"optional": true "optional": true
}, },
"duplexer": {
"version": "0.1.2",
"resolved": "https://registry.npmjs.org/duplexer/-/duplexer-0.1.2.tgz",
"integrity": "sha512-jtD6YG370ZCIi/9GTaJKQxWTZD045+4R4hTk/x1UyoqadyJ9x9CgSi1RlVDQF8U2sxLLSnFkCaMihqljHIWgMg=="
},
"duplexer2": { "duplexer2": {
"version": "0.1.4", "version": "0.1.4",
"resolved": "https://registry.npmjs.org/duplexer2/-/duplexer2-0.1.4.tgz", "resolved": "https://registry.npmjs.org/duplexer2/-/duplexer2-0.1.4.tgz",
@ -2578,6 +2588,20 @@
"resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz", "resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz",
"integrity": "sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc=" "integrity": "sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc="
}, },
"event-stream": {
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/event-stream/-/event-stream-4.0.1.tgz",
"integrity": "sha512-qACXdu/9VHPBzcyhdOWR5/IahhGMf0roTeZJfzz077GwylcDd90yOHLouhmv7GJ5XzPi6ekaQWd8AvPP2nOvpA==",
"requires": {
"duplexer": "^0.1.1",
"from": "^0.1.7",
"map-stream": "0.0.7",
"pause-stream": "^0.0.11",
"split": "^1.0.1",
"stream-combiner": "^0.2.2",
"through": "^2.3.8"
}
},
"events": { "events": {
"version": "1.1.1", "version": "1.1.1",
"resolved": "https://registry.npmjs.org/events/-/events-1.1.1.tgz", "resolved": "https://registry.npmjs.org/events/-/events-1.1.1.tgz",
@ -2933,6 +2957,11 @@
"resolved": "https://registry.npmjs.org/fresh/-/fresh-0.5.2.tgz", "resolved": "https://registry.npmjs.org/fresh/-/fresh-0.5.2.tgz",
"integrity": "sha1-PYyt2Q2XZWn6g1qx+OSyOhBWBac=" "integrity": "sha1-PYyt2Q2XZWn6g1qx+OSyOhBWBac="
}, },
"from": {
"version": "0.1.7",
"resolved": "https://registry.npmjs.org/from/-/from-0.1.7.tgz",
"integrity": "sha1-g8YK/Fi5xWmXAH7Rp2izqzA6RP4="
},
"fs-minipass": { "fs-minipass": {
"version": "1.2.7", "version": "1.2.7",
"resolved": "https://registry.npmjs.org/fs-minipass/-/fs-minipass-1.2.7.tgz", "resolved": "https://registry.npmjs.org/fs-minipass/-/fs-minipass-1.2.7.tgz",
@ -5121,8 +5150,7 @@
"lodash.assign": { "lodash.assign": {
"version": "4.2.0", "version": "4.2.0",
"resolved": "https://registry.npmjs.org/lodash.assign/-/lodash.assign-4.2.0.tgz", "resolved": "https://registry.npmjs.org/lodash.assign/-/lodash.assign-4.2.0.tgz",
"integrity": "sha1-DZnzzNem0mHRm9rrkkUAXShYCOc=", "integrity": "sha1-DZnzzNem0mHRm9rrkkUAXShYCOc="
"dev": true
}, },
"lodash.create": { "lodash.create": {
"version": "3.1.1", "version": "3.1.1",
@ -5444,6 +5472,11 @@
"pify": "^3.0.0" "pify": "^3.0.0"
} }
}, },
"map-stream": {
"version": "0.0.7",
"resolved": "https://registry.npmjs.org/map-stream/-/map-stream-0.0.7.tgz",
"integrity": "sha1-ih8HiW2CsQkmvTdEokIACfiJdKg="
},
"md5.js": { "md5.js": {
"version": "1.3.4", "version": "1.3.4",
"resolved": "https://registry.npmjs.org/md5.js/-/md5.js-1.3.4.tgz", "resolved": "https://registry.npmjs.org/md5.js/-/md5.js-1.3.4.tgz",
@ -5533,6 +5566,11 @@
"integrity": "sha1-5md4PZLonb00KBi1IwudYqZyrRg=", "integrity": "sha1-5md4PZLonb00KBi1IwudYqZyrRg=",
"dev": true "dev": true
}, },
"mingo": {
"version": "1.3.3",
"resolved": "https://registry.npmjs.org/mingo/-/mingo-1.3.3.tgz",
"integrity": "sha1-aSLE0Ufvx3GgFCWixMj3eER4xUY="
},
"minibus": { "minibus": {
"version": "3.1.0", "version": "3.1.0",
"resolved": "https://registry.npmjs.org/minibus/-/minibus-3.1.0.tgz", "resolved": "https://registry.npmjs.org/minibus/-/minibus-3.1.0.tgz",
@ -6403,6 +6441,14 @@
"resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-0.1.7.tgz", "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-0.1.7.tgz",
"integrity": "sha1-32BBeABfUi8V60SQ5yR6G/qmf4w=" "integrity": "sha1-32BBeABfUi8V60SQ5yR6G/qmf4w="
}, },
"pause-stream": {
"version": "0.0.11",
"resolved": "https://registry.npmjs.org/pause-stream/-/pause-stream-0.0.11.tgz",
"integrity": "sha1-/lo0sMvOErWqaitAPuLnO2AvFEU=",
"requires": {
"through": "~2.3"
}
},
"pbkdf2": { "pbkdf2": {
"version": "3.0.14", "version": "3.0.14",
"resolved": "https://registry.npmjs.org/pbkdf2/-/pbkdf2-3.0.14.tgz", "resolved": "https://registry.npmjs.org/pbkdf2/-/pbkdf2-3.0.14.tgz",
@ -7053,6 +7099,32 @@
"integrity": "sha512-1HwIYD/8UlOtFS3QO3w7ey+SdSDFE4HRNLZoZRYVQefrOY3l17epswImeB1ijgJFQJodIaHcwkp3r/myBjFVbg==", "integrity": "sha512-1HwIYD/8UlOtFS3QO3w7ey+SdSDFE4HRNLZoZRYVQefrOY3l17epswImeB1ijgJFQJodIaHcwkp3r/myBjFVbg==",
"dev": true "dev": true
}, },
"save": {
"version": "2.4.0",
"resolved": "https://registry.npmjs.org/save/-/save-2.4.0.tgz",
"integrity": "sha512-wd5L2uVnsKYkIUaK6i8Ie66IOHaI328gMF0MPuTJtYOjXgUolC33LSIk7Qr8WVA55QHaGwfiVS8a7EFIeGOR3w==",
"requires": {
"async": "^2.6.2",
"event-stream": "^4.0.1",
"lodash.assign": "^4.2.0",
"mingo": "1"
},
"dependencies": {
"async": {
"version": "2.6.3",
"resolved": "https://registry.npmjs.org/async/-/async-2.6.3.tgz",
"integrity": "sha512-zflvls11DCy+dQWzTW2dzuilv8Z5X/pjfmZOWba6TNIVDm+2UDaJmXSOXlasHKfNBs8oo3M0aT50fDEWfKZjXg==",
"requires": {
"lodash": "^4.17.14"
}
},
"lodash": {
"version": "4.17.20",
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz",
"integrity": "sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA=="
}
}
},
"sax": { "sax": {
"version": "1.2.4", "version": "1.2.4",
"resolved": "https://registry.npmjs.org/sax/-/sax-1.2.4.tgz", "resolved": "https://registry.npmjs.org/sax/-/sax-1.2.4.tgz",
@ -7363,6 +7435,14 @@
"source-map": "^0.5.6" "source-map": "^0.5.6"
} }
}, },
"split": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/split/-/split-1.0.1.tgz",
"integrity": "sha512-mTyOoPbrivtXnwnIxZRFYRrPNtEFKlpB2fvjSnCQUiAA6qAZzqwna5envK4uk6OIeP17CsdF3rSBGYVBsU0Tkg==",
"requires": {
"through": "2"
}
},
"sprintf-js": { "sprintf-js": {
"version": "1.0.3", "version": "1.0.3",
"resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz", "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz",
@ -7415,6 +7495,15 @@
"readable-stream": "^2.0.2" "readable-stream": "^2.0.2"
} }
}, },
"stream-combiner": {
"version": "0.2.2",
"resolved": "https://registry.npmjs.org/stream-combiner/-/stream-combiner-0.2.2.tgz",
"integrity": "sha1-rsjLrBd7Vrb0+kec7YwZEs7lKFg=",
"requires": {
"duplexer": "~0.1.1",
"through": "~2.3.4"
}
},
"stream-combiner2": { "stream-combiner2": {
"version": "1.1.1", "version": "1.1.1",
"resolved": "https://registry.npmjs.org/stream-combiner2/-/stream-combiner2-1.1.1.tgz", "resolved": "https://registry.npmjs.org/stream-combiner2/-/stream-combiner2-1.1.1.tgz",

View File

@ -13,6 +13,7 @@
"browserify": "^14.5.0", "browserify": "^14.5.0",
"compressjs": "^1.0.3", "compressjs": "^1.0.3",
"cors": "^2.8.4", "cors": "^2.8.4",
"cuid": "^2.1.8",
"dotenv": "^8.2.0", "dotenv": "^8.2.0",
"es6-promise": "^4.1.1", "es6-promise": "^4.1.1",
"express": "^4.16.2", "express": "^4.16.2",
@ -24,6 +25,7 @@
"minibus": "^3.1.0", "minibus": "^3.1.0",
"muicss": "^0.9.33", "muicss": "^0.9.33",
"node-crc": "^1.3.0", "node-crc": "^1.3.0",
"save": "^2.4.0",
"sqlite3": "^4.2.0", "sqlite3": "^4.2.0",
"twitter-lite": "^0.11.0", "twitter-lite": "^0.11.0",
"underscore": "^1.8.3", "underscore": "^1.8.3",

View File

@ -22,6 +22,34 @@ function prepareData(_obj) {
exports.prepareData = prepareData; exports.prepareData = prepareData;
exports.vacuum = () => {
console.log('>> vacuum:');
return new Promise((resolve, reject) => {
db.run('VACUUM', function(err) {
if (err)
reject(err);
resolve({ 'msg':'Vacuum', 'changes': this.changes });
});
});
};
exports.deleteUpTo = (ms) => {
console.log('>> deleteUpTo:', ms);
const sql = 'delete from tweets where createdAt < ?';
return new Promise((resolve, reject) => {
db.run(sql, ms, function(err) {
if (err)
reject(err);
resolve({ 'msg':'Rows deleted', 'changes': this.changes });
});
});
};
exports.getAll = (list) => { exports.getAll = (list) => {
console.log('>> getAll:', list); console.log('>> getAll:', list);
const outgoing = []; const outgoing = [];

View File

@ -20,12 +20,15 @@ class PubSubManager {
this.brokerId = setInterval(() => { this.brokerId = setInterval(() => {
this.broker(); this.broker();
}, 1000); }, 1000);
this.updater = setInterval(() => {
this.doUpdater();
}, 60000);
} }
subscribe(subscriber, channel) { subscribe(subscriber, channel) {
const channelSplit = channel.split('-'); const channelSplit = channel.split('-');
if (channelSplit[0] === 't') { if (channelSplit[0] === 't') {
if (!this.channels.twitter.hasOwnProperty(channel)) if (!this.channels.twitter.hasOwnProperty(channel))
this.channels.twitter[channel] = { this.channels.twitter[channel] = {
'message': null, 'message': null,
@ -38,6 +41,15 @@ class PubSubManager {
} }
} }
unsubscribe(id) {
console.log('Forcing unsub for', id);
for (const channel in this.channels.twitter)
if (this.channels.twitter.hasOwnProperty(channel)) {
const channelObj = this.channels.twitter[channel];
channelObj.subscribers = channelObj.subscribers.filter(item => item.cuid !== id);
}
}
removeBroker() { removeBroker() {
clearInterval(this.brokerId); clearInterval(this.brokerId);
} }
@ -47,7 +59,7 @@ class PubSubManager {
} }
publishTwitter( channel, message) { publishTwitter( channel, message) {
logger.debug('Publish:', channel, message); // logger.debug('Publish:', channel, message);
if (!this.channels.twitter.hasOwnProperty(channel)) if (!this.channels.twitter.hasOwnProperty(channel))
this.channels.twitter[channel] = { this.channels.twitter[channel] = {
@ -58,6 +70,15 @@ class PubSubManager {
this.channels.twitter[channel].message = message; this.channels.twitter[channel].message = message;
} }
doUpdater() {
for (const channel in this.channels.twitter)
// logger.debug(channel);
if (this.channels.twitter.hasOwnProperty(channel)) {
const channelObj = this.channels.twitter[channel];
console.log(`Subscribers: ${channelObj.subscribers.length} in ${channel}`);
// console.log(channelObj.subscribers);
}
}
broker() { broker() {
// logger.debug('Broker...'); // logger.debug('Broker...');
for (const channel in this.channels.twitter) for (const channel in this.channels.twitter)
@ -76,22 +97,6 @@ class PubSubManager {
channelObj.message = ''; channelObj.message = '';
} }
} }
/* for (const channel in this.channels)
if (this.channels.hasOwnProperty(channel)) {
const channelObj = this.channels[channel];
if (channelObj.message) {
console.log(`found message: ${channelObj.message} in ${channel}`);
channelObj.subscribers.forEach(subscriber => {
subscriber.send(JSON.stringify({
'message': channelObj.message
}));
});
channelObj.message = '';
}
}*/
} }
} }
module.exports = PubSubManager; module.exports = PubSubManager;

View File

@ -10,6 +10,7 @@
// https://medium.com/unprogrammer/implementing-publisher-subscriber-pattern-using-javascript-nodejs-and-websockets-82036da7e174 // https://medium.com/unprogrammer/implementing-publisher-subscriber-pattern-using-javascript-nodejs-and-websockets-82036da7e174
const url = require('url'); const url = require('url');
const cuid = require('cuid');
const PubSubManager = require('./pubsub'); const PubSubManager = require('./pubsub');
const logger = require('log4js').getLogger('wshandlerv3'); const logger = require('log4js').getLogger('wshandlerv3');
@ -17,14 +18,25 @@ logger.level = 'debug';
const pubSubManager = new PubSubManager(); const pubSubManager = new PubSubManager();
function noop() {}
function heartbeat() {
this.isAlive = true;
}
module.exports = function(events, wsServer) { module.exports = function(events, wsServer) {
'use strict'; 'use strict';
logger.debug('>> new WS', wsServer); logger.debug('>> new WS', wsServer);
wsServer.on('connection', (ws, req) => { wsServer.on('connection', (ws, req) => {
console.log(`Connection request from: ${req.connection.remoteAddress}`); ws.cuid = cuid();
console.log(`Connection request for ${ws.cuid}`);
ws.isAlive = true;
ws.on('pong', heartbeat);
ws.on('message', (data) => { ws.on('message', (data) => {
console.log(`data: ${ data}`); console.log(`data: ${ data}`, this);
const json = JSON.parse(data); const json = JSON.parse(data);
const request = json.request; const request = json.request;
const message = json.message; const message = json.message;
@ -42,24 +54,22 @@ module.exports = function(events, wsServer) {
} }
}); });
ws.on('close', () => { ws.on('close', () => {
console.log('Stopping client connection.'); console.log('Stopping client connection.', ws.cuid);
pubSubManager.unsubscribe(ws.cuid);
}); });
}); });
const interval = setInterval(function ping() {
wsServer.clients.forEach(function each(ws) {
if (ws.isAlive === false) return ws.terminate();
ws.isAlive = false;
ws.ping(noop);
});
}, 30000);
const sendTweetHandler = (id, obj) => { const sendTweetHandler = (id, obj) => {
logger.debug('sendTweetHandler', id, obj);
pubSubManager.publishTwitter(id, obj); pubSubManager.publishTwitter(id, obj);
/*
try {
ws.send(JSON.stringify(obj));
}
catch (err) {
logger.error(err);
logger.warn('Offending object: ', obj);
}
*/
}; };
events.on('sendTweet', sendTweetHandler); events.on('sendTweet', sendTweetHandler);

View File

@ -40,6 +40,7 @@ const accounts = [
{'name':'ScotRail', 'id':61569136}, {'name':'ScotRail', 'id':61569136},
{'name':'Stansted_Exp', 'id':257511611}, {'name':'Stansted_Exp', 'id':257511611},
{'name':'TfL', 'id':47319664}, {'name':'TfL', 'id':47319664},
{'name':'WestMidRailway', 'id':915554470175657984},
{'name':'NetworkRailBHM', 'id':583910976}, {'name':'NetworkRailBHM', 'id':583910976},
{'name':'NetworkRailEDB', 'id':586614081}, {'name':'NetworkRailEDB', 'id':586614081},
@ -49,11 +50,37 @@ const accounts = [
{'name':'NetworkRailLST', 'id':581826097}, {'name':'NetworkRailLST', 'id':581826097},
{'name':'NetworkRailMAN', 'id':583895871}, {'name':'NetworkRailMAN', 'id':583895871},
{'name':'NetworkRailVIC', 'id':587354752}, {'name':'NetworkRailVIC', 'id':587354752},
{'name':'BTP', 'id':266094415},
{'name':'BTPAvonSomerset', 'id':738664125132345344},
{'name':'BTPBhm', 'id':952003488},
{'name':'BTPBlackCountry', 'id':767698362866999297},
{'name':'BTPCambs', 'id':2574726074},
{'name':'BTPCardiff_NWP', 'id':951714852},
{'name':'BTPEAnglia', 'id':4479942923},
{'name':'BTPEssex', 'id':2949032015},
{'name':'BTPGtrMcr', 'id':1670204977},
{'name':'BTPLeics', 'id':761147194598711296},
{'name':'BTPLiverpoolSt', 'id':951912242},
{'name':'BTPLondon', 'id':957226980},
{'name':'BTPLondonBridge', 'id':3346645594},
{'name':'BTPMersey', 'id':951748434},
{'name':'BTPNorthScot', 'id':2238888007},
{'name':'BTPNorthWales', 'id':951487338},
{'name':'BTPOxon', 'id':741228701791178753},
{'name':'BTPPontypridd', 'id':1672678292},
{'name':'BTPScotland', 'id':957256160}, {'name':'BTPScotland', 'id':957256160},
{'name':'BTPSouthYorks', 'id':3384315676},
{'name':'BTPTeesValley', 'id':802182849872936962},
{'name':'BTPUnderground', 'id':986236195049897985},
{'name':'BTPWales', 'id':1430734374},
{'name':'BTPWaterloo', 'id':951997044},
{'name':'BTPWestScot', 'id':951757261}
]; ];
// {'name':'', 'id':},
(async function(){ (async function(){
/* const response = await user.getBearerToken(); /* const response = await user.getBearerToken();
@ -61,6 +88,29 @@ const accounts = [
bearer_token: response.access_token bearer_token: response.access_token
});*/ });*/
const cleanDB = async function(){
const aDay = 86400000;
const now = new Date().getTime() ;
const oldDate = now - (15 * aDay);
console.log('Clean up to:', oldDate );
await dbmanager.deleteUpTo(oldDate).then((v) => {
console.log(v);
}).catch((err) => {
console.error(err);
});
await dbmanager.vacuum().then((v) => {
console.log(v);
}).catch((err) => {
console.error(err);
});
};
const validAccounts = accounts.map(item => { const validAccounts = accounts.map(item => {
return item.id; return item.id;
}) })
@ -72,6 +122,8 @@ const accounts = [
console.log('Starting:', parameters); console.log('Starting:', parameters);
await cleanDB();
const stream = client.stream("statuses/filter", parameters) const stream = client.stream("statuses/filter", parameters)
.on("start", response => console.log('Started!')) .on("start", response => console.log('Started!'))
.on("data", tweet => { .on("data", tweet => {