diff --git a/src/util/task-queue.js b/src/util/task-queue.js index accddfbbb..8013961a3 100644 --- a/src/util/task-queue.js +++ b/src/util/task-queue.js @@ -24,6 +24,17 @@ class TaskQueue { this._timer.start(); this._timeout = null; this._lastUpdateTime = this._timer.timeElapsed(); + + this._runTasks = this._runTasks.bind(this); + } + + /** + * Get the number of queued tasks which have not yet started. + * @readonly + * @memberof TaskQueue + */ + get length () { + return this._pendingTaskRecords.length; } /** @@ -35,35 +46,32 @@ class TaskQueue { * @memberof TaskQueue */ do (task, cost = 1) { - const newRecord = {}; + const newRecord = { + cost + }; const promise = new Promise((resolve, reject) => { - newRecord.wrappedTask = () => { - const canRun = this._refillAndSpend(cost); - if (canRun) { - // Remove this task from the queue and run it - this._pendingTaskRecords.shift(); - try { - resolve(task()); - } catch (e) { - reject(e); - } + newRecord.cancel = () => { + const index = this._pendingTaskRecords.indexOf(newRecord); + if (index >= 0) { + this._pendingTaskRecords.splice(index, 1); + } + reject(new Error('Task canceled')); + }; - // Tell the next wrapper to start trying to run its task - if (this._pendingTaskRecords.length > 0) { - const nextRecord = this._pendingTaskRecords[0]; - nextRecord.wrappedTask(); - } - } else { - // This task can't run yet. Estimate when it will be able to, then try again. - newRecord.reject = reject; - this._waitUntilAffordable(cost).then(() => newRecord.wrappedTask()); + // The caller, `_runTasks()`, is responsible for cost-checking and spending tokens. + newRecord.wrappedTask = () => { + try { + resolve(task()); + } catch (e) { + reject(e); } }; }); this._pendingTaskRecords.push(newRecord); + // If the queue has been idle we need to prime the pump if (this._pendingTaskRecords.length === 1) { - newRecord.wrappedTask(); + this._runTasks(); } return promise; @@ -76,15 +84,16 @@ class TaskQueue { */ cancelAll () { if (this._timeout !== null) { - clearTimeout(this._timeout); + this._timer.clearTimeout(this._timeout); this._timeout = null; } - this._pendingTaskRecords.forEach(r => r.reject()); + const oldTasks = this._pendingTaskRecords; this._pendingTaskRecords = []; + oldTasks.forEach(r => r.cancel()); } /** - * Shorthand for calling @ _refill() then _spend(cost). + * Shorthand for calling _refill() then _spend(cost). * * @see {@link TaskQueue#_refill} * @see {@link TaskQueue#_spend} @@ -129,33 +138,36 @@ class TaskQueue { } /** - * Create a Promise which will resolve when the bucket will be able to "afford" the given cost. - * Note that this won't refill the bucket, so make sure to refill after the promise resolves. - * - * @param {number} cost - wait until the token count is at least this much. - * @returns {Promise} - to be resolved once the bucket is due for a token count greater than or equal to the cost. + * Loop until the task queue is empty, running each task and spending tokens to do so. + * Any time the bucket can't afford the next task, delay asynchronously until it can. * @memberof TaskQueue */ - _waitUntilAffordable (cost) { - if (cost <= this._tokenCount) { - return Promise.resolve(); + _runTasks () { + if (this._timeout) { + this._timer.clearTimeout(this._timeout); + this._timeout = null; } - if (!(cost <= this._maxTokens)) { - return Promise.reject(new Error(`Task cost ${cost} is greater than bucket limit ${this._maxTokens}`)); + for (;;) { + const nextRecord = this._pendingTaskRecords.shift(); + if (!nextRecord) { + // We ran out of work. Go idle until someone adds another task to the queue. + return; + } + if (nextRecord.cost > this._maxTokens) { + throw new Error(`Task cost ${nextRecord.cost} is greater than bucket limit ${this._maxTokens}`); + } + // Refill before each task in case the time it took for the last task to run was enough to afford the next. + if (this._refillAndSpend(nextRecord.cost)) { + nextRecord.wrappedTask(); + } else { + // We can't currently afford this task. Put it back and wait until we can and try again. + this._pendingTaskRecords.unshift(nextRecord); + const tokensNeeded = Math.max(nextRecord.cost - this._tokenCount, 0); + const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate); + this._timeout = this._timer.setTimeout(this._runTasks, estimatedWait); + return; + } } - return new Promise(resolve => { - const tokensNeeded = Math.max(cost - this._tokenCount, 0); - const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate); - - let timeout = null; - const onTimeout = () => { - if (this._timeout === timeout) { - this._timeout = null; - } - resolve(); - }; - this._timeout = timeout = this._timer.setTimeout(onTimeout, estimatedWait); - }); } } diff --git a/test/unit/util_task-queue.js b/test/unit/util_task-queue.js index 4dd68c3b5..d9ac12eb7 100644 --- a/test/unit/util_task-queue.js +++ b/test/unit/util_task-queue.js @@ -5,48 +5,103 @@ const TaskQueue = require('../../src/util/task-queue'); const MockTimer = require('../fixtures/mock-timer'); const testCompare = require('../fixtures/test-compare'); -test('constructor', t => { - // Max tokens = 1000, refill 1000 tokens per second (1 per millisecond), and start with 0 tokens +// Max tokens = 1000 +// Refill 1000 tokens per second (1 per millisecond) +// Token bucket starts empty +const makeTestQueue = () => { const bukkit = new TaskQueue(1000, 1000, 0); const mockTimer = new MockTimer(); bukkit._timer = mockTimer; mockTimer.start(); + return bukkit; +}; + +test('spec', t => { + t.type(TaskQueue, 'function'); + const bukkit = makeTestQueue(); + + t.type(bukkit, 'object'); + + t.type(bukkit.length, 'number'); + t.type(bukkit.do, 'function'); + t.type(bukkit.cancelAll, 'function'); + + t.end(); +}); + +test('cancelAll', t => { + const bukkit = makeTestQueue(); + const taskResults = []; - const promises = []; - const goodCancelMessage = 'Task was canceled correctly'; - bukkit.do(() => taskResults.push('nope'), 999).then( - () => { - t.fail('Task should have been canceled'); - }, - () => { - taskResults.push(goodCancelMessage); - } - ); + const goodCancelMessage1 = 'Task1 was canceled correctly'; + const goodCancelMessage2 = 'Task2 was canceled correctly'; + + const promises = [ + bukkit.do(() => taskResults.push('nope'), 999).then( + () => { + t.fail('Task1 should have been canceled'); + }, + () => { + taskResults.push(goodCancelMessage1); + } + ), + bukkit.do(() => taskResults.push('nah'), 999).then( + () => { + t.fail('Task2 should have been canceled'); + }, + () => { + taskResults.push(goodCancelMessage2); + } + ) + ]; + + // advance time, but not enough that any task should run + bukkit._timer.advanceMockTime(100); + bukkit.cancelAll(); - promises.push( - bukkit.do(() => taskResults.push('a'), 50).then(() => { + + // advance enough that both tasks would run if they hadn't been canceled + bukkit._timer.advanceMockTime(10000); + + return Promise.all(promises).then(() => { + t.deepEqual(taskResults, [goodCancelMessage1, goodCancelMessage2], 'Tasks should cancel in order'); + t.end(); + }); +}); + +test('run tasks', t => { + const bukkit = makeTestQueue(); + + const taskResults = []; + + const promises = [ + bukkit.do(() => { + taskResults.push('a'); testCompare(t, bukkit._timer.timeElapsed(), '>=', 50, 'Costly task must wait'); - }), - bukkit.do(() => taskResults.push('b'), 10).then(() => { + }, 50), + bukkit.do(() => { + taskResults.push('b'); testCompare(t, bukkit._timer.timeElapsed(), '>=', 60, 'Tasks must run in serial'); - }), - bukkit.do(() => taskResults.push('c'), 1).then(() => { - testCompare(t, bukkit._timer.timeElapsed(), '<=', 80, 'Cheap task should run soon'); - }) - ); + }, 10), + bukkit.do(() => { + taskResults.push('c'); + testCompare(t, bukkit._timer.timeElapsed(), '<=', 70, 'Cheap task should run soon'); + }, 1) + ]; // advance 10 simulated milliseconds per JS tick const step = () => { - mockTimer.advanceMockTime(10); - if (mockTimer.timeElapsed() < 100) { + bukkit._timer.advanceMockTime(10); + if (bukkit.length > 0) { global.setTimeout(step, 0); } }; step(); return Promise.all(promises).then(() => { - t.deepEqual(taskResults, [goodCancelMessage, 'a', 'b', 'c'], 'All tasks must run in correct order'); + t.deepEqual(taskResults, ['a', 'b', 'c'], 'All tasks must run in correct order'); + t.end(); }); });