mirror of
https://github.com/scratchfoundation/scratch-vm.git
synced 2025-07-20 11:13:04 -04:00
Merge pull request #1587 from cwillisf/util_token-bucket
Add a rate-limited task queue utility
This commit is contained in:
commit
c541e87da5
3 changed files with 218 additions and 0 deletions
162
src/util/task-queue.js
Normal file
162
src/util/task-queue.js
Normal file
|
@ -0,0 +1,162 @@
|
|||
const Timer = require('../util/timer');
|
||||
|
||||
/**
|
||||
* This class uses the token bucket algorithm to control a queue of tasks.
|
||||
*/
|
||||
class TaskQueue {
|
||||
/**
|
||||
* Creates an instance of TaskQueue.
|
||||
* To allow bursts, set `maxTokens` to several times the average task cost.
|
||||
* To prevent bursts, set `maxTokens` to the cost of the largest tasks.
|
||||
* Note that tasks with a cost greater than `maxTokens` will be rejected.
|
||||
*
|
||||
* @param {number} maxTokens - the maximum number of tokens in the bucket (burst size).
|
||||
* @param {number} refillRate - the number of tokens to be added per second (sustain rate).
|
||||
* @param {number} [startingTokens=maxTokens] - the number of tokens the bucket starts with.
|
||||
* @memberof TaskQueue
|
||||
*/
|
||||
constructor (maxTokens, refillRate, startingTokens = maxTokens) {
|
||||
this._maxTokens = maxTokens;
|
||||
this._refillRate = refillRate;
|
||||
this._pendingTaskRecords = [];
|
||||
this._tokenCount = startingTokens;
|
||||
this._timer = new Timer();
|
||||
this._timer.start();
|
||||
this._timeout = null;
|
||||
this._lastUpdateTime = this._timer.timeElapsed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until the token bucket is full enough, then run the provided task.
|
||||
*
|
||||
* @param {Function} task - the task to run.
|
||||
* @param {number} [cost=1] - the number of tokens this task consumes from the bucket.
|
||||
* @returns {Promise} - a promise for the task's return value.
|
||||
* @memberof TaskQueue
|
||||
*/
|
||||
do (task, cost = 1) {
|
||||
const newRecord = {};
|
||||
const promise = new Promise((resolve, reject) => {
|
||||
newRecord.wrappedTask = () => {
|
||||
const canRun = this._refillAndSpend(cost);
|
||||
if (canRun) {
|
||||
// Remove this task from the queue and run it
|
||||
this._pendingTaskRecords.shift();
|
||||
try {
|
||||
resolve(task());
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
|
||||
// Tell the next wrapper to start trying to run its task
|
||||
if (this._pendingTaskRecords.length > 0) {
|
||||
const nextRecord = this._pendingTaskRecords[0];
|
||||
nextRecord.wrappedTask();
|
||||
}
|
||||
} else {
|
||||
// This task can't run yet. Estimate when it will be able to, then try again.
|
||||
newRecord.reject = reject;
|
||||
this._waitUntilAffordable(cost).then(() => newRecord.wrappedTask());
|
||||
}
|
||||
};
|
||||
});
|
||||
this._pendingTaskRecords.push(newRecord);
|
||||
|
||||
if (this._pendingTaskRecords.length === 1) {
|
||||
newRecord.wrappedTask();
|
||||
}
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel all pending tasks, rejecting all their promises.
|
||||
*
|
||||
* @memberof TaskQueue
|
||||
*/
|
||||
cancelAll () {
|
||||
if (this._timeout !== null) {
|
||||
clearTimeout(this._timeout);
|
||||
this._timeout = null;
|
||||
}
|
||||
this._pendingTaskRecords.forEach(r => r.reject());
|
||||
this._pendingTaskRecords = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Shorthand for calling @ _refill() then _spend(cost).
|
||||
*
|
||||
* @see {@link TaskQueue#_refill}
|
||||
* @see {@link TaskQueue#_spend}
|
||||
* @param {number} cost - the number of tokens to try to spend.
|
||||
* @returns {boolean} true if we had enough tokens; false otherwise.
|
||||
* @memberof TaskQueue
|
||||
*/
|
||||
_refillAndSpend (cost) {
|
||||
this._refill();
|
||||
return this._spend(cost);
|
||||
}
|
||||
|
||||
/**
|
||||
* Refill the token bucket based on the amount of time since the last refill.
|
||||
*
|
||||
* @memberof TaskQueue
|
||||
*/
|
||||
_refill () {
|
||||
const now = this._timer.timeElapsed();
|
||||
const timeSinceRefill = now - this._lastUpdateTime;
|
||||
if (timeSinceRefill <= 0) return;
|
||||
|
||||
this._lastUpdateTime = now;
|
||||
this._tokenCount += timeSinceRefill * this._refillRate / 1000;
|
||||
this._tokenCount = Math.min(this._tokenCount, this._maxTokens);
|
||||
}
|
||||
|
||||
/**
|
||||
* If we can "afford" the given cost, subtract that many tokens and return true.
|
||||
* Otherwise, return false.
|
||||
*
|
||||
* @param {number} cost - the number of tokens to try to spend.
|
||||
* @returns {boolean} true if we had enough tokens; false otherwise.
|
||||
* @memberof TaskQueue
|
||||
*/
|
||||
_spend (cost) {
|
||||
if (cost <= this._tokenCount) {
|
||||
this._tokenCount -= cost;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Promise which will resolve when the bucket will be able to "afford" the given cost.
|
||||
* Note that this won't refill the bucket, so make sure to refill after the promise resolves.
|
||||
*
|
||||
* @param {number} cost - wait until the token count is at least this much.
|
||||
* @returns {Promise} - to be resolved once the bucket is due for a token count greater than or equal to the cost.
|
||||
* @memberof TaskQueue
|
||||
*/
|
||||
_waitUntilAffordable (cost) {
|
||||
if (cost <= this._tokenCount) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
if (!(cost <= this._maxTokens)) {
|
||||
return Promise.reject(new Error(`Task cost ${cost} is greater than bucket limit ${this._maxTokens}`));
|
||||
}
|
||||
return new Promise(resolve => {
|
||||
const tokensNeeded = Math.max(cost - this._tokenCount, 0);
|
||||
const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate);
|
||||
|
||||
let timeout = null;
|
||||
const onTimeout = () => {
|
||||
if (this._timeout === timeout) {
|
||||
this._timeout = null;
|
||||
}
|
||||
resolve();
|
||||
};
|
||||
this._timeout = timeout = setTimeout(onTimeout, estimatedWait);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = TaskQueue;
|
15
test/fixtures/test-compare.js
vendored
Normal file
15
test/fixtures/test-compare.js
vendored
Normal file
|
@ -0,0 +1,15 @@
|
|||
const testCompare = (t, lhs, op, rhs, message) => {
|
||||
const details = `Expected: ${lhs} ${op} ${rhs}`;
|
||||
const extra = {details};
|
||||
switch (op) {
|
||||
case '<': return t.ok(lhs < rhs, message, extra);
|
||||
case '<=': return t.ok(lhs <= rhs, message, extra);
|
||||
case '===': return t.ok(lhs === rhs, message, extra);
|
||||
case '!==': return t.ok(lhs !== rhs, message, extra);
|
||||
case '>=': return t.ok(lhs >= rhs, message, extra);
|
||||
case '>': return t.ok(lhs > rhs, message, extra);
|
||||
default: return t.fail(`Unrecognized op: ${op}`);
|
||||
}
|
||||
};
|
||||
|
||||
module.exports = testCompare;
|
41
test/unit/util_task-queue.js
Normal file
41
test/unit/util_task-queue.js
Normal file
|
@ -0,0 +1,41 @@
|
|||
const test = require('tap').test;
|
||||
|
||||
const Timer = require('../../src/util/timer');
|
||||
const TaskQueue = require('../../src/util/task-queue');
|
||||
|
||||
const testCompare = require('../fixtures/test-compare');
|
||||
|
||||
test('constructor', t => {
|
||||
// Max tokens = 1000, refill 1000 tokens per second (1 per millisecond), and start with 0 tokens
|
||||
const bukkit = new TaskQueue(1000, 1000, 0);
|
||||
|
||||
const timer = new Timer();
|
||||
timer.start();
|
||||
|
||||
const taskResults = [];
|
||||
const promises = [];
|
||||
const goodCancelMessage = 'Task was canceled correctly';
|
||||
bukkit.do(() => taskResults.push('nope'), 999).then(
|
||||
() => {
|
||||
t.fail('Task should have been canceled');
|
||||
},
|
||||
() => {
|
||||
taskResults.push(goodCancelMessage);
|
||||
}
|
||||
);
|
||||
bukkit.cancelAll();
|
||||
promises.push(
|
||||
bukkit.do(() => taskResults.push('a'), 50).then(() =>
|
||||
testCompare(t, timer.timeElapsed(), '>=', 50, 'Costly task must wait')
|
||||
),
|
||||
bukkit.do(() => taskResults.push('b'), 10).then(() =>
|
||||
testCompare(t, timer.timeElapsed(), '>=', 60, 'Tasks must run in serial')
|
||||
),
|
||||
bukkit.do(() => taskResults.push('c'), 1).then(() =>
|
||||
testCompare(t, timer.timeElapsed(), '<', 80, 'Cheap task should run soon')
|
||||
)
|
||||
);
|
||||
return Promise.all(promises).then(() => {
|
||||
t.deepEqual(taskResults, [goodCancelMessage, 'a', 'b', 'c'], 'All tasks must run in correct order');
|
||||
});
|
||||
});
|
Loading…
Add table
Add a link
Reference in a new issue