2017-08-05 02:52:31 -04:00
|
|
|
const log = require('../util/log');
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @typedef {object} DispatchCallMessage - a message to the dispatch system representing a service method call
|
|
|
|
* @property {*} responseId - send a response message with this response ID. See {@link DispatchResponseMessage}
|
|
|
|
* @property {string} service - the name of the service to be called
|
|
|
|
* @property {string} method - the name of the method to be called
|
|
|
|
* @property {Array|undefined} args - the arguments to be passed to the method
|
|
|
|
*/
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @typedef {object} DispatchResponseMessage - a message to the dispatch system representing the results of a call
|
|
|
|
* @property {*} responseId - a copy of the response ID from the call which generated this response
|
|
|
|
* @property {*|undefined} error - if this is truthy, then it contains results from a failed call (such as an exception)
|
|
|
|
* @property {*|undefined} result - if error is not truthy, then this contains the return value of the call (if any)
|
|
|
|
*/
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @typedef {DispatchCallMessage|DispatchResponseMessage} DispatchMessage
|
|
|
|
* Any message to the dispatch system.
|
|
|
|
*/
|
|
|
|
|
|
|
|
/**
|
|
|
|
* The SharedDispatch class is responsible for dispatch features shared by
|
|
|
|
* {@link CentralDispatch} and {@link WorkerDispatch}.
|
|
|
|
*/
|
|
|
|
class SharedDispatch {
|
|
|
|
constructor () {
|
|
|
|
/**
|
|
|
|
* List of callback registrations for promises waiting for a response from a call to a service on another
|
|
|
|
* worker. A callback registration is an array of [resolve,reject] Promise functions.
|
|
|
|
* Calls to local services don't enter this list.
|
2018-07-31 09:01:37 -04:00
|
|
|
* @type {Array.<Function[]>}
|
2017-08-05 02:52:31 -04:00
|
|
|
*/
|
|
|
|
this.callbacks = [];
|
|
|
|
|
|
|
|
/**
|
|
|
|
* The next response ID to be used.
|
|
|
|
* @type {int}
|
|
|
|
*/
|
|
|
|
this.nextResponseId = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
|
|
|
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
|
|
|
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
|
|
|
* transferred to the worker, and they should not be used after this call.
|
|
|
|
* @example
|
|
|
|
* dispatcher.call('vm', 'setData', 'cat', 42);
|
|
|
|
* // this finds the worker for the 'vm' service, then on that worker calls:
|
|
|
|
* vm.setData('cat', 42);
|
|
|
|
* @param {string} service - the name of the service.
|
|
|
|
* @param {string} method - the name of the method.
|
|
|
|
* @param {*} [args] - the arguments to be copied to the method, if any.
|
|
|
|
* @returns {Promise} - a promise for the return value of the service method.
|
|
|
|
*/
|
|
|
|
call (service, method, ...args) {
|
|
|
|
return this.transferCall(service, method, null, ...args);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
|
|
|
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
|
|
|
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
|
|
|
* transferred to the worker, and they should not be used after this call.
|
|
|
|
* @example
|
|
|
|
* dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
|
|
|
|
* // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
|
|
|
|
* vm.setData('cat', myArrayBuffer);
|
|
|
|
* @param {string} service - the name of the service.
|
|
|
|
* @param {string} method - the name of the method.
|
|
|
|
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
|
|
|
|
* @param {*} [args] - the arguments to be copied to the method, if any.
|
|
|
|
* @returns {Promise} - a promise for the return value of the service method.
|
|
|
|
*/
|
|
|
|
transferCall (service, method, transfer, ...args) {
|
|
|
|
try {
|
|
|
|
const {provider, isRemote} = this._getServiceProvider(service);
|
|
|
|
if (provider) {
|
|
|
|
if (isRemote) {
|
|
|
|
return this._remoteTransferCall(provider, service, method, transfer, ...args);
|
|
|
|
}
|
|
|
|
|
|
|
|
const result = provider[method].apply(provider, args);
|
|
|
|
return Promise.resolve(result);
|
|
|
|
}
|
|
|
|
return Promise.reject(new Error(`Service not found: ${service}`));
|
|
|
|
} catch (e) {
|
|
|
|
return Promise.reject(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-10-04 18:40:25 -04:00
|
|
|
/**
|
|
|
|
* Check if a particular service lives on another worker.
|
|
|
|
* @param {string} service - the service to check.
|
|
|
|
* @returns {boolean} - true if the service is remote (calls must cross a Worker boundary), false otherwise.
|
|
|
|
* @private
|
|
|
|
*/
|
|
|
|
_isRemoteService (service) {
|
|
|
|
return this._getServiceProvider(service).isRemote;
|
|
|
|
}
|
|
|
|
|
2017-08-05 02:52:31 -04:00
|
|
|
/**
|
|
|
|
* Like {@link call}, but force the call to be posted through a particular communication channel.
|
|
|
|
* @param {object} provider - send the call through this object's `postMessage` function.
|
|
|
|
* @param {string} service - the name of the service.
|
|
|
|
* @param {string} method - the name of the method.
|
|
|
|
* @param {*} [args] - the arguments to be copied to the method, if any.
|
|
|
|
* @returns {Promise} - a promise for the return value of the service method.
|
|
|
|
*/
|
|
|
|
_remoteCall (provider, service, method, ...args) {
|
|
|
|
return this._remoteTransferCall(provider, service, method, null, ...args);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Like {@link transferCall}, but force the call to be posted through a particular communication channel.
|
|
|
|
* @param {object} provider - send the call through this object's `postMessage` function.
|
|
|
|
* @param {string} service - the name of the service.
|
|
|
|
* @param {string} method - the name of the method.
|
|
|
|
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
|
|
|
|
* @param {*} [args] - the arguments to be copied to the method, if any.
|
|
|
|
* @returns {Promise} - a promise for the return value of the service method.
|
|
|
|
*/
|
|
|
|
_remoteTransferCall (provider, service, method, transfer, ...args) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
const responseId = this._storeCallbacks(resolve, reject);
|
2017-08-29 14:43:09 -04:00
|
|
|
|
|
|
|
/** @TODO: remove this hack! this is just here so we don't try to send `util` to a worker */
|
|
|
|
if ((args.length > 0) && (typeof args[args.length - 1].yield === 'function')) {
|
|
|
|
args.pop();
|
|
|
|
}
|
|
|
|
|
2017-08-05 02:52:31 -04:00
|
|
|
if (transfer) {
|
|
|
|
provider.postMessage({service, method, responseId, args}, transfer);
|
|
|
|
} else {
|
|
|
|
provider.postMessage({service, method, responseId, args});
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Store callback functions pending a response message.
|
|
|
|
* @param {Function} resolve - function to call if the service method returns.
|
|
|
|
* @param {Function} reject - function to call if the service method throws.
|
|
|
|
* @returns {*} - a unique response ID for this set of callbacks. See {@link _deliverResponse}.
|
|
|
|
* @protected
|
|
|
|
*/
|
|
|
|
_storeCallbacks (resolve, reject) {
|
|
|
|
const responseId = this.nextResponseId++;
|
|
|
|
this.callbacks[responseId] = [resolve, reject];
|
|
|
|
return responseId;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Deliver call response from a worker. This should only be called as the result of a message from a worker.
|
|
|
|
* @param {int} responseId - the response ID of the callback set to call.
|
|
|
|
* @param {DispatchResponseMessage} message - the message containing the response value(s).
|
|
|
|
* @protected
|
|
|
|
*/
|
|
|
|
_deliverResponse (responseId, message) {
|
|
|
|
try {
|
|
|
|
const [resolve, reject] = this.callbacks[responseId];
|
|
|
|
delete this.callbacks[responseId];
|
|
|
|
if (message.error) {
|
|
|
|
reject(message.error);
|
|
|
|
} else {
|
|
|
|
resolve(message.result);
|
|
|
|
}
|
|
|
|
} catch (e) {
|
|
|
|
log.error(`Dispatch callback failed: ${JSON.stringify(e)}`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Handle a message event received from a connected worker.
|
|
|
|
* @param {Worker} worker - the worker which sent the message, or the global object if running in a worker.
|
|
|
|
* @param {MessageEvent} event - the message event to be handled.
|
|
|
|
* @protected
|
|
|
|
*/
|
|
|
|
_onMessage (worker, event) {
|
|
|
|
/** @type {DispatchMessage} */
|
|
|
|
const message = event.data;
|
|
|
|
message.args = message.args || [];
|
|
|
|
let promise;
|
|
|
|
if (message.service) {
|
|
|
|
if (message.service === 'dispatch') {
|
|
|
|
promise = this._onDispatchMessage(worker, message);
|
|
|
|
} else {
|
|
|
|
promise = this.call(message.service, message.method, ...message.args);
|
|
|
|
}
|
|
|
|
} else if (typeof message.responseId === 'undefined') {
|
|
|
|
log.error(`Dispatch caught malformed message from a worker: ${JSON.stringify(event)}`);
|
|
|
|
} else {
|
|
|
|
this._deliverResponse(message.responseId, message);
|
|
|
|
}
|
|
|
|
if (promise) {
|
|
|
|
if (typeof message.responseId === 'undefined') {
|
|
|
|
log.error(`Dispatch message missing required response ID: ${JSON.stringify(event)}`);
|
|
|
|
} else {
|
|
|
|
promise.then(
|
|
|
|
result => worker.postMessage({responseId: message.responseId, result}),
|
|
|
|
error => worker.postMessage({responseId: message.responseId, error})
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Fetch the service provider object for a particular service name.
|
|
|
|
* @abstract
|
|
|
|
* @param {string} service - the name of the service to look up
|
|
|
|
* @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
|
|
|
|
* @protected
|
|
|
|
*/
|
|
|
|
_getServiceProvider (service) {
|
|
|
|
throw new Error(`Could not get provider for ${service}: _getServiceProvider not implemented`);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Handle a call message sent to the dispatch service itself
|
|
|
|
* @abstract
|
|
|
|
* @param {Worker} worker - the worker which sent the message.
|
|
|
|
* @param {DispatchCallMessage} message - the message to be handled.
|
|
|
|
* @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
|
|
|
|
* @private
|
|
|
|
*/
|
|
|
|
_onDispatchMessage (worker, message) {
|
|
|
|
throw new Error(`Unimplemented dispatch message handler cannot handle ${message.method} method`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = SharedDispatch;
|