diff --git a/src/util/task-queue.js b/src/util/task-queue.js index db890e329..0b452ffb9 100644 --- a/src/util/task-queue.js +++ b/src/util/task-queue.js @@ -12,18 +12,33 @@ class TaskQueue { * * @param {number} maxTokens - the maximum number of tokens in the bucket (burst size). * @param {number} refillRate - the number of tokens to be added per second (sustain rate). - * @param {number} [startingTokens=maxTokens] - the number of tokens the bucket starts with. + * @param {object} options - optional settings for the new task queue instance. + * @property {number} startingTokens - the number of tokens the bucket starts with (default: `maxTokens`). + * @property {number} maxTotalCost - reject a task if total queue cost would pass this limit (default: no limit). * @memberof TaskQueue */ - constructor (maxTokens, refillRate, startingTokens = maxTokens) { + constructor (maxTokens, refillRate, options = {}) { this._maxTokens = maxTokens; this._refillRate = refillRate; this._pendingTaskRecords = []; - this._tokenCount = startingTokens; + this._tokenCount = options.hasOwnProperty('startingTokens') ? options.startingTokens : maxTokens; + this._maxTotalCost = options.hasOwnProperty('maxTotalCost') ? options.maxTotalCost : Infinity; this._timer = new Timer(); this._timer.start(); this._timeout = null; this._lastUpdateTime = this._timer.timeElapsed(); + + this._runTasks = this._runTasks.bind(this); + } + + /** + * Get the number of queued tasks which have not yet started. + * + * @readonly + * @memberof TaskQueue + */ + get length () { + return this._pendingTaskRecords.length; } /** @@ -35,38 +50,57 @@ class TaskQueue { * @memberof TaskQueue */ do (task, cost = 1) { - const newRecord = {}; - const promise = new Promise((resolve, reject) => { - newRecord.wrappedTask = () => { - const canRun = this._refillAndSpend(cost); - if (canRun) { - // Remove this task from the queue and run it - this._pendingTaskRecords.shift(); - try { - resolve(task()); - } catch (e) { - reject(e); - } + if (this._maxTotalCost < Infinity) { + const currentTotalCost = this._pendingTaskRecords.reduce((t, r) => t + r.cost, 0); + if (currentTotalCost + cost > this._maxTotalCost) { + return Promise.reject('Maximum total cost exceeded'); + } + } + const newRecord = { + cost + }; + newRecord.promise = new Promise((resolve, reject) => { + newRecord.cancel = () => { + reject(new Error('Task canceled')); + }; - // Tell the next wrapper to start trying to run its task - if (this._pendingTaskRecords.length > 0) { - const nextRecord = this._pendingTaskRecords[0]; - nextRecord.wrappedTask(); - } - } else { - // This task can't run yet. Estimate when it will be able to, then try again. - newRecord.reject = reject; - this._waitUntilAffordable(cost).then(() => newRecord.wrappedTask()); + // The caller, `_runTasks()`, is responsible for cost-checking and spending tokens. + newRecord.wrappedTask = () => { + try { + resolve(task()); + } catch (e) { + reject(e); } }; }); this._pendingTaskRecords.push(newRecord); + // If the queue has been idle we need to prime the pump if (this._pendingTaskRecords.length === 1) { - newRecord.wrappedTask(); + this._runTasks(); } - return promise; + return newRecord.promise; + } + + /** + * Cancel one pending task, rejecting its promise. + * + * @param {Promise} taskPromise - the promise returned by `do()`. + * @returns {boolean} - true if the task was found, or false otherwise. + * @memberof TaskQueue + */ + cancel (taskPromise) { + const taskIndex = this._pendingTaskRecords.findIndex(r => r.promise === taskPromise); + if (taskIndex !== -1) { + const [taskRecord] = this._pendingTaskRecords.splice(taskIndex, 1); + taskRecord.cancel(); + if (taskIndex === 0 && this._pendingTaskRecords.length > 0) { + this._runTasks(); + } + return true; + } + return false; } /** @@ -76,15 +110,16 @@ class TaskQueue { */ cancelAll () { if (this._timeout !== null) { - clearTimeout(this._timeout); + this._timer.clearTimeout(this._timeout); this._timeout = null; } - this._pendingTaskRecords.forEach(r => r.reject()); + const oldTasks = this._pendingTaskRecords; this._pendingTaskRecords = []; + oldTasks.forEach(r => r.cancel()); } /** - * Shorthand for calling @ _refill() then _spend(cost). + * Shorthand for calling _refill() then _spend(cost). * * @see {@link TaskQueue#_refill} * @see {@link TaskQueue#_spend} @@ -129,33 +164,37 @@ class TaskQueue { } /** - * Create a Promise which will resolve when the bucket will be able to "afford" the given cost. - * Note that this won't refill the bucket, so make sure to refill after the promise resolves. + * Loop until the task queue is empty, running each task and spending tokens to do so. + * Any time the bucket can't afford the next task, delay asynchronously until it can. * - * @param {number} cost - wait until the token count is at least this much. - * @returns {Promise} - to be resolved once the bucket is due for a token count greater than or equal to the cost. * @memberof TaskQueue */ - _waitUntilAffordable (cost) { - if (cost <= this._tokenCount) { - return Promise.resolve(); + _runTasks () { + if (this._timeout) { + this._timer.clearTimeout(this._timeout); + this._timeout = null; } - if (!(cost <= this._maxTokens)) { - return Promise.reject(new Error(`Task cost ${cost} is greater than bucket limit ${this._maxTokens}`)); + for (;;) { + const nextRecord = this._pendingTaskRecords.shift(); + if (!nextRecord) { + // We ran out of work. Go idle until someone adds another task to the queue. + return; + } + if (nextRecord.cost > this._maxTokens) { + throw new Error(`Task cost ${nextRecord.cost} is greater than bucket limit ${this._maxTokens}`); + } + // Refill before each task in case the time it took for the last task to run was enough to afford the next. + if (this._refillAndSpend(nextRecord.cost)) { + nextRecord.wrappedTask(); + } else { + // We can't currently afford this task. Put it back and wait until we can and try again. + this._pendingTaskRecords.unshift(nextRecord); + const tokensNeeded = Math.max(nextRecord.cost - this._tokenCount, 0); + const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate); + this._timeout = this._timer.setTimeout(this._runTasks, estimatedWait); + return; + } } - return new Promise(resolve => { - const tokensNeeded = Math.max(cost - this._tokenCount, 0); - const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate); - - let timeout = null; - const onTimeout = () => { - if (this._timeout === timeout) { - this._timeout = null; - } - resolve(); - }; - this._timeout = timeout = setTimeout(onTimeout, estimatedWait); - }); } } diff --git a/src/util/timer.js b/src/util/timer.js index 4ed1d82af..fba3e94e9 100644 --- a/src/util/timer.js +++ b/src/util/timer.js @@ -90,6 +90,25 @@ class Timer { timeElapsed () { return this.nowObj.now() - this.startTime; } + + /** + * Call a handler function after a specified amount of time has elapsed. + * @param {function} handler - function to call after the timeout + * @param {number} timeout - number of milliseconds to delay before calling the handler + * @returns {number} - the ID of the new timeout + */ + setTimeout (handler, timeout) { + return global.setTimeout(handler, timeout); + } + + /** + * Clear a timeout from the pending timeout pool. + * @param {number} timeoutId - the ID returned by `setTimeout()` + * @memberof Timer + */ + clearTimeout (timeoutId) { + global.clearTimeout(timeoutId); + } } module.exports = Timer; diff --git a/test/fixtures/mock-timer.js b/test/fixtures/mock-timer.js new file mode 100644 index 000000000..c0e3ae495 --- /dev/null +++ b/test/fixtures/mock-timer.js @@ -0,0 +1,167 @@ +/** + * Mimic the Timer class with external control of the "time" value, allowing tests to run more quickly and + * reliably. Multiple instances of this class operate independently: they may report different time values, and + * advancing one timer will not trigger timeouts set on another. + */ +class MockTimer { + /** + * Creates an instance of MockTimer. + * @param {*} [nowObj=null] - alert the caller that this parameter, supported by Timer, is not supported here. + * @memberof MockTimer + */ + constructor (nowObj = null) { + if (nowObj) { + throw new Error('nowObj is not implemented in MockTimer'); + } + + /** + * The fake "current time" value, in epoch milliseconds. + * @type {number} + */ + this._mockTime = 0; + + /** + * Used to store the start time of a timer action. + * Updated when calling `timer.start`. + * @type {number} + */ + this.startTime = 0; + + /** + * The ID to use the next time `setTimeout` is called. + * @type {number} + */ + this._nextTimeoutId = 1; + + /** + * Map of timeout ID to pending timeout callback info. + * @type {Map.} + * @property {number} time - the time at/after which this handler should run + * @property {Function} handler - the handler to call when the time comes + */ + this._timeouts = new Map(); + } + + /** + * Advance this MockTimer's idea of "current time", running timeout handlers if appropriate. + * + * @param {number} milliseconds - the amount of time to add to the current mock time value, in milliseconds. + * @memberof MockTimer + */ + advanceMockTime (milliseconds) { + if (milliseconds < 0) { + throw new Error('Time may not move backward'); + } + this._mockTime += milliseconds; + this._runTimeouts(); + } + + /** + * Advance this MockTimer's idea of "current time", running timeout handlers if appropriate. + * + * @param {number} milliseconds - the amount of time to add to the current mock time value, in milliseconds. + * @returns {Promise} - promise which resolves after timeout handlers have had an opportunity to run. + * @memberof MockTimer + */ + advanceMockTimeAsync (milliseconds) { + return new Promise(resolve => { + this.advanceMockTime(milliseconds); + global.setTimeout(resolve, 0); + }); + } + + /** + * @returns {number} - current mock time elapsed since 1 January 1970 00:00:00 UTC. + * @memberof MockTimer + */ + time () { + return this._mockTime; + } + + /** + * Returns a time accurate relative to other times produced by this function. + * @returns {number} ms-scale accurate time relative to other relative times. + * @memberof MockTimer + */ + relativeTime () { + return this._mockTime; + } + + /** + * Start a timer for measuring elapsed time. + * @memberof MockTimer + */ + start () { + this.startTime = this._mockTime; + } + + /** + * @returns {number} - the time elapsed since `start()` was called. + * @memberof MockTimer + */ + timeElapsed () { + return this._mockTime - this.startTime; + } + + /** + * Call a handler function after a specified amount of time has elapsed. + * Guaranteed to happen in between "ticks" of JavaScript. + * @param {function} handler - function to call after the timeout + * @param {number} timeout - number of milliseconds to delay before calling the handler + * @returns {number} - the ID of the new timeout. + * @memberof MockTimer + */ + setTimeout (handler, timeout) { + const timeoutId = this._nextTimeoutId++; + this._timeouts.set(timeoutId, { + time: this._mockTime + timeout, + handler + }); + this._runTimeouts(); + return timeoutId; + } + + /** + * Clear a particular timeout from the pending timeout pool. + * @param {number} timeoutId - the value returned from `setTimeout()` + * @memberof MockTimer + */ + clearTimeout (timeoutId) { + this._timeouts.delete(timeoutId); + } + + /** + * WARNING: this method has no equivalent in `Timer`. Do not use this method outside of tests! + * @returns {boolean} - true if there are any pending timeouts, false otherwise. + * @memberof MockTimer + */ + hasTimeouts () { + return this._timeouts.size > 0; + } + + /** + * Run any timeout handlers whose timeouts have expired. + * @memberof MockTimer + */ + _runTimeouts () { + const ready = []; + + this._timeouts.forEach((timeoutRecord, timeoutId) => { + const isReady = timeoutRecord.time <= this._mockTime; + if (isReady) { + ready.push(timeoutRecord); + this._timeouts.delete(timeoutId); + } + }); + + // sort so that earlier timeouts run before later timeouts + ready.sort((a, b) => a.time < b.time); + + // next tick, call everything that's ready + global.setTimeout(() => { + ready.forEach(o => o.handler()); + }, 0); + } +} + +module.exports = MockTimer; diff --git a/test/unit/mock-timer.js b/test/unit/mock-timer.js new file mode 100644 index 000000000..db5878942 --- /dev/null +++ b/test/unit/mock-timer.js @@ -0,0 +1,91 @@ +const test = require('tap').test; +const MockTimer = require('../fixtures/mock-timer'); + +test('spec', t => { + const timer = new MockTimer(); + + t.type(MockTimer, 'function'); + t.type(timer, 'object'); + + // Most members of MockTimer mimic members of Timer. + t.type(timer.startTime, 'number'); + t.type(timer.time, 'function'); + t.type(timer.start, 'function'); + t.type(timer.timeElapsed, 'function'); + t.type(timer.setTimeout, 'function'); + t.type(timer.clearTimeout, 'function'); + + // A few members of MockTimer have no Timer equivalent and should only be used in tests. + t.type(timer.advanceMockTime, 'function'); + t.type(timer.advanceMockTimeAsync, 'function'); + t.type(timer.hasTimeouts, 'function'); + + t.end(); +}); + +test('time', t => { + const timer = new MockTimer(); + const delta = 1; + + const time1 = timer.time(); + const time2 = timer.time(); + timer.advanceMockTime(delta); + const time3 = timer.time(); + + t.equal(time1, time2); + t.equal(time2 + delta, time3); + t.end(); +}); + +test('start / timeElapsed', t => new Promise(resolve => { + const timer = new MockTimer(); + const halfDelay = 1; + const fullDelay = halfDelay + halfDelay; + + timer.start(); + + let timeoutCalled = 0; + + // Wait and measure timer + timer.setTimeout(() => { + t.equal(timeoutCalled, 0); + ++timeoutCalled; + + const timeElapsed = timer.timeElapsed(); + t.equal(timeElapsed, fullDelay); + t.end(); + + resolve(); + }, fullDelay); + + // this should not trigger the callback + timer.advanceMockTime(halfDelay); + + // give the mock timer a chance to run tasks + global.setTimeout(() => { + // we've only mock-waited for half the delay so it should not have run yet + t.equal(timeoutCalled, 0); + + // this should trigger the callback + timer.advanceMockTime(halfDelay); + }, 0); +})); + +test('clearTimeout / hasTimeouts', t => new Promise((resolve, reject) => { + const timer = new MockTimer(); + + const timeoutId = timer.setTimeout(() => { + reject(new Error('Canceled task ran')); + }, 1); + + timer.setTimeout(() => { + resolve('Non-canceled task ran'); + t.end(); + }, 2); + + timer.clearTimeout(timeoutId); + + while (timer.hasTimeouts()) { + timer.advanceMockTime(1); + } +})); diff --git a/test/unit/util_task-queue.js b/test/unit/util_task-queue.js index ab8b5c42c..6e484a7a4 100644 --- a/test/unit/util_task-queue.js +++ b/test/unit/util_task-queue.js @@ -1,21 +1,92 @@ -const test = require('tap').skip; +const test = require('tap').test; const TaskQueue = require('../../src/util/task-queue'); +const MockTimer = require('../fixtures/mock-timer'); const testCompare = require('../fixtures/test-compare'); -test('constructor', t => { - // Max tokens = 1000, refill 1000 tokens per second (1 per millisecond), and start with 0 tokens - const bukkit = new TaskQueue(1000, 1000, 0); +// Max tokens = 1000 +// Refill 1000 tokens per second (1 per millisecond) +// Token bucket starts empty +// Max total cost of queued tasks = 10000 tokens = 10 seconds +const makeTestQueue = () => { + const bukkit = new TaskQueue(1000, 1000, { + startingTokens: 0, + maxTotalCost: 10000 + }); - // Simulate time passing with a stubbed timer - const simulatedTimeStart = Date.now(); - bukkit._timer = {timeElapsed: () => Date.now() - simulatedTimeStart}; + const mockTimer = new MockTimer(); + bukkit._timer = mockTimer; + mockTimer.start(); + + return bukkit; +}; + +test('spec', t => { + t.type(TaskQueue, 'function'); + const bukkit = makeTestQueue(); + + t.type(bukkit, 'object'); + + t.type(bukkit.length, 'number'); + t.type(bukkit.do, 'function'); + t.type(bukkit.cancel, 'function'); + t.type(bukkit.cancelAll, 'function'); + + t.end(); +}); + +test('constructor', t => { + t.ok(new TaskQueue(1, 1)); + t.ok(new TaskQueue(1, 1, {})); + t.ok(new TaskQueue(1, 1, {startingTokens: 0})); + t.ok(new TaskQueue(1, 1, {maxTotalCost: 999})); + t.ok(new TaskQueue(1, 1, {startingTokens: 0, maxTotalCost: 999})); + t.end(); +}); + +test('run tasks', async t => { + const bukkit = makeTestQueue(); + + const taskResults = []; + + const promises = [ + bukkit.do(() => { + taskResults.push('a'); + testCompare(t, bukkit._timer.timeElapsed(), '>=', 50, 'Costly task must wait'); + }, 50), + bukkit.do(() => { + taskResults.push('b'); + testCompare(t, bukkit._timer.timeElapsed(), '>=', 60, 'Tasks must run in serial'); + }, 10), + bukkit.do(() => { + taskResults.push('c'); + testCompare(t, bukkit._timer.timeElapsed(), '<=', 70, 'Cheap task should run soon'); + }, 1) + ]; + + // advance 10 simulated milliseconds per JS tick + while (bukkit.length > 0) { + await bukkit._timer.advanceMockTimeAsync(10); + } + + return Promise.all(promises).then(() => { + t.deepEqual(taskResults, ['a', 'b', 'c'], 'All tasks must run in correct order'); + t.end(); + }); +}); + +test('cancel', async t => { + const bukkit = makeTestQueue(); const taskResults = []; - const promises = []; const goodCancelMessage = 'Task was canceled correctly'; - bukkit.do(() => taskResults.push('nope'), 999).then( + const afterCancelMessage = 'Task was run correctly'; + const cancelTaskPromise = bukkit.do( + () => { + taskResults.push('nope'); + }, 999); + const cancelCheckPromise = cancelTaskPromise.then( () => { t.fail('Task should have been canceled'); }, @@ -23,19 +94,99 @@ test('constructor', t => { taskResults.push(goodCancelMessage); } ); - bukkit.cancelAll(); - promises.push( - bukkit.do(() => taskResults.push('a'), 50).then(() => - testCompare(t, bukkit._timer.timeElapsed(), '>=', 50, 'Costly task must wait') - ), - bukkit.do(() => taskResults.push('b'), 10).then(() => - testCompare(t, bukkit._timer.timeElapsed(), '>=', 60, 'Tasks must run in serial') - ), - bukkit.do(() => taskResults.push('c'), 1).then(() => - testCompare(t, bukkit._timer.timeElapsed(), '<', 80, 'Cheap task should run soon') - ) - ); - return Promise.all(promises).then(() => { - t.deepEqual(taskResults, [goodCancelMessage, 'a', 'b', 'c'], 'All tasks must run in correct order'); + const keepTaskPromise = bukkit.do( + () => { + taskResults.push(afterCancelMessage); + testCompare(t, bukkit._timer.timeElapsed(), '<', 10, 'Canceled task must not delay other tasks'); + }, 5); + + // give the bucket a chance to make a mistake + await bukkit._timer.advanceMockTimeAsync(1); + + t.equal(bukkit.length, 2); + const taskWasCanceled = bukkit.cancel(cancelTaskPromise); + t.ok(taskWasCanceled); + t.equal(bukkit.length, 1); + + while (bukkit.length > 0) { + await bukkit._timer.advanceMockTimeAsync(1); + } + + return Promise.all([cancelCheckPromise, keepTaskPromise]).then(() => { + t.deepEqual(taskResults, [goodCancelMessage, afterCancelMessage]); + t.end(); }); }); + +test('cancelAll', async t => { + const bukkit = makeTestQueue(); + + const taskResults = []; + const goodCancelMessage1 = 'Task1 was canceled correctly'; + const goodCancelMessage2 = 'Task2 was canceled correctly'; + + const promises = [ + bukkit.do(() => taskResults.push('nope'), 999).then( + () => { + t.fail('Task1 should have been canceled'); + }, + () => { + taskResults.push(goodCancelMessage1); + } + ), + bukkit.do(() => taskResults.push('nah'), 999).then( + () => { + t.fail('Task2 should have been canceled'); + }, + () => { + taskResults.push(goodCancelMessage2); + } + ) + ]; + + // advance time, but not enough that any task should run + await bukkit._timer.advanceMockTimeAsync(100); + + bukkit.cancelAll(); + + // advance enough that both tasks would run if they hadn't been canceled + await bukkit._timer.advanceMockTimeAsync(10000); + + return Promise.all(promises).then(() => { + t.deepEqual(taskResults, [goodCancelMessage1, goodCancelMessage2], 'Tasks should cancel in order'); + t.end(); + }); +}); + +test('max total cost', async t => { + const bukkit = makeTestQueue(); + + let numTasks = 0; + + const task = () => ++numTasks; + + // Fill the queue + for (let i = 0; i < 10; ++i) { + bukkit.do(task, 1000); + } + + // This one should be rejected because the queue is full + bukkit + .do(task, 1000) + .then( + () => { + t.fail('Full queue did not reject task'); + }, + () => { + t.pass(); + } + ); + + while (bukkit.length > 0) { + await bukkit._timer.advanceMockTimeAsync(1000); + } + + // this should be 10 if the last task is rejected or 11 if it runs + t.equal(numTasks, 10); + t.end(); +}); diff --git a/test/unit/util_timer.js b/test/unit/util_timer.js index bf6d6b9ac..a6665fadd 100644 --- a/test/unit/util_timer.js +++ b/test/unit/util_timer.js @@ -11,6 +11,8 @@ test('spec', t => { t.type(timer.time, 'function'); t.type(timer.start, 'function'); t.type(timer.timeElapsed, 'function'); + t.type(timer.setTimeout, 'function'); + t.type(timer.clearTimeout, 'function'); t.end(); }); @@ -32,7 +34,7 @@ test('start / timeElapsed', t => { timer.start(); // Wait and measure timer - setTimeout(() => { + timer.setTimeout(() => { const timeElapsed = timer.timeElapsed(); t.ok(timeElapsed >= 0); t.ok(timeElapsed >= (delay - threshold) && @@ -40,3 +42,15 @@ test('start / timeElapsed', t => { t.end(); }, delay); }); + +test('setTimeout / clearTimeout', t => new Promise((resolve, reject) => { + const timer = new Timer(); + const cancelId = timer.setTimeout(() => { + reject(new Error('Canceled task ran')); + }, 1); + timer.setTimeout(() => { + resolve('Non-canceled task ran'); + t.end(); + }, 2); + timer.clearTimeout(cancelId); +}));