send only build changes to the client

This commit is contained in:
oleg 2015-05-10 13:04:54 +03:00
parent 1339b1dfed
commit ed8b86567b
4 changed files with 76 additions and 44 deletions

View File

@ -2,7 +2,9 @@
var Steppy = require('twostep').Steppy, var Steppy = require('twostep').Steppy,
_ = require('underscore'), _ = require('underscore'),
Node = require('./node').Node; Node = require('./node').Node,
EventEmitter = require('events').EventEmitter,
inherits = require('util').inherits;
function Distributor(params) { function Distributor(params) {
@ -14,7 +16,7 @@ function Distributor(params) {
// queued projects to build // queued projects to build
self.queue = []; self.queue = [];
self.onBuildUpdate = params.onBuildUpdate || function(build, callback) { self.saveBuild = params.saveBuild || function(build, callback) {
callback(null, build); callback(null, build);
}; };
@ -22,6 +24,8 @@ function Distributor(params) {
}; };
} }
inherits(Distributor, EventEmitter);
exports.Distributor = Distributor; exports.Distributor = Distributor;
Distributor.prototype._createNode = function(nodeParams) { Distributor.prototype._createNode = function(nodeParams) {
@ -51,27 +55,33 @@ Distributor.prototype._runNext = function(callback) {
var queueItem = self.queue[queueItemIndex]; var queueItem = self.queue[queueItemIndex];
this.pass(queueItemIndex, queueItem); this.pass(queueItemIndex, queueItem);
queueItem.build.startDate = Date.now(); self._updateBuild(
queueItem.build.status = 'in-progress'; queueItem.build,
self._updateBuild(queueItem.build, this.slot()); {startDate: Date.now(), status: 'in-progress'},
this.slot()
);
}, },
function(err, node, queueItemIndex, queueItem, build) { function(err, node, queueItemIndex, queueItem, build) {
self.queue.splice(queueItemIndex, 1); self.queue.splice(queueItemIndex, 1);
var stepCallback = this.slot(); var stepCallback = this.slot();
var executor = node.run(queueItem.project, build.params, function(err) { var executor = node.run(queueItem.project, build.params, function(err) {
build.endDate = Date.now(); self._updateBuild(
build.status = err ? 'error' : 'done'; build,
build.error = err; {
self._updateBuild(build, function(err, build) { endDate: Date.now(),
// try to run next project from the queue status: err ? 'error' : 'done',
self._runNext(stepCallback); error: err
}); },
function(err, build) {
// try to run next project from the queue
self._runNext(stepCallback);
}
);
}); });
executor.on('currentStep', function(stepLabel) { executor.on('currentStep', function(stepLabel) {
build.currentStep = stepLabel; self._updateBuild(build, {currentStep: stepLabel});
self._updateBuild(build);
}); });
executor.on('data', function(data) { executor.on('data', function(data) {
@ -79,24 +89,43 @@ Distributor.prototype._runNext = function(callback) {
}); });
executor.once('scmData', function(scmData) { executor.once('scmData', function(scmData) {
build.scm = scmData; self._updateBuild(build, {scm: scmData});
self._updateBuild(build);
}); });
}, },
callback callback
); );
}; };
Distributor.prototype._updateBuild = function(build, callback) { Distributor.prototype._updateBuild = function(build, changes, callback) {
var self = this;
callback = callback || _.noop; callback = callback || _.noop;
this.onBuildUpdate(build, callback);
Steppy(
function() {
_(build).extend(changes);
// skip saving to db of unimportant data
if (changes.currentStep && _(changes).keys().length === 1) {
this.pass(null);
} else {
self.saveBuild(build, this.slot());
}
},
function() {
// emits only after get an id (at save build)
self.emit('buildUpdate', build, changes);
this.pass(build);
},
callback
);
}; };
Distributor.prototype.run = function(project, params, callback) { Distributor.prototype.run = function(project, params, callback) {
var self = this; var self = this;
Steppy( Steppy(
function() { function() {
self._updateBuild({ self._updateBuild({}, {
project: project, project: project,
params: params, params: params,
createDate: Date.now(), createDate: Date.now(),

View File

@ -24,27 +24,12 @@ module.exports = function(app) {
var distributor = new Distributor({ var distributor = new Distributor({
nodes: [{type: 'local', maxExecutorsCount: 1}], nodes: [{type: 'local', maxExecutorsCount: 1}],
onBuildUpdate: function(build, callback) { saveBuild: function(build, callback) {
Steppy( Steppy(
function() { function() {
db.builds.put(build, this.slot()); db.builds.put(build, this.slot());
}, },
function() { function() {
var buildsResource = app.dataio.resource('builds');
if (build.status === 'queued') {
// create resource for build data
var buildDataResource = app.dataio.resource('build' + build.id);
buildDataResource.on('connection', function(client) {
client.emit('sync', 'data', '< collected data >');
});
}
buildsResource.clientEmitSync(
build.status === 'queued' ? 'create' : 'update',
build
);
this.pass(build); this.pass(build);
}, },
callback callback
@ -55,6 +40,22 @@ module.exports = function(app) {
} }
}); });
distributor.on('buildUpdate', function(build, changes) {
var buildsResource = app.dataio.resource('builds');
if (build.status === 'queued') {
// create resource for build data
var buildDataResource = app.dataio.resource('build' + build.id);
buildDataResource.on('connection', function(client) {
client.emit('sync', 'data', '< collected data >');
});
}
buildsResource.clientEmitSync('change', {
buildId: build.id, changes: changes
});
});
var resource = app.dataio.resource('projects'); var resource = app.dataio.resource('projects');
resource.use('read', function(req, res) { resource.use('read', function(req, res) {

View File

@ -10,19 +10,21 @@ define([
listenables: BuildActions, listenables: BuildActions,
builds: [], builds: [],
_onAction: function(build, action) { onChange: function(data, action) {
var oldBuild = _(this.builds).findWhere({id: build.id}); var oldBuild = _(this.builds).findWhere({id: data.buildId});
if (oldBuild) { if (oldBuild) {
_(oldBuild).extend(build); _(oldBuild).extend(data.changes);
} else { } else {
this.builds.unshift(build); this.builds.unshift(
_({id: data.buildId}).extend(data.changes)
);
} }
this.trigger(this.builds); this.trigger(this.builds);
}, },
init: function() { init: function() {
resource.subscribe('create', 'update', this._onAction); resource.subscribe('change', this.onChange);
}, },
onReadAll: function() { onReadAll: function() {

View File

@ -51,7 +51,7 @@ describe('Distributor', function() {
3: {queue: {length: 0}, build: {status: 'done'}}, 3: {queue: {length: 0}, build: {status: 'done'}},
4: 'Should never happend' 4: 'Should never happend'
}; };
var onBuildUpdate = function(build, callback) { var saveBuild = function(build, callback) {
expectUpdateBuild(distributor, build, number, conditionsHash); expectUpdateBuild(distributor, build, number, conditionsHash);
number++; number++;
callback(null, build) callback(null, build)
@ -59,7 +59,7 @@ describe('Distributor', function() {
distributor = new Distributor({ distributor = new Distributor({
nodes: [{type: 'local', maxExecutorsCount: 1}], nodes: [{type: 'local', maxExecutorsCount: 1}],
onBuildUpdate: onBuildUpdate saveBuild: saveBuild
}); });
}); });
@ -103,7 +103,7 @@ describe('Distributor', function() {
}, },
4: 'Should never happend' 4: 'Should never happend'
}; };
var onBuildUpdate = function(build, callback) { var saveBuild = function(build, callback) {
expectUpdateBuild(distributor, build, number, conditionsHash); expectUpdateBuild(distributor, build, number, conditionsHash);
number++; number++;
callback(null, build) callback(null, build)
@ -111,7 +111,7 @@ describe('Distributor', function() {
distributor = new Distributor({ distributor = new Distributor({
nodes: [{type: 'local', maxExecutorsCount: 1}], nodes: [{type: 'local', maxExecutorsCount: 1}],
onBuildUpdate: onBuildUpdate saveBuild: saveBuild
}); });
}); });