From 748008fbedccff40fb94b3ffaed4799ac15e4b72 Mon Sep 17 00:00:00 2001 From: oleg Date: Sun, 5 Jul 2015 23:35:57 +0300 Subject: [PATCH] move resource initialization to separate module --- app.js | 20 ++++--- distributor.js | 113 ++++++++++++++++++++++++++++++++++++++++ lib/notifier/console.js | 2 - resources/projects.js | 106 ++----------------------------------- 4 files changed, 129 insertions(+), 112 deletions(-) create mode 100644 distributor.js diff --git a/app.js b/app.js index 69aaaa9..1d79858 100644 --- a/app.js +++ b/app.js @@ -65,10 +65,8 @@ Steppy( this.pass(null); } - // register plugins + // register reader plugins require('./lib/reader/yaml').register(app); - require('./lib/notifier/console').register(app); - require('./lib/notifier/mail').register(app); reader.load(app.config.paths.data, 'config', this.slot()); }, @@ -77,8 +75,6 @@ Steppy( logger.log('Server config:', JSON.stringify(app.config, null, 4)); - notifier.init(app.config.notify, this.slot()); - db.init('path/to/db/ignored/for/memdown', { db: require('memdown'), valueEncoding: 'json' @@ -93,6 +89,18 @@ Steppy( app.projects = projects; logger.log('Loaded projects: ', _(app.projects).pluck('name')); + require('./distributor').init(app, this.slot()); + }, + function(err, distributor) { + app.distributor = distributor; + + // register other plugins + require('./lib/notifier/console').register(app); + require('./lib/notifier/mail').register(app); + require('./httpApi').register(app); + + notifier.init(app.config.notify, this.slot()); + // start file watcher for reloading projects on change var syncProject = function(filename, fileInfo) { var baseDir = app.config.paths.projects, @@ -146,8 +154,6 @@ Steppy( // init resources require('./resources')(app); - - require('./httpApi').register(app); }, function(err) { if (err) throw err; diff --git a/distributor.js b/distributor.js new file mode 100644 index 0000000..95cf1ef --- /dev/null +++ b/distributor.js @@ -0,0 +1,113 @@ +'use strict'; + +var Steppy = require('twostep').Steppy, + _ = require('underscore'), + Distributor = require('./lib/distributor').Distributor, + db = require('./db'), + path = require('path'), + fs = require('fs'), + logger = require('./lib/logger')('distributor'); + + +exports.init = function(app, callback) { + var distributor = new Distributor({ + nodes: app.config.nodes, + projects: app.projects, + saveBuild: function(build, callback) { + Steppy( + function() { + db.builds.put(build, this.slot()); + }, + function() { + this.pass(build); + }, + callback + ); + } + }); + + var getBuildLogPath = function(buildId) { + return path.join(app.config.paths.builds, buildId + '.log'); + }; + + // create resource for build data + var createBuildDataResource = function(build) { + if (build.id in buildDataResourcesHash) { + return; + } + var buildDataResource = app.dataio.resource('build' + build.id); + buildDataResource.on('connection', function(client) { + var callback = this.async(), + buildLogPath = getBuildLogPath(build.id); + + var stream = fs.createReadStream(buildLogPath, { + encoding: 'utf8' + }); + + stream + .on('readable', function() { + var data = stream.read(); + while (data) { + client.emit('sync', 'data', data); + data = stream.read(); + } + }) + .on('end', callback) + .on('error', function(err) { + logger.error( + 'Error during read "' + buildLogPath + '":', + err.stack || err + ); + }); + }); + buildDataResourcesHash[build.id] = buildDataResource; + }; + + exports.createBuildDataResource = createBuildDataResource; + + var buildDataResourcesHash = {}; + + distributor.on('buildUpdate', function(build, changes) { + var buildsResource = app.dataio.resource('builds'); + + if (build.status === 'queued') { + // remove prev log if it exists - for development + fs.unlink(getBuildLogPath(build.id)); + createBuildDataResource(build); + } + + buildsResource.clientEmitSync('change', { + buildId: build.id, changes: changes + }); + }); + + var writeStreamsHash = {}; + + distributor.on('buildData', function(build, data) { + if (!/\n$/.test(data)) { + data += '\n'; + } + + var filePath = getBuildLogPath(build.id); + + if (!writeStreamsHash[filePath]) { + writeStreamsHash[filePath] = fs.createWriteStream( + getBuildLogPath(build.id), {encoding: 'utf8'} + ); + writeStreamsHash[filePath].on('error', function(err) { + logger.error( + 'Error during write "' + filePath + '":', + err.stack || err + ); + }); + } + // TODO: close ended files + writeStreamsHash[filePath].write(data); + + app.dataio.resource('build' + build.id).clientEmitSync( + 'data', data + ); + }); + + callback(null, distributor); +}; diff --git a/lib/notifier/console.js b/lib/notifier/console.js index 7966fff..e4f7cff 100644 --- a/lib/notifier/console.js +++ b/lib/notifier/console.js @@ -12,8 +12,6 @@ exports.register = function(app) { app.lib.notifier.register('console', Notifier); }; -exports.Notifier = Notifier; - Notifier.prototype.send = function(params, callback) { var build = params.build; console.log( diff --git a/resources/projects.js b/resources/projects.js index b5dddfc..153bf2c 100644 --- a/resources/projects.js +++ b/resources/projects.js @@ -1,119 +1,19 @@ 'use strict'; var Steppy = require('twostep').Steppy, - _ = require('underscore'), - project = require('../lib/project'), - Distributor = require('../lib/distributor').Distributor, - db = require('../db'), - path = require('path'), - fs = require('fs'), + createBuildDataResource = require('../distributor').createBuildDataResource, logger = require('../lib/logger')('projects resource'); module.exports = function(app) { - var resource = app.dataio.resource('projects'); - - var distributor = new Distributor({ - nodes: app.config.nodes, - projects: app.projects, - saveBuild: function(build, callback) { - Steppy( - function() { - db.builds.put(build, this.slot()); - }, - function() { - this.pass(build); - }, - callback - ); - } - }); - - // expose distributor to the app - app.distributor = distributor; - - var getBuildLogPath = function(buildId) { - return path.join(app.config.paths.builds, buildId + '.log'); - }; - - var buildDataResourcesHash = {}; - - // create resource for build data - var createBuildDataResource = function(build) { - if (build.id in buildDataResourcesHash) { - return; - } - var buildDataResource = app.dataio.resource('build' + build.id); - buildDataResource.on('connection', function(client) { - var callback = this.async(), - buildLogPath = getBuildLogPath(build.id); - - var stream = fs.createReadStream(buildLogPath, {encoding: 'utf8'}); - - stream - .on('readable', function() { - var data = stream.read(); - while (data) { - client.emit('sync', 'data', data); - data = stream.read(); - } - }) - .on('end', callback) - .on('error', function(err) { - logger.error( - 'Error during read "' + buildLogPath + '":', - err.stack || err - ); - }); - }); - buildDataResourcesHash[build.id] = buildDataResource; - }; - - distributor.on('buildUpdate', function(build, changes) { - var buildsResource = app.dataio.resource('builds'); - - if (build.status === 'queued') { - // remove prev log if it exists - for development - fs.unlink(getBuildLogPath(build.id)); - createBuildDataResource(build); - } - - buildsResource.clientEmitSync('change', { - buildId: build.id, changes: changes - }); - }); + var resource = app.dataio.resource('projects'), + distributor = app.distributor; resource.use('createBuildDataResource', function(req, res) { createBuildDataResource({id: req.data.id}); res.send(); }); - var writeStreamsHash = {}; - - distributor.on('buildData', function(build, data) { - if (!/\n$/.test(data)) { - data += '\n'; - } - - var filePath = getBuildLogPath(build.id); - - if (!writeStreamsHash[filePath]) { - writeStreamsHash[filePath] = fs.createWriteStream( - getBuildLogPath(build.id), {encoding: 'utf8'} - ); - writeStreamsHash[filePath].on('error', function(err) { - logger.error( - 'Error during write "' + filePath + '":', - err.stack || err - ); - }); - } - // TODO: close ended files - writeStreamsHash[filePath].write(data); - - app.dataio.resource('build' + build.id).clientEmitSync('data', data); - }); - resource.use('readAll', function(req, res) { res.send(app.projects); });