mirror of
https://github.com/scratchfoundation/scratch-vm.git
synced 2024-12-23 06:23:37 -05:00
Merge pull request #1998 from cwillisf/task-queue-enhancements
Task queue enhancements
This commit is contained in:
commit
bb42c0019c
6 changed files with 556 additions and 75 deletions
|
@ -12,18 +12,33 @@ class TaskQueue {
|
||||||
*
|
*
|
||||||
* @param {number} maxTokens - the maximum number of tokens in the bucket (burst size).
|
* @param {number} maxTokens - the maximum number of tokens in the bucket (burst size).
|
||||||
* @param {number} refillRate - the number of tokens to be added per second (sustain rate).
|
* @param {number} refillRate - the number of tokens to be added per second (sustain rate).
|
||||||
* @param {number} [startingTokens=maxTokens] - the number of tokens the bucket starts with.
|
* @param {object} options - optional settings for the new task queue instance.
|
||||||
|
* @property {number} startingTokens - the number of tokens the bucket starts with (default: `maxTokens`).
|
||||||
|
* @property {number} maxTotalCost - reject a task if total queue cost would pass this limit (default: no limit).
|
||||||
* @memberof TaskQueue
|
* @memberof TaskQueue
|
||||||
*/
|
*/
|
||||||
constructor (maxTokens, refillRate, startingTokens = maxTokens) {
|
constructor (maxTokens, refillRate, options = {}) {
|
||||||
this._maxTokens = maxTokens;
|
this._maxTokens = maxTokens;
|
||||||
this._refillRate = refillRate;
|
this._refillRate = refillRate;
|
||||||
this._pendingTaskRecords = [];
|
this._pendingTaskRecords = [];
|
||||||
this._tokenCount = startingTokens;
|
this._tokenCount = options.hasOwnProperty('startingTokens') ? options.startingTokens : maxTokens;
|
||||||
|
this._maxTotalCost = options.hasOwnProperty('maxTotalCost') ? options.maxTotalCost : Infinity;
|
||||||
this._timer = new Timer();
|
this._timer = new Timer();
|
||||||
this._timer.start();
|
this._timer.start();
|
||||||
this._timeout = null;
|
this._timeout = null;
|
||||||
this._lastUpdateTime = this._timer.timeElapsed();
|
this._lastUpdateTime = this._timer.timeElapsed();
|
||||||
|
|
||||||
|
this._runTasks = this._runTasks.bind(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of queued tasks which have not yet started.
|
||||||
|
*
|
||||||
|
* @readonly
|
||||||
|
* @memberof TaskQueue
|
||||||
|
*/
|
||||||
|
get length () {
|
||||||
|
return this._pendingTaskRecords.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -35,38 +50,57 @@ class TaskQueue {
|
||||||
* @memberof TaskQueue
|
* @memberof TaskQueue
|
||||||
*/
|
*/
|
||||||
do (task, cost = 1) {
|
do (task, cost = 1) {
|
||||||
const newRecord = {};
|
if (this._maxTotalCost < Infinity) {
|
||||||
const promise = new Promise((resolve, reject) => {
|
const currentTotalCost = this._pendingTaskRecords.reduce((t, r) => t + r.cost, 0);
|
||||||
newRecord.wrappedTask = () => {
|
if (currentTotalCost + cost > this._maxTotalCost) {
|
||||||
const canRun = this._refillAndSpend(cost);
|
return Promise.reject('Maximum total cost exceeded');
|
||||||
if (canRun) {
|
}
|
||||||
// Remove this task from the queue and run it
|
}
|
||||||
this._pendingTaskRecords.shift();
|
const newRecord = {
|
||||||
try {
|
cost
|
||||||
resolve(task());
|
};
|
||||||
} catch (e) {
|
newRecord.promise = new Promise((resolve, reject) => {
|
||||||
reject(e);
|
newRecord.cancel = () => {
|
||||||
}
|
reject(new Error('Task canceled'));
|
||||||
|
};
|
||||||
|
|
||||||
// Tell the next wrapper to start trying to run its task
|
// The caller, `_runTasks()`, is responsible for cost-checking and spending tokens.
|
||||||
if (this._pendingTaskRecords.length > 0) {
|
newRecord.wrappedTask = () => {
|
||||||
const nextRecord = this._pendingTaskRecords[0];
|
try {
|
||||||
nextRecord.wrappedTask();
|
resolve(task());
|
||||||
}
|
} catch (e) {
|
||||||
} else {
|
reject(e);
|
||||||
// This task can't run yet. Estimate when it will be able to, then try again.
|
|
||||||
newRecord.reject = reject;
|
|
||||||
this._waitUntilAffordable(cost).then(() => newRecord.wrappedTask());
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
this._pendingTaskRecords.push(newRecord);
|
this._pendingTaskRecords.push(newRecord);
|
||||||
|
|
||||||
|
// If the queue has been idle we need to prime the pump
|
||||||
if (this._pendingTaskRecords.length === 1) {
|
if (this._pendingTaskRecords.length === 1) {
|
||||||
newRecord.wrappedTask();
|
this._runTasks();
|
||||||
}
|
}
|
||||||
|
|
||||||
return promise;
|
return newRecord.promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel one pending task, rejecting its promise.
|
||||||
|
*
|
||||||
|
* @param {Promise} taskPromise - the promise returned by `do()`.
|
||||||
|
* @returns {boolean} - true if the task was found, or false otherwise.
|
||||||
|
* @memberof TaskQueue
|
||||||
|
*/
|
||||||
|
cancel (taskPromise) {
|
||||||
|
const taskIndex = this._pendingTaskRecords.findIndex(r => r.promise === taskPromise);
|
||||||
|
if (taskIndex !== -1) {
|
||||||
|
const [taskRecord] = this._pendingTaskRecords.splice(taskIndex, 1);
|
||||||
|
taskRecord.cancel();
|
||||||
|
if (taskIndex === 0 && this._pendingTaskRecords.length > 0) {
|
||||||
|
this._runTasks();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -76,15 +110,16 @@ class TaskQueue {
|
||||||
*/
|
*/
|
||||||
cancelAll () {
|
cancelAll () {
|
||||||
if (this._timeout !== null) {
|
if (this._timeout !== null) {
|
||||||
clearTimeout(this._timeout);
|
this._timer.clearTimeout(this._timeout);
|
||||||
this._timeout = null;
|
this._timeout = null;
|
||||||
}
|
}
|
||||||
this._pendingTaskRecords.forEach(r => r.reject());
|
const oldTasks = this._pendingTaskRecords;
|
||||||
this._pendingTaskRecords = [];
|
this._pendingTaskRecords = [];
|
||||||
|
oldTasks.forEach(r => r.cancel());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shorthand for calling @ _refill() then _spend(cost).
|
* Shorthand for calling _refill() then _spend(cost).
|
||||||
*
|
*
|
||||||
* @see {@link TaskQueue#_refill}
|
* @see {@link TaskQueue#_refill}
|
||||||
* @see {@link TaskQueue#_spend}
|
* @see {@link TaskQueue#_spend}
|
||||||
|
@ -129,33 +164,37 @@ class TaskQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a Promise which will resolve when the bucket will be able to "afford" the given cost.
|
* Loop until the task queue is empty, running each task and spending tokens to do so.
|
||||||
* Note that this won't refill the bucket, so make sure to refill after the promise resolves.
|
* Any time the bucket can't afford the next task, delay asynchronously until it can.
|
||||||
*
|
*
|
||||||
* @param {number} cost - wait until the token count is at least this much.
|
|
||||||
* @returns {Promise} - to be resolved once the bucket is due for a token count greater than or equal to the cost.
|
|
||||||
* @memberof TaskQueue
|
* @memberof TaskQueue
|
||||||
*/
|
*/
|
||||||
_waitUntilAffordable (cost) {
|
_runTasks () {
|
||||||
if (cost <= this._tokenCount) {
|
if (this._timeout) {
|
||||||
return Promise.resolve();
|
this._timer.clearTimeout(this._timeout);
|
||||||
|
this._timeout = null;
|
||||||
}
|
}
|
||||||
if (!(cost <= this._maxTokens)) {
|
for (;;) {
|
||||||
return Promise.reject(new Error(`Task cost ${cost} is greater than bucket limit ${this._maxTokens}`));
|
const nextRecord = this._pendingTaskRecords.shift();
|
||||||
|
if (!nextRecord) {
|
||||||
|
// We ran out of work. Go idle until someone adds another task to the queue.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (nextRecord.cost > this._maxTokens) {
|
||||||
|
throw new Error(`Task cost ${nextRecord.cost} is greater than bucket limit ${this._maxTokens}`);
|
||||||
|
}
|
||||||
|
// Refill before each task in case the time it took for the last task to run was enough to afford the next.
|
||||||
|
if (this._refillAndSpend(nextRecord.cost)) {
|
||||||
|
nextRecord.wrappedTask();
|
||||||
|
} else {
|
||||||
|
// We can't currently afford this task. Put it back and wait until we can and try again.
|
||||||
|
this._pendingTaskRecords.unshift(nextRecord);
|
||||||
|
const tokensNeeded = Math.max(nextRecord.cost - this._tokenCount, 0);
|
||||||
|
const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate);
|
||||||
|
this._timeout = this._timer.setTimeout(this._runTasks, estimatedWait);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return new Promise(resolve => {
|
|
||||||
const tokensNeeded = Math.max(cost - this._tokenCount, 0);
|
|
||||||
const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate);
|
|
||||||
|
|
||||||
let timeout = null;
|
|
||||||
const onTimeout = () => {
|
|
||||||
if (this._timeout === timeout) {
|
|
||||||
this._timeout = null;
|
|
||||||
}
|
|
||||||
resolve();
|
|
||||||
};
|
|
||||||
this._timeout = timeout = setTimeout(onTimeout, estimatedWait);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -90,6 +90,25 @@ class Timer {
|
||||||
timeElapsed () {
|
timeElapsed () {
|
||||||
return this.nowObj.now() - this.startTime;
|
return this.nowObj.now() - this.startTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call a handler function after a specified amount of time has elapsed.
|
||||||
|
* @param {function} handler - function to call after the timeout
|
||||||
|
* @param {number} timeout - number of milliseconds to delay before calling the handler
|
||||||
|
* @returns {number} - the ID of the new timeout
|
||||||
|
*/
|
||||||
|
setTimeout (handler, timeout) {
|
||||||
|
return global.setTimeout(handler, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear a timeout from the pending timeout pool.
|
||||||
|
* @param {number} timeoutId - the ID returned by `setTimeout()`
|
||||||
|
* @memberof Timer
|
||||||
|
*/
|
||||||
|
clearTimeout (timeoutId) {
|
||||||
|
global.clearTimeout(timeoutId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = Timer;
|
module.exports = Timer;
|
||||||
|
|
167
test/fixtures/mock-timer.js
vendored
Normal file
167
test/fixtures/mock-timer.js
vendored
Normal file
|
@ -0,0 +1,167 @@
|
||||||
|
/**
|
||||||
|
* Mimic the Timer class with external control of the "time" value, allowing tests to run more quickly and
|
||||||
|
* reliably. Multiple instances of this class operate independently: they may report different time values, and
|
||||||
|
* advancing one timer will not trigger timeouts set on another.
|
||||||
|
*/
|
||||||
|
class MockTimer {
|
||||||
|
/**
|
||||||
|
* Creates an instance of MockTimer.
|
||||||
|
* @param {*} [nowObj=null] - alert the caller that this parameter, supported by Timer, is not supported here.
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
constructor (nowObj = null) {
|
||||||
|
if (nowObj) {
|
||||||
|
throw new Error('nowObj is not implemented in MockTimer');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The fake "current time" value, in epoch milliseconds.
|
||||||
|
* @type {number}
|
||||||
|
*/
|
||||||
|
this._mockTime = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to store the start time of a timer action.
|
||||||
|
* Updated when calling `timer.start`.
|
||||||
|
* @type {number}
|
||||||
|
*/
|
||||||
|
this.startTime = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The ID to use the next time `setTimeout` is called.
|
||||||
|
* @type {number}
|
||||||
|
*/
|
||||||
|
this._nextTimeoutId = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map of timeout ID to pending timeout callback info.
|
||||||
|
* @type {Map.<Object>}
|
||||||
|
* @property {number} time - the time at/after which this handler should run
|
||||||
|
* @property {Function} handler - the handler to call when the time comes
|
||||||
|
*/
|
||||||
|
this._timeouts = new Map();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Advance this MockTimer's idea of "current time", running timeout handlers if appropriate.
|
||||||
|
*
|
||||||
|
* @param {number} milliseconds - the amount of time to add to the current mock time value, in milliseconds.
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
advanceMockTime (milliseconds) {
|
||||||
|
if (milliseconds < 0) {
|
||||||
|
throw new Error('Time may not move backward');
|
||||||
|
}
|
||||||
|
this._mockTime += milliseconds;
|
||||||
|
this._runTimeouts();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Advance this MockTimer's idea of "current time", running timeout handlers if appropriate.
|
||||||
|
*
|
||||||
|
* @param {number} milliseconds - the amount of time to add to the current mock time value, in milliseconds.
|
||||||
|
* @returns {Promise} - promise which resolves after timeout handlers have had an opportunity to run.
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
advanceMockTimeAsync (milliseconds) {
|
||||||
|
return new Promise(resolve => {
|
||||||
|
this.advanceMockTime(milliseconds);
|
||||||
|
global.setTimeout(resolve, 0);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @returns {number} - current mock time elapsed since 1 January 1970 00:00:00 UTC.
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
time () {
|
||||||
|
return this._mockTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a time accurate relative to other times produced by this function.
|
||||||
|
* @returns {number} ms-scale accurate time relative to other relative times.
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
relativeTime () {
|
||||||
|
return this._mockTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start a timer for measuring elapsed time.
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
start () {
|
||||||
|
this.startTime = this._mockTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @returns {number} - the time elapsed since `start()` was called.
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
timeElapsed () {
|
||||||
|
return this._mockTime - this.startTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call a handler function after a specified amount of time has elapsed.
|
||||||
|
* Guaranteed to happen in between "ticks" of JavaScript.
|
||||||
|
* @param {function} handler - function to call after the timeout
|
||||||
|
* @param {number} timeout - number of milliseconds to delay before calling the handler
|
||||||
|
* @returns {number} - the ID of the new timeout.
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
setTimeout (handler, timeout) {
|
||||||
|
const timeoutId = this._nextTimeoutId++;
|
||||||
|
this._timeouts.set(timeoutId, {
|
||||||
|
time: this._mockTime + timeout,
|
||||||
|
handler
|
||||||
|
});
|
||||||
|
this._runTimeouts();
|
||||||
|
return timeoutId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear a particular timeout from the pending timeout pool.
|
||||||
|
* @param {number} timeoutId - the value returned from `setTimeout()`
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
clearTimeout (timeoutId) {
|
||||||
|
this._timeouts.delete(timeoutId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* WARNING: this method has no equivalent in `Timer`. Do not use this method outside of tests!
|
||||||
|
* @returns {boolean} - true if there are any pending timeouts, false otherwise.
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
hasTimeouts () {
|
||||||
|
return this._timeouts.size > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run any timeout handlers whose timeouts have expired.
|
||||||
|
* @memberof MockTimer
|
||||||
|
*/
|
||||||
|
_runTimeouts () {
|
||||||
|
const ready = [];
|
||||||
|
|
||||||
|
this._timeouts.forEach((timeoutRecord, timeoutId) => {
|
||||||
|
const isReady = timeoutRecord.time <= this._mockTime;
|
||||||
|
if (isReady) {
|
||||||
|
ready.push(timeoutRecord);
|
||||||
|
this._timeouts.delete(timeoutId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// sort so that earlier timeouts run before later timeouts
|
||||||
|
ready.sort((a, b) => a.time < b.time);
|
||||||
|
|
||||||
|
// next tick, call everything that's ready
|
||||||
|
global.setTimeout(() => {
|
||||||
|
ready.forEach(o => o.handler());
|
||||||
|
}, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = MockTimer;
|
91
test/unit/mock-timer.js
Normal file
91
test/unit/mock-timer.js
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
const test = require('tap').test;
|
||||||
|
const MockTimer = require('../fixtures/mock-timer');
|
||||||
|
|
||||||
|
test('spec', t => {
|
||||||
|
const timer = new MockTimer();
|
||||||
|
|
||||||
|
t.type(MockTimer, 'function');
|
||||||
|
t.type(timer, 'object');
|
||||||
|
|
||||||
|
// Most members of MockTimer mimic members of Timer.
|
||||||
|
t.type(timer.startTime, 'number');
|
||||||
|
t.type(timer.time, 'function');
|
||||||
|
t.type(timer.start, 'function');
|
||||||
|
t.type(timer.timeElapsed, 'function');
|
||||||
|
t.type(timer.setTimeout, 'function');
|
||||||
|
t.type(timer.clearTimeout, 'function');
|
||||||
|
|
||||||
|
// A few members of MockTimer have no Timer equivalent and should only be used in tests.
|
||||||
|
t.type(timer.advanceMockTime, 'function');
|
||||||
|
t.type(timer.advanceMockTimeAsync, 'function');
|
||||||
|
t.type(timer.hasTimeouts, 'function');
|
||||||
|
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('time', t => {
|
||||||
|
const timer = new MockTimer();
|
||||||
|
const delta = 1;
|
||||||
|
|
||||||
|
const time1 = timer.time();
|
||||||
|
const time2 = timer.time();
|
||||||
|
timer.advanceMockTime(delta);
|
||||||
|
const time3 = timer.time();
|
||||||
|
|
||||||
|
t.equal(time1, time2);
|
||||||
|
t.equal(time2 + delta, time3);
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('start / timeElapsed', t => new Promise(resolve => {
|
||||||
|
const timer = new MockTimer();
|
||||||
|
const halfDelay = 1;
|
||||||
|
const fullDelay = halfDelay + halfDelay;
|
||||||
|
|
||||||
|
timer.start();
|
||||||
|
|
||||||
|
let timeoutCalled = 0;
|
||||||
|
|
||||||
|
// Wait and measure timer
|
||||||
|
timer.setTimeout(() => {
|
||||||
|
t.equal(timeoutCalled, 0);
|
||||||
|
++timeoutCalled;
|
||||||
|
|
||||||
|
const timeElapsed = timer.timeElapsed();
|
||||||
|
t.equal(timeElapsed, fullDelay);
|
||||||
|
t.end();
|
||||||
|
|
||||||
|
resolve();
|
||||||
|
}, fullDelay);
|
||||||
|
|
||||||
|
// this should not trigger the callback
|
||||||
|
timer.advanceMockTime(halfDelay);
|
||||||
|
|
||||||
|
// give the mock timer a chance to run tasks
|
||||||
|
global.setTimeout(() => {
|
||||||
|
// we've only mock-waited for half the delay so it should not have run yet
|
||||||
|
t.equal(timeoutCalled, 0);
|
||||||
|
|
||||||
|
// this should trigger the callback
|
||||||
|
timer.advanceMockTime(halfDelay);
|
||||||
|
}, 0);
|
||||||
|
}));
|
||||||
|
|
||||||
|
test('clearTimeout / hasTimeouts', t => new Promise((resolve, reject) => {
|
||||||
|
const timer = new MockTimer();
|
||||||
|
|
||||||
|
const timeoutId = timer.setTimeout(() => {
|
||||||
|
reject(new Error('Canceled task ran'));
|
||||||
|
}, 1);
|
||||||
|
|
||||||
|
timer.setTimeout(() => {
|
||||||
|
resolve('Non-canceled task ran');
|
||||||
|
t.end();
|
||||||
|
}, 2);
|
||||||
|
|
||||||
|
timer.clearTimeout(timeoutId);
|
||||||
|
|
||||||
|
while (timer.hasTimeouts()) {
|
||||||
|
timer.advanceMockTime(1);
|
||||||
|
}
|
||||||
|
}));
|
|
@ -1,21 +1,92 @@
|
||||||
const test = require('tap').skip;
|
const test = require('tap').test;
|
||||||
|
|
||||||
const TaskQueue = require('../../src/util/task-queue');
|
const TaskQueue = require('../../src/util/task-queue');
|
||||||
|
|
||||||
|
const MockTimer = require('../fixtures/mock-timer');
|
||||||
const testCompare = require('../fixtures/test-compare');
|
const testCompare = require('../fixtures/test-compare');
|
||||||
|
|
||||||
test('constructor', t => {
|
// Max tokens = 1000
|
||||||
// Max tokens = 1000, refill 1000 tokens per second (1 per millisecond), and start with 0 tokens
|
// Refill 1000 tokens per second (1 per millisecond)
|
||||||
const bukkit = new TaskQueue(1000, 1000, 0);
|
// Token bucket starts empty
|
||||||
|
// Max total cost of queued tasks = 10000 tokens = 10 seconds
|
||||||
|
const makeTestQueue = () => {
|
||||||
|
const bukkit = new TaskQueue(1000, 1000, {
|
||||||
|
startingTokens: 0,
|
||||||
|
maxTotalCost: 10000
|
||||||
|
});
|
||||||
|
|
||||||
// Simulate time passing with a stubbed timer
|
const mockTimer = new MockTimer();
|
||||||
const simulatedTimeStart = Date.now();
|
bukkit._timer = mockTimer;
|
||||||
bukkit._timer = {timeElapsed: () => Date.now() - simulatedTimeStart};
|
mockTimer.start();
|
||||||
|
|
||||||
|
return bukkit;
|
||||||
|
};
|
||||||
|
|
||||||
|
test('spec', t => {
|
||||||
|
t.type(TaskQueue, 'function');
|
||||||
|
const bukkit = makeTestQueue();
|
||||||
|
|
||||||
|
t.type(bukkit, 'object');
|
||||||
|
|
||||||
|
t.type(bukkit.length, 'number');
|
||||||
|
t.type(bukkit.do, 'function');
|
||||||
|
t.type(bukkit.cancel, 'function');
|
||||||
|
t.type(bukkit.cancelAll, 'function');
|
||||||
|
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('constructor', t => {
|
||||||
|
t.ok(new TaskQueue(1, 1));
|
||||||
|
t.ok(new TaskQueue(1, 1, {}));
|
||||||
|
t.ok(new TaskQueue(1, 1, {startingTokens: 0}));
|
||||||
|
t.ok(new TaskQueue(1, 1, {maxTotalCost: 999}));
|
||||||
|
t.ok(new TaskQueue(1, 1, {startingTokens: 0, maxTotalCost: 999}));
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('run tasks', async t => {
|
||||||
|
const bukkit = makeTestQueue();
|
||||||
|
|
||||||
|
const taskResults = [];
|
||||||
|
|
||||||
|
const promises = [
|
||||||
|
bukkit.do(() => {
|
||||||
|
taskResults.push('a');
|
||||||
|
testCompare(t, bukkit._timer.timeElapsed(), '>=', 50, 'Costly task must wait');
|
||||||
|
}, 50),
|
||||||
|
bukkit.do(() => {
|
||||||
|
taskResults.push('b');
|
||||||
|
testCompare(t, bukkit._timer.timeElapsed(), '>=', 60, 'Tasks must run in serial');
|
||||||
|
}, 10),
|
||||||
|
bukkit.do(() => {
|
||||||
|
taskResults.push('c');
|
||||||
|
testCompare(t, bukkit._timer.timeElapsed(), '<=', 70, 'Cheap task should run soon');
|
||||||
|
}, 1)
|
||||||
|
];
|
||||||
|
|
||||||
|
// advance 10 simulated milliseconds per JS tick
|
||||||
|
while (bukkit.length > 0) {
|
||||||
|
await bukkit._timer.advanceMockTimeAsync(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Promise.all(promises).then(() => {
|
||||||
|
t.deepEqual(taskResults, ['a', 'b', 'c'], 'All tasks must run in correct order');
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('cancel', async t => {
|
||||||
|
const bukkit = makeTestQueue();
|
||||||
|
|
||||||
const taskResults = [];
|
const taskResults = [];
|
||||||
const promises = [];
|
|
||||||
const goodCancelMessage = 'Task was canceled correctly';
|
const goodCancelMessage = 'Task was canceled correctly';
|
||||||
bukkit.do(() => taskResults.push('nope'), 999).then(
|
const afterCancelMessage = 'Task was run correctly';
|
||||||
|
const cancelTaskPromise = bukkit.do(
|
||||||
|
() => {
|
||||||
|
taskResults.push('nope');
|
||||||
|
}, 999);
|
||||||
|
const cancelCheckPromise = cancelTaskPromise.then(
|
||||||
() => {
|
() => {
|
||||||
t.fail('Task should have been canceled');
|
t.fail('Task should have been canceled');
|
||||||
},
|
},
|
||||||
|
@ -23,19 +94,99 @@ test('constructor', t => {
|
||||||
taskResults.push(goodCancelMessage);
|
taskResults.push(goodCancelMessage);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
bukkit.cancelAll();
|
const keepTaskPromise = bukkit.do(
|
||||||
promises.push(
|
() => {
|
||||||
bukkit.do(() => taskResults.push('a'), 50).then(() =>
|
taskResults.push(afterCancelMessage);
|
||||||
testCompare(t, bukkit._timer.timeElapsed(), '>=', 50, 'Costly task must wait')
|
testCompare(t, bukkit._timer.timeElapsed(), '<', 10, 'Canceled task must not delay other tasks');
|
||||||
),
|
}, 5);
|
||||||
bukkit.do(() => taskResults.push('b'), 10).then(() =>
|
|
||||||
testCompare(t, bukkit._timer.timeElapsed(), '>=', 60, 'Tasks must run in serial')
|
// give the bucket a chance to make a mistake
|
||||||
),
|
await bukkit._timer.advanceMockTimeAsync(1);
|
||||||
bukkit.do(() => taskResults.push('c'), 1).then(() =>
|
|
||||||
testCompare(t, bukkit._timer.timeElapsed(), '<', 80, 'Cheap task should run soon')
|
t.equal(bukkit.length, 2);
|
||||||
)
|
const taskWasCanceled = bukkit.cancel(cancelTaskPromise);
|
||||||
);
|
t.ok(taskWasCanceled);
|
||||||
return Promise.all(promises).then(() => {
|
t.equal(bukkit.length, 1);
|
||||||
t.deepEqual(taskResults, [goodCancelMessage, 'a', 'b', 'c'], 'All tasks must run in correct order');
|
|
||||||
|
while (bukkit.length > 0) {
|
||||||
|
await bukkit._timer.advanceMockTimeAsync(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Promise.all([cancelCheckPromise, keepTaskPromise]).then(() => {
|
||||||
|
t.deepEqual(taskResults, [goodCancelMessage, afterCancelMessage]);
|
||||||
|
t.end();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('cancelAll', async t => {
|
||||||
|
const bukkit = makeTestQueue();
|
||||||
|
|
||||||
|
const taskResults = [];
|
||||||
|
const goodCancelMessage1 = 'Task1 was canceled correctly';
|
||||||
|
const goodCancelMessage2 = 'Task2 was canceled correctly';
|
||||||
|
|
||||||
|
const promises = [
|
||||||
|
bukkit.do(() => taskResults.push('nope'), 999).then(
|
||||||
|
() => {
|
||||||
|
t.fail('Task1 should have been canceled');
|
||||||
|
},
|
||||||
|
() => {
|
||||||
|
taskResults.push(goodCancelMessage1);
|
||||||
|
}
|
||||||
|
),
|
||||||
|
bukkit.do(() => taskResults.push('nah'), 999).then(
|
||||||
|
() => {
|
||||||
|
t.fail('Task2 should have been canceled');
|
||||||
|
},
|
||||||
|
() => {
|
||||||
|
taskResults.push(goodCancelMessage2);
|
||||||
|
}
|
||||||
|
)
|
||||||
|
];
|
||||||
|
|
||||||
|
// advance time, but not enough that any task should run
|
||||||
|
await bukkit._timer.advanceMockTimeAsync(100);
|
||||||
|
|
||||||
|
bukkit.cancelAll();
|
||||||
|
|
||||||
|
// advance enough that both tasks would run if they hadn't been canceled
|
||||||
|
await bukkit._timer.advanceMockTimeAsync(10000);
|
||||||
|
|
||||||
|
return Promise.all(promises).then(() => {
|
||||||
|
t.deepEqual(taskResults, [goodCancelMessage1, goodCancelMessage2], 'Tasks should cancel in order');
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('max total cost', async t => {
|
||||||
|
const bukkit = makeTestQueue();
|
||||||
|
|
||||||
|
let numTasks = 0;
|
||||||
|
|
||||||
|
const task = () => ++numTasks;
|
||||||
|
|
||||||
|
// Fill the queue
|
||||||
|
for (let i = 0; i < 10; ++i) {
|
||||||
|
bukkit.do(task, 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This one should be rejected because the queue is full
|
||||||
|
bukkit
|
||||||
|
.do(task, 1000)
|
||||||
|
.then(
|
||||||
|
() => {
|
||||||
|
t.fail('Full queue did not reject task');
|
||||||
|
},
|
||||||
|
() => {
|
||||||
|
t.pass();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
while (bukkit.length > 0) {
|
||||||
|
await bukkit._timer.advanceMockTimeAsync(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// this should be 10 if the last task is rejected or 11 if it runs
|
||||||
|
t.equal(numTasks, 10);
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
|
|
@ -11,6 +11,8 @@ test('spec', t => {
|
||||||
t.type(timer.time, 'function');
|
t.type(timer.time, 'function');
|
||||||
t.type(timer.start, 'function');
|
t.type(timer.start, 'function');
|
||||||
t.type(timer.timeElapsed, 'function');
|
t.type(timer.timeElapsed, 'function');
|
||||||
|
t.type(timer.setTimeout, 'function');
|
||||||
|
t.type(timer.clearTimeout, 'function');
|
||||||
|
|
||||||
t.end();
|
t.end();
|
||||||
});
|
});
|
||||||
|
@ -32,7 +34,7 @@ test('start / timeElapsed', t => {
|
||||||
timer.start();
|
timer.start();
|
||||||
|
|
||||||
// Wait and measure timer
|
// Wait and measure timer
|
||||||
setTimeout(() => {
|
timer.setTimeout(() => {
|
||||||
const timeElapsed = timer.timeElapsed();
|
const timeElapsed = timer.timeElapsed();
|
||||||
t.ok(timeElapsed >= 0);
|
t.ok(timeElapsed >= 0);
|
||||||
t.ok(timeElapsed >= (delay - threshold) &&
|
t.ok(timeElapsed >= (delay - threshold) &&
|
||||||
|
@ -40,3 +42,15 @@ test('start / timeElapsed', t => {
|
||||||
t.end();
|
t.end();
|
||||||
}, delay);
|
}, delay);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('setTimeout / clearTimeout', t => new Promise((resolve, reject) => {
|
||||||
|
const timer = new Timer();
|
||||||
|
const cancelId = timer.setTimeout(() => {
|
||||||
|
reject(new Error('Canceled task ran'));
|
||||||
|
}, 1);
|
||||||
|
timer.setTimeout(() => {
|
||||||
|
resolve('Non-canceled task ran');
|
||||||
|
t.end();
|
||||||
|
}, 2);
|
||||||
|
timer.clearTimeout(cancelId);
|
||||||
|
}));
|
||||||
|
|
Loading…
Reference in a new issue