nci/distributor.js
2015-10-15 08:36:04 +03:00

159 lines
3.9 KiB
JavaScript

'use strict';
var Steppy = require('twostep').Steppy,
_ = require('underscore'),
Distributor = require('./lib/distributor').Distributor,
getAvgProjectBuildDuration = (
require('./lib/project').getAvgProjectBuildDuration
),
db = require('./db'),
path = require('path'),
fs = require('fs'),
logger = require('./lib/logger')('distributor');
exports.init = function(app, callback) {
var distributor = new Distributor({
nodes: app.config.nodes,
projects: app.projects,
saveBuild: function(build, callback) {
Steppy(
function() {
if (!_(build.project).has('avgBuildDuration')) {
getAvgProjectBuildDuration(build.project.name, this.slot());
} else {
this.pass(build.project.avgBuildDuration);
}
},
function(err, avgBuildDuration) {
build.project.avgBuildDuration = avgBuildDuration;
db.builds.put(build, this.slot());
},
function() {
this.pass(build);
},
callback
);
}
});
var getBuildLogPath = function(buildId) {
return path.join(app.config.paths.builds, buildId + '.log');
};
// create resource for build data
var createBuildDataResource = function(buildId) {
if (buildId in buildDataResourcesHash) {
return;
}
var buildDataResource = app.dataio.resource('build' + buildId);
buildDataResource.on('connection', function(client) {
var callback = this.async(),
buildLogPath = getBuildLogPath(buildId);
var stream = fs.createReadStream(buildLogPath, {
encoding: 'utf8'
});
stream
.on('readable', function() {
var data = stream.read();
while (data) {
client.emit('sync', 'data', {lines: [{text: data}]});
data = stream.read();
}
})
.on('end', callback)
.on('error', function(err) {
logger.error(
'Error during read "' + buildLogPath + '":',
err.stack || err
);
});
});
buildDataResourcesHash[buildId] = buildDataResource;
};
exports.createBuildDataResource = createBuildDataResource;
var buildDataResourcesHash = {};
distributor.on('buildUpdate', function(build, changes) {
var buildsResource = app.dataio.resource('builds');
if (build.status === 'queued') {
// remove prev log if it exists - for development
fs.unlink(getBuildLogPath(build.id));
createBuildDataResource(build.id);
}
// notify about build's project change, coz building affects project
// related stat (last build date, avg build time, etc)
if (changes.completed) {
var projectsResource = app.dataio.resource('projects');
projectsResource.clientEmitSyncChange({name: build.project.name});
}
buildsResource.clientEmitSync('change', {
buildId: build.id, changes: changes
});
});
var writeStreamsHash = {};
var buildLogLineNumbersHash = {};
distributor.on('buildData', function(build, data) {
if (!/\n$/.test(data)) {
data += '\n';
}
if (buildLogLineNumbersHash[build.id]) {
buildLogLineNumbersHash[build.id]++;
} else {
buildLogLineNumbersHash[build.id] = 1;
}
var logLineNumber = buildLogLineNumbersHash[build.id];
var filePath = getBuildLogPath(build.id);
if (!writeStreamsHash[filePath]) {
writeStreamsHash[filePath] = fs.createWriteStream(
getBuildLogPath(build.id), {encoding: 'utf8'}
);
writeStreamsHash[filePath].on('error', function(err) {
logger.error(
'Error during write "' + filePath + '":',
err.stack || err
);
});
}
// TODO: close ended files
writeStreamsHash[filePath].write(data);
app.dataio.resource('build' + build.id).clientEmitSync(
'data',
{lines: [{number: logLineNumber, text: data}]}
);
// write build logs to db
db.logLines.put({
buildId: build.id,
number: logLineNumber,
text: data
}, function(err) {
if (err) {
logger.error(
'Error during write log line "' + logLineNumber +
'" for build "' + build.id + '":',
err.stack || err
);
}
});
});
callback(null, distributor);
};