mirror of
https://github.com/scratchfoundation/scratch-vm.git
synced 2025-07-06 03:00:30 -04:00
TaskQueue: reduce number of async steps before task runs
This commit is contained in:
parent
5a17bb1451
commit
d27dc5d54a
2 changed files with 138 additions and 71 deletions
|
@ -24,6 +24,17 @@ class TaskQueue {
|
||||||
this._timer.start();
|
this._timer.start();
|
||||||
this._timeout = null;
|
this._timeout = null;
|
||||||
this._lastUpdateTime = this._timer.timeElapsed();
|
this._lastUpdateTime = this._timer.timeElapsed();
|
||||||
|
|
||||||
|
this._runTasks = this._runTasks.bind(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of queued tasks which have not yet started.
|
||||||
|
* @readonly
|
||||||
|
* @memberof TaskQueue
|
||||||
|
*/
|
||||||
|
get length () {
|
||||||
|
return this._pendingTaskRecords.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -35,35 +46,32 @@ class TaskQueue {
|
||||||
* @memberof TaskQueue
|
* @memberof TaskQueue
|
||||||
*/
|
*/
|
||||||
do (task, cost = 1) {
|
do (task, cost = 1) {
|
||||||
const newRecord = {};
|
const newRecord = {
|
||||||
|
cost
|
||||||
|
};
|
||||||
const promise = new Promise((resolve, reject) => {
|
const promise = new Promise((resolve, reject) => {
|
||||||
newRecord.wrappedTask = () => {
|
newRecord.cancel = () => {
|
||||||
const canRun = this._refillAndSpend(cost);
|
const index = this._pendingTaskRecords.indexOf(newRecord);
|
||||||
if (canRun) {
|
if (index >= 0) {
|
||||||
// Remove this task from the queue and run it
|
this._pendingTaskRecords.splice(index, 1);
|
||||||
this._pendingTaskRecords.shift();
|
}
|
||||||
try {
|
reject(new Error('Task canceled'));
|
||||||
resolve(task());
|
};
|
||||||
} catch (e) {
|
|
||||||
reject(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tell the next wrapper to start trying to run its task
|
// The caller, `_runTasks()`, is responsible for cost-checking and spending tokens.
|
||||||
if (this._pendingTaskRecords.length > 0) {
|
newRecord.wrappedTask = () => {
|
||||||
const nextRecord = this._pendingTaskRecords[0];
|
try {
|
||||||
nextRecord.wrappedTask();
|
resolve(task());
|
||||||
}
|
} catch (e) {
|
||||||
} else {
|
reject(e);
|
||||||
// This task can't run yet. Estimate when it will be able to, then try again.
|
|
||||||
newRecord.reject = reject;
|
|
||||||
this._waitUntilAffordable(cost).then(() => newRecord.wrappedTask());
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
this._pendingTaskRecords.push(newRecord);
|
this._pendingTaskRecords.push(newRecord);
|
||||||
|
|
||||||
|
// If the queue has been idle we need to prime the pump
|
||||||
if (this._pendingTaskRecords.length === 1) {
|
if (this._pendingTaskRecords.length === 1) {
|
||||||
newRecord.wrappedTask();
|
this._runTasks();
|
||||||
}
|
}
|
||||||
|
|
||||||
return promise;
|
return promise;
|
||||||
|
@ -76,15 +84,16 @@ class TaskQueue {
|
||||||
*/
|
*/
|
||||||
cancelAll () {
|
cancelAll () {
|
||||||
if (this._timeout !== null) {
|
if (this._timeout !== null) {
|
||||||
clearTimeout(this._timeout);
|
this._timer.clearTimeout(this._timeout);
|
||||||
this._timeout = null;
|
this._timeout = null;
|
||||||
}
|
}
|
||||||
this._pendingTaskRecords.forEach(r => r.reject());
|
const oldTasks = this._pendingTaskRecords;
|
||||||
this._pendingTaskRecords = [];
|
this._pendingTaskRecords = [];
|
||||||
|
oldTasks.forEach(r => r.cancel());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shorthand for calling @ _refill() then _spend(cost).
|
* Shorthand for calling _refill() then _spend(cost).
|
||||||
*
|
*
|
||||||
* @see {@link TaskQueue#_refill}
|
* @see {@link TaskQueue#_refill}
|
||||||
* @see {@link TaskQueue#_spend}
|
* @see {@link TaskQueue#_spend}
|
||||||
|
@ -129,33 +138,36 @@ class TaskQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a Promise which will resolve when the bucket will be able to "afford" the given cost.
|
* Loop until the task queue is empty, running each task and spending tokens to do so.
|
||||||
* Note that this won't refill the bucket, so make sure to refill after the promise resolves.
|
* Any time the bucket can't afford the next task, delay asynchronously until it can.
|
||||||
*
|
|
||||||
* @param {number} cost - wait until the token count is at least this much.
|
|
||||||
* @returns {Promise} - to be resolved once the bucket is due for a token count greater than or equal to the cost.
|
|
||||||
* @memberof TaskQueue
|
* @memberof TaskQueue
|
||||||
*/
|
*/
|
||||||
_waitUntilAffordable (cost) {
|
_runTasks () {
|
||||||
if (cost <= this._tokenCount) {
|
if (this._timeout) {
|
||||||
return Promise.resolve();
|
this._timer.clearTimeout(this._timeout);
|
||||||
|
this._timeout = null;
|
||||||
}
|
}
|
||||||
if (!(cost <= this._maxTokens)) {
|
for (;;) {
|
||||||
return Promise.reject(new Error(`Task cost ${cost} is greater than bucket limit ${this._maxTokens}`));
|
const nextRecord = this._pendingTaskRecords.shift();
|
||||||
|
if (!nextRecord) {
|
||||||
|
// We ran out of work. Go idle until someone adds another task to the queue.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (nextRecord.cost > this._maxTokens) {
|
||||||
|
throw new Error(`Task cost ${nextRecord.cost} is greater than bucket limit ${this._maxTokens}`);
|
||||||
|
}
|
||||||
|
// Refill before each task in case the time it took for the last task to run was enough to afford the next.
|
||||||
|
if (this._refillAndSpend(nextRecord.cost)) {
|
||||||
|
nextRecord.wrappedTask();
|
||||||
|
} else {
|
||||||
|
// We can't currently afford this task. Put it back and wait until we can and try again.
|
||||||
|
this._pendingTaskRecords.unshift(nextRecord);
|
||||||
|
const tokensNeeded = Math.max(nextRecord.cost - this._tokenCount, 0);
|
||||||
|
const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate);
|
||||||
|
this._timeout = this._timer.setTimeout(this._runTasks, estimatedWait);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return new Promise(resolve => {
|
|
||||||
const tokensNeeded = Math.max(cost - this._tokenCount, 0);
|
|
||||||
const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate);
|
|
||||||
|
|
||||||
let timeout = null;
|
|
||||||
const onTimeout = () => {
|
|
||||||
if (this._timeout === timeout) {
|
|
||||||
this._timeout = null;
|
|
||||||
}
|
|
||||||
resolve();
|
|
||||||
};
|
|
||||||
this._timeout = timeout = this._timer.setTimeout(onTimeout, estimatedWait);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,48 +5,103 @@ const TaskQueue = require('../../src/util/task-queue');
|
||||||
const MockTimer = require('../fixtures/mock-timer');
|
const MockTimer = require('../fixtures/mock-timer');
|
||||||
const testCompare = require('../fixtures/test-compare');
|
const testCompare = require('../fixtures/test-compare');
|
||||||
|
|
||||||
test('constructor', t => {
|
// Max tokens = 1000
|
||||||
// Max tokens = 1000, refill 1000 tokens per second (1 per millisecond), and start with 0 tokens
|
// Refill 1000 tokens per second (1 per millisecond)
|
||||||
|
// Token bucket starts empty
|
||||||
|
const makeTestQueue = () => {
|
||||||
const bukkit = new TaskQueue(1000, 1000, 0);
|
const bukkit = new TaskQueue(1000, 1000, 0);
|
||||||
|
|
||||||
const mockTimer = new MockTimer();
|
const mockTimer = new MockTimer();
|
||||||
bukkit._timer = mockTimer;
|
bukkit._timer = mockTimer;
|
||||||
mockTimer.start();
|
mockTimer.start();
|
||||||
|
|
||||||
|
return bukkit;
|
||||||
|
};
|
||||||
|
|
||||||
|
test('spec', t => {
|
||||||
|
t.type(TaskQueue, 'function');
|
||||||
|
const bukkit = makeTestQueue();
|
||||||
|
|
||||||
|
t.type(bukkit, 'object');
|
||||||
|
|
||||||
|
t.type(bukkit.length, 'number');
|
||||||
|
t.type(bukkit.do, 'function');
|
||||||
|
t.type(bukkit.cancelAll, 'function');
|
||||||
|
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('cancelAll', t => {
|
||||||
|
const bukkit = makeTestQueue();
|
||||||
|
|
||||||
const taskResults = [];
|
const taskResults = [];
|
||||||
const promises = [];
|
const goodCancelMessage1 = 'Task1 was canceled correctly';
|
||||||
const goodCancelMessage = 'Task was canceled correctly';
|
const goodCancelMessage2 = 'Task2 was canceled correctly';
|
||||||
bukkit.do(() => taskResults.push('nope'), 999).then(
|
|
||||||
() => {
|
const promises = [
|
||||||
t.fail('Task should have been canceled');
|
bukkit.do(() => taskResults.push('nope'), 999).then(
|
||||||
},
|
() => {
|
||||||
() => {
|
t.fail('Task1 should have been canceled');
|
||||||
taskResults.push(goodCancelMessage);
|
},
|
||||||
}
|
() => {
|
||||||
);
|
taskResults.push(goodCancelMessage1);
|
||||||
|
}
|
||||||
|
),
|
||||||
|
bukkit.do(() => taskResults.push('nah'), 999).then(
|
||||||
|
() => {
|
||||||
|
t.fail('Task2 should have been canceled');
|
||||||
|
},
|
||||||
|
() => {
|
||||||
|
taskResults.push(goodCancelMessage2);
|
||||||
|
}
|
||||||
|
)
|
||||||
|
];
|
||||||
|
|
||||||
|
// advance time, but not enough that any task should run
|
||||||
|
bukkit._timer.advanceMockTime(100);
|
||||||
|
|
||||||
bukkit.cancelAll();
|
bukkit.cancelAll();
|
||||||
promises.push(
|
|
||||||
bukkit.do(() => taskResults.push('a'), 50).then(() => {
|
// advance enough that both tasks would run if they hadn't been canceled
|
||||||
|
bukkit._timer.advanceMockTime(10000);
|
||||||
|
|
||||||
|
return Promise.all(promises).then(() => {
|
||||||
|
t.deepEqual(taskResults, [goodCancelMessage1, goodCancelMessage2], 'Tasks should cancel in order');
|
||||||
|
t.end();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('run tasks', t => {
|
||||||
|
const bukkit = makeTestQueue();
|
||||||
|
|
||||||
|
const taskResults = [];
|
||||||
|
|
||||||
|
const promises = [
|
||||||
|
bukkit.do(() => {
|
||||||
|
taskResults.push('a');
|
||||||
testCompare(t, bukkit._timer.timeElapsed(), '>=', 50, 'Costly task must wait');
|
testCompare(t, bukkit._timer.timeElapsed(), '>=', 50, 'Costly task must wait');
|
||||||
}),
|
}, 50),
|
||||||
bukkit.do(() => taskResults.push('b'), 10).then(() => {
|
bukkit.do(() => {
|
||||||
|
taskResults.push('b');
|
||||||
testCompare(t, bukkit._timer.timeElapsed(), '>=', 60, 'Tasks must run in serial');
|
testCompare(t, bukkit._timer.timeElapsed(), '>=', 60, 'Tasks must run in serial');
|
||||||
}),
|
}, 10),
|
||||||
bukkit.do(() => taskResults.push('c'), 1).then(() => {
|
bukkit.do(() => {
|
||||||
testCompare(t, bukkit._timer.timeElapsed(), '<=', 80, 'Cheap task should run soon');
|
taskResults.push('c');
|
||||||
})
|
testCompare(t, bukkit._timer.timeElapsed(), '<=', 70, 'Cheap task should run soon');
|
||||||
);
|
}, 1)
|
||||||
|
];
|
||||||
|
|
||||||
// advance 10 simulated milliseconds per JS tick
|
// advance 10 simulated milliseconds per JS tick
|
||||||
const step = () => {
|
const step = () => {
|
||||||
mockTimer.advanceMockTime(10);
|
bukkit._timer.advanceMockTime(10);
|
||||||
if (mockTimer.timeElapsed() < 100) {
|
if (bukkit.length > 0) {
|
||||||
global.setTimeout(step, 0);
|
global.setTimeout(step, 0);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
step();
|
step();
|
||||||
|
|
||||||
return Promise.all(promises).then(() => {
|
return Promise.all(promises).then(() => {
|
||||||
t.deepEqual(taskResults, [goodCancelMessage, 'a', 'b', 'c'], 'All tasks must run in correct order');
|
t.deepEqual(taskResults, ['a', 'b', 'c'], 'All tasks must run in correct order');
|
||||||
|
t.end();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue