2018-08-31 14:16:20 -04:00
|
|
|
const Timer = require('../util/timer');
|
|
|
|
|
|
|
|
/**
|
|
|
|
* This class uses the token bucket algorithm to control a queue of tasks.
|
|
|
|
*/
|
|
|
|
class TokenBucket {
|
|
|
|
/**
|
|
|
|
* Creates an instance of TokenBucket.
|
|
|
|
* @param {number} maxTokens - the maximum number of tokens in the bucket (burst size).
|
|
|
|
* @param {number} refillRate - the number of tokens to be added per second (sustain rate).
|
|
|
|
* @param {*} [startingTokens=maxTokens] - the number of tokens the bucket starts with.
|
|
|
|
* @memberof TokenBucket
|
|
|
|
*/
|
|
|
|
constructor (maxTokens, refillRate, startingTokens = maxTokens) {
|
|
|
|
this._maxTokens = maxTokens;
|
|
|
|
this._refillRate = refillRate;
|
|
|
|
this._pendingTasks = [];
|
|
|
|
this._tokenCount = startingTokens;
|
|
|
|
this._timer = new Timer();
|
|
|
|
this._timer.start();
|
|
|
|
this._lastUpdateTime = this._timer.timeElapsed();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Wait until the token bucket is full enough, then run the provided task.
|
|
|
|
*
|
|
|
|
* @param {Function} task - the task to run.
|
|
|
|
* @param {number} [cost=1] - the number of tokens this task consumes from the bucket.
|
|
|
|
* @returns {Promise} - a promise for the task's return value.
|
|
|
|
* @memberof TokenBucket
|
|
|
|
*/
|
|
|
|
do (task, cost = 1) {
|
|
|
|
let wrappedTask;
|
|
|
|
const promise = new Promise((resolve, reject) => {
|
|
|
|
wrappedTask = () => {
|
|
|
|
const canRun = this._refillAndSpend(cost);
|
|
|
|
if (canRun) {
|
|
|
|
// Remove this task from the queue and run it
|
|
|
|
this._pendingTasks.shift();
|
|
|
|
try {
|
|
|
|
resolve(task());
|
|
|
|
} catch (e) {
|
|
|
|
reject(e);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Tell the next wrapper to start trying to run its task
|
|
|
|
if (this._pendingTasks.length > 0) {
|
|
|
|
const nextWrappedTask = this._pendingTasks[0];
|
|
|
|
nextWrappedTask();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// This task can't run yet. Estimate when it will be able to, then try again.
|
|
|
|
this._waitUntilAffordable(cost).then(() => wrappedTask());
|
|
|
|
}
|
|
|
|
};
|
|
|
|
});
|
|
|
|
this._pendingTasks.push(wrappedTask);
|
|
|
|
|
|
|
|
if (this._pendingTasks.length === 1) {
|
|
|
|
wrappedTask();
|
|
|
|
}
|
|
|
|
|
|
|
|
return promise;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Shorthand for calling @ _refill() then _spend(cost).
|
|
|
|
* @see {@link TokenBucket#_refill}
|
|
|
|
* @see {@link TokenBucket#_spend}
|
|
|
|
* @param {number} cost - the number of tokens to try to spend.
|
|
|
|
* @returns {boolean} true if we had enough tokens; false otherwise.
|
|
|
|
* @memberof TokenBucket
|
|
|
|
*/
|
|
|
|
_refillAndSpend (cost) {
|
|
|
|
this._refill();
|
|
|
|
return this._spend(cost);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Refill the token bucket based on the amount of time since the last refill.
|
|
|
|
* @memberof TokenBucket
|
|
|
|
*/
|
|
|
|
_refill () {
|
|
|
|
const now = this._timer.timeElapsed();
|
|
|
|
const timeSinceRefill = now - this._lastUpdateTime;
|
|
|
|
if (timeSinceRefill <= 0) return;
|
|
|
|
|
|
|
|
this._lastUpdateTime = now;
|
|
|
|
this._tokenCount += timeSinceRefill * this._refillRate / 1000;
|
|
|
|
this._tokenCount = Math.min(this._tokenCount, this._maxTokens);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* If we can "afford" the given cost, subtract that many tokens and return true.
|
|
|
|
* Otherwise, return false.
|
|
|
|
* @param {number} cost - the number of tokens to try to spend.
|
|
|
|
* @returns {boolean} true if we had enough tokens; false otherwise.
|
|
|
|
* @memberof TokenBucket
|
|
|
|
*/
|
|
|
|
_spend (cost) {
|
|
|
|
if (cost <= this._tokenCount) {
|
|
|
|
this._tokenCount -= cost;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Create a Promise which will resolve when the bucket will be able to "afford" the given cost.
|
|
|
|
* Note that this won't refill the bucket, so make sure to refill after the promise resolves.
|
|
|
|
*
|
|
|
|
* @param {number} cost - wait until the token count is at least this much.
|
|
|
|
* @returns {Promise} - to be resolved once the bucket is due for a token count greater than or equal to the cost.
|
|
|
|
* @memberof TokenBucket
|
|
|
|
*/
|
|
|
|
_waitUntilAffordable (cost) {
|
|
|
|
if (cost <= this._tokenCount) {
|
|
|
|
return Promise.resolve();
|
|
|
|
}
|
2018-08-31 14:35:54 -04:00
|
|
|
if (!(cost <= this._limit)) {
|
2018-08-31 14:16:20 -04:00
|
|
|
return Promise.reject(new Error('Task cost is greater than bucket limit'));
|
|
|
|
}
|
|
|
|
return new Promise(resolve => {
|
|
|
|
const tokensNeeded = this._tokenCount - cost;
|
|
|
|
const estimatedWait = Math.ceil(1000 * tokensNeeded / this._refillRate);
|
|
|
|
setTimeout(resolve, estimatedWait);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = TokenBucket;
|