move resource initialization to separate module

This commit is contained in:
oleg 2015-07-05 23:35:57 +03:00
parent 4e2fddb2dd
commit 748008fbed
4 changed files with 129 additions and 112 deletions

20
app.js
View File

@ -65,10 +65,8 @@ Steppy(
this.pass(null);
}
// register plugins
// register reader plugins
require('./lib/reader/yaml').register(app);
require('./lib/notifier/console').register(app);
require('./lib/notifier/mail').register(app);
reader.load(app.config.paths.data, 'config', this.slot());
},
@ -77,8 +75,6 @@ Steppy(
logger.log('Server config:', JSON.stringify(app.config, null, 4));
notifier.init(app.config.notify, this.slot());
db.init('path/to/db/ignored/for/memdown', {
db: require('memdown'),
valueEncoding: 'json'
@ -93,6 +89,18 @@ Steppy(
app.projects = projects;
logger.log('Loaded projects: ', _(app.projects).pluck('name'));
require('./distributor').init(app, this.slot());
},
function(err, distributor) {
app.distributor = distributor;
// register other plugins
require('./lib/notifier/console').register(app);
require('./lib/notifier/mail').register(app);
require('./httpApi').register(app);
notifier.init(app.config.notify, this.slot());
// start file watcher for reloading projects on change
var syncProject = function(filename, fileInfo) {
var baseDir = app.config.paths.projects,
@ -146,8 +154,6 @@ Steppy(
// init resources
require('./resources')(app);
require('./httpApi').register(app);
},
function(err) {
if (err) throw err;

113
distributor.js Normal file
View File

@ -0,0 +1,113 @@
'use strict';
var Steppy = require('twostep').Steppy,
_ = require('underscore'),
Distributor = require('./lib/distributor').Distributor,
db = require('./db'),
path = require('path'),
fs = require('fs'),
logger = require('./lib/logger')('distributor');
exports.init = function(app, callback) {
var distributor = new Distributor({
nodes: app.config.nodes,
projects: app.projects,
saveBuild: function(build, callback) {
Steppy(
function() {
db.builds.put(build, this.slot());
},
function() {
this.pass(build);
},
callback
);
}
});
var getBuildLogPath = function(buildId) {
return path.join(app.config.paths.builds, buildId + '.log');
};
// create resource for build data
var createBuildDataResource = function(build) {
if (build.id in buildDataResourcesHash) {
return;
}
var buildDataResource = app.dataio.resource('build' + build.id);
buildDataResource.on('connection', function(client) {
var callback = this.async(),
buildLogPath = getBuildLogPath(build.id);
var stream = fs.createReadStream(buildLogPath, {
encoding: 'utf8'
});
stream
.on('readable', function() {
var data = stream.read();
while (data) {
client.emit('sync', 'data', data);
data = stream.read();
}
})
.on('end', callback)
.on('error', function(err) {
logger.error(
'Error during read "' + buildLogPath + '":',
err.stack || err
);
});
});
buildDataResourcesHash[build.id] = buildDataResource;
};
exports.createBuildDataResource = createBuildDataResource;
var buildDataResourcesHash = {};
distributor.on('buildUpdate', function(build, changes) {
var buildsResource = app.dataio.resource('builds');
if (build.status === 'queued') {
// remove prev log if it exists - for development
fs.unlink(getBuildLogPath(build.id));
createBuildDataResource(build);
}
buildsResource.clientEmitSync('change', {
buildId: build.id, changes: changes
});
});
var writeStreamsHash = {};
distributor.on('buildData', function(build, data) {
if (!/\n$/.test(data)) {
data += '\n';
}
var filePath = getBuildLogPath(build.id);
if (!writeStreamsHash[filePath]) {
writeStreamsHash[filePath] = fs.createWriteStream(
getBuildLogPath(build.id), {encoding: 'utf8'}
);
writeStreamsHash[filePath].on('error', function(err) {
logger.error(
'Error during write "' + filePath + '":',
err.stack || err
);
});
}
// TODO: close ended files
writeStreamsHash[filePath].write(data);
app.dataio.resource('build' + build.id).clientEmitSync(
'data', data
);
});
callback(null, distributor);
};

View File

@ -12,8 +12,6 @@ exports.register = function(app) {
app.lib.notifier.register('console', Notifier);
};
exports.Notifier = Notifier;
Notifier.prototype.send = function(params, callback) {
var build = params.build;
console.log(

View File

@ -1,119 +1,19 @@
'use strict';
var Steppy = require('twostep').Steppy,
_ = require('underscore'),
project = require('../lib/project'),
Distributor = require('../lib/distributor').Distributor,
db = require('../db'),
path = require('path'),
fs = require('fs'),
createBuildDataResource = require('../distributor').createBuildDataResource,
logger = require('../lib/logger')('projects resource');
module.exports = function(app) {
var resource = app.dataio.resource('projects');
var distributor = new Distributor({
nodes: app.config.nodes,
projects: app.projects,
saveBuild: function(build, callback) {
Steppy(
function() {
db.builds.put(build, this.slot());
},
function() {
this.pass(build);
},
callback
);
}
});
// expose distributor to the app
app.distributor = distributor;
var getBuildLogPath = function(buildId) {
return path.join(app.config.paths.builds, buildId + '.log');
};
var buildDataResourcesHash = {};
// create resource for build data
var createBuildDataResource = function(build) {
if (build.id in buildDataResourcesHash) {
return;
}
var buildDataResource = app.dataio.resource('build' + build.id);
buildDataResource.on('connection', function(client) {
var callback = this.async(),
buildLogPath = getBuildLogPath(build.id);
var stream = fs.createReadStream(buildLogPath, {encoding: 'utf8'});
stream
.on('readable', function() {
var data = stream.read();
while (data) {
client.emit('sync', 'data', data);
data = stream.read();
}
})
.on('end', callback)
.on('error', function(err) {
logger.error(
'Error during read "' + buildLogPath + '":',
err.stack || err
);
});
});
buildDataResourcesHash[build.id] = buildDataResource;
};
distributor.on('buildUpdate', function(build, changes) {
var buildsResource = app.dataio.resource('builds');
if (build.status === 'queued') {
// remove prev log if it exists - for development
fs.unlink(getBuildLogPath(build.id));
createBuildDataResource(build);
}
buildsResource.clientEmitSync('change', {
buildId: build.id, changes: changes
});
});
var resource = app.dataio.resource('projects'),
distributor = app.distributor;
resource.use('createBuildDataResource', function(req, res) {
createBuildDataResource({id: req.data.id});
res.send();
});
var writeStreamsHash = {};
distributor.on('buildData', function(build, data) {
if (!/\n$/.test(data)) {
data += '\n';
}
var filePath = getBuildLogPath(build.id);
if (!writeStreamsHash[filePath]) {
writeStreamsHash[filePath] = fs.createWriteStream(
getBuildLogPath(build.id), {encoding: 'utf8'}
);
writeStreamsHash[filePath].on('error', function(err) {
logger.error(
'Error during write "' + filePath + '":',
err.stack || err
);
});
}
// TODO: close ended files
writeStreamsHash[filePath].write(data);
app.dataio.resource('build' + build.id).clientEmitSync('data', data);
});
resource.use('readAll', function(req, res) {
res.send(app.projects);
});