diff --git a/lib/distributor.js b/lib/distributor.js index a729c96..d046ace 100644 --- a/lib/distributor.js +++ b/lib/distributor.js @@ -2,7 +2,9 @@ var Steppy = require('twostep').Steppy, _ = require('underscore'), - Node = require('./node').Node; + Node = require('./node').Node, + EventEmitter = require('events').EventEmitter, + inherits = require('util').inherits; function Distributor(params) { @@ -14,7 +16,7 @@ function Distributor(params) { // queued projects to build self.queue = []; - self.onBuildUpdate = params.onBuildUpdate || function(build, callback) { + self.saveBuild = params.saveBuild || function(build, callback) { callback(null, build); }; @@ -22,6 +24,8 @@ function Distributor(params) { }; } +inherits(Distributor, EventEmitter); + exports.Distributor = Distributor; Distributor.prototype._createNode = function(nodeParams) { @@ -51,27 +55,33 @@ Distributor.prototype._runNext = function(callback) { var queueItem = self.queue[queueItemIndex]; this.pass(queueItemIndex, queueItem); - queueItem.build.startDate = Date.now(); - queueItem.build.status = 'in-progress'; - self._updateBuild(queueItem.build, this.slot()); + self._updateBuild( + queueItem.build, + {startDate: Date.now(), status: 'in-progress'}, + this.slot() + ); }, function(err, node, queueItemIndex, queueItem, build) { self.queue.splice(queueItemIndex, 1); var stepCallback = this.slot(); var executor = node.run(queueItem.project, build.params, function(err) { - build.endDate = Date.now(); - build.status = err ? 'error' : 'done'; - build.error = err; - self._updateBuild(build, function(err, build) { - // try to run next project from the queue - self._runNext(stepCallback); - }); + self._updateBuild( + build, + { + endDate: Date.now(), + status: err ? 'error' : 'done', + error: err + }, + function(err, build) { + // try to run next project from the queue + self._runNext(stepCallback); + } + ); }); executor.on('currentStep', function(stepLabel) { - build.currentStep = stepLabel; - self._updateBuild(build); + self._updateBuild(build, {currentStep: stepLabel}); }); executor.on('data', function(data) { @@ -79,24 +89,43 @@ Distributor.prototype._runNext = function(callback) { }); executor.once('scmData', function(scmData) { - build.scm = scmData; - self._updateBuild(build); + self._updateBuild(build, {scm: scmData}); }); }, callback ); }; -Distributor.prototype._updateBuild = function(build, callback) { +Distributor.prototype._updateBuild = function(build, changes, callback) { + var self = this; callback = callback || _.noop; - this.onBuildUpdate(build, callback); + + Steppy( + function() { + _(build).extend(changes); + + // skip saving to db of unimportant data + if (changes.currentStep && _(changes).keys().length === 1) { + this.pass(null); + } else { + self.saveBuild(build, this.slot()); + } + }, + function() { + // emits only after get an id (at save build) + self.emit('buildUpdate', build, changes); + + this.pass(build); + }, + callback + ); }; Distributor.prototype.run = function(project, params, callback) { var self = this; Steppy( function() { - self._updateBuild({ + self._updateBuild({}, { project: project, params: params, createDate: Date.now(), diff --git a/resources/projects.js b/resources/projects.js index cccbfe8..087cc1f 100644 --- a/resources/projects.js +++ b/resources/projects.js @@ -24,27 +24,12 @@ module.exports = function(app) { var distributor = new Distributor({ nodes: [{type: 'local', maxExecutorsCount: 1}], - onBuildUpdate: function(build, callback) { + saveBuild: function(build, callback) { Steppy( function() { db.builds.put(build, this.slot()); }, function() { - var buildsResource = app.dataio.resource('builds'); - - if (build.status === 'queued') { - // create resource for build data - var buildDataResource = app.dataio.resource('build' + build.id); - buildDataResource.on('connection', function(client) { - client.emit('sync', 'data', '< collected data >'); - }); - } - - buildsResource.clientEmitSync( - build.status === 'queued' ? 'create' : 'update', - build - ); - this.pass(build); }, callback @@ -55,6 +40,22 @@ module.exports = function(app) { } }); + distributor.on('buildUpdate', function(build, changes) { + var buildsResource = app.dataio.resource('builds'); + + if (build.status === 'queued') { + // create resource for build data + var buildDataResource = app.dataio.resource('build' + build.id); + buildDataResource.on('connection', function(client) { + client.emit('sync', 'data', '< collected data >'); + }); + } + + buildsResource.clientEmitSync('change', { + buildId: build.id, changes: changes + }); + }); + var resource = app.dataio.resource('projects'); resource.use('read', function(req, res) { diff --git a/static/js/app/stores/builds.js b/static/js/app/stores/builds.js index 960e74d..ddd2d57 100644 --- a/static/js/app/stores/builds.js +++ b/static/js/app/stores/builds.js @@ -10,19 +10,21 @@ define([ listenables: BuildActions, builds: [], - _onAction: function(build, action) { - var oldBuild = _(this.builds).findWhere({id: build.id}); + onChange: function(data, action) { + var oldBuild = _(this.builds).findWhere({id: data.buildId}); if (oldBuild) { - _(oldBuild).extend(build); + _(oldBuild).extend(data.changes); } else { - this.builds.unshift(build); + this.builds.unshift( + _({id: data.buildId}).extend(data.changes) + ); } this.trigger(this.builds); }, init: function() { - resource.subscribe('create', 'update', this._onAction); + resource.subscribe('change', this.onChange); }, onReadAll: function() { diff --git a/test/distributor.js b/test/distributor.js index 3978b54..96abf61 100644 --- a/test/distributor.js +++ b/test/distributor.js @@ -51,7 +51,7 @@ describe('Distributor', function() { 3: {queue: {length: 0}, build: {status: 'done'}}, 4: 'Should never happend' }; - var onBuildUpdate = function(build, callback) { + var saveBuild = function(build, callback) { expectUpdateBuild(distributor, build, number, conditionsHash); number++; callback(null, build) @@ -59,7 +59,7 @@ describe('Distributor', function() { distributor = new Distributor({ nodes: [{type: 'local', maxExecutorsCount: 1}], - onBuildUpdate: onBuildUpdate + saveBuild: saveBuild }); }); @@ -103,7 +103,7 @@ describe('Distributor', function() { }, 4: 'Should never happend' }; - var onBuildUpdate = function(build, callback) { + var saveBuild = function(build, callback) { expectUpdateBuild(distributor, build, number, conditionsHash); number++; callback(null, build) @@ -111,7 +111,7 @@ describe('Distributor', function() { distributor = new Distributor({ nodes: [{type: 'local', maxExecutorsCount: 1}], - onBuildUpdate: onBuildUpdate + saveBuild: saveBuild }); });