scratch-vm/src/util/task-queue.js
2019-02-15 15:20:38 -08:00

201 lines
7.1 KiB
JavaScript

const Timer = require('../util/timer');
/**
* This class uses the token bucket algorithm to control a queue of tasks.
*/
class TaskQueue {
/**
* Creates an instance of TaskQueue.
* To allow bursts, set `maxTokens` to several times the average task cost.
* To prevent bursts, set `maxTokens` to the cost of the largest tasks.
* Note that tasks with a cost greater than `maxTokens` will be rejected.
*
* @param {number} maxTokens - the maximum number of tokens in the bucket (burst size).
* @param {number} refillRate - the number of tokens to be added per second (sustain rate).
* @param {object} options - optional settings for the new task queue instance.
* @property {number} startingTokens - the number of tokens the bucket starts with (default: `maxTokens`).
* @property {number} maxTotalCost - reject a task if total queue cost would pass this limit (default: no limit).
* @memberof TaskQueue
*/
constructor (maxTokens, refillRate, options = {}) {
this._maxTokens = maxTokens;
this._refillRate = refillRate;
this._pendingTaskRecords = [];
this._tokenCount = options.hasOwnProperty('startingTokens') ? options.startingTokens : maxTokens;
this._maxTotalCost = options.hasOwnProperty('maxTotalCost') ? options.maxTotalCost : Infinity;
this._timer = new Timer();
this._timer.start();
this._timeout = null;
this._lastUpdateTime = this._timer.timeElapsed();
this._runTasks = this._runTasks.bind(this);
}
/**
* Get the number of queued tasks which have not yet started.
*
* @readonly
* @memberof TaskQueue
*/
get length () {
return this._pendingTaskRecords.length;
}
/**
* Wait until the token bucket is full enough, then run the provided task.
*
* @param {Function} task - the task to run.
* @param {number} [cost=1] - the number of tokens this task consumes from the bucket.
* @returns {Promise} - a promise for the task's return value.
* @memberof TaskQueue
*/
do (task, cost = 1) {
if (this._maxTotalCost < Infinity) {
const currentTotalCost = this._pendingTaskRecords.reduce((t, r) => t + r.cost, 0);
if (currentTotalCost + cost > this._maxTotalCost) {
return Promise.reject('Maximum total cost exceeded');
}
}
const newRecord = {
cost
};
newRecord.promise = new Promise((resolve, reject) => {
newRecord.cancel = () => {
reject(new Error('Task canceled'));
};
// The caller, `_runTasks()`, is responsible for cost-checking and spending tokens.
newRecord.wrappedTask = () => {
try {
resolve(task());
} catch (e) {
reject(e);
}
};
});
this._pendingTaskRecords.push(newRecord);
// If the queue has been idle we need to prime the pump
if (this._pendingTaskRecords.length === 1) {
this._runTasks();
}
return newRecord.promise;
}
/**
* Cancel one pending task, rejecting its promise.
*
* @param {Promise} taskPromise - the promise returned by `do()`.
* @returns {boolean} - true if the task was found, or false otherwise.
* @memberof TaskQueue
*/
cancel (taskPromise) {
const taskIndex = this._pendingTaskRecords.findIndex(r => r.promise === taskPromise);
if (taskIndex !== -1) {
const [taskRecord] = this._pendingTaskRecords.splice(taskIndex, 1);
taskRecord.cancel();
if (taskIndex === 0 && this._pendingTaskRecords.length > 0) {
this._runTasks();
}
return true;
}
return false;
}
/**
* Cancel all pending tasks, rejecting all their promises.
*
* @memberof TaskQueue
*/
cancelAll () {
if (this._timeout !== null) {
this._timer.clearTimeout(this._timeout);
this._timeout = null;
}
const oldTasks = this._pendingTaskRecords;
this._pendingTaskRecords = [];
oldTasks.forEach(r => r.cancel());
}
/**
* Shorthand for calling _refill() then _spend(cost).
*
* @see {@link TaskQueue#_refill}
* @see {@link TaskQueue#_spend}
* @param {number} cost - the number of tokens to try to spend.
* @returns {boolean} true if we had enough tokens; false otherwise.
* @memberof TaskQueue
*/
_refillAndSpend (cost) {
this._refill();
return this._spend(cost);
}
/**
* Refill the token bucket based on the amount of time since the last refill.
*
* @memberof TaskQueue
*/
_refill () {
const now = this._timer.timeElapsed();
const timeSinceRefill = now - this._lastUpdateTime;
if (timeSinceRefill <= 0) return;
this._lastUpdateTime = now;
this._tokenCount += timeSinceRefill * this._refillRate / 1000;
this._tokenCount = Math.min(this._tokenCount, this._maxTokens);
}
/**
* If we can "afford" the given cost, subtract that many tokens and return true.
* Otherwise, return false.
*
* @param {number} cost - the number of tokens to try to spend.
* @returns {boolean} true if we had enough tokens; false otherwise.
* @memberof TaskQueue
*/
_spend (cost) {
if (cost <= this._tokenCount) {
this._tokenCount -= cost;
return true;
}
return false;
}
/**
* Loop until the task queue is empty, running each task and spending tokens to do so.
* Any time the bucket can't afford the next task, delay asynchronously until it can.
*
* @memberof TaskQueue
*/
_runTasks () {
if (this._timeout) {
this._timer.clearTimeout(this._timeout);
this._timeout = null;
}
for (;;) {
const nextRecord = this._pendingTaskRecords.shift();
if (!nextRecord) {
// We ran out of work. Go idle until someone adds another task to the queue.
return;
}
if (nextRecord.cost > this._maxTokens) {
throw new Error(`Task cost ${nextRecord.cost} is greater than bucket limit ${this._maxTokens}`);
}
// Refill before each task in case the time it took for the last task to run was enough to afford the next.
if (this._refillAndSpend(nextRecord.cost)) {
nextRecord.wrappedTask();
} else {
// We can't currently afford this task. Put it back and wait until we can and try again.
this._pendingTaskRecords.unshift(nextRecord);
const tokensNeeded = Math.max(nextRecord.cost - this._tokenCount, 0);
const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate);
this._timeout = this._timer.setTimeout(this._runTasks, estimatedWait);
return;
}
}
}
}
module.exports = TaskQueue;