2018-08-31 14:16:20 -04:00
|
|
|
const Timer = require('../util/timer');
|
|
|
|
|
|
|
|
/**
|
|
|
|
* This class uses the token bucket algorithm to control a queue of tasks.
|
|
|
|
*/
|
2018-09-14 22:34:34 -04:00
|
|
|
class TaskQueue {
|
2018-08-31 14:16:20 -04:00
|
|
|
/**
|
2018-09-14 22:34:34 -04:00
|
|
|
* Creates an instance of TaskQueue.
|
|
|
|
* To allow bursts, set `maxTokens` to several times the average task cost.
|
|
|
|
* To prevent bursts, set `maxTokens` to the cost of the largest tasks.
|
|
|
|
* Note that tasks with a cost greater than `maxTokens` will be rejected.
|
|
|
|
*
|
2018-08-31 14:16:20 -04:00
|
|
|
* @param {number} maxTokens - the maximum number of tokens in the bucket (burst size).
|
|
|
|
* @param {number} refillRate - the number of tokens to be added per second (sustain rate).
|
2019-02-12 21:57:58 -05:00
|
|
|
* @param {object} options - optional settings for the new task queue instance.
|
|
|
|
* @property {number} startingTokens - the number of tokens the bucket starts with (default: `maxTokens`).
|
|
|
|
* @property {number} maxTotalCost - reject a task if total queue cost would pass this limit (default: no limit).
|
2018-09-14 22:34:34 -04:00
|
|
|
* @memberof TaskQueue
|
2018-08-31 14:16:20 -04:00
|
|
|
*/
|
2019-02-12 21:57:58 -05:00
|
|
|
constructor (maxTokens, refillRate, options = null) {
|
2018-08-31 14:16:20 -04:00
|
|
|
this._maxTokens = maxTokens;
|
|
|
|
this._refillRate = refillRate;
|
2018-09-14 22:10:17 -04:00
|
|
|
this._pendingTaskRecords = [];
|
2019-02-12 21:57:58 -05:00
|
|
|
this._tokenCount = options.hasOwnProperty('startingTokens') ? options.startingTokens : maxTokens;
|
|
|
|
this._maxTotalCost = options.hasOwnProperty('maxTotalCost') ? options.maxTotalCost : Infinity;
|
2018-08-31 14:16:20 -04:00
|
|
|
this._timer = new Timer();
|
|
|
|
this._timer.start();
|
2018-09-14 22:10:17 -04:00
|
|
|
this._timeout = null;
|
2018-08-31 14:16:20 -04:00
|
|
|
this._lastUpdateTime = this._timer.timeElapsed();
|
2019-02-11 15:46:48 -05:00
|
|
|
|
|
|
|
this._runTasks = this._runTasks.bind(this);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Get the number of queued tasks which have not yet started.
|
2019-02-12 20:37:04 -05:00
|
|
|
*
|
2019-02-11 15:46:48 -05:00
|
|
|
* @readonly
|
|
|
|
* @memberof TaskQueue
|
|
|
|
*/
|
|
|
|
get length () {
|
|
|
|
return this._pendingTaskRecords.length;
|
2018-08-31 14:16:20 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Wait until the token bucket is full enough, then run the provided task.
|
|
|
|
*
|
|
|
|
* @param {Function} task - the task to run.
|
|
|
|
* @param {number} [cost=1] - the number of tokens this task consumes from the bucket.
|
|
|
|
* @returns {Promise} - a promise for the task's return value.
|
2018-09-14 22:34:34 -04:00
|
|
|
* @memberof TaskQueue
|
2018-08-31 14:16:20 -04:00
|
|
|
*/
|
|
|
|
do (task, cost = 1) {
|
2019-02-12 21:57:58 -05:00
|
|
|
if (this._maxTotalCost < Infinity) {
|
|
|
|
const currentTotalCost = this._pendingTaskRecords.reduce((t, r) => t + r.cost, 0);
|
|
|
|
if (currentTotalCost + cost > this._maxTotalCost) {
|
|
|
|
return Promise.reject('Maximum total cost exceeded');
|
|
|
|
}
|
|
|
|
}
|
2019-02-11 15:46:48 -05:00
|
|
|
const newRecord = {
|
|
|
|
cost
|
|
|
|
};
|
2019-02-12 20:37:04 -05:00
|
|
|
newRecord.promise = new Promise((resolve, reject) => {
|
2019-02-11 15:46:48 -05:00
|
|
|
newRecord.cancel = () => {
|
|
|
|
reject(new Error('Task canceled'));
|
|
|
|
};
|
2018-08-31 14:16:20 -04:00
|
|
|
|
2019-02-11 15:46:48 -05:00
|
|
|
// The caller, `_runTasks()`, is responsible for cost-checking and spending tokens.
|
|
|
|
newRecord.wrappedTask = () => {
|
|
|
|
try {
|
|
|
|
resolve(task());
|
|
|
|
} catch (e) {
|
|
|
|
reject(e);
|
2018-08-31 14:16:20 -04:00
|
|
|
}
|
|
|
|
};
|
|
|
|
});
|
2018-09-14 22:10:17 -04:00
|
|
|
this._pendingTaskRecords.push(newRecord);
|
2018-08-31 14:16:20 -04:00
|
|
|
|
2019-02-11 15:46:48 -05:00
|
|
|
// If the queue has been idle we need to prime the pump
|
2018-09-14 22:10:17 -04:00
|
|
|
if (this._pendingTaskRecords.length === 1) {
|
2019-02-11 15:46:48 -05:00
|
|
|
this._runTasks();
|
2018-08-31 14:16:20 -04:00
|
|
|
}
|
|
|
|
|
2019-02-12 20:37:04 -05:00
|
|
|
return newRecord.promise;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Cancel one pending task, rejecting its promise.
|
|
|
|
*
|
|
|
|
* @param {Promise} taskPromise - the promise returned by `do()`.
|
|
|
|
* @returns {boolean} - true if the task was found, or false otherwise.
|
|
|
|
* @memberof TaskQueue
|
|
|
|
*/
|
|
|
|
cancel (taskPromise) {
|
|
|
|
const taskIndex = this._pendingTaskRecords.findIndex(r => r.promise === taskPromise);
|
|
|
|
if (taskIndex !== -1) {
|
|
|
|
const [taskRecord] = this._pendingTaskRecords.splice(taskIndex, 1);
|
|
|
|
taskRecord.cancel();
|
|
|
|
if (taskIndex === 0 && this._pendingTaskRecords.length > 0) {
|
|
|
|
this._runTasks();
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
2018-08-31 14:16:20 -04:00
|
|
|
}
|
|
|
|
|
2018-09-14 22:10:17 -04:00
|
|
|
/**
|
|
|
|
* Cancel all pending tasks, rejecting all their promises.
|
2018-09-14 22:34:34 -04:00
|
|
|
*
|
|
|
|
* @memberof TaskQueue
|
2018-09-14 22:10:17 -04:00
|
|
|
*/
|
|
|
|
cancelAll () {
|
|
|
|
if (this._timeout !== null) {
|
2019-02-11 15:46:48 -05:00
|
|
|
this._timer.clearTimeout(this._timeout);
|
2018-09-14 22:10:17 -04:00
|
|
|
this._timeout = null;
|
|
|
|
}
|
2019-02-11 15:46:48 -05:00
|
|
|
const oldTasks = this._pendingTaskRecords;
|
2018-09-14 22:10:17 -04:00
|
|
|
this._pendingTaskRecords = [];
|
2019-02-11 15:46:48 -05:00
|
|
|
oldTasks.forEach(r => r.cancel());
|
2018-09-14 22:10:17 -04:00
|
|
|
}
|
|
|
|
|
2018-08-31 14:16:20 -04:00
|
|
|
/**
|
2019-02-11 15:46:48 -05:00
|
|
|
* Shorthand for calling _refill() then _spend(cost).
|
2018-09-14 22:34:34 -04:00
|
|
|
*
|
|
|
|
* @see {@link TaskQueue#_refill}
|
|
|
|
* @see {@link TaskQueue#_spend}
|
2018-08-31 14:16:20 -04:00
|
|
|
* @param {number} cost - the number of tokens to try to spend.
|
|
|
|
* @returns {boolean} true if we had enough tokens; false otherwise.
|
2018-09-14 22:34:34 -04:00
|
|
|
* @memberof TaskQueue
|
2018-08-31 14:16:20 -04:00
|
|
|
*/
|
|
|
|
_refillAndSpend (cost) {
|
|
|
|
this._refill();
|
|
|
|
return this._spend(cost);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Refill the token bucket based on the amount of time since the last refill.
|
2018-09-14 22:34:34 -04:00
|
|
|
*
|
|
|
|
* @memberof TaskQueue
|
2018-08-31 14:16:20 -04:00
|
|
|
*/
|
|
|
|
_refill () {
|
|
|
|
const now = this._timer.timeElapsed();
|
|
|
|
const timeSinceRefill = now - this._lastUpdateTime;
|
|
|
|
if (timeSinceRefill <= 0) return;
|
|
|
|
|
|
|
|
this._lastUpdateTime = now;
|
|
|
|
this._tokenCount += timeSinceRefill * this._refillRate / 1000;
|
|
|
|
this._tokenCount = Math.min(this._tokenCount, this._maxTokens);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* If we can "afford" the given cost, subtract that many tokens and return true.
|
|
|
|
* Otherwise, return false.
|
2018-09-14 22:34:34 -04:00
|
|
|
*
|
2018-08-31 14:16:20 -04:00
|
|
|
* @param {number} cost - the number of tokens to try to spend.
|
|
|
|
* @returns {boolean} true if we had enough tokens; false otherwise.
|
2018-09-14 22:34:34 -04:00
|
|
|
* @memberof TaskQueue
|
2018-08-31 14:16:20 -04:00
|
|
|
*/
|
|
|
|
_spend (cost) {
|
|
|
|
if (cost <= this._tokenCount) {
|
|
|
|
this._tokenCount -= cost;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2019-02-11 15:46:48 -05:00
|
|
|
* Loop until the task queue is empty, running each task and spending tokens to do so.
|
|
|
|
* Any time the bucket can't afford the next task, delay asynchronously until it can.
|
2019-02-12 20:37:04 -05:00
|
|
|
*
|
2018-09-14 22:34:34 -04:00
|
|
|
* @memberof TaskQueue
|
2018-08-31 14:16:20 -04:00
|
|
|
*/
|
2019-02-11 15:46:48 -05:00
|
|
|
_runTasks () {
|
|
|
|
if (this._timeout) {
|
|
|
|
this._timer.clearTimeout(this._timeout);
|
|
|
|
this._timeout = null;
|
2018-08-31 14:16:20 -04:00
|
|
|
}
|
2019-02-11 15:46:48 -05:00
|
|
|
for (;;) {
|
|
|
|
const nextRecord = this._pendingTaskRecords.shift();
|
|
|
|
if (!nextRecord) {
|
|
|
|
// We ran out of work. Go idle until someone adds another task to the queue.
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if (nextRecord.cost > this._maxTokens) {
|
|
|
|
throw new Error(`Task cost ${nextRecord.cost} is greater than bucket limit ${this._maxTokens}`);
|
|
|
|
}
|
|
|
|
// Refill before each task in case the time it took for the last task to run was enough to afford the next.
|
|
|
|
if (this._refillAndSpend(nextRecord.cost)) {
|
|
|
|
nextRecord.wrappedTask();
|
|
|
|
} else {
|
|
|
|
// We can't currently afford this task. Put it back and wait until we can and try again.
|
|
|
|
this._pendingTaskRecords.unshift(nextRecord);
|
|
|
|
const tokensNeeded = Math.max(nextRecord.cost - this._tokenCount, 0);
|
|
|
|
const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate);
|
|
|
|
this._timeout = this._timer.setTimeout(this._runTasks, estimatedWait);
|
|
|
|
return;
|
|
|
|
}
|
2018-08-31 14:16:20 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-14 22:34:34 -04:00
|
|
|
module.exports = TaskQueue;
|