const logger = require('log4js').getLogger('pubsub'); logger.level = process.env.LOGGER_LEVEL || 'debug'; class PubSubManager { constructor() { this.channels = { 'twitter': { }, 'weather': { 'message': '', 'subscribers': [] }, 'sports': { 'message': '', 'subscribers': [] } }; this.brokerId = setInterval(() => { this.broker(); }, 1000); /* this.updater = setInterval(() => { this.doUpdater(); }, 60000);*/ } subscribe(subscriber, channel) { const channelSplit = channel.split('-'); if (channelSplit[0] === 't') { if (!this.channels.twitter.hasOwnProperty(channel)) this.channels.twitter[channel] = { 'message': null, 'subscribers': [] }; logger.info(`subscribing to ${channel}`); this.channels.twitter[channel].subscribers.push(subscriber); } } unsubscribe(id) { logger.info('Forcing unsub for', id); for (const channel in this.channels.twitter) if (this.channels.twitter.hasOwnProperty(channel)) { const channelObj = this.channels.twitter[channel]; channelObj.subscribers = channelObj.subscribers.filter(item => item.cuid !== id); } } removeBroker() { clearInterval(this.brokerId); } publish(publisher, channel, message) { this.channels[channel].message = message; } publishTwitter( channel, message) { // logger.debug('Publish:', channel, message); if (!this.channels.twitter.hasOwnProperty(channel)) this.channels.twitter[channel] = { 'message': null, 'subscribers': [] }; this.channels.twitter[channel].message = message; } doUpdater() { for (const channel in this.channels.twitter) // logger.debug(channel); if (this.channels.twitter.hasOwnProperty(channel)) { const channelObj = this.channels.twitter[channel]; logger.info(`Subscribers: ${channelObj.subscribers.length} in ${channel}`); // logger.info(channelObj.subscribers); } } broker() { // logger.debug('Broker...'); for (const channel in this.channels.twitter) // logger.debug(channel); if (this.channels.twitter.hasOwnProperty(channel)) { const channelObj = this.channels.twitter[channel]; if (channelObj.message) { logger.info(`found message: ${channelObj.message} in ${channel}`); channelObj.subscribers.forEach(subscriber => { subscriber.send(JSON.stringify({ 'message': channelObj.message })); }); channelObj.message = ''; } } } } module.exports = PubSubManager;