From c8dc8fb5b953216a0856bb2e7c65ba94c4a272ba Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford <cwillisf@media.mit.edu> Date: Fri, 31 Aug 2018 11:16:20 -0700 Subject: [PATCH 1/5] Add TokenBucket utility class --- src/util/token-bucket.js | 131 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 src/util/token-bucket.js diff --git a/src/util/token-bucket.js b/src/util/token-bucket.js new file mode 100644 index 000000000..c25bc4963 --- /dev/null +++ b/src/util/token-bucket.js @@ -0,0 +1,131 @@ +const Timer = require('../util/timer'); + +/** + * This class uses the token bucket algorithm to control a queue of tasks. + */ +class TokenBucket { + /** + * Creates an instance of TokenBucket. + * @param {number} maxTokens - the maximum number of tokens in the bucket (burst size). + * @param {number} refillRate - the number of tokens to be added per second (sustain rate). + * @param {*} [startingTokens=maxTokens] - the number of tokens the bucket starts with. + * @memberof TokenBucket + */ + constructor (maxTokens, refillRate, startingTokens = maxTokens) { + this._maxTokens = maxTokens; + this._refillRate = refillRate; + this._pendingTasks = []; + this._tokenCount = startingTokens; + this._timer = new Timer(); + this._timer.start(); + this._lastUpdateTime = this._timer.timeElapsed(); + } + + /** + * Wait until the token bucket is full enough, then run the provided task. + * + * @param {Function} task - the task to run. + * @param {number} [cost=1] - the number of tokens this task consumes from the bucket. + * @returns {Promise} - a promise for the task's return value. + * @memberof TokenBucket + */ + do (task, cost = 1) { + let wrappedTask; + const promise = new Promise((resolve, reject) => { + wrappedTask = () => { + const canRun = this._refillAndSpend(cost); + if (canRun) { + // Remove this task from the queue and run it + this._pendingTasks.shift(); + try { + resolve(task()); + } catch (e) { + reject(e); + } + + // Tell the next wrapper to start trying to run its task + if (this._pendingTasks.length > 0) { + const nextWrappedTask = this._pendingTasks[0]; + nextWrappedTask(); + } + } else { + // This task can't run yet. Estimate when it will be able to, then try again. + this._waitUntilAffordable(cost).then(() => wrappedTask()); + } + }; + }); + this._pendingTasks.push(wrappedTask); + + if (this._pendingTasks.length === 1) { + wrappedTask(); + } + + return promise; + } + + /** + * Shorthand for calling @ _refill() then _spend(cost). + * @see {@link TokenBucket#_refill} + * @see {@link TokenBucket#_spend} + * @param {number} cost - the number of tokens to try to spend. + * @returns {boolean} true if we had enough tokens; false otherwise. + * @memberof TokenBucket + */ + _refillAndSpend (cost) { + this._refill(); + return this._spend(cost); + } + + /** + * Refill the token bucket based on the amount of time since the last refill. + * @memberof TokenBucket + */ + _refill () { + const now = this._timer.timeElapsed(); + const timeSinceRefill = now - this._lastUpdateTime; + if (timeSinceRefill <= 0) return; + + this._lastUpdateTime = now; + this._tokenCount += timeSinceRefill * this._refillRate / 1000; + this._tokenCount = Math.min(this._tokenCount, this._maxTokens); + } + + /** + * If we can "afford" the given cost, subtract that many tokens and return true. + * Otherwise, return false. + * @param {number} cost - the number of tokens to try to spend. + * @returns {boolean} true if we had enough tokens; false otherwise. + * @memberof TokenBucket + */ + _spend (cost) { + if (cost <= this._tokenCount) { + this._tokenCount -= cost; + return true; + } + return false; + } + + /** + * Create a Promise which will resolve when the bucket will be able to "afford" the given cost. + * Note that this won't refill the bucket, so make sure to refill after the promise resolves. + * + * @param {number} cost - wait until the token count is at least this much. + * @returns {Promise} - to be resolved once the bucket is due for a token count greater than or equal to the cost. + * @memberof TokenBucket + */ + _waitUntilAffordable (cost) { + if (cost <= this._tokenCount) { + return Promise.resolve(); + } + if (cost > this._limit) { + return Promise.reject(new Error('Task cost is greater than bucket limit')); + } + return new Promise(resolve => { + const tokensNeeded = this._tokenCount - cost; + const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate); + setTimeout(resolve, estimatedWait); + }); + } +} + +module.exports = TokenBucket; From 28621c986089ecad957010c9ab95cb428f21b185 Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford <cwillisf@media.mit.edu> Date: Fri, 31 Aug 2018 11:35:54 -0700 Subject: [PATCH 2/5] Protect against NaN task cost --- src/util/token-bucket.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/token-bucket.js b/src/util/token-bucket.js index c25bc4963..b5449bff9 100644 --- a/src/util/token-bucket.js +++ b/src/util/token-bucket.js @@ -117,7 +117,7 @@ class TokenBucket { if (cost <= this._tokenCount) { return Promise.resolve(); } - if (cost > this._limit) { + if (!(cost <= this._limit)) { return Promise.reject(new Error('Task cost is greater than bucket limit')); } return new Promise(resolve => { From 33e0197ad50103e9b9a58b5567f9f8436d2a587f Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford <cwillisf@media.mit.edu> Date: Fri, 31 Aug 2018 12:38:06 -0700 Subject: [PATCH 3/5] Add tests for TokenBucket --- src/util/token-bucket.js | 4 ++-- test/fixtures/test-compare.js | 15 +++++++++++++++ test/unit/util_token-bucket.js | 31 +++++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 test/fixtures/test-compare.js create mode 100644 test/unit/util_token-bucket.js diff --git a/src/util/token-bucket.js b/src/util/token-bucket.js index b5449bff9..0a1c20686 100644 --- a/src/util/token-bucket.js +++ b/src/util/token-bucket.js @@ -117,8 +117,8 @@ class TokenBucket { if (cost <= this._tokenCount) { return Promise.resolve(); } - if (!(cost <= this._limit)) { - return Promise.reject(new Error('Task cost is greater than bucket limit')); + if (!(cost <= this._maxTokens)) { + return Promise.reject(new Error(`Task cost ${cost} is greater than bucket limit ${this._maxTokens}`)); } return new Promise(resolve => { const tokensNeeded = this._tokenCount - cost; diff --git a/test/fixtures/test-compare.js b/test/fixtures/test-compare.js new file mode 100644 index 000000000..4aa625ea7 --- /dev/null +++ b/test/fixtures/test-compare.js @@ -0,0 +1,15 @@ +const testCompare = (t, lhs, op, rhs, message) => { + const details = `Expected: ${lhs} ${op} ${rhs}`; + const extra = {details}; + switch (op) { + case '<': return t.ok(lhs < rhs, message, extra); + case '<=': return t.ok(lhs <= rhs, message, extra); + case '===': return t.ok(lhs === rhs, message, extra); + case '!==': return t.ok(lhs !== rhs, message, extra); + case '>=': return t.ok(lhs >= rhs, message, extra); + case '>': return t.ok(lhs > rhs, message, extra); + default: return t.fail(`Unrecognized op: ${op}`); + } +}; + +module.exports = testCompare; diff --git a/test/unit/util_token-bucket.js b/test/unit/util_token-bucket.js new file mode 100644 index 000000000..a70764de2 --- /dev/null +++ b/test/unit/util_token-bucket.js @@ -0,0 +1,31 @@ +const test = require('tap').test; + +const Timer = require('../../src/util/timer'); +const TokenBucket = require('../../src/util/token-bucket'); + +const testCompare = require('../fixtures/test-compare'); + +test('constructor', t => { + // Max tokens = 1000, refill 1000 tokens per second (1 per millisecond), and start with 0 tokens + const bukkit = new TokenBucket(1000, 1000, 0); + + const timer = new Timer(); + timer.start(); + + const taskResults = []; + const promises = []; + promises.push( + bukkit.do(() => taskResults.push('a'), 100).then(() => + testCompare(t, timer.timeElapsed(), '>=', 100, 'Costly task must wait') + ), + bukkit.do(() => taskResults.push('b'), 0).then(() => + testCompare(t, timer.timeElapsed(), '<', 150, 'Cheap task should run soon') + ), + bukkit.do(() => taskResults.push('c'), 101).then(() => + testCompare(t, timer.timeElapsed(), '>=', 200, 'Tasks must run in serial') + ) + ); + return Promise.all(promises).then(() => { + t.deepEqual(taskResults, ['a', 'b', 'c'], 'All tasks must run in correct order'); + }); +}); From 70c6ad4ef1c2068ac369bcbc1f047af9426d69b1 Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford <cwillisf@media.mit.edu> Date: Fri, 14 Sep 2018 22:10:17 -0400 Subject: [PATCH 4/5] Add `cancelAll` method to clear the task queue --- src/util/token-bucket.js | 48 +++++++++++++++++++++++++--------- test/unit/util_token-bucket.js | 24 ++++++++++++----- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/src/util/token-bucket.js b/src/util/token-bucket.js index 0a1c20686..d66071254 100644 --- a/src/util/token-bucket.js +++ b/src/util/token-bucket.js @@ -14,10 +14,11 @@ class TokenBucket { constructor (maxTokens, refillRate, startingTokens = maxTokens) { this._maxTokens = maxTokens; this._refillRate = refillRate; - this._pendingTasks = []; + this._pendingTaskRecords = []; this._tokenCount = startingTokens; this._timer = new Timer(); this._timer.start(); + this._timeout = null; this._lastUpdateTime = this._timer.timeElapsed(); } @@ -30,13 +31,13 @@ class TokenBucket { * @memberof TokenBucket */ do (task, cost = 1) { - let wrappedTask; + const newRecord = {}; const promise = new Promise((resolve, reject) => { - wrappedTask = () => { + newRecord.wrappedTask = () => { const canRun = this._refillAndSpend(cost); if (canRun) { // Remove this task from the queue and run it - this._pendingTasks.shift(); + this._pendingTaskRecords.shift(); try { resolve(task()); } catch (e) { @@ -44,25 +45,38 @@ class TokenBucket { } // Tell the next wrapper to start trying to run its task - if (this._pendingTasks.length > 0) { - const nextWrappedTask = this._pendingTasks[0]; - nextWrappedTask(); + if (this._pendingTaskRecords.length > 0) { + const nextRecord = this._pendingTaskRecords[0]; + nextRecord.wrappedTask(); } } else { // This task can't run yet. Estimate when it will be able to, then try again. - this._waitUntilAffordable(cost).then(() => wrappedTask()); + newRecord.reject = reject; + this._waitUntilAffordable(cost).then(() => newRecord.wrappedTask()); } }; }); - this._pendingTasks.push(wrappedTask); + this._pendingTaskRecords.push(newRecord); - if (this._pendingTasks.length === 1) { - wrappedTask(); + if (this._pendingTaskRecords.length === 1) { + newRecord.wrappedTask(); } return promise; } + /** + * Cancel all pending tasks, rejecting all their promises. + */ + cancelAll () { + if (this._timeout !== null) { + clearTimeout(this._timeout); + this._timeout = null; + } + this._pendingTaskRecords.forEach(r => r.reject()); + this._pendingTaskRecords = []; + } + /** * Shorthand for calling @ _refill() then _spend(cost). * @see {@link TokenBucket#_refill} @@ -121,9 +135,17 @@ class TokenBucket { return Promise.reject(new Error(`Task cost ${cost} is greater than bucket limit ${this._maxTokens}`)); } return new Promise(resolve => { - const tokensNeeded = this._tokenCount - cost; + const tokensNeeded = Math.max(cost - this._tokenCount, 0); const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate); - setTimeout(resolve, estimatedWait); + + let timeout = null; + const onTimeout = () => { + if (this._timeout === timeout) { + this._timeout = null; + } + resolve(); + }; + this._timeout = timeout = setTimeout(onTimeout, estimatedWait); }); } } diff --git a/test/unit/util_token-bucket.js b/test/unit/util_token-bucket.js index a70764de2..2223d3710 100644 --- a/test/unit/util_token-bucket.js +++ b/test/unit/util_token-bucket.js @@ -14,18 +14,28 @@ test('constructor', t => { const taskResults = []; const promises = []; + const goodCancelMessage = 'Task was canceled correctly'; + bukkit.do(() => taskResults.push('nope'), 999).then( + () => { + t.fail('Task should have been canceled'); + }, + () => { + taskResults.push(goodCancelMessage); + } + ); + bukkit.cancelAll(); promises.push( - bukkit.do(() => taskResults.push('a'), 100).then(() => - testCompare(t, timer.timeElapsed(), '>=', 100, 'Costly task must wait') + bukkit.do(() => taskResults.push('a'), 50).then(() => + testCompare(t, timer.timeElapsed(), '>=', 50, 'Costly task must wait') ), - bukkit.do(() => taskResults.push('b'), 0).then(() => - testCompare(t, timer.timeElapsed(), '<', 150, 'Cheap task should run soon') + bukkit.do(() => taskResults.push('b'), 10).then(() => + testCompare(t, timer.timeElapsed(), '>=', 60, 'Tasks must run in serial') ), - bukkit.do(() => taskResults.push('c'), 101).then(() => - testCompare(t, timer.timeElapsed(), '>=', 200, 'Tasks must run in serial') + bukkit.do(() => taskResults.push('c'), 1).then(() => + testCompare(t, timer.timeElapsed(), '<', 80, 'Cheap task should run soon') ) ); return Promise.all(promises).then(() => { - t.deepEqual(taskResults, ['a', 'b', 'c'], 'All tasks must run in correct order'); + t.deepEqual(taskResults, [goodCancelMessage, 'a', 'b', 'c'], 'All tasks must run in correct order'); }); }); From 69ba2045fe33fde75bb4f1c51eea566b0f131add Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford <cwillisf@media.mit.edu> Date: Fri, 14 Sep 2018 22:34:34 -0400 Subject: [PATCH 5/5] Rename `TokenBucket` to `TaskQueue` --- src/util/{token-bucket.js => task-queue.js} | 33 ++++++++++++------- ...til_token-bucket.js => util_task-queue.js} | 4 +-- 2 files changed, 23 insertions(+), 14 deletions(-) rename src/util/{token-bucket.js => task-queue.js} (87%) rename test/unit/{util_token-bucket.js => util_task-queue.js} (92%) diff --git a/src/util/token-bucket.js b/src/util/task-queue.js similarity index 87% rename from src/util/token-bucket.js rename to src/util/task-queue.js index d66071254..db890e329 100644 --- a/src/util/token-bucket.js +++ b/src/util/task-queue.js @@ -3,13 +3,17 @@ const Timer = require('../util/timer'); /** * This class uses the token bucket algorithm to control a queue of tasks. */ -class TokenBucket { +class TaskQueue { /** - * Creates an instance of TokenBucket. + * Creates an instance of TaskQueue. + * To allow bursts, set `maxTokens` to several times the average task cost. + * To prevent bursts, set `maxTokens` to the cost of the largest tasks. + * Note that tasks with a cost greater than `maxTokens` will be rejected. + * * @param {number} maxTokens - the maximum number of tokens in the bucket (burst size). * @param {number} refillRate - the number of tokens to be added per second (sustain rate). - * @param {*} [startingTokens=maxTokens] - the number of tokens the bucket starts with. - * @memberof TokenBucket + * @param {number} [startingTokens=maxTokens] - the number of tokens the bucket starts with. + * @memberof TaskQueue */ constructor (maxTokens, refillRate, startingTokens = maxTokens) { this._maxTokens = maxTokens; @@ -28,7 +32,7 @@ class TokenBucket { * @param {Function} task - the task to run. * @param {number} [cost=1] - the number of tokens this task consumes from the bucket. * @returns {Promise} - a promise for the task's return value. - * @memberof TokenBucket + * @memberof TaskQueue */ do (task, cost = 1) { const newRecord = {}; @@ -67,6 +71,8 @@ class TokenBucket { /** * Cancel all pending tasks, rejecting all their promises. + * + * @memberof TaskQueue */ cancelAll () { if (this._timeout !== null) { @@ -79,11 +85,12 @@ class TokenBucket { /** * Shorthand for calling @ _refill() then _spend(cost). - * @see {@link TokenBucket#_refill} - * @see {@link TokenBucket#_spend} + * + * @see {@link TaskQueue#_refill} + * @see {@link TaskQueue#_spend} * @param {number} cost - the number of tokens to try to spend. * @returns {boolean} true if we had enough tokens; false otherwise. - * @memberof TokenBucket + * @memberof TaskQueue */ _refillAndSpend (cost) { this._refill(); @@ -92,7 +99,8 @@ class TokenBucket { /** * Refill the token bucket based on the amount of time since the last refill. - * @memberof TokenBucket + * + * @memberof TaskQueue */ _refill () { const now = this._timer.timeElapsed(); @@ -107,9 +115,10 @@ class TokenBucket { /** * If we can "afford" the given cost, subtract that many tokens and return true. * Otherwise, return false. + * * @param {number} cost - the number of tokens to try to spend. * @returns {boolean} true if we had enough tokens; false otherwise. - * @memberof TokenBucket + * @memberof TaskQueue */ _spend (cost) { if (cost <= this._tokenCount) { @@ -125,7 +134,7 @@ class TokenBucket { * * @param {number} cost - wait until the token count is at least this much. * @returns {Promise} - to be resolved once the bucket is due for a token count greater than or equal to the cost. - * @memberof TokenBucket + * @memberof TaskQueue */ _waitUntilAffordable (cost) { if (cost <= this._tokenCount) { @@ -150,4 +159,4 @@ class TokenBucket { } } -module.exports = TokenBucket; +module.exports = TaskQueue; diff --git a/test/unit/util_token-bucket.js b/test/unit/util_task-queue.js similarity index 92% rename from test/unit/util_token-bucket.js rename to test/unit/util_task-queue.js index 2223d3710..2e67770fd 100644 --- a/test/unit/util_token-bucket.js +++ b/test/unit/util_task-queue.js @@ -1,13 +1,13 @@ const test = require('tap').test; const Timer = require('../../src/util/timer'); -const TokenBucket = require('../../src/util/token-bucket'); +const TaskQueue = require('../../src/util/task-queue'); const testCompare = require('../fixtures/test-compare'); test('constructor', t => { // Max tokens = 1000, refill 1000 tokens per second (1 per millisecond), and start with 0 tokens - const bukkit = new TokenBucket(1000, 1000, 0); + const bukkit = new TaskQueue(1000, 1000, 0); const timer = new Timer(); timer.start();