Merge pull request #640 from cwillisf/feature/message-dispatch

Implement message dispatch system
This commit is contained in:
Chris Willis-Ford 2017-08-17 10:57:23 -07:00 committed by GitHub
commit 2665ef2b2b
8 changed files with 547 additions and 0 deletions

View file

@ -52,6 +52,7 @@
"socket.io-client": "1.7.3", "socket.io-client": "1.7.3",
"stats.js": "^0.17.0", "stats.js": "^0.17.0",
"tap": "^10.2.0", "tap": "^10.2.0",
"tiny-worker": "^2.1.1",
"webpack": "^2.4.1", "webpack": "^2.4.1",
"webpack-dev-server": "^2.4.1" "webpack-dev-server": "^2.4.1"
} }

View file

@ -0,0 +1,111 @@
const SharedDispatch = require('./shared-dispatch');
const log = require('../util/log');
/**
* This class serves as the central broker for message dispatch. It expects to operate on the main thread / Window and
* it must be informed of any Worker threads which will participate in the messaging system. From any context in the
* messaging system, the dispatcher's "call" method can call any method on any "service" provided in any participating
* context. The dispatch system will forward function arguments and return values across worker boundaries as needed.
* @see {WorkerDispatch}
*/
class CentralDispatch extends SharedDispatch {
constructor () {
super();
/**
* Map of channel name to worker or local service provider.
* If the entry is a Worker, the service is provided by an object on that worker.
* Otherwise, the service is provided locally and methods on the service will be called directly.
* @see {setService}
* @type {object.<Worker|object>}
*/
this.services = {};
/**
* The constructor we will use to recognize workers.
* @type {Function}
*/
this.workerClass = (typeof Worker === 'undefined' ? null : Worker);
/**
* List of workers attached to this dispatcher.
* @type {Array}
*/
this.workers = [];
}
/**
* Set a local object as the global provider of the specified service.
* WARNING: Any method on the provider can be called from any worker within the dispatch system.
* @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'.
* @param {object} provider - a local object which provides this service.
* @returns {Promise} - a promise which will resolve once the service is registered.
*/
setService (service, provider) {
/** Return a promise for consistency with {@link WorkerDispatch#setService} */
try {
if (this.services.hasOwnProperty(service)) {
log.warn(`Central dispatch replacing existing service provider for ${service}`);
}
this.services[service] = provider;
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}
/**
* Add a worker to the message dispatch system. The worker must implement a compatible message dispatch framework.
* The dispatcher will immediately attempt to "handshake" with the worker.
* @param {Worker} worker - the worker to add into the dispatch system.
*/
addWorker (worker) {
if (this.workers.indexOf(worker) === -1) {
this.workers.push(worker);
worker.onmessage = this._onMessage.bind(this, worker);
this._remoteCall(worker, 'dispatch', 'handshake').catch(e => {
log.error(`Could not handshake with worker: ${JSON.stringify(e)}`);
});
} else {
log.warn('Central dispatch ignoring attempt to add duplicate worker');
}
}
/**
* Fetch the service provider object for a particular service name.
* @override
* @param {string} service - the name of the service to look up
* @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
* @protected
*/
_getServiceProvider (service) {
const provider = this.services[service];
return provider && {
provider,
isRemote: provider instanceof this.workerClass
};
}
/**
* Handle a call message sent to the dispatch service itself
* @override
* @param {Worker} worker - the worker which sent the message.
* @param {DispatchCallMessage} message - the message to be handled.
* @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
* @protected
*/
_onDispatchMessage (worker, message) {
let promise;
switch (message.method) {
case 'setService':
promise = this.setService(message.args[0], worker);
break;
default:
log.error(`Central dispatch received message for unknown method: ${message.method}`);
}
return promise;
}
}
module.exports = new CentralDispatch();

View file

@ -0,0 +1,217 @@
const log = require('../util/log');
/**
* @typedef {object} DispatchCallMessage - a message to the dispatch system representing a service method call
* @property {*} responseId - send a response message with this response ID. See {@link DispatchResponseMessage}
* @property {string} service - the name of the service to be called
* @property {string} method - the name of the method to be called
* @property {Array|undefined} args - the arguments to be passed to the method
*/
/**
* @typedef {object} DispatchResponseMessage - a message to the dispatch system representing the results of a call
* @property {*} responseId - a copy of the response ID from the call which generated this response
* @property {*|undefined} error - if this is truthy, then it contains results from a failed call (such as an exception)
* @property {*|undefined} result - if error is not truthy, then this contains the return value of the call (if any)
*/
/**
* @typedef {DispatchCallMessage|DispatchResponseMessage} DispatchMessage
* Any message to the dispatch system.
*/
/**
* The SharedDispatch class is responsible for dispatch features shared by
* {@link CentralDispatch} and {@link WorkerDispatch}.
*/
class SharedDispatch {
constructor () {
/**
* List of callback registrations for promises waiting for a response from a call to a service on another
* worker. A callback registration is an array of [resolve,reject] Promise functions.
* Calls to local services don't enter this list.
* @type {Array.<[Function,Function]>}
*/
this.callbacks = [];
/**
* The next response ID to be used.
* @type {int}
*/
this.nextResponseId = 0;
}
/**
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
* transferred to the worker, and they should not be used after this call.
* @example
* dispatcher.call('vm', 'setData', 'cat', 42);
* // this finds the worker for the 'vm' service, then on that worker calls:
* vm.setData('cat', 42);
* @param {string} service - the name of the service.
* @param {string} method - the name of the method.
* @param {*} [args] - the arguments to be copied to the method, if any.
* @returns {Promise} - a promise for the return value of the service method.
*/
call (service, method, ...args) {
return this.transferCall(service, method, null, ...args);
}
/**
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
* transferred to the worker, and they should not be used after this call.
* @example
* dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
* // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
* vm.setData('cat', myArrayBuffer);
* @param {string} service - the name of the service.
* @param {string} method - the name of the method.
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
* @param {*} [args] - the arguments to be copied to the method, if any.
* @returns {Promise} - a promise for the return value of the service method.
*/
transferCall (service, method, transfer, ...args) {
try {
const {provider, isRemote} = this._getServiceProvider(service);
if (provider) {
if (isRemote) {
return this._remoteTransferCall(provider, service, method, transfer, ...args);
}
const result = provider[method].apply(provider, args);
return Promise.resolve(result);
}
return Promise.reject(new Error(`Service not found: ${service}`));
} catch (e) {
return Promise.reject(e);
}
}
/**
* Like {@link call}, but force the call to be posted through a particular communication channel.
* @param {object} provider - send the call through this object's `postMessage` function.
* @param {string} service - the name of the service.
* @param {string} method - the name of the method.
* @param {*} [args] - the arguments to be copied to the method, if any.
* @returns {Promise} - a promise for the return value of the service method.
*/
_remoteCall (provider, service, method, ...args) {
return this._remoteTransferCall(provider, service, method, null, ...args);
}
/**
* Like {@link transferCall}, but force the call to be posted through a particular communication channel.
* @param {object} provider - send the call through this object's `postMessage` function.
* @param {string} service - the name of the service.
* @param {string} method - the name of the method.
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
* @param {*} [args] - the arguments to be copied to the method, if any.
* @returns {Promise} - a promise for the return value of the service method.
*/
_remoteTransferCall (provider, service, method, transfer, ...args) {
return new Promise((resolve, reject) => {
const responseId = this._storeCallbacks(resolve, reject);
if (transfer) {
provider.postMessage({service, method, responseId, args}, transfer);
} else {
provider.postMessage({service, method, responseId, args});
}
});
}
/**
* Store callback functions pending a response message.
* @param {Function} resolve - function to call if the service method returns.
* @param {Function} reject - function to call if the service method throws.
* @returns {*} - a unique response ID for this set of callbacks. See {@link _deliverResponse}.
* @protected
*/
_storeCallbacks (resolve, reject) {
const responseId = this.nextResponseId++;
this.callbacks[responseId] = [resolve, reject];
return responseId;
}
/**
* Deliver call response from a worker. This should only be called as the result of a message from a worker.
* @param {int} responseId - the response ID of the callback set to call.
* @param {DispatchResponseMessage} message - the message containing the response value(s).
* @protected
*/
_deliverResponse (responseId, message) {
try {
const [resolve, reject] = this.callbacks[responseId];
delete this.callbacks[responseId];
if (message.error) {
reject(message.error);
} else {
resolve(message.result);
}
} catch (e) {
log.error(`Dispatch callback failed: ${JSON.stringify(e)}`);
}
}
/**
* Handle a message event received from a connected worker.
* @param {Worker} worker - the worker which sent the message, or the global object if running in a worker.
* @param {MessageEvent} event - the message event to be handled.
* @protected
*/
_onMessage (worker, event) {
/** @type {DispatchMessage} */
const message = event.data;
message.args = message.args || [];
let promise;
if (message.service) {
if (message.service === 'dispatch') {
promise = this._onDispatchMessage(worker, message);
} else {
promise = this.call(message.service, message.method, ...message.args);
}
} else if (typeof message.responseId === 'undefined') {
log.error(`Dispatch caught malformed message from a worker: ${JSON.stringify(event)}`);
} else {
this._deliverResponse(message.responseId, message);
}
if (promise) {
if (typeof message.responseId === 'undefined') {
log.error(`Dispatch message missing required response ID: ${JSON.stringify(event)}`);
} else {
promise.then(
result => worker.postMessage({responseId: message.responseId, result}),
error => worker.postMessage({responseId: message.responseId, error})
);
}
}
}
/**
* Fetch the service provider object for a particular service name.
* @abstract
* @param {string} service - the name of the service to look up
* @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
* @protected
*/
_getServiceProvider (service) {
throw new Error(`Could not get provider for ${service}: _getServiceProvider not implemented`);
}
/**
* Handle a call message sent to the dispatch service itself
* @abstract
* @param {Worker} worker - the worker which sent the message.
* @param {DispatchCallMessage} message - the message to be handled.
* @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
* @private
*/
_onDispatchMessage (worker, message) {
throw new Error(`Unimplemented dispatch message handler cannot handle ${message.method} method`);
}
}
module.exports = SharedDispatch;

View file

@ -0,0 +1,110 @@
const SharedDispatch = require('./shared-dispatch');
const log = require('../util/log');
/**
* This class provides a Worker with the means to participate in the message dispatch system managed by CentralDispatch.
* From any context in the messaging system, the dispatcher's "call" method can call any method on any "service"
* provided in any participating context. The dispatch system will forward function arguments and return values across
* worker boundaries as needed.
* @see {CentralDispatch}
*/
class WorkerDispatch extends SharedDispatch {
constructor () {
super();
/**
* This promise will be resolved when we have successfully connected to central dispatch.
* @type {Promise}
* @see {waitForConnection}
* @private
*/
this._connectionPromise = new Promise(resolve => {
this._onConnect = resolve;
});
/**
* Map of service name to local service provider.
* If a service is not listed here, it is assumed to be provided by another context (another Worker or the main
* thread).
* @see {setService}
* @type {object}
*/
this.services = {};
this._onMessage = this._onMessage.bind(this, self);
if (typeof self !== 'undefined') {
self.onmessage = this._onMessage;
}
}
/**
* @returns {Promise} a promise which will resolve upon connection to central dispatch. If you need to make a call
* immediately on "startup" you can attach a 'then' to this promise.
* @example
* dispatch.waitForConnection.then(() => {
* dispatch.call('myService', 'hello');
* })
*/
get waitForConnection () {
return this._connectionPromise;
}
/**
* Set a local object as the global provider of the specified service.
* WARNING: Any method on the provider can be called from any worker within the dispatch system.
* @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'.
* @param {object} provider - a local object which provides this service.
* @returns {Promise} - a promise which will resolve once the service is registered.
*/
setService (service, provider) {
if (this.services.hasOwnProperty(service)) {
log.warn(`Worker dispatch replacing existing service provider for ${service}`);
}
this.services[service] = provider;
return this.waitForConnection.then(() => this._remoteCall(self, 'dispatch', 'setService', service));
}
/**
* Fetch the service provider object for a particular service name.
* @override
* @param {string} service - the name of the service to look up
* @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
* @protected
*/
_getServiceProvider (service) {
// if we don't have a local service by this name, contact central dispatch by calling `postMessage` on self
const provider = this.services[service];
return {
provider: provider || self,
isRemote: !provider
};
}
/**
* Handle a call message sent to the dispatch service itself
* @override
* @param {Worker} worker - the worker which sent the message.
* @param {DispatchCallMessage} message - the message to be handled.
* @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
* @protected
*/
_onDispatchMessage (worker, message) {
let promise;
switch (message.method) {
case 'handshake':
promise = this._onConnect();
break;
case 'terminate':
// Don't close until next tick, after sending confirmation back
setTimeout(() => self.close(), 0);
promise = Promise.resolve();
break;
default:
log.error(`Worker dispatch received message for unknown method: ${message.method}`);
}
return promise;
}
}
module.exports = new WorkerDispatch();

15
test/fixtures/dispatch-test-service.js vendored Normal file
View file

@ -0,0 +1,15 @@
class DispatchTestService {
returnFortyTwo () {
return 42;
}
doubleArgument (x) {
return 2 * x;
}
throwException () {
throw new Error('This is a test exception thrown by DispatchTest');
}
}
module.exports = DispatchTestService;

View file

@ -0,0 +1,19 @@
const Module = require('module');
const callsite = require('callsite');
const path = require('path');
const oldRequire = Module.prototype.require;
Module.prototype.require = function (target) {
if (target.indexOf('/') === -1) {
return oldRequire.apply(this, arguments);
}
const stack = callsite();
const callerFile = stack[2].getFileName();
const callerDir = path.dirname(callerFile);
target = path.resolve(callerDir, target);
return oldRequire.call(this, target);
};
oldRequire(path.resolve(__dirname, 'dispatch-test-worker'));

11
test/fixtures/dispatch-test-worker.js vendored Normal file
View file

@ -0,0 +1,11 @@
const dispatch = require('../../src/dispatch/worker-dispatch');
const DispatchTestService = require('./dispatch-test-service');
const log = require('../../src/util/log');
dispatch.setService('RemoteDispatchTest', new DispatchTestService());
dispatch.waitForConnection.then(() => {
dispatch.call('test', 'onWorkerReady').catch(e => {
log(`Test worker failed to call onWorkerReady: ${JSON.stringify(e)}`);
});
});

63
test/unit/dispatch.js Normal file
View file

@ -0,0 +1,63 @@
const DispatchTestService = require('../fixtures/dispatch-test-service');
const Worker = require('tiny-worker');
const dispatch = require('../../src/dispatch/central-dispatch');
const path = require('path');
const test = require('tap').test;
// By default Central Dispatch works with the Worker class built into the browser. Tell it to use TinyWorker instead.
dispatch.workerClass = Worker;
const runServiceTest = function (serviceName, t) {
const promises = [];
promises.push(dispatch.call(serviceName, 'returnFortyTwo')
.then(
x => t.equal(x, 42),
e => t.fail(e)
));
promises.push(dispatch.call(serviceName, 'doubleArgument', 9)
.then(
x => t.equal(x, 18),
e => t.fail(e)
));
promises.push(dispatch.call(serviceName, 'doubleArgument', 123)
.then(
x => t.equal(x, 246),
e => t.fail(e)
));
// I tried using `t.rejects` here but ran into https://github.com/tapjs/node-tap/issues/384
promises.push(dispatch.call(serviceName, 'throwException')
.then(
() => t.fail('exception was not propagated as expected'),
() => t.pass('exception was propagated as expected')
));
return Promise.all(promises);
};
test('local', t => {
dispatch.setService('LocalDispatchTest', new DispatchTestService())
.catch(e => t.fail(e));
return runServiceTest('LocalDispatchTest', t);
});
test('remote', t => {
const fixturesDir = path.resolve(__dirname, '../fixtures');
const worker = new Worker('./test/fixtures/dispatch-test-worker-shim.js', null, {cwd: fixturesDir});
dispatch.addWorker(worker);
const waitForWorker = new Promise(resolve => {
dispatch.setService('test', {onWorkerReady: resolve})
.catch(e => t.fail(e));
});
return waitForWorker
.then(() => runServiceTest('RemoteDispatchTest', t), e => t.fail(e))
.then(() => dispatch._remoteCall(worker, 'dispatch', 'terminate'), e => t.fail(e));
});