cryptpad/lib/plan.js

236 lines
8.9 KiB
JavaScript

/*
There are many situations where we want to do lots of little jobs
in parallel and with few constraints as to their ordering.
One example is recursing over a bunch of directories and reading files.
The naive way to do this is to recurse over all the subdirectories
relative to a root while adding files to a list. Then to iterate over
the files in that list. Unfortunately, this means holding the complete
list of file paths in memory, which can't possible scale as our database grows.
A better way to do this is to recurse into one directory and
iterate over its contents until there are no more, then to backtrack
to the next directory and repeat until no more directories exist.
This kind of thing is easy enough when you perform one task at a time
and use synchronous code, but with multiple asynchronous tasks it's
easy to introduce subtle bugs.
This module is designed for these situations. It allows you to easily
and efficiently schedule a large number of tasks with an associated
degree of priority from 0 (highest priority) to Number.MAX_SAFE_INTEGER.
Initialize your scheduler with a degree of parallelism, and start planning
some initial jobs. Set it to run and it will keep going until all jobs are
complete, at which point it will optionally execute a 'done' callback.
Getting back to the original example:
List the contents of the root directory, then plan subsequent jobs
with a priority of 1 to recurse into subdirectories. The callback
of each of these recursions can then plan higher priority tasks
to actually process the contained files with a priority of 0.
As long as there are more files scheduled it will continue to process
them first. When there are no more files the scheduler will read
the next directory and repopulate the list of files to process.
This will repeat until everything is done.
// load the module
const Plan = require("./plan");
// instantiate a scheduler with a parallelism of 5
var plan = Plan(5)
// plan the first job which schedules more jobs...
.job(1, function (next) {
listRootDirectory(function (files) {
files.forEach(function (file) {
// highest priority, run as soon as there is a free worker
plan.job(0, function (next) {
processFile(file, function (result) {
console.log(result);
// don't forget to call next
next();
});
});
});
next(); // call 'next' to free up one worker
});
})
// chain commands together if you want
.done(function () {
console.log("DONE");
})
// it won't run unless you launch it
.start();
*/
module.exports = function (max) {
var plan = {};
max = max || 5;
// finds an id that isn't in use in a particular map
// accepts an id in case you have one already chosen
// otherwise generates random new ids if one is not passed
// or if there is a collision
var uid = function (map, id) {
if (typeof(id) === 'undefined') {
id = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
}
if (id && typeof(map[id]) === 'undefined') {
return id;
}
return uid(map);
};
// the queue of jobs is an array, which will be populated
// with maps for each level of priority
var jobs = [];
// the count of currently running jobs
var count = 0;
// a list of callbacks to be executed once everything is done
var completeHandlers = [];
// the recommended usage is to create a new scheduler for every job
// use it for internals in a scope, and let the garbage collector
// clean up when everything stops. This means you shouldn't
// go passing 'plan' around in a long-lived process!
var FINISHED = false;
var done = function () {
// 'done' gets called when there are no more jobs in the queue
// but other jobs might still be running...
// the count of running processes should never be less than zero
// because we guard against multiple callbacks
if (count < 0) { throw new Error("should never happen"); }
// greater than zero is definitely possible, it just means you aren't done yet
if (count !== 0) { return; }
// you will finish twice if you call 'start' a second time
// this behaviour isn't supported yet.
if (FINISHED) { throw new Error('finished twice'); }
FINISHED = true;
// execute all your 'done' callbacks
completeHandlers.forEach(function (f) { f(); });
};
var run;
// this 'next' is internal only.
// it iterates over all known jobs, running them until
// the scheduler achieves the desired amount of parallelism.
// If there are no more jobs it will call 'done'
// which will shortcircuit if there are still pending tasks.
// Whenever any tasks finishes it will return its lock and
// run as many new jobs as are allowed.
var next = function () {
// array.some skips over bare indexes in sparse arrays
var pending = jobs.some(function (bag /*, priority*/) {
if (!bag || typeof(bag) !== 'object') { return; }
// a bag is a map of jobs for any particular degree of priority
// iterate over jobs in the bag until you're out of 'workers'
for (var id in bag) {
// bail out if you hit max parallelism
if (count >= max) { return true; }
run(bag, id, next);
}
});
// check whether you're done if you hit the end of the array
if (!pending) { done(); }
};
// and here's the part that actually handles jobs...
run = function (bag, id) {
// this is just a sanity check.
// there should only ever be jobs in each bag.
if (typeof(bag[id]) !== 'function') {
throw new Error("expected function");
}
// keep a local reference to the function
var f = bag[id];
// remove it from the bag.
delete bag[id];
// increment the count of running jobs
count++;
// guard against it being called twice.
var called = false;
f(function () {
// watch out! it'll bite you.
// maybe this should just return?
// support that option for 'production' ?
if (called) { throw new Error("called twice"); }
// the code below is safe because we can't call back a second time
called = true;
// decrement the count of running jobs...
count--;
// and finally call next to replace this worker with more job(s)
next();
});
};
// this is exposed as API
plan.job = function (priority, cb) {
// you have to pass both the priority (a non-negative number) and an actual job
if (typeof(priority) !== 'number' || priority < 0) { throw new Error('expected a non-negative number'); }
// a job is an asynchronous function that takes a single parameter:
// a 'next' callback which will keep the whole thing going.
// forgetting to call 'next' means you'll never complete.
if (typeof(cb) !== 'function') { throw new Error('expected function'); }
// initialize the specified priority level if it doesn't already exist
var bag = jobs[priority] = jobs[priority] || {};
// choose a random id that isn't already in use for this priority level
var id = uid(bag);
// add the job to this priority level's bag
// most (all?) javascript engines will append this job to the bottom
// of the map. Meaning when we iterate it will be run later than
// other jobs that were scheduled first, effectively making a FIFO queue.
// However, this is undefined behaviour and you shouldn't ever rely on it.
bag[id] = function (next) {
cb(next);
};
// returning 'plan' lets us chain methods together.
return plan;
};
var started = false;
plan.start = function () {
// don't allow multiple starts
// even though it should work, it's simpler not to.
if (started) { return plan; }
// this seems to imply a 'stop' method
// but I don't need it, so I'm not implementing it now --ansuz
started = true;
// start asynchronously, otherwise jobs will start running
// before you've had a chance to return 'plan', and weird things
// happen.
setTimeout(function () {
next();
});
return plan;
};
// you can pass any number of functions to be executed
// when all pending jobs are complete.
// We don't pass any arguments, so you need to handle return values
// yourself if you want them.
plan.done = function (f) {
if (typeof(f) !== 'function') { throw new Error('expected function'); }
completeHandlers.push(f);
return plan;
};
// That's all! I hope you had fun reading this!
return plan;
};