diff --git a/.gitignore b/.gitignore index 858824f..d58cbd5 100644 --- a/.gitignore +++ b/.gitignore @@ -151,3 +151,4 @@ fabric.properties /src/es2016/bundle.js.map /src/backbone/bundle.js.map +!/db/twitter.db diff --git a/ecosystem.config.js b/ecosystem.config.js new file mode 100644 index 0000000..dab9398 --- /dev/null +++ b/ecosystem.config.js @@ -0,0 +1,14 @@ +module.exports = { + 'apps' : [{ + 'name': 'Traintimes Server', + 'script': './server.js', + 'watch': ['./server.js', 'live/build'] + }, { + 'name': 'Twitter Grabber', + 'script': './twitter.js', + 'watch': './twitter.js', + 'watch_delay': 10000, + 'cron_restart' : '0 4 * * */3' + }] + +}; diff --git a/package-lock.json b/package-lock.json index 2d87cbb..f65fda9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1923,6 +1923,11 @@ "integrity": "sha1-lGfQMsOM+u+58teVASUwYvh/ob0=", "dev": true }, + "cuid": { + "version": "2.1.8", + "resolved": "https://registry.npmjs.org/cuid/-/cuid-2.1.8.tgz", + "integrity": "sha512-xiEMER6E7TlTPnDxrM4eRiC6TRgjNX9xzEZ5U/Se2YJKr7Mq4pJn/2XEHjl3STcSh96GmkHPcBXLES8M29wyyg==" + }, "dashdash": { "version": "1.14.1", "resolved": "https://registry.npmjs.org/dashdash/-/dashdash-1.14.1.tgz", @@ -2170,6 +2175,11 @@ "integrity": "sha1-ED01J/0xUo9AGIEwyEHv3XgmTlw=", "optional": true }, + "duplexer": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/duplexer/-/duplexer-0.1.2.tgz", + "integrity": "sha512-jtD6YG370ZCIi/9GTaJKQxWTZD045+4R4hTk/x1UyoqadyJ9x9CgSi1RlVDQF8U2sxLLSnFkCaMihqljHIWgMg==" + }, "duplexer2": { "version": "0.1.4", "resolved": "https://registry.npmjs.org/duplexer2/-/duplexer2-0.1.4.tgz", @@ -2578,6 +2588,20 @@ "resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz", "integrity": "sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc=" }, + "event-stream": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/event-stream/-/event-stream-4.0.1.tgz", + "integrity": "sha512-qACXdu/9VHPBzcyhdOWR5/IahhGMf0roTeZJfzz077GwylcDd90yOHLouhmv7GJ5XzPi6ekaQWd8AvPP2nOvpA==", + "requires": { + "duplexer": "^0.1.1", + "from": "^0.1.7", + "map-stream": "0.0.7", + "pause-stream": "^0.0.11", + "split": "^1.0.1", + "stream-combiner": "^0.2.2", + "through": "^2.3.8" + } + }, "events": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/events/-/events-1.1.1.tgz", @@ -2933,6 +2957,11 @@ "resolved": "https://registry.npmjs.org/fresh/-/fresh-0.5.2.tgz", "integrity": "sha1-PYyt2Q2XZWn6g1qx+OSyOhBWBac=" }, + "from": { + "version": "0.1.7", + "resolved": "https://registry.npmjs.org/from/-/from-0.1.7.tgz", + "integrity": "sha1-g8YK/Fi5xWmXAH7Rp2izqzA6RP4=" + }, "fs-minipass": { "version": "1.2.7", "resolved": "https://registry.npmjs.org/fs-minipass/-/fs-minipass-1.2.7.tgz", @@ -5121,8 +5150,7 @@ "lodash.assign": { "version": "4.2.0", "resolved": "https://registry.npmjs.org/lodash.assign/-/lodash.assign-4.2.0.tgz", - "integrity": "sha1-DZnzzNem0mHRm9rrkkUAXShYCOc=", - "dev": true + "integrity": "sha1-DZnzzNem0mHRm9rrkkUAXShYCOc=" }, "lodash.create": { "version": "3.1.1", @@ -5444,6 +5472,11 @@ "pify": "^3.0.0" } }, + "map-stream": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/map-stream/-/map-stream-0.0.7.tgz", + "integrity": "sha1-ih8HiW2CsQkmvTdEokIACfiJdKg=" + }, "md5.js": { "version": "1.3.4", "resolved": "https://registry.npmjs.org/md5.js/-/md5.js-1.3.4.tgz", @@ -5533,6 +5566,11 @@ "integrity": "sha1-5md4PZLonb00KBi1IwudYqZyrRg=", "dev": true }, + "mingo": { + "version": "1.3.3", + "resolved": "https://registry.npmjs.org/mingo/-/mingo-1.3.3.tgz", + "integrity": "sha1-aSLE0Ufvx3GgFCWixMj3eER4xUY=" + }, "minibus": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/minibus/-/minibus-3.1.0.tgz", @@ -6403,6 +6441,14 @@ "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-0.1.7.tgz", "integrity": "sha1-32BBeABfUi8V60SQ5yR6G/qmf4w=" }, + "pause-stream": { + "version": "0.0.11", + "resolved": "https://registry.npmjs.org/pause-stream/-/pause-stream-0.0.11.tgz", + "integrity": "sha1-/lo0sMvOErWqaitAPuLnO2AvFEU=", + "requires": { + "through": "~2.3" + } + }, "pbkdf2": { "version": "3.0.14", "resolved": "https://registry.npmjs.org/pbkdf2/-/pbkdf2-3.0.14.tgz", @@ -7053,6 +7099,32 @@ "integrity": "sha512-1HwIYD/8UlOtFS3QO3w7ey+SdSDFE4HRNLZoZRYVQefrOY3l17epswImeB1ijgJFQJodIaHcwkp3r/myBjFVbg==", "dev": true }, + "save": { + "version": "2.4.0", + "resolved": "https://registry.npmjs.org/save/-/save-2.4.0.tgz", + "integrity": "sha512-wd5L2uVnsKYkIUaK6i8Ie66IOHaI328gMF0MPuTJtYOjXgUolC33LSIk7Qr8WVA55QHaGwfiVS8a7EFIeGOR3w==", + "requires": { + "async": "^2.6.2", + "event-stream": "^4.0.1", + "lodash.assign": "^4.2.0", + "mingo": "1" + }, + "dependencies": { + "async": { + "version": "2.6.3", + "resolved": "https://registry.npmjs.org/async/-/async-2.6.3.tgz", + "integrity": "sha512-zflvls11DCy+dQWzTW2dzuilv8Z5X/pjfmZOWba6TNIVDm+2UDaJmXSOXlasHKfNBs8oo3M0aT50fDEWfKZjXg==", + "requires": { + "lodash": "^4.17.14" + } + }, + "lodash": { + "version": "4.17.20", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz", + "integrity": "sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA==" + } + } + }, "sax": { "version": "1.2.4", "resolved": "https://registry.npmjs.org/sax/-/sax-1.2.4.tgz", @@ -7363,6 +7435,14 @@ "source-map": "^0.5.6" } }, + "split": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/split/-/split-1.0.1.tgz", + "integrity": "sha512-mTyOoPbrivtXnwnIxZRFYRrPNtEFKlpB2fvjSnCQUiAA6qAZzqwna5envK4uk6OIeP17CsdF3rSBGYVBsU0Tkg==", + "requires": { + "through": "2" + } + }, "sprintf-js": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz", @@ -7415,6 +7495,15 @@ "readable-stream": "^2.0.2" } }, + "stream-combiner": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/stream-combiner/-/stream-combiner-0.2.2.tgz", + "integrity": "sha1-rsjLrBd7Vrb0+kec7YwZEs7lKFg=", + "requires": { + "duplexer": "~0.1.1", + "through": "~2.3.4" + } + }, "stream-combiner2": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/stream-combiner2/-/stream-combiner2-1.1.1.tgz", diff --git a/package.json b/package.json index 30d9ba0..24ad99a 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "browserify": "^14.5.0", "compressjs": "^1.0.3", "cors": "^2.8.4", + "cuid": "^2.1.8", "dotenv": "^8.2.0", "es6-promise": "^4.1.1", "express": "^4.16.2", @@ -24,6 +25,7 @@ "minibus": "^3.1.0", "muicss": "^0.9.33", "node-crc": "^1.3.0", + "save": "^2.4.0", "sqlite3": "^4.2.0", "twitter-lite": "^0.11.0", "underscore": "^1.8.3", diff --git a/server/db/dbmanager.js b/server/db/dbmanager.js index 5b6c57e..6d0d13a 100644 --- a/server/db/dbmanager.js +++ b/server/db/dbmanager.js @@ -22,6 +22,34 @@ function prepareData(_obj) { exports.prepareData = prepareData; +exports.vacuum = () => { + console.log('>> vacuum:'); + + return new Promise((resolve, reject) => { + db.run('VACUUM', function(err) { + if (err) + reject(err); + + resolve({ 'msg':'Vacuum', 'changes': this.changes }); + }); + }); +}; + +exports.deleteUpTo = (ms) => { + console.log('>> deleteUpTo:', ms); + + const sql = 'delete from tweets where createdAt < ?'; + + return new Promise((resolve, reject) => { + db.run(sql, ms, function(err) { + if (err) + reject(err); + + resolve({ 'msg':'Rows deleted', 'changes': this.changes }); + }); + }); +}; + exports.getAll = (list) => { console.log('>> getAll:', list); const outgoing = []; diff --git a/server/lib/pubsub.js b/server/lib/pubsub.js index 20b9419..93b55b4 100644 --- a/server/lib/pubsub.js +++ b/server/lib/pubsub.js @@ -20,12 +20,15 @@ class PubSubManager { this.brokerId = setInterval(() => { this.broker(); }, 1000); + + this.updater = setInterval(() => { + this.doUpdater(); + }, 60000); } subscribe(subscriber, channel) { const channelSplit = channel.split('-'); if (channelSplit[0] === 't') { - if (!this.channels.twitter.hasOwnProperty(channel)) this.channels.twitter[channel] = { 'message': null, @@ -38,6 +41,15 @@ class PubSubManager { } } + unsubscribe(id) { + console.log('Forcing unsub for', id); + for (const channel in this.channels.twitter) + if (this.channels.twitter.hasOwnProperty(channel)) { + const channelObj = this.channels.twitter[channel]; + channelObj.subscribers = channelObj.subscribers.filter(item => item.cuid !== id); + } + } + removeBroker() { clearInterval(this.brokerId); } @@ -47,7 +59,7 @@ class PubSubManager { } publishTwitter( channel, message) { - logger.debug('Publish:', channel, message); + // logger.debug('Publish:', channel, message); if (!this.channels.twitter.hasOwnProperty(channel)) this.channels.twitter[channel] = { @@ -58,6 +70,15 @@ class PubSubManager { this.channels.twitter[channel].message = message; } + doUpdater() { + for (const channel in this.channels.twitter) + // logger.debug(channel); + if (this.channels.twitter.hasOwnProperty(channel)) { + const channelObj = this.channels.twitter[channel]; + console.log(`Subscribers: ${channelObj.subscribers.length} in ${channel}`); + // console.log(channelObj.subscribers); + } + } broker() { // logger.debug('Broker...'); for (const channel in this.channels.twitter) @@ -76,22 +97,6 @@ class PubSubManager { channelObj.message = ''; } } - - /* for (const channel in this.channels) - if (this.channels.hasOwnProperty(channel)) { - const channelObj = this.channels[channel]; - if (channelObj.message) { - console.log(`found message: ${channelObj.message} in ${channel}`); - - channelObj.subscribers.forEach(subscriber => { - subscriber.send(JSON.stringify({ - 'message': channelObj.message - })); - }); - - channelObj.message = ''; - } - }*/ } } module.exports = PubSubManager; diff --git a/server/lib/wshandlerv3.js b/server/lib/wshandlerv3.js index 5ae9cb1..7add8ab 100644 --- a/server/lib/wshandlerv3.js +++ b/server/lib/wshandlerv3.js @@ -10,6 +10,7 @@ // https://medium.com/unprogrammer/implementing-publisher-subscriber-pattern-using-javascript-nodejs-and-websockets-82036da7e174 const url = require('url'); +const cuid = require('cuid'); const PubSubManager = require('./pubsub'); const logger = require('log4js').getLogger('wshandlerv3'); @@ -17,14 +18,25 @@ logger.level = 'debug'; const pubSubManager = new PubSubManager(); +function noop() {} + +function heartbeat() { + this.isAlive = true; +} + module.exports = function(events, wsServer) { 'use strict'; logger.debug('>> new WS', wsServer); wsServer.on('connection', (ws, req) => { - console.log(`Connection request from: ${req.connection.remoteAddress}`); + ws.cuid = cuid(); + console.log(`Connection request for ${ws.cuid}`); + + ws.isAlive = true; + ws.on('pong', heartbeat); + ws.on('message', (data) => { - console.log(`data: ${ data}`); + console.log(`data: ${ data}`, this); const json = JSON.parse(data); const request = json.request; const message = json.message; @@ -42,24 +54,22 @@ module.exports = function(events, wsServer) { } }); ws.on('close', () => { - console.log('Stopping client connection.'); + console.log('Stopping client connection.', ws.cuid); + pubSubManager.unsubscribe(ws.cuid); }); }); + const interval = setInterval(function ping() { + wsServer.clients.forEach(function each(ws) { + if (ws.isAlive === false) return ws.terminate(); + + ws.isAlive = false; + ws.ping(noop); + }); + }, 30000); + const sendTweetHandler = (id, obj) => { - logger.debug('sendTweetHandler', id, obj); - pubSubManager.publishTwitter(id, obj); - - /* - try { - ws.send(JSON.stringify(obj)); - } - catch (err) { - logger.error(err); - logger.warn('Offending object: ', obj); - } -*/ }; events.on('sendTweet', sendTweetHandler); diff --git a/twitter.js b/twitter.js index eb653db..6d1457a 100644 --- a/twitter.js +++ b/twitter.js @@ -40,6 +40,7 @@ const accounts = [ {'name':'ScotRail', 'id':61569136}, {'name':'Stansted_Exp', 'id':257511611}, {'name':'TfL', 'id':47319664}, + {'name':'WestMidRailway', 'id':915554470175657984}, {'name':'NetworkRailBHM', 'id':583910976}, {'name':'NetworkRailEDB', 'id':586614081}, @@ -49,11 +50,37 @@ const accounts = [ {'name':'NetworkRailLST', 'id':581826097}, {'name':'NetworkRailMAN', 'id':583895871}, {'name':'NetworkRailVIC', 'id':587354752}, + + {'name':'BTP', 'id':266094415}, + {'name':'BTPAvonSomerset', 'id':738664125132345344}, + {'name':'BTPBhm', 'id':952003488}, + {'name':'BTPBlackCountry', 'id':767698362866999297}, + {'name':'BTPCambs', 'id':2574726074}, + {'name':'BTPCardiff_NWP', 'id':951714852}, + {'name':'BTPEAnglia', 'id':4479942923}, + {'name':'BTPEssex', 'id':2949032015}, + {'name':'BTPGtrMcr', 'id':1670204977}, + {'name':'BTPLeics', 'id':761147194598711296}, + {'name':'BTPLiverpoolSt', 'id':951912242}, + {'name':'BTPLondon', 'id':957226980}, + {'name':'BTPLondonBridge', 'id':3346645594}, + {'name':'BTPMersey', 'id':951748434}, + {'name':'BTPNorthScot', 'id':2238888007}, + {'name':'BTPNorthWales', 'id':951487338}, + {'name':'BTPOxon', 'id':741228701791178753}, + {'name':'BTPPontypridd', 'id':1672678292}, {'name':'BTPScotland', 'id':957256160}, + {'name':'BTPSouthYorks', 'id':3384315676}, + {'name':'BTPTeesValley', 'id':802182849872936962}, + {'name':'BTPUnderground', 'id':986236195049897985}, + {'name':'BTPWales', 'id':1430734374}, + {'name':'BTPWaterloo', 'id':951997044}, + {'name':'BTPWestScot', 'id':951757261} + ]; - +// {'name':'', 'id':}, (async function(){ /* const response = await user.getBearerToken(); @@ -61,6 +88,29 @@ const accounts = [ bearer_token: response.access_token });*/ + const cleanDB = async function(){ + + const aDay = 86400000; + const now = new Date().getTime() ; + + const oldDate = now - (15 * aDay); + + console.log('Clean up to:', oldDate ); + + await dbmanager.deleteUpTo(oldDate).then((v) => { + console.log(v); + }).catch((err) => { + console.error(err); + }); + + await dbmanager.vacuum().then((v) => { + console.log(v); + }).catch((err) => { + console.error(err); + }); + + }; + const validAccounts = accounts.map(item => { return item.id; }) @@ -72,6 +122,8 @@ const accounts = [ console.log('Starting:', parameters); + await cleanDB(); + const stream = client.stream("statuses/filter", parameters) .on("start", response => console.log('Started!')) .on("data", tweet => {