var Promise = require("bluebird");
var spawn = require("child_process").spawn;
var assert = require("assert");
var argv = require("optimist").argv;
var ARGS = [].concat(
    process.execArgv,
    __filename,
    process.argv.slice(2)
);
function sanitizeCpCount(input) {
    return Math.min(Math.max(1, (+input | 0)), 16);
}

if (typeof argv["child-processes"] === "number") {
    var CHILD_PROCESSES = sanitizeCpCount(argv["child-processes"]);
} else if (process.env.CHILD_PROCESSES) {
    var CHILD_PROCESSES = sanitizeCpCount(process.env.CHILD_PROCESSES);
} else {
    var CHILD_PROCESSES = 4;
}


var debugging = false;

function debug() {
    if (debugging) {
        var msg = [].slice.call(arguments).join(" ");
        console.log(msg);
    }
}

function ResultWithOutput(result, stdout, stderr) {
    this.result = result;
    this.stdout = stdout;
    this.stderr = stderr;
}

var jobRunner = (function() {
    var taskId = 0;
    var workerCount;
    var workers = [];
    var tasks = [];
    var killed = true;

    function leastTotalRunningTime(a, b) {
        return a.totalRunningTime() - b.totalRunningTime();
    }

    function each(fn) {
        if (typeof fn === "string") {
            var args = [].slice.call(arguments, 1);
            return workers.forEach(function(worker) {
                worker[fn].apply(worker, args);
            });
        }
        return workers.forEach(fn);
    }

    function retLogger(result) {
        if (result instanceof ResultWithOutput) {
            process.stdout.write(result.stdout);
            process.stderr.write(result.stderr);
        }
        return result;
    }

    function throwLogger(result) {
        if (result && (result.stderr || result.stdout)) {
            process.stdout.write(result.stdout);
            process.stderr.write(result.stderr);
        }
        throw result;
    }

    function reinit() {
        each("init");
    }

    function checkShutDown(secondCheck) {
        if (tasks.length > 0) return;
        var anyWorkerHasTasks = workers.some(function(w) {
            return w.hasTasks();
        });
        if (anyWorkerHasTasks) return;
        if (secondCheck) return ret.exit();
        setTimeout(function() {
            checkShutDown(true);
        }, 10);
    }

    function schedule(task, queued) {
        var worker = workers.filter(function(worker) {
            return task.isolated ?
                !worker.hasTasks() : !worker._runningIsolatedTask;
        }).sort(leastTotalRunningTime)[0];

        if (!worker) {
            if (!queued) tasks.push(task);
            return false;
        } else {
            assert(task.isolated ? !worker.hasTasks() : true);
            debug("found free worker", worker._id, "for task", task.id);
            worker.performTask(task);
            return true;
        }
    }

    var ret = {
        init: function(requirePath, initTaskFn) {
            if (workers.length) return;
            if (typeof requirePath !== "string") throw new TypeError();
            var count = CHILD_PROCESSES;
            workerCount = count;
            var id = 0;
            for (var i = 0; i < count; ++i) {
                workers.push(new Worker(id++, requirePath, initTaskFn));
            }
            process.on("exit", ret.exit);
        },

        exit: function() {
            if (killed) return;
            killed = true;
            each("kill");
        },

        run: function(task, opts) {
            if (!workerCount) throw new Error("task runner has not been initialized");
            if (typeof task !== "function") throw new TypeError("fn not function");
            if (killed) {
                killed = false;
                reinit();
            }
            opts = opts || {};
            var context = opts.context || {};
            var log = opts.log === false ? false : true;
            var estimate = typeof opts.estimate === "number" ? opts.estimate : null;
            var progress = typeof opts.progress === "function" ? opts.progress : null;
            var isolated = !!opts.isolated;
            var resolve, reject;
            var promise = new Promise(function() {
                resolve = arguments[0];
                reject = arguments[1];
            });
            var task = {
                isolated: isolated,
                task: {
                    code: task + "",
                    context: context
                },
                resolve: resolve,
                reject: reject,
                estimate: estimate,
                id: taskId++,
                log: log,
                progress: progress
            };
            schedule(task, false);
            if (log) promise = promise.then(retLogger, throwLogger);
            return promise;
        },

        setVerbose: function(v) {
            debugging = !!v;
        },

        _workerIdleNotification: function() {
            var _t = tasks;
            if (_t.length === 0) {
                return checkShutDown();
            }
            while(_t.length > 0) {
                var task = _t.shift();
                if (!schedule(task, true)) {
                    _t.unshift(task);
                    return;
                }
            }
        }
    };
    return ret;
})();

function Worker(id, requirePath, initTaskFn) {
    this._initTaskFn = initTaskFn;
    this._requirePath = requirePath;
    this._id = id;
    this._runningTaskCount = 0;
    this._runningIsolatedTask = false;
    this._performingIsolatedTask = false;
    this._queuedIsolatedTask = null;
    this._bufferingStdio = false;
    this._runningTime = 0;
    this._c = null;
    this._stdout = "";
    this._stderr = "";
    this._tasks = {};
    this._onStdOut = bind(this._onStdOut, this);
    this._onStdErr = bind(this._onStdErr, this);
    this._onError = bind(this._onError, this);
    this._onMessage = bind(this._onMessage, this);
}

Worker.prototype.totalRunningTime = function() {
    var ret = this._runningTime;
    var ids = Object.keys(this._tasks);
    var now = Date.now();
    for (var i = 0; i < ids.length; ++i) {
        var task = this._tasks[ids[i]];
        ret += task.estimate === null ? (now - task.started + 1): task.estimate;
    }
    return ret;
};

Worker.prototype._onStdOut = function(data) {
    data = data.toString();
    if (this._bufferingStdio) {
        this._stdout += data;
    } else {
        process.stdout.write(data);
    }
};

Worker.prototype._onStdErr = function(data) {
    data = data.toString();

    if (this._bufferingStdio) {
        this._stderr += data;
    } else {
        process.stderr.write(data);
    }
};

Worker.prototype._onError = function(e) {
    process.stderr.write(e && e.stack && e.stack + "" || (e + ""));
};

Worker.prototype._onMessage = function(payload) {
    var self = this;
    setImmediate(function() {
        self[payload.type].call(self, payload);
    });
};

Worker.prototype.removeListeners = function() {
    var c = this._c;
    c.stdout.removeListener("data", this._onStdOut);
    c.stderr.removeListener("data", this._onStdErr);
    c.removeListener("message", this._onMessage);
    c.removeListener("error", this._onError);
};

Worker.prototype.debug = function(msg, task) {
    debug("worker", this._id, msg, (task.isolated ?
        "isolated" : ""), "task", task.id);
};

Worker.prototype.hasTasks = function() {
    return this.runningTaskCount() > 0 ||
        this._queuedIsolatedTask ||
        this._runningIsolatedTask;
};

Worker.prototype.runningTaskCount = function() {
    return this._runningTaskCount;
};

Worker.prototype.performTask = function(task) {
    if (task !== this._queuedIsolatedTask) {
        assert(!this._runningIsolatedTask);
        if (task.isolated) {
            this._runningIsolatedTask = true;
            if (this.runningTaskCount() > 0) {
                this.debug("queued", task);
                this._queuedIsolatedTask = task;
                return;
            } else {
                assert(this._queuedIsolatedTask === null);
                this._performingIsolatedTask = true;
            }
        }
    } else {
        assert(this.runningTaskCount() === 0);
        assert(this._runningIsolatedTask);
        this._queuedIsolatedTask = null;
        this._performingIsolatedTask = true;
    }
    this._runningTaskCount++;
    assert(this._performingIsolatedTask ? this._runningTaskCount === 1 : true);
    this._tasks[task.id] = task;
    task.started = Date.now();
    this.debug("starts to perform", task);
    this._c.send({
        type: "newTask",
        id: task.id,
        task: task.task,
        isolated: task.isolated,
        log: task.log,
        progress: !!task.progress
    });

};

function getFunctionSource(fn) {
   return (fn + "")
        .replace(/^\s*function\s*\(\s*\)\s*{/, "")
        .replace(/}\s*$/, "");
}

Worker.prototype.init = function() {
    assert(Array.isArray(ARGS));
    assert(!this._c);
    var env = {};
    Object.getOwnPropertyNames(process.env).forEach(function(key) {
        env[key] = process.env[key];
    });
    env.requirePath = this._requirePath;
    if (typeof this._initTaskFn === "function") {
        env.initialCode = getFunctionSource(this._initTaskFn);
    }

    var c = spawn(process.execPath, ARGS, {
        env: env,
        stdio: ["pipe", "pipe", "pipe", "ipc"],
        cwd: this._requirePath
    });
    assert(typeof c.send === "function");
    this._c = c;
    c.stdout.on("data", this._onStdOut);
    c.stderr.on("data", this._onStdErr);
    c.on("error", this._onError);
    c.on("message", this._onMessage);
};

