mirror of
https://github.com/scratchfoundation/scratch-vm.git
synced 2025-05-15 07:51:04 -04:00
Refactor common code in {central,worker}-dispatch
Central dispatch and worker dispatch share most of their code now by inheriting from a new shared dispatch class. Also, the format of passed messages has been altered to make it easier to understand in a debugger.
This commit is contained in:
parent
97d67d75f4
commit
b4c0cfe940
6 changed files with 328 additions and 258 deletions
|
@ -1,3 +1,5 @@
|
|||
const SharedDispatch = require('./shared-dispatch');
|
||||
|
||||
const log = require('../util/log');
|
||||
|
||||
/**
|
||||
|
@ -7,21 +9,9 @@ const log = require('../util/log');
|
|||
* context. The dispatch system will forward function arguments and return values across worker boundaries as needed.
|
||||
* @see {WorkerDispatch}
|
||||
*/
|
||||
class CentralDispatch {
|
||||
class CentralDispatch extends SharedDispatch {
|
||||
constructor () {
|
||||
/**
|
||||
* List of callback registrations for promises waiting for a response from a call to a service on another
|
||||
* worker. A callback registration is an array of [resolve,reject] Promise functions.
|
||||
* Calls to local services don't enter this list.
|
||||
* @type {Array.<[Function,Function]>}
|
||||
*/
|
||||
this.callbacks = [];
|
||||
|
||||
/**
|
||||
* The next callback ID to be used.
|
||||
* @type {int}
|
||||
*/
|
||||
this.nextCallback = 0;
|
||||
super();
|
||||
|
||||
/**
|
||||
* Map of channel name to worker or local service provider.
|
||||
|
@ -45,65 +35,6 @@ class CentralDispatch {
|
|||
this.workers = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
||||
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
||||
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
||||
* transferred to the worker, and they should not be used after this call.
|
||||
* @example
|
||||
* dispatcher.call('vm', 'setData', 'cat', 42);
|
||||
* // this finds the worker for the 'vm' service, then on that worker calls:
|
||||
* vm.setData('cat', 42);
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
call (service, method, ...args) {
|
||||
return this.transferCall(service, method, null, ...args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
||||
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
||||
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
||||
* transferred to the worker, and they should not be used after this call.
|
||||
* @example
|
||||
* dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
|
||||
* // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
|
||||
* vm.setData('cat', myArrayBuffer);
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
transferCall (service, method, transfer, ...args) {
|
||||
return new Promise((resolve, reject) => {
|
||||
try {
|
||||
if (this.services.hasOwnProperty(service)) {
|
||||
const provider = this.services[service];
|
||||
if (provider instanceof this.workerClass) {
|
||||
const callbackId = this.nextCallback++;
|
||||
this.callbacks[callbackId] = [resolve, reject];
|
||||
if (transfer) {
|
||||
provider.postMessage([service, method, callbackId, args], transfer);
|
||||
} else {
|
||||
provider.postMessage([service, method, callbackId, args]);
|
||||
}
|
||||
} else {
|
||||
const result = provider[method].apply(provider, args);
|
||||
resolve(result);
|
||||
}
|
||||
} else {
|
||||
reject(new Error(`Service not found: ${service}`));
|
||||
}
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a local object as the global provider of the specified service.
|
||||
* WARNING: Any method on the provider can be called from any worker within the dispatch system.
|
||||
|
@ -112,13 +43,16 @@ class CentralDispatch {
|
|||
* @returns {Promise} - a promise which will resolve once the service is registered.
|
||||
*/
|
||||
setService (service, provider) {
|
||||
return new Promise(resolve => {
|
||||
/** Return a promise for consistency with {@link WorkerDispatch#setService} */
|
||||
try {
|
||||
if (this.services.hasOwnProperty(service)) {
|
||||
log.warn(`Replacing existing service provider for ${service}`);
|
||||
log.warn(`Central dispatch replacing existing service provider for ${service}`);
|
||||
}
|
||||
this.services[service] = provider;
|
||||
resolve();
|
||||
});
|
||||
return Promise.resolve();
|
||||
} catch (e) {
|
||||
return Promise.reject(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -130,58 +64,47 @@ class CentralDispatch {
|
|||
if (this.workers.indexOf(worker) === -1) {
|
||||
this.workers.push(worker);
|
||||
worker.onmessage = this._onMessage.bind(this, worker);
|
||||
worker.postMessage(['dispatch', '_handshake']);
|
||||
this._remoteCall(worker, 'dispatch', 'handshake').catch(e => {
|
||||
log.error(`Could not handshake with worker: ${JSON.stringify(e)}`);
|
||||
});
|
||||
} else {
|
||||
log.warn('Ignoring attempt to add duplicate worker');
|
||||
log.warn('Central dispatch ignoring attempt to add duplicate worker');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a message event received from a connected worker.
|
||||
* Fetch the service provider object for a particular service name.
|
||||
* @override
|
||||
* @param {string} service - the name of the service to look up
|
||||
* @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
|
||||
* @protected
|
||||
*/
|
||||
_getServiceProvider (service) {
|
||||
const provider = this.services[service];
|
||||
return provider && {
|
||||
provider,
|
||||
isRemote: provider instanceof this.workerClass
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a call message sent to the dispatch service itself
|
||||
* @override
|
||||
* @param {Worker} worker - the worker which sent the message.
|
||||
* @param {MessageEvent} event - the message event to be handled.
|
||||
* @private
|
||||
* @param {DispatchCallMessage} message - the message to be handled.
|
||||
* @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
|
||||
* @protected
|
||||
*/
|
||||
_onMessage (worker, event) {
|
||||
const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data;
|
||||
_onDispatchMessage (worker, message) {
|
||||
let promise;
|
||||
if (service === 'dispatch') {
|
||||
switch (method) {
|
||||
case '_callback':
|
||||
this._callback(callbackId, ...args);
|
||||
break;
|
||||
case 'setService':
|
||||
promise = this.setService(args[0], worker);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
promise = this.call(service, method, ...args);
|
||||
}
|
||||
if (promise) {
|
||||
promise.then(
|
||||
result => {
|
||||
worker.postMessage(['dispatch', '_callback', callbackId, result]);
|
||||
},
|
||||
error => {
|
||||
worker.postMessage(['dispatch', '_callback', callbackId, null, error]);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a callback from a worker. This should only be called as the result of a message from a worker.
|
||||
* @param {int} callbackId - the ID of the callback to call.
|
||||
* @param {*} result - if `error` is not truthy, resolve the callback promise with this value.
|
||||
* @param {*} [error] - if this is truthy, reject the callback promise with this value.
|
||||
* @private
|
||||
*/
|
||||
_callback (callbackId, result, error) {
|
||||
const [resolve, reject] = this.callbacks[callbackId];
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(result);
|
||||
switch (message.method) {
|
||||
case 'setService':
|
||||
promise = this.setService(message.args[0], worker);
|
||||
break;
|
||||
default:
|
||||
log.error(`Central dispatch received message for unknown method: ${message.method}`);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
217
src/dispatch/shared-dispatch.js
Normal file
217
src/dispatch/shared-dispatch.js
Normal file
|
@ -0,0 +1,217 @@
|
|||
const log = require('../util/log');
|
||||
|
||||
/**
|
||||
* @typedef {object} DispatchCallMessage - a message to the dispatch system representing a service method call
|
||||
* @property {*} responseId - send a response message with this response ID. See {@link DispatchResponseMessage}
|
||||
* @property {string} service - the name of the service to be called
|
||||
* @property {string} method - the name of the method to be called
|
||||
* @property {Array|undefined} args - the arguments to be passed to the method
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {object} DispatchResponseMessage - a message to the dispatch system representing the results of a call
|
||||
* @property {*} responseId - a copy of the response ID from the call which generated this response
|
||||
* @property {*|undefined} error - if this is truthy, then it contains results from a failed call (such as an exception)
|
||||
* @property {*|undefined} result - if error is not truthy, then this contains the return value of the call (if any)
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {DispatchCallMessage|DispatchResponseMessage} DispatchMessage
|
||||
* Any message to the dispatch system.
|
||||
*/
|
||||
|
||||
/**
|
||||
* The SharedDispatch class is responsible for dispatch features shared by
|
||||
* {@link CentralDispatch} and {@link WorkerDispatch}.
|
||||
*/
|
||||
class SharedDispatch {
|
||||
constructor () {
|
||||
/**
|
||||
* List of callback registrations for promises waiting for a response from a call to a service on another
|
||||
* worker. A callback registration is an array of [resolve,reject] Promise functions.
|
||||
* Calls to local services don't enter this list.
|
||||
* @type {Array.<[Function,Function]>}
|
||||
*/
|
||||
this.callbacks = [];
|
||||
|
||||
/**
|
||||
* The next response ID to be used.
|
||||
* @type {int}
|
||||
*/
|
||||
this.nextResponseId = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
||||
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
||||
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
||||
* transferred to the worker, and they should not be used after this call.
|
||||
* @example
|
||||
* dispatcher.call('vm', 'setData', 'cat', 42);
|
||||
* // this finds the worker for the 'vm' service, then on that worker calls:
|
||||
* vm.setData('cat', 42);
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
call (service, method, ...args) {
|
||||
return this.transferCall(service, method, null, ...args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
||||
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
||||
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
||||
* transferred to the worker, and they should not be used after this call.
|
||||
* @example
|
||||
* dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
|
||||
* // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
|
||||
* vm.setData('cat', myArrayBuffer);
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
transferCall (service, method, transfer, ...args) {
|
||||
try {
|
||||
const {provider, isRemote} = this._getServiceProvider(service);
|
||||
if (provider) {
|
||||
if (isRemote) {
|
||||
return this._remoteTransferCall(provider, service, method, transfer, ...args);
|
||||
}
|
||||
|
||||
const result = provider[method].apply(provider, args);
|
||||
return Promise.resolve(result);
|
||||
}
|
||||
return Promise.reject(new Error(`Service not found: ${service}`));
|
||||
} catch (e) {
|
||||
return Promise.reject(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Like {@link call}, but force the call to be posted through a particular communication channel.
|
||||
* @param {object} provider - send the call through this object's `postMessage` function.
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
_remoteCall (provider, service, method, ...args) {
|
||||
return this._remoteTransferCall(provider, service, method, null, ...args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Like {@link transferCall}, but force the call to be posted through a particular communication channel.
|
||||
* @param {object} provider - send the call through this object's `postMessage` function.
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
_remoteTransferCall (provider, service, method, transfer, ...args) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const responseId = this._storeCallbacks(resolve, reject);
|
||||
if (transfer) {
|
||||
provider.postMessage({service, method, responseId, args}, transfer);
|
||||
} else {
|
||||
provider.postMessage({service, method, responseId, args});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Store callback functions pending a response message.
|
||||
* @param {Function} resolve - function to call if the service method returns.
|
||||
* @param {Function} reject - function to call if the service method throws.
|
||||
* @returns {*} - a unique response ID for this set of callbacks. See {@link _deliverResponse}.
|
||||
* @protected
|
||||
*/
|
||||
_storeCallbacks (resolve, reject) {
|
||||
const responseId = this.nextResponseId++;
|
||||
this.callbacks[responseId] = [resolve, reject];
|
||||
return responseId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deliver call response from a worker. This should only be called as the result of a message from a worker.
|
||||
* @param {int} responseId - the response ID of the callback set to call.
|
||||
* @param {DispatchResponseMessage} message - the message containing the response value(s).
|
||||
* @protected
|
||||
*/
|
||||
_deliverResponse (responseId, message) {
|
||||
try {
|
||||
const [resolve, reject] = this.callbacks[responseId];
|
||||
delete this.callbacks[responseId];
|
||||
if (message.error) {
|
||||
reject(message.error);
|
||||
} else {
|
||||
resolve(message.result);
|
||||
}
|
||||
} catch (e) {
|
||||
log.error(`Dispatch callback failed: ${JSON.stringify(e)}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a message event received from a connected worker.
|
||||
* @param {Worker} worker - the worker which sent the message, or the global object if running in a worker.
|
||||
* @param {MessageEvent} event - the message event to be handled.
|
||||
* @protected
|
||||
*/
|
||||
_onMessage (worker, event) {
|
||||
/** @type {DispatchMessage} */
|
||||
const message = event.data;
|
||||
message.args = message.args || [];
|
||||
let promise;
|
||||
if (message.service) {
|
||||
if (message.service === 'dispatch') {
|
||||
promise = this._onDispatchMessage(worker, message);
|
||||
} else {
|
||||
promise = this.call(message.service, message.method, ...message.args);
|
||||
}
|
||||
} else if (typeof message.responseId === 'undefined') {
|
||||
log.error(`Dispatch caught malformed message from a worker: ${JSON.stringify(event)}`);
|
||||
} else {
|
||||
this._deliverResponse(message.responseId, message);
|
||||
}
|
||||
if (promise) {
|
||||
if (typeof message.responseId === 'undefined') {
|
||||
log.error(`Dispatch message missing required response ID: ${JSON.stringify(event)}`);
|
||||
} else {
|
||||
promise.then(
|
||||
result => worker.postMessage({responseId: message.responseId, result}),
|
||||
error => worker.postMessage({responseId: message.responseId, error})
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the service provider object for a particular service name.
|
||||
* @abstract
|
||||
* @param {string} service - the name of the service to look up
|
||||
* @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
|
||||
* @protected
|
||||
*/
|
||||
_getServiceProvider (service) {
|
||||
throw new Error(`Could not get provider for ${service}: _getServiceProvider not implemented`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a call message sent to the dispatch service itself
|
||||
* @abstract
|
||||
* @param {Worker} worker - the worker which sent the message.
|
||||
* @param {DispatchCallMessage} message - the message to be handled.
|
||||
* @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
|
||||
* @private
|
||||
*/
|
||||
_onDispatchMessage (worker, message) {
|
||||
throw new Error(`Unimplemented dispatch message handler cannot handle ${message.method} method`);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = SharedDispatch;
|
|
@ -1,3 +1,5 @@
|
|||
const SharedDispatch = require('./shared-dispatch');
|
||||
|
||||
const log = require('../util/log');
|
||||
|
||||
/**
|
||||
|
@ -7,15 +9,9 @@ const log = require('../util/log');
|
|||
* worker boundaries as needed.
|
||||
* @see {CentralDispatch}
|
||||
*/
|
||||
class WorkerDispatch {
|
||||
class WorkerDispatch extends SharedDispatch {
|
||||
constructor () {
|
||||
/**
|
||||
* List of callback registrations for promises waiting for a response from a call to a service on another
|
||||
* worker. A callback registration is an array of [resolve,reject] Promise functions.
|
||||
* Calls to local services don't enter this list.
|
||||
* @type {Array.<[Function,Function]>}
|
||||
*/
|
||||
this.callbacks = [];
|
||||
super();
|
||||
|
||||
/**
|
||||
* This promise will be resolved when we have successfully connected to central dispatch.
|
||||
|
@ -27,12 +23,6 @@ class WorkerDispatch {
|
|||
this._onConnect = resolve;
|
||||
});
|
||||
|
||||
/**
|
||||
* The next callback ID to be used.
|
||||
* @type {int}
|
||||
*/
|
||||
this.nextCallback = 0;
|
||||
|
||||
/**
|
||||
* Map of service name to local service provider.
|
||||
* If a service is not listed here, it is assumed to be provided by another context (another Worker or the main
|
||||
|
@ -42,7 +32,7 @@ class WorkerDispatch {
|
|||
*/
|
||||
this.services = {};
|
||||
|
||||
this._onMessage = this._onMessage.bind(this);
|
||||
this._onMessage = this._onMessage.bind(this, self);
|
||||
if (typeof self !== 'undefined') {
|
||||
self.onmessage = this._onMessage;
|
||||
}
|
||||
|
@ -60,61 +50,6 @@ class WorkerDispatch {
|
|||
return this._connectionPromise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
||||
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
||||
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
||||
* transferred to the worker, and they should not be used after this call.
|
||||
* @example
|
||||
* dispatcher.call('vm', 'setData', 'cat', 42);
|
||||
* // this finds the worker for the 'vm' service, then on that worker calls:
|
||||
* vm.setData('cat', 42);
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
call (service, method, ...args) {
|
||||
return this.transferCall(service, method, null, ...args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
||||
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
||||
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
||||
* transferred to the worker, and they should not be used after this call.
|
||||
* @example
|
||||
* dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
|
||||
* // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
|
||||
* vm.setData('cat', myArrayBuffer);
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
transferCall (service, method, transfer, ...args) {
|
||||
return new Promise((resolve, reject) => {
|
||||
try {
|
||||
if (this.services.hasOwnProperty(service)) {
|
||||
const provider = this.services[service];
|
||||
const result = provider[method].apply(provider, args);
|
||||
resolve(result);
|
||||
} else {
|
||||
const callbackId = this.nextCallback++;
|
||||
this.callbacks[callbackId] = [resolve, reject];
|
||||
if (transfer) {
|
||||
self.postMessage([service, method, callbackId, ...args], transfer);
|
||||
} else {
|
||||
self.postMessage([service, method, callbackId, ...args]);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a local object as the global provider of the specified service.
|
||||
* WARNING: Any method on the provider can be called from any worker within the dispatch system.
|
||||
|
@ -124,56 +59,51 @@ class WorkerDispatch {
|
|||
*/
|
||||
setService (service, provider) {
|
||||
if (this.services.hasOwnProperty(service)) {
|
||||
log.warn(`Replacing existing service provider for ${service}`);
|
||||
log.warn(`Worker dispatch replacing existing service provider for ${service}`);
|
||||
}
|
||||
this.services[service] = provider;
|
||||
return this.waitForConnection.then(() => this.call('dispatch', 'setService', service));
|
||||
return this.waitForConnection.then(() => this._remoteCall(self, 'dispatch', 'setService', service));
|
||||
}
|
||||
|
||||
/**
|
||||
* Message handler active after the dispatcher handshake. This only handles method calls.
|
||||
* @param {MessageEvent} event - the message event to be handled.
|
||||
* @private
|
||||
* Fetch the service provider object for a particular service name.
|
||||
* @override
|
||||
* @param {string} service - the name of the service to look up
|
||||
* @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
|
||||
* @protected
|
||||
*/
|
||||
_onMessage (event) {
|
||||
const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data;
|
||||
if (service === 'dispatch') {
|
||||
switch (method) {
|
||||
case '_handshake':
|
||||
this._onConnect();
|
||||
break;
|
||||
case '_callback':
|
||||
this._callback(callbackId, ...args);
|
||||
break;
|
||||
case '_terminate':
|
||||
self.close();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
this.call(service, method, ...args).then(
|
||||
result => {
|
||||
self.postMessage(['dispatch', '_callback', callbackId, result]);
|
||||
},
|
||||
error => {
|
||||
self.postMessage(['dispatch', '_callback', callbackId, null, error]);
|
||||
});
|
||||
}
|
||||
_getServiceProvider (service) {
|
||||
// if we don't have a local service by this name, contact central dispatch by calling `postMessage` on self
|
||||
const provider = this.services[service];
|
||||
return {
|
||||
provider: provider || self,
|
||||
isRemote: !provider
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a callback from a worker. This should only be called as the result of a message from a worker.
|
||||
* @param {int} callbackId - the ID of the callback to call.
|
||||
* @param {*} result - if `error` is not truthy, resolve the callback promise with this value.
|
||||
* @param {*} [error] - if this is truthy, reject the callback promise with this value.
|
||||
* @private
|
||||
* Handle a call message sent to the dispatch service itself
|
||||
* @override
|
||||
* @param {Worker} worker - the worker which sent the message.
|
||||
* @param {DispatchCallMessage} message - the message to be handled.
|
||||
* @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
|
||||
* @protected
|
||||
*/
|
||||
_callback (callbackId, result, error) {
|
||||
const [resolve, reject] = this.callbacks[callbackId];
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(result);
|
||||
_onDispatchMessage (worker, message) {
|
||||
let promise;
|
||||
switch (message.method) {
|
||||
case 'handshake':
|
||||
promise = this._onConnect();
|
||||
break;
|
||||
case 'terminate':
|
||||
// Don't close until next tick, after sending confirmation back
|
||||
setTimeout(() => self.close(), 0);
|
||||
promise = Promise.resolve();
|
||||
break;
|
||||
default:
|
||||
log.error(`Worker dispatch received message for unknown method: ${message.method}`);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
5
test/fixtures/dispatch-test-service.js
vendored
5
test/fixtures/dispatch-test-service.js
vendored
|
@ -10,11 +10,6 @@ class DispatchTestService {
|
|||
throwException () {
|
||||
throw new Error('This is a test exception thrown by DispatchTest');
|
||||
}
|
||||
|
||||
close () {
|
||||
// eslint-disable-next-line no-undef
|
||||
self.close();
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = DispatchTestService;
|
||||
|
|
5
test/fixtures/dispatch-test-worker.js
vendored
5
test/fixtures/dispatch-test-worker.js
vendored
|
@ -1,8 +1,11 @@
|
|||
const dispatch = require('../../src/dispatch/worker-dispatch');
|
||||
const DispatchTestService = require('./dispatch-test-service');
|
||||
const log = require('../../src/util/log');
|
||||
|
||||
dispatch.setService('RemoteDispatchTest', new DispatchTestService());
|
||||
|
||||
dispatch.waitForConnection.then(() => {
|
||||
dispatch.call('test', 'onWorkerReady');
|
||||
dispatch.call('test', 'onWorkerReady').catch(e => {
|
||||
log(`Test worker failed to call onWorkerReady: ${JSON.stringify(e)}`);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -12,31 +12,37 @@ dispatch.workerClass = Worker;
|
|||
const runServiceTest = function (serviceName, t) {
|
||||
const promises = [];
|
||||
|
||||
promises.push(dispatch.call(serviceName, 'returnFortyTwo').then(x => {
|
||||
t.equal(x, 42);
|
||||
}));
|
||||
promises.push(dispatch.call(serviceName, 'returnFortyTwo')
|
||||
.then(
|
||||
x => t.equal(x, 42),
|
||||
e => t.fail(e)
|
||||
));
|
||||
|
||||
promises.push(dispatch.call(serviceName, 'doubleArgument', 9).then(x => {
|
||||
t.equal(x, 18);
|
||||
}));
|
||||
promises.push(dispatch.call(serviceName, 'doubleArgument', 9)
|
||||
.then(
|
||||
x => t.equal(x, 18),
|
||||
e => t.fail(e)
|
||||
));
|
||||
|
||||
promises.push(dispatch.call(serviceName, 'doubleArgument', 123).then(x => {
|
||||
t.equal(x, 246);
|
||||
}));
|
||||
promises.push(dispatch.call(serviceName, 'doubleArgument', 123)
|
||||
.then(
|
||||
x => t.equal(x, 246),
|
||||
e => t.fail(e)
|
||||
));
|
||||
|
||||
// I tried using `t.rejects` here but ran into https://github.com/tapjs/node-tap/issues/384
|
||||
promises.push(dispatch.call(serviceName, 'throwException')
|
||||
.then(() => {
|
||||
t.fail('exception was not propagated as expected');
|
||||
}, () => {
|
||||
t.pass('exception was propagated as expected');
|
||||
}));
|
||||
.then(
|
||||
() => t.fail('exception was not propagated as expected'),
|
||||
() => t.pass('exception was propagated as expected')
|
||||
));
|
||||
|
||||
return Promise.all(promises);
|
||||
};
|
||||
|
||||
test('local', t => {
|
||||
dispatch.setService('LocalDispatchTest', new DispatchTestService());
|
||||
dispatch.setService('LocalDispatchTest', new DispatchTestService())
|
||||
.catch(e => t.fail(e));
|
||||
|
||||
return runServiceTest('LocalDispatchTest', t);
|
||||
});
|
||||
|
@ -47,15 +53,11 @@ test('remote', t => {
|
|||
dispatch.addWorker(worker);
|
||||
|
||||
const waitForWorker = new Promise(resolve => {
|
||||
dispatch.setService('test', {
|
||||
onWorkerReady: resolve
|
||||
});
|
||||
dispatch.setService('test', {onWorkerReady: resolve})
|
||||
.catch(e => t.fail(e));
|
||||
});
|
||||
|
||||
return waitForWorker
|
||||
.then(() => runServiceTest('RemoteDispatchTest', t))
|
||||
.then(() => {
|
||||
// Allow some time for the worker to finish, then terminate it
|
||||
setTimeout(() => dispatch.call('RemoteDispatchTest', 'close'), 10);
|
||||
});
|
||||
.then(() => runServiceTest('RemoteDispatchTest', t), e => t.fail(e))
|
||||
.then(() => dispatch._remoteCall(worker, 'dispatch', 'terminate'), e => t.fail(e));
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue