This commit is contained in:
Christopher Willis-Ford 2025-05-08 12:29:22 -07:00 committed by GitHub
commit 7f5a2cbbe4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 75 additions and 9 deletions

View file

@ -1,5 +1,7 @@
const Timer = require('../util/timer');
const noop = () => {};
/**
* This class uses the token bucket algorithm to control a queue of tasks.
*/
@ -48,10 +50,37 @@ class TaskQueue {
*
* @param {Function} task - the task to run.
* @param {number} [cost=1] - the number of tokens this task consumes from the bucket.
* @returns {Promise} - a promise for the task's return value.
* @returns {Promise} - a promise for the task's return value, or `undefined` if the task gets canceled.
* @memberof TaskQueue
*/
do (task, cost = 1) {
return this._do(task, false, cost);
}
/**
* Wait until the token bucket is full enough, then run the provided task.
*
* @param {Function} task - the task to run.
* @param {number} [cost=1] - the number of tokens this task consumes from the bucket.
* @returns {Promise} - a promise for the task's return value. This promise will reject if the task gets canceled.
* @memberof TaskQueue
*/
doOrReject (task, cost = 1) {
return this._do(task, true, cost);
}
/**
* Wait until the token bucket is full enough, then run the provided task.
*
* @param {Function} task - the task to run.
* @param {boolean} rejectOnCancel - choose what happens if the queue cancels this task:
* true: reject the promise returned by this function
* false: resolve the promise returned by this function with a value of `undefined`
* @param {number} [cost=1] - the number of tokens this task consumes from the bucket.
* @returns {Promise} - a promise for the task's return value.
* @memberof TaskQueue
*/
_do (task, rejectOnCancel, cost) {
if (this._maxTotalCost < Infinity) {
const currentTotalCost = this._pendingTaskRecords.reduce((t, r) => t + r.cost, 0);
if (currentTotalCost + cost > this._maxTotalCost) {
@ -62,9 +91,13 @@ class TaskQueue {
cost
};
newRecord.promise = new Promise((resolve, reject) => {
newRecord.cancel = () => {
reject(new Error('Task canceled'));
};
if (rejectOnCancel) {
newRecord.cancel = () => {
reject(new Error('Task canceled'));
};
} else {
newRecord.cancel = noop;
}
// The caller, `_runTasks()`, is responsible for cost-checking and spending tokens.
newRecord.wrappedTask = () => {

View file

@ -76,13 +76,13 @@ test('run tasks', async t => {
});
});
test('cancel', async t => {
test('cancel with doOrReject', async t => {
const bukkit = makeTestQueue();
const taskResults = [];
const goodCancelMessage = 'Task was canceled correctly';
const afterCancelMessage = 'Task was run correctly';
const cancelTaskPromise = bukkit.do(
const cancelTaskPromise = bukkit.doOrReject(
() => {
taskResults.push('nope');
}, 999);
@ -94,7 +94,7 @@ test('cancel', async t => {
taskResults.push(goodCancelMessage);
}
);
const keepTaskPromise = bukkit.do(
const keepTaskPromise = bukkit.doOrReject(
() => {
taskResults.push(afterCancelMessage);
testCompare(t, bukkit._timer.timeElapsed(), '<', 10, 'Canceled task must not delay other tasks');
@ -118,6 +118,39 @@ test('cancel', async t => {
});
});
test('cancel with do (no reject)', async t => {
const bukkit = makeTestQueue();
const taskResults = [];
const afterCancelMessage = 'Task was run correctly';
const cancelTaskPromise = bukkit.do(
() => {
taskResults.push('nope');
}, 999);
const keepTaskPromise = bukkit.do(
() => {
taskResults.push(afterCancelMessage);
testCompare(t, bukkit._timer.timeElapsed(), '<', 10, 'Canceled task must not delay other tasks');
}, 5);
// give the bucket a chance to make a mistake
await bukkit._timer.advanceMockTimeAsync(1);
t.equal(bukkit.length, 2);
const taskWasCanceled = bukkit.cancel(cancelTaskPromise);
t.ok(taskWasCanceled);
t.equal(bukkit.length, 1);
while (bukkit.length > 0) {
await bukkit._timer.advanceMockTimeAsync(1);
}
return keepTaskPromise.then(() => {
t.deepEqual(taskResults, [afterCancelMessage]);
t.end();
});
});
test('cancelAll', async t => {
const bukkit = makeTestQueue();
@ -126,7 +159,7 @@ test('cancelAll', async t => {
const goodCancelMessage2 = 'Task2 was canceled correctly';
const promises = [
bukkit.do(() => taskResults.push('nope'), 999).then(
bukkit.doOrReject(() => taskResults.push('nope'), 999).then(
() => {
t.fail('Task1 should have been canceled');
},
@ -134,7 +167,7 @@ test('cancelAll', async t => {
taskResults.push(goodCancelMessage1);
}
),
bukkit.do(() => taskResults.push('nah'), 999).then(
bukkit.doOrReject(() => taskResults.push('nah'), 999).then(
() => {
t.fail('Task2 should have been canceled');
},