Worker.prototype.taskComplete = function(payload) {
    var task = this._tasks[payload.id];
    this.debug("completed", task);
    delete this._tasks[payload.id];
    this._runningTaskCount--;
    var resolve, result;
    if (payload.isError) {
        resolve = task.reject;
        var err = payload.error;
        if (err.__isErrorInstance__) {
            result = new Error();
            Object.keys(err).forEach(function(key) {
                if (key === "__isErrorInstance__") return;;
                result[key] = err[key];
            });
            result.name = err.name;
            result.stack = err.stack;
            result.message = err.message;
        } else {
            result = err;
        }
    } else {
        resolve = task.resolve;
        result = payload.result;
    }
    if (this._runningIsolatedTask) {
        if (this._queuedIsolatedTask) {
            if (this.runningTaskCount() === 0) {
                this.performTask(this._queuedIsolatedTask);
            }
        } else {
            if (payload.error) {
                result.stdout = this._stdout;
                result.stderr = this._stderr;
            } else {
                result = new ResultWithOutput(result, this._stdout, this._stderr);
            }
            this._stderr = this._stdout = "";
            this._performingIsolatedTask = false;
            this._runningIsolatedTask = false;
            this._bufferingStdio = false;
            this.kill();
            this.init();
        }
    }
    resolve(result);
    if (!this._runningIsolatedTask) {
        jobRunner._workerIdleNotification();
    }
};

Worker.prototype.kill = function() {
    if (this._c) {
        this.removeListeners();
        this._c.kill("SIGKILL");
        this._c = null;
    }
};

Worker.prototype.progress = function(payload) {
    var id = payload.id;
    var task = this._tasks[id];
    if (task && typeof task.progress === "function") {
        task.progress.call(undefined, payload.value);
    }
};

Worker.prototype.outputFlushed = function() {
    this._bufferingStdio = true;
    this._c.send({type: "outputFlushedAck"});
};



function bind(fn, ctx) {return function() {return fn.apply(ctx, arguments); };}


function getTaskFunction(context, code) {
    with (context) {
        return eval( "(" + code + ")");
    }
}

if (require.main === module) {
    var __requirePath = process.env.requirePath;
    var __oldreq = require;
    var __path = require("path");
    require = function(p) {
        if (p.charAt(0) === ".") {
            p = __path.join(__requirePath, p);
        }
        return __oldreq(p);
    };
    if (process.env.initialCode) {
        eval(process.env.initialCode);
    }
    (function() {
        function waitForOutput() {
            return new Promise(function(resolve, reject) {
                var flushCount = 0;
                function onFlushed() {
                    flushCount++;
                    if (flushCount === 2) {
                        resolve();
                    }
                }
                function checkStream(stream) {
                    if (stream.bufferSize === 0) {
                        onFlushed();
                    } else {
                        stream.write("", "utf-8", onFlushed);
                    }
                }
                checkStream(process.stdout);
                checkStream(process.stderr);
            });
        }

        function waitForPreviousOutput(id) {
            return waitForOutput().then(function() {
                var ret = waitForFlushAck(id);
                process.send({type: "outputFlushed", id: id});
                return ret;
            });
        }

        var ackWaitResolve = null;
        function waitForFlushAck(id) {
            assert(ackWaitResolve === null);
            return new Promise(function(resolve) {
                ackWaitResolve = resolve;
            });
        }
        var noop  = function() {};
        var noopWrite = function(_, a, b) {
            if (typeof a === "function") return a();
            if (typeof b === "function") return b();
        };

        function toSerializableError(err) {
            if (err instanceof Error) {
                var ret = Object.create(null);
                Object.keys(err).forEach(function(key){
                    ret[key] = err[key];
                });
                ret.name = err.name;
                ret.stack = err.stack;
                ret.message = err.message;
                ret.__isErrorInstance__ = true;
                return ret;
            } else {
                return err;
            }
        }

        var actions = {
            newTask: function(payload) {
                var task = payload.task;
                var code = task.code;
                var context = task.context;
                var id = payload.id;
                var promise = payload.isolated
                                    ? waitForPreviousOutput(id) : Promise.resolve();


                return promise
                    .then(function() {
                        if (payload.log === false && payload.isolated) {
                            process.stdout.write = noopWrite;
                        }
                        var fn = getTaskFunction(context, code);
                        if (typeof fn !== "function")
                            throw new Error("fn must be function");
                        return fn(payload.progress ? function(value) {
                            process.send({type: "progress", id: id, value: value});
                        } : noop);
                    })
                    .finally(function() {
                        if (payload.isolated) {
                            return waitForOutput();
                        }
                    })
                    .then(function(result) {
                        process.send({
                            type: "taskComplete",
                            id: payload.id,
                            result: result
                        });
                    })
                    .catch(function(error) {
                        process.send({
                            type: "taskComplete",
                            id: payload.id,
                            error: toSerializableError(error),
                            isError: true
                        });
                    });
            },

            outputFlushedAck: function() {
                var resolve = ackWaitResolve;
                ackWaitResolve = null;
                resolve();
            },

            addGlobals: function(payload) {
                new Function(payload.code)();
            }
        };

        process.on("message", function(payload) {
            setImmediate(function() {
                actions[payload.type].call(actions, payload);
            });
        });
    })();
} else {
    module.exports = jobRunner;
    Object.defineProperty(module.exports, "CHILD_PROCESSES", {
        value: CHILD_PROCESSES,
        writable: false
    });
}