diff --git a/src/util/task-queue.js b/src/util/task-queue.js new file mode 100644 index 000000000..db890e329 --- /dev/null +++ b/src/util/task-queue.js @@ -0,0 +1,162 @@ +const Timer = require('../util/timer'); + +/** + * This class uses the token bucket algorithm to control a queue of tasks. + */ +class TaskQueue { + /** + * Creates an instance of TaskQueue. + * To allow bursts, set `maxTokens` to several times the average task cost. + * To prevent bursts, set `maxTokens` to the cost of the largest tasks. + * Note that tasks with a cost greater than `maxTokens` will be rejected. + * + * @param {number} maxTokens - the maximum number of tokens in the bucket (burst size). + * @param {number} refillRate - the number of tokens to be added per second (sustain rate). + * @param {number} [startingTokens=maxTokens] - the number of tokens the bucket starts with. + * @memberof TaskQueue + */ + constructor (maxTokens, refillRate, startingTokens = maxTokens) { + this._maxTokens = maxTokens; + this._refillRate = refillRate; + this._pendingTaskRecords = []; + this._tokenCount = startingTokens; + this._timer = new Timer(); + this._timer.start(); + this._timeout = null; + this._lastUpdateTime = this._timer.timeElapsed(); + } + + /** + * Wait until the token bucket is full enough, then run the provided task. + * + * @param {Function} task - the task to run. + * @param {number} [cost=1] - the number of tokens this task consumes from the bucket. + * @returns {Promise} - a promise for the task's return value. + * @memberof TaskQueue + */ + do (task, cost = 1) { + const newRecord = {}; + const promise = new Promise((resolve, reject) => { + newRecord.wrappedTask = () => { + const canRun = this._refillAndSpend(cost); + if (canRun) { + // Remove this task from the queue and run it + this._pendingTaskRecords.shift(); + try { + resolve(task()); + } catch (e) { + reject(e); + } + + // Tell the next wrapper to start trying to run its task + if (this._pendingTaskRecords.length > 0) { + const nextRecord = this._pendingTaskRecords[0]; + nextRecord.wrappedTask(); + } + } else { + // This task can't run yet. Estimate when it will be able to, then try again. + newRecord.reject = reject; + this._waitUntilAffordable(cost).then(() => newRecord.wrappedTask()); + } + }; + }); + this._pendingTaskRecords.push(newRecord); + + if (this._pendingTaskRecords.length === 1) { + newRecord.wrappedTask(); + } + + return promise; + } + + /** + * Cancel all pending tasks, rejecting all their promises. + * + * @memberof TaskQueue + */ + cancelAll () { + if (this._timeout !== null) { + clearTimeout(this._timeout); + this._timeout = null; + } + this._pendingTaskRecords.forEach(r => r.reject()); + this._pendingTaskRecords = []; + } + + /** + * Shorthand for calling @ _refill() then _spend(cost). + * + * @see {@link TaskQueue#_refill} + * @see {@link TaskQueue#_spend} + * @param {number} cost - the number of tokens to try to spend. + * @returns {boolean} true if we had enough tokens; false otherwise. + * @memberof TaskQueue + */ + _refillAndSpend (cost) { + this._refill(); + return this._spend(cost); + } + + /** + * Refill the token bucket based on the amount of time since the last refill. + * + * @memberof TaskQueue + */ + _refill () { + const now = this._timer.timeElapsed(); + const timeSinceRefill = now - this._lastUpdateTime; + if (timeSinceRefill <= 0) return; + + this._lastUpdateTime = now; + this._tokenCount += timeSinceRefill * this._refillRate / 1000; + this._tokenCount = Math.min(this._tokenCount, this._maxTokens); + } + + /** + * If we can "afford" the given cost, subtract that many tokens and return true. + * Otherwise, return false. + * + * @param {number} cost - the number of tokens to try to spend. + * @returns {boolean} true if we had enough tokens; false otherwise. + * @memberof TaskQueue + */ + _spend (cost) { + if (cost <= this._tokenCount) { + this._tokenCount -= cost; + return true; + } + return false; + } + + /** + * Create a Promise which will resolve when the bucket will be able to "afford" the given cost. + * Note that this won't refill the bucket, so make sure to refill after the promise resolves. + * + * @param {number} cost - wait until the token count is at least this much. + * @returns {Promise} - to be resolved once the bucket is due for a token count greater than or equal to the cost. + * @memberof TaskQueue + */ + _waitUntilAffordable (cost) { + if (cost <= this._tokenCount) { + return Promise.resolve(); + } + if (!(cost <= this._maxTokens)) { + return Promise.reject(new Error(`Task cost ${cost} is greater than bucket limit ${this._maxTokens}`)); + } + return new Promise(resolve => { + const tokensNeeded = Math.max(cost - this._tokenCount, 0); + const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate); + + let timeout = null; + const onTimeout = () => { + if (this._timeout === timeout) { + this._timeout = null; + } + resolve(); + }; + this._timeout = timeout = setTimeout(onTimeout, estimatedWait); + }); + } +} + +module.exports = TaskQueue; diff --git a/test/fixtures/test-compare.js b/test/fixtures/test-compare.js new file mode 100644 index 000000000..4aa625ea7 --- /dev/null +++ b/test/fixtures/test-compare.js @@ -0,0 +1,15 @@ +const testCompare = (t, lhs, op, rhs, message) => { + const details = `Expected: ${lhs} ${op} ${rhs}`; + const extra = {details}; + switch (op) { + case '<': return t.ok(lhs < rhs, message, extra); + case '<=': return t.ok(lhs <= rhs, message, extra); + case '===': return t.ok(lhs === rhs, message, extra); + case '!==': return t.ok(lhs !== rhs, message, extra); + case '>=': return t.ok(lhs >= rhs, message, extra); + case '>': return t.ok(lhs > rhs, message, extra); + default: return t.fail(`Unrecognized op: ${op}`); + } +}; + +module.exports = testCompare; diff --git a/test/unit/util_task-queue.js b/test/unit/util_task-queue.js new file mode 100644 index 000000000..2e67770fd --- /dev/null +++ b/test/unit/util_task-queue.js @@ -0,0 +1,41 @@ +const test = require('tap').test; + +const Timer = require('../../src/util/timer'); +const TaskQueue = require('../../src/util/task-queue'); + +const testCompare = require('../fixtures/test-compare'); + +test('constructor', t => { + // Max tokens = 1000, refill 1000 tokens per second (1 per millisecond), and start with 0 tokens + const bukkit = new TaskQueue(1000, 1000, 0); + + const timer = new Timer(); + timer.start(); + + const taskResults = []; + const promises = []; + const goodCancelMessage = 'Task was canceled correctly'; + bukkit.do(() => taskResults.push('nope'), 999).then( + () => { + t.fail('Task should have been canceled'); + }, + () => { + taskResults.push(goodCancelMessage); + } + ); + bukkit.cancelAll(); + promises.push( + bukkit.do(() => taskResults.push('a'), 50).then(() => + testCompare(t, timer.timeElapsed(), '>=', 50, 'Costly task must wait') + ), + bukkit.do(() => taskResults.push('b'), 10).then(() => + testCompare(t, timer.timeElapsed(), '>=', 60, 'Tasks must run in serial') + ), + bukkit.do(() => taskResults.push('c'), 1).then(() => + testCompare(t, timer.timeElapsed(), '<', 80, 'Cheap task should run soon') + ) + ); + return Promise.all(promises).then(() => { + t.deepEqual(taskResults, [goodCancelMessage, 'a', 'b', 'c'], 'All tasks must run in correct order'); + }); +});