TaskQueue: support canceling a single task

This commit is contained in:
Christopher Willis-Ford 2019-02-12 17:37:04 -08:00
parent d27dc5d54a
commit 2a9a3b11b9
4 changed files with 122 additions and 47 deletions

View file

@ -30,6 +30,7 @@ class TaskQueue {
/** /**
* Get the number of queued tasks which have not yet started. * Get the number of queued tasks which have not yet started.
*
* @readonly * @readonly
* @memberof TaskQueue * @memberof TaskQueue
*/ */
@ -49,12 +50,8 @@ class TaskQueue {
const newRecord = { const newRecord = {
cost cost
}; };
const promise = new Promise((resolve, reject) => { newRecord.promise = new Promise((resolve, reject) => {
newRecord.cancel = () => { newRecord.cancel = () => {
const index = this._pendingTaskRecords.indexOf(newRecord);
if (index >= 0) {
this._pendingTaskRecords.splice(index, 1);
}
reject(new Error('Task canceled')); reject(new Error('Task canceled'));
}; };
@ -74,7 +71,27 @@ class TaskQueue {
this._runTasks(); this._runTasks();
} }
return promise; return newRecord.promise;
}
/**
* Cancel one pending task, rejecting its promise.
*
* @param {Promise} taskPromise - the promise returned by `do()`.
* @returns {boolean} - true if the task was found, or false otherwise.
* @memberof TaskQueue
*/
cancel (taskPromise) {
const taskIndex = this._pendingTaskRecords.findIndex(r => r.promise === taskPromise);
if (taskIndex !== -1) {
const [taskRecord] = this._pendingTaskRecords.splice(taskIndex, 1);
taskRecord.cancel();
if (taskIndex === 0 && this._pendingTaskRecords.length > 0) {
this._runTasks();
}
return true;
}
return false;
} }
/** /**
@ -140,6 +157,7 @@ class TaskQueue {
/** /**
* Loop until the task queue is empty, running each task and spending tokens to do so. * Loop until the task queue is empty, running each task and spending tokens to do so.
* Any time the bucket can't afford the next task, delay asynchronously until it can. * Any time the bucket can't afford the next task, delay asynchronously until it can.
*
* @memberof TaskQueue * @memberof TaskQueue
*/ */
_runTasks () { _runTasks () {

View file

@ -44,17 +44,32 @@ class MockTimer {
/** /**
* Advance this MockTimer's idea of "current time", running timeout handlers if appropriate. * Advance this MockTimer's idea of "current time", running timeout handlers if appropriate.
* @param {number} delta - the amount of time to add to the current mock time value, in milliseconds. *
* @param {number} milliseconds - the amount of time to add to the current mock time value, in milliseconds.
* @memberof MockTimer * @memberof MockTimer
*/ */
advanceMockTime (delta) { advanceMockTime (milliseconds) {
if (delta < 0) { if (milliseconds < 0) {
throw new Error('Time may not move backward'); throw new Error('Time may not move backward');
} }
this._mockTime += delta; this._mockTime += milliseconds;
this._runTimeouts(); this._runTimeouts();
} }
/**
* Advance this MockTimer's idea of "current time", running timeout handlers if appropriate.
*
* @param {number} milliseconds - the amount of time to add to the current mock time value, in milliseconds.
* @returns {Promise} - promise which resolves after timeout handlers have had an opportunity to run.
* @memberof MockTimer
*/
advanceMockTimeAsync (milliseconds) {
return new Promise(resolve => {
this.advanceMockTime(milliseconds);
global.setTimeout(resolve, 0);
});
}
/** /**
* @returns {number} - current mock time elapsed since 1 January 1970 00:00:00 UTC. * @returns {number} - current mock time elapsed since 1 January 1970 00:00:00 UTC.
* @memberof MockTimer * @memberof MockTimer

View file

@ -16,6 +16,8 @@ test('spec', t => {
t.type(timer.clearTimeout, 'function'); t.type(timer.clearTimeout, 'function');
// A few members of MockTimer have no Timer equivalent and should only be used in tests. // A few members of MockTimer have no Timer equivalent and should only be used in tests.
t.type(timer.advanceMockTime, 'function');
t.type(timer.advanceMockTimeAsync, 'function');
t.type(timer.hasTimeouts, 'function'); t.type(timer.hasTimeouts, 'function');
t.end(); t.end();

View file

@ -26,12 +26,86 @@ test('spec', t => {
t.type(bukkit.length, 'number'); t.type(bukkit.length, 'number');
t.type(bukkit.do, 'function'); t.type(bukkit.do, 'function');
t.type(bukkit.cancel, 'function');
t.type(bukkit.cancelAll, 'function'); t.type(bukkit.cancelAll, 'function');
t.end(); t.end();
}); });
test('cancelAll', t => { test('run tasks', async t => {
const bukkit = makeTestQueue();
const taskResults = [];
const promises = [
bukkit.do(() => {
taskResults.push('a');
testCompare(t, bukkit._timer.timeElapsed(), '>=', 50, 'Costly task must wait');
}, 50),
bukkit.do(() => {
taskResults.push('b');
testCompare(t, bukkit._timer.timeElapsed(), '>=', 60, 'Tasks must run in serial');
}, 10),
bukkit.do(() => {
taskResults.push('c');
testCompare(t, bukkit._timer.timeElapsed(), '<=', 70, 'Cheap task should run soon');
}, 1)
];
// advance 10 simulated milliseconds per JS tick
while (bukkit.length > 0) {
await bukkit._timer.advanceMockTimeAsync(10);
}
return Promise.all(promises).then(() => {
t.deepEqual(taskResults, ['a', 'b', 'c'], 'All tasks must run in correct order');
t.end();
});
});
test('cancel', async t => {
const bukkit = makeTestQueue();
const taskResults = [];
const goodCancelMessage = 'Task was canceled correctly';
const afterCancelMessage = 'Task was run correctly';
const cancelTaskPromise = bukkit.do(
() => {
taskResults.push('nope');
}, 999);
const cancelCheckPromise = cancelTaskPromise.then(
() => {
t.fail('Task should have been canceled');
},
() => {
taskResults.push(goodCancelMessage);
}
);
const keepTaskPromise = bukkit.do(
() => {
taskResults.push(afterCancelMessage);
testCompare(t, bukkit._timer.timeElapsed(), '<', 10, 'Canceled task must not delay other tasks');
}, 5);
// give the bucket a chance to make a mistake
await bukkit._timer.advanceMockTimeAsync(1);
t.equal(bukkit.length, 2);
const taskWasCanceled = bukkit.cancel(cancelTaskPromise);
t.ok(taskWasCanceled);
t.equal(bukkit.length, 1);
while (bukkit.length > 0) {
await bukkit._timer.advanceMockTimeAsync(1);
}
return Promise.all([cancelCheckPromise, keepTaskPromise]).then(() => {
t.deepEqual(taskResults, [goodCancelMessage, afterCancelMessage]);
t.end();
});
});
test('cancelAll', async t => {
const bukkit = makeTestQueue(); const bukkit = makeTestQueue();
const taskResults = []; const taskResults = [];
@ -58,12 +132,12 @@ test('cancelAll', t => {
]; ];
// advance time, but not enough that any task should run // advance time, but not enough that any task should run
bukkit._timer.advanceMockTime(100); await bukkit._timer.advanceMockTimeAsync(100);
bukkit.cancelAll(); bukkit.cancelAll();
// advance enough that both tasks would run if they hadn't been canceled // advance enough that both tasks would run if they hadn't been canceled
bukkit._timer.advanceMockTime(10000); await bukkit._timer.advanceMockTimeAsync(10000);
return Promise.all(promises).then(() => { return Promise.all(promises).then(() => {
t.deepEqual(taskResults, [goodCancelMessage1, goodCancelMessage2], 'Tasks should cancel in order'); t.deepEqual(taskResults, [goodCancelMessage1, goodCancelMessage2], 'Tasks should cancel in order');
@ -71,37 +145,3 @@ test('cancelAll', t => {
}); });
}); });
test('run tasks', t => {
const bukkit = makeTestQueue();
const taskResults = [];
const promises = [
bukkit.do(() => {
taskResults.push('a');
testCompare(t, bukkit._timer.timeElapsed(), '>=', 50, 'Costly task must wait');
}, 50),
bukkit.do(() => {
taskResults.push('b');
testCompare(t, bukkit._timer.timeElapsed(), '>=', 60, 'Tasks must run in serial');
}, 10),
bukkit.do(() => {
taskResults.push('c');
testCompare(t, bukkit._timer.timeElapsed(), '<=', 70, 'Cheap task should run soon');
}, 1)
];
// advance 10 simulated milliseconds per JS tick
const step = () => {
bukkit._timer.advanceMockTime(10);
if (bukkit.length > 0) {
global.setTimeout(step, 0);
}
};
step();
return Promise.all(promises).then(() => {
t.deepEqual(taskResults, ['a', 'b', 'c'], 'All tasks must run in correct order');
t.end();
});
});