From 09d3fe330f9eecf193c26d11e166c5767e823a30 Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford Date: Wed, 12 Jul 2017 18:48:36 -0400 Subject: [PATCH 1/5] Implement message dispatch system This message dispatch system allows a "service" to register itself under a particular name, and then anyone can call any method on that service from any context (the main "window" thread or any worker). Return values are passed back through promise resolution. --- src/dispatch/central-dispatch.js | 173 ++++++++++++++++++++++++++++ src/dispatch/worker-dispatch.js | 190 +++++++++++++++++++++++++++++++ 2 files changed, 363 insertions(+) create mode 100644 src/dispatch/central-dispatch.js create mode 100644 src/dispatch/worker-dispatch.js diff --git a/src/dispatch/central-dispatch.js b/src/dispatch/central-dispatch.js new file mode 100644 index 000000000..2eba7ecc8 --- /dev/null +++ b/src/dispatch/central-dispatch.js @@ -0,0 +1,173 @@ +const log = require('../util/log'); + +/** + * This class serves as the central broker for message dispatch. It expects to operate on the main thread / Window and + * it must be informed of any Worker threads which will participate in the messaging system. From any context in the + * messaging system, the dispatcher's "call" method can call any method on any "service" provided in any participating + * context. The dispatch system will forward function arguments and return values across worker boundaries as needed. + * @see {WorkerDispatch} + */ +class CentralDispatch { + constructor () { + /** + * List of callback registrations for promises waiting for a response from a call to a service on another + * worker. A callback registration is an array of [resolve,reject] Promise functions. + * Calls to services on this worker don't enter this list. + * @type {Array.<[Function,Function]>} + */ + this.callbacks = []; + + /** + * The next callback ID to be used. + * @type {int} + */ + this.nextCallback = 0; + + /** + * Map of channel name to worker or local service provider. + * If the entry is a Worker, the service is provided by an object on that worker. + * Otherwise, the service is provided locally and methods on the service will be called directly. + * @see {setService} + * @type {object.} + */ + this.services = {}; + + /** + * List of workers attached to this dispatcher. + * @type {Array} + */ + this.workers = []; + } + + /** + * Call a particular method on a particular service, regardless of whether that service is provided locally or on + * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone + * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be + * transferred to the worker, and they should not be used after this call. + * @example + * dispatcher.call('vm', 'setData', 'cat', 42); + * // this finds the worker for the 'vm' service, then on that worker calls: + * vm.setData('cat', 42); + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + call (service, method, ...args) { + return this.transferCall(service, method, null, ...args); + } + + /** + * Call a particular method on a particular service, regardless of whether that service is provided locally or on + * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone + * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be + * transferred to the worker, and they should not be used after this call. + * @example + * dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer); + * // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls: + * vm.setData('cat', myArrayBuffer); + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + transferCall (service, method, transfer, ...args) { + return new Promise((resolve, reject) => { + if (this.services.hasOwnProperty(service)) { + const provider = this.services[service]; + if (provider instanceof Worker) { + const callbackId = this.nextCallback++; + this.callbacks[callbackId] = [resolve, reject]; + if (transfer) { + provider.postMessage([service, method, callbackId, args], transfer); + } else { + provider.postMessage([service, method, callbackId, args]); + } + } else { + const result = provider[method].apply(provider, args); + resolve(result); + } + } else { + reject(new Error(`Service not found: ${service}`)); + } + }); + } + + /** + * Set a local object as the global provider of the specified service. + * @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'. + * @param {object} provider - a local object which provides this service. + * WARNING: Any method on the provider can be called from any worker within the dispatch system. + */ + setService (service, provider) { + if (this.services.hasOwnProperty(service)) { + log.warn(`Replacing existing service provider for ${service}`); + } + this.services[service] = provider; + } + + /** + * Add a worker to the message dispatch system. The worker must implement a compatible message dispatch framework. + * The dispatcher will immediately attempt to "handshake" with the worker. + * @param {Worker} worker - the worker to add into the dispatch system. + */ + addWorker (worker) { + if (this.workers.indexOf(worker) === -1) { + this.workers.push(worker); + worker.onmessage = this._onMessage.bind(this); + worker.postMessage('dispatch-handshake'); + } else { + log.warn('Ignoring attempt to add duplicate worker'); + } + } + + /** + * Handle a message event received from a connected worker. + * @param {MessageEvent} event - the message event to be handled. + * @private + */ + _onMessage (event) { + const worker = event.target; + const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data; + if (service === 'dispatch') { + switch (method) { + case '_callback': + this._callback(callbackId, ...args); + break; + case 'setService': + this.setService(args[0], worker); + break; + } + } else { + this.call(service, method, ...args).then( + result => { + worker.postMessage(['dispatch', '_callback', callbackId, result]); + }, + error => { + worker.postMessage(['dispatch', '_callback', callbackId, null, error]); + }); + } + } + + /** + * Handle a callback from a worker. This should only be called as the result of a message from a worker. + * @param {int} callbackId - the ID of the callback to call. + * @param {*} result - if `error` is not truthy, resolve the callback promise with this value. + * @param {*} [error] - if this is truthy, reject the callback promise with this value. + * @private + */ + _callback (callbackId, result, error) { + const [resolve, reject] = this.callbacks[callbackId]; + if (error) { + reject(error); + } else { + resolve(result); + } + } +} + +const dispatch = new CentralDispatch(); +module.exports = dispatch; +self.Scratch = self.Scratch || {}; +self.Scratch.dispatch = dispatch; diff --git a/src/dispatch/worker-dispatch.js b/src/dispatch/worker-dispatch.js new file mode 100644 index 000000000..e8563faff --- /dev/null +++ b/src/dispatch/worker-dispatch.js @@ -0,0 +1,190 @@ +const log = require('../util/log'); + +/** + * This class provides a Worker with the means to participate in the message dispatch system managed by CentralDispatch. + * From any context in the messaging system, the dispatcher's "call" method can call any method on any "service" + * provided in any participating context. The dispatch system will forward function arguments and return values across + * worker boundaries as needed. + * @see {CentralDispatch} + */ +class WorkerDispatch { + constructor () { + /** + * List of callback registrations for promises waiting for a response from a call to a service on another + * worker. A callback registration is an array of [resolve,reject] Promise functions. + * Calls to services on this worker don't enter this list. + * @type {Array.<[Function,Function]>} + */ + this.callbacks = []; + + /** + * This promise will be resolved when we have successfully connected to central dispatch. + * @type {Promise} + * @see {waitForConnection} + * @private + */ + this._connectionPromise = new Promise(resolve => { + this._onConnect = resolve; + }).then(() => { + self.onmessage = this._onMessage.bind(this); + }); + + /** + * The next callback ID to be used. + * @type {int} + */ + this.nextCallback = 0; + + /** + * Map of service name to local service provider. + * If a service is not listed here, it is assumed to be provided by another context (another Worker or the main + * thread). + * @see {setService} + * @type {object} + */ + this.services = {}; + + self.onmessage = this._onHandshake.bind(this); + } + + /** + * @returns {Promise} a promise which will resolve upon connection to central dispatch. If you need to make a call + * immediately on "startup" you can attach a 'then' to this promise. + * @example + * dispatch.waitForConnection.then(() => { + * dispatch.call('myService', 'hello'); + * }) + */ + get waitForConnection () { + return this._connectionPromise; + } + + /** + * Call a particular method on a particular service, regardless of whether that service is provided locally or on + * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone + * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be + * transferred to the worker, and they should not be used after this call. + * @example + * dispatcher.call('vm', 'setData', 'cat', 42); + * // this finds the worker for the 'vm' service, then on that worker calls: + * vm.setData('cat', 42); + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + call (service, method, ...args) { + return this.transferCall(service, method, null, ...args); + } + + /** + * Call a particular method on a particular service, regardless of whether that service is provided locally or on + * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone + * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be + * transferred to the worker, and they should not be used after this call. + * @example + * dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer); + * // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls: + * vm.setData('cat', myArrayBuffer); + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + transferCall (service, method, transfer, ...args) { + return new Promise((resolve, reject) => { + if (this.services.hasOwnProperty(service)) { + const provider = this.services[service]; + const result = provider[method].apply(provider, args); + resolve(result); + } else { + const callbackId = this.nextCallback++; + this.callbacks[callbackId] = [resolve, reject]; + if (transfer) { + self.postMessage([service, method, callbackId, ...args], transfer); + } else { + self.postMessage([service, method, callbackId, ...args]); + } + } + }); + } + + /** + * Set a local object as the global provider of the specified service. + * @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'. + * @param {object} provider - a local object which provides this service. + * WARNING: Any method on the provider can be called from any worker within the dispatch system. + */ + setService (service, provider) { + if (this.services.hasOwnProperty(service)) { + log.warn(`Replacing existing service provider for ${service}`); + } + this.services[service] = provider; + this.waitForConnection.then(() => { + this.call('dispatch', 'setService', service); + }); + } + + /** + * Message handler active until the dispatcher handshake arrives. + * @param {MessageEvent} event - the message event to be handled. + * @private + */ + _onHandshake (event) { + const message = event.data; + if (message === 'dispatch-handshake') { + this._onConnect(); + } else { + log.error(`WorkerDispatch received unexpected message before handshake: ${JSON.stringify(message)}`); + } + } + + /** + * Message handler active after the dispatcher handshake. This only handles method calls. + * @param {MessageEvent} event - the message event to be handled. + * @private + */ + _onMessage (event) { + const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data; + if (service === 'dispatch') { + switch (method) { + case '_callback': + this._callback(callbackId, ...args); + break; + case '_terminate': + self.close(); + break; + } + } else { + this.call(service, method, ...args).then( + result => { + self.postMessage(['dispatch', '_callback', callbackId, result]); + }, + error => { + self.postMessage(['dispatch', '_callback', callbackId, null, error]); + }); + } + } + + /** + * Handle a callback from a worker. This should only be called as the result of a message from a worker. + * @param {int} callbackId - the ID of the callback to call. + * @param {*} result - if `error` is not truthy, resolve the callback promise with this value. + * @param {*} [error] - if this is truthy, reject the callback promise with this value. + * @private + */ + _callback (callbackId, result, error) { + const [resolve, reject] = this.callbacks[callbackId]; + if (error) { + reject(error); + } else { + resolve(result); + } + } +} + +const dispatch = new WorkerDispatch(); +module.exports = dispatch; +self.Scratch = self.Scratch || {}; +self.Scratch.dispatch = dispatch; From 0fcc248ac12eec9bf8cded3d30677787e7aeb28e Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford Date: Sat, 15 Jul 2017 23:51:59 -0700 Subject: [PATCH 2/5] Add tests for message dispatch system; fix bugs The tests run using TinyWorker, which emulates web workers on Node. There are quite a few quirks in that situation due to the differences between Node and Webpack as well as the differences between TinyWorker and real Web Workers. The tests also exposed a few bugs in the dispatch system, which have now been fixed. Most notably, if a method called through the dispatch system throws an exception that exception will now be passed back to the caller. Previously the exception would escape the dispatch system and the caller would never hear any response at all. --- package.json | 1 + src/dispatch/central-dispatch.js | 49 +++++++++-------- src/dispatch/worker-dispatch.js | 55 ++++++++----------- test/fixtures/dispatch-test-service.js | 20 +++++++ test/fixtures/dispatch-test-worker-shim.js | 19 +++++++ test/fixtures/dispatch-test-worker.js | 8 +++ test/unit/dispatch.js | 61 ++++++++++++++++++++++ 7 files changed, 160 insertions(+), 53 deletions(-) create mode 100644 test/fixtures/dispatch-test-service.js create mode 100644 test/fixtures/dispatch-test-worker-shim.js create mode 100644 test/fixtures/dispatch-test-worker.js create mode 100644 test/unit/dispatch.js diff --git a/package.json b/package.json index d20b86728..ec4cf86f6 100644 --- a/package.json +++ b/package.json @@ -52,6 +52,7 @@ "socket.io-client": "1.7.3", "stats.js": "^0.17.0", "tap": "^10.2.0", + "tiny-worker": "^2.1.1", "webpack": "^2.4.1", "webpack-dev-server": "^2.4.1" } diff --git a/src/dispatch/central-dispatch.js b/src/dispatch/central-dispatch.js index 2eba7ecc8..9d1388724 100644 --- a/src/dispatch/central-dispatch.js +++ b/src/dispatch/central-dispatch.js @@ -12,7 +12,7 @@ class CentralDispatch { /** * List of callback registrations for promises waiting for a response from a call to a service on another * worker. A callback registration is an array of [resolve,reject] Promise functions. - * Calls to services on this worker don't enter this list. + * Calls to local services don't enter this list. * @type {Array.<[Function,Function]>} */ this.callbacks = []; @@ -32,6 +32,12 @@ class CentralDispatch { */ this.services = {}; + /** + * The constructor we will use to recognize workers. + * @type {Function} + */ + this.workerClass = (typeof Worker === 'undefined' ? null : Worker); + /** * List of workers attached to this dispatcher. * @type {Array} @@ -74,22 +80,26 @@ class CentralDispatch { */ transferCall (service, method, transfer, ...args) { return new Promise((resolve, reject) => { - if (this.services.hasOwnProperty(service)) { - const provider = this.services[service]; - if (provider instanceof Worker) { - const callbackId = this.nextCallback++; - this.callbacks[callbackId] = [resolve, reject]; - if (transfer) { - provider.postMessage([service, method, callbackId, args], transfer); + try { + if (this.services.hasOwnProperty(service)) { + const provider = this.services[service]; + if (provider instanceof this.workerClass) { + const callbackId = this.nextCallback++; + this.callbacks[callbackId] = [resolve, reject]; + if (transfer) { + provider.postMessage([service, method, callbackId, args], transfer); + } else { + provider.postMessage([service, method, callbackId, args]); + } } else { - provider.postMessage([service, method, callbackId, args]); + const result = provider[method].apply(provider, args); + resolve(result); } } else { - const result = provider[method].apply(provider, args); - resolve(result); + reject(new Error(`Service not found: ${service}`)); } - } else { - reject(new Error(`Service not found: ${service}`)); + } catch (e) { + reject(e); } }); } @@ -115,8 +125,8 @@ class CentralDispatch { addWorker (worker) { if (this.workers.indexOf(worker) === -1) { this.workers.push(worker); - worker.onmessage = this._onMessage.bind(this); - worker.postMessage('dispatch-handshake'); + worker.onmessage = this._onMessage.bind(this, worker); + worker.postMessage(['dispatch', '_handshake']); } else { log.warn('Ignoring attempt to add duplicate worker'); } @@ -124,11 +134,11 @@ class CentralDispatch { /** * Handle a message event received from a connected worker. + * @param {Worker} worker - the worker which sent the message. * @param {MessageEvent} event - the message event to be handled. * @private */ - _onMessage (event) { - const worker = event.target; + _onMessage (worker, event) { const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data; if (service === 'dispatch') { switch (method) { @@ -167,7 +177,4 @@ class CentralDispatch { } } -const dispatch = new CentralDispatch(); -module.exports = dispatch; -self.Scratch = self.Scratch || {}; -self.Scratch.dispatch = dispatch; +module.exports = new CentralDispatch(); diff --git a/src/dispatch/worker-dispatch.js b/src/dispatch/worker-dispatch.js index e8563faff..a0127d067 100644 --- a/src/dispatch/worker-dispatch.js +++ b/src/dispatch/worker-dispatch.js @@ -12,7 +12,7 @@ class WorkerDispatch { /** * List of callback registrations for promises waiting for a response from a call to a service on another * worker. A callback registration is an array of [resolve,reject] Promise functions. - * Calls to services on this worker don't enter this list. + * Calls to local services don't enter this list. * @type {Array.<[Function,Function]>} */ this.callbacks = []; @@ -25,8 +25,6 @@ class WorkerDispatch { */ this._connectionPromise = new Promise(resolve => { this._onConnect = resolve; - }).then(() => { - self.onmessage = this._onMessage.bind(this); }); /** @@ -44,7 +42,10 @@ class WorkerDispatch { */ this.services = {}; - self.onmessage = this._onHandshake.bind(this); + this._onMessage = this._onMessage.bind(this); + if (typeof self !== 'undefined') { + self.onmessage = this._onMessage; + } } /** @@ -94,18 +95,22 @@ class WorkerDispatch { */ transferCall (service, method, transfer, ...args) { return new Promise((resolve, reject) => { - if (this.services.hasOwnProperty(service)) { - const provider = this.services[service]; - const result = provider[method].apply(provider, args); - resolve(result); - } else { - const callbackId = this.nextCallback++; - this.callbacks[callbackId] = [resolve, reject]; - if (transfer) { - self.postMessage([service, method, callbackId, ...args], transfer); + try { + if (this.services.hasOwnProperty(service)) { + const provider = this.services[service]; + const result = provider[method].apply(provider, args); + resolve(result); } else { - self.postMessage([service, method, callbackId, ...args]); + const callbackId = this.nextCallback++; + this.callbacks[callbackId] = [resolve, reject]; + if (transfer) { + self.postMessage([service, method, callbackId, ...args], transfer); + } else { + self.postMessage([service, method, callbackId, ...args]); + } } + } catch (e) { + reject(e); } }); } @@ -126,20 +131,6 @@ class WorkerDispatch { }); } - /** - * Message handler active until the dispatcher handshake arrives. - * @param {MessageEvent} event - the message event to be handled. - * @private - */ - _onHandshake (event) { - const message = event.data; - if (message === 'dispatch-handshake') { - this._onConnect(); - } else { - log.error(`WorkerDispatch received unexpected message before handshake: ${JSON.stringify(message)}`); - } - } - /** * Message handler active after the dispatcher handshake. This only handles method calls. * @param {MessageEvent} event - the message event to be handled. @@ -149,6 +140,9 @@ class WorkerDispatch { const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data; if (service === 'dispatch') { switch (method) { + case '_handshake': + this._onConnect(); + break; case '_callback': this._callback(callbackId, ...args); break; @@ -184,7 +178,4 @@ class WorkerDispatch { } } -const dispatch = new WorkerDispatch(); -module.exports = dispatch; -self.Scratch = self.Scratch || {}; -self.Scratch.dispatch = dispatch; +module.exports = new WorkerDispatch(); diff --git a/test/fixtures/dispatch-test-service.js b/test/fixtures/dispatch-test-service.js new file mode 100644 index 000000000..1be99ea95 --- /dev/null +++ b/test/fixtures/dispatch-test-service.js @@ -0,0 +1,20 @@ +class DispatchTestService { + returnFortyTwo () { + return 42; + } + + doubleArgument (x) { + return 2 * x; + } + + throwException () { + throw new Error('This is a test exception thrown by LocalDispatchTest'); + } + + close () { + // eslint-disable-next-line no-undef + self.close(); + } +} + +module.exports = DispatchTestService; diff --git a/test/fixtures/dispatch-test-worker-shim.js b/test/fixtures/dispatch-test-worker-shim.js new file mode 100644 index 000000000..ae11a4950 --- /dev/null +++ b/test/fixtures/dispatch-test-worker-shim.js @@ -0,0 +1,19 @@ +const Module = require('module'); + +const callsite = require('callsite'); +const path = require('path'); + +const oldRequire = Module.prototype.require; +Module.prototype.require = function (target) { + if (target.indexOf('/') === -1) { + return oldRequire.apply(this, arguments); + } + + const stack = callsite(); + const callerFile = stack[2].getFileName(); + const callerDir = path.dirname(callerFile); + target = path.resolve(callerDir, target); + return oldRequire.call(this, target); +}; + +oldRequire(path.resolve(__dirname, 'dispatch-test-worker')); diff --git a/test/fixtures/dispatch-test-worker.js b/test/fixtures/dispatch-test-worker.js new file mode 100644 index 000000000..4ca16a545 --- /dev/null +++ b/test/fixtures/dispatch-test-worker.js @@ -0,0 +1,8 @@ +const dispatch = require('../../src/dispatch/worker-dispatch'); +const DispatchTestService = require('./dispatch-test-service'); + +dispatch.setService('RemoteDispatchTest', new DispatchTestService()); + +dispatch.waitForConnection.then(() => { + dispatch.call('test', 'onWorkerReady'); +}); diff --git a/test/unit/dispatch.js b/test/unit/dispatch.js new file mode 100644 index 000000000..8e94735e2 --- /dev/null +++ b/test/unit/dispatch.js @@ -0,0 +1,61 @@ +const DispatchTestService = require('../fixtures/dispatch-test-service'); +const Worker = require('tiny-worker'); + +const dispatch = require('../../src/dispatch/central-dispatch'); +const path = require('path'); +const test = require('tap').test; + + +// By default Central Dispatch works with the Worker class built into the browser. Tell it to use TinyWorker instead. +dispatch.workerClass = Worker; + +const runServiceTest = function (serviceName, t) { + const promises = []; + + promises.push(dispatch.call(serviceName, 'returnFortyTwo').then(x => { + t.equal(x, 42); + })); + + promises.push(dispatch.call(serviceName, 'doubleArgument', 9).then(x => { + t.equal(x, 18); + })); + + promises.push(dispatch.call(serviceName, 'doubleArgument', 123).then(x => { + t.equal(x, 246); + })); + + // I tried using `t.rejects` here but ran into https://github.com/tapjs/node-tap/issues/384 + promises.push(dispatch.call(serviceName, 'throwException') + .then(() => { + t.fail('exception was not propagated as expected'); + }, () => { + t.pass('exception was propagated as expected'); + })); + + return Promise.all(promises); +}; + +test('local', t => { + dispatch.setService('LocalDispatchTest', new DispatchTestService()); + + return runServiceTest('LocalDispatchTest', t); +}); + +test('remote', t => { + const fixturesDir = path.resolve(__dirname, '../fixtures'); + const worker = new Worker('./test/fixtures/dispatch-test-worker-shim.js', null, {cwd: fixturesDir}); + dispatch.addWorker(worker); + + const waitForWorker = new Promise(resolve => { + dispatch.setService('test', { + onWorkerReady: resolve + }); + }); + + return waitForWorker + .then(() => runServiceTest('RemoteDispatchTest', t)) + .then(() => { + // Allow some time for the worker to finish, then terminate it + setTimeout(() => dispatch.call('RemoteDispatchTest', 'close'), 10); + }); +}); From 84ac66d6e5b3770bc2135f1c013857d6844b1499 Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford Date: Sun, 16 Jul 2017 00:14:03 -0700 Subject: [PATCH 3/5] Fix exception message --- test/fixtures/dispatch-test-service.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures/dispatch-test-service.js b/test/fixtures/dispatch-test-service.js index 1be99ea95..959a87637 100644 --- a/test/fixtures/dispatch-test-service.js +++ b/test/fixtures/dispatch-test-service.js @@ -8,7 +8,7 @@ class DispatchTestService { } throwException () { - throw new Error('This is a test exception thrown by LocalDispatchTest'); + throw new Error('This is a test exception thrown by DispatchTest'); } close () { From 97d67d75f480e02bb92b42f82b0fd24cbd8ced48 Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford Date: Fri, 21 Jul 2017 13:13:45 -0700 Subject: [PATCH 4/5] Allow 'then' on service registration This allows a service to postpone communication with other services until it can be sure that it's registered with central dispatch. Service registration on the main thread always happens immediately, but that version of `setService` still returns a Promise for consistency. --- src/dispatch/central-dispatch.js | 22 +++++++++++++++------- src/dispatch/worker-dispatch.js | 7 +++---- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/dispatch/central-dispatch.js b/src/dispatch/central-dispatch.js index 9d1388724..98bb2805a 100644 --- a/src/dispatch/central-dispatch.js +++ b/src/dispatch/central-dispatch.js @@ -106,15 +106,19 @@ class CentralDispatch { /** * Set a local object as the global provider of the specified service. + * WARNING: Any method on the provider can be called from any worker within the dispatch system. * @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'. * @param {object} provider - a local object which provides this service. - * WARNING: Any method on the provider can be called from any worker within the dispatch system. + * @returns {Promise} - a promise which will resolve once the service is registered. */ setService (service, provider) { - if (this.services.hasOwnProperty(service)) { - log.warn(`Replacing existing service provider for ${service}`); - } - this.services[service] = provider; + return new Promise(resolve => { + if (this.services.hasOwnProperty(service)) { + log.warn(`Replacing existing service provider for ${service}`); + } + this.services[service] = provider; + resolve(); + }); } /** @@ -140,17 +144,21 @@ class CentralDispatch { */ _onMessage (worker, event) { const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data; + let promise; if (service === 'dispatch') { switch (method) { case '_callback': this._callback(callbackId, ...args); break; case 'setService': - this.setService(args[0], worker); + promise = this.setService(args[0], worker); break; } } else { - this.call(service, method, ...args).then( + promise = this.call(service, method, ...args); + } + if (promise) { + promise.then( result => { worker.postMessage(['dispatch', '_callback', callbackId, result]); }, diff --git a/src/dispatch/worker-dispatch.js b/src/dispatch/worker-dispatch.js index a0127d067..50328d84c 100644 --- a/src/dispatch/worker-dispatch.js +++ b/src/dispatch/worker-dispatch.js @@ -117,18 +117,17 @@ class WorkerDispatch { /** * Set a local object as the global provider of the specified service. + * WARNING: Any method on the provider can be called from any worker within the dispatch system. * @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'. * @param {object} provider - a local object which provides this service. - * WARNING: Any method on the provider can be called from any worker within the dispatch system. + * @returns {Promise} - a promise which will resolve once the service is registered. */ setService (service, provider) { if (this.services.hasOwnProperty(service)) { log.warn(`Replacing existing service provider for ${service}`); } this.services[service] = provider; - this.waitForConnection.then(() => { - this.call('dispatch', 'setService', service); - }); + return this.waitForConnection.then(() => this.call('dispatch', 'setService', service)); } /** From b4c0cfe94083e05954feeddbb3bfcf2dbe8cb42f Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford Date: Fri, 4 Aug 2017 23:52:31 -0700 Subject: [PATCH 5/5] Refactor common code in {central,worker}-dispatch Central dispatch and worker dispatch share most of their code now by inheriting from a new shared dispatch class. Also, the format of passed messages has been altered to make it easier to understand in a debugger. --- src/dispatch/central-dispatch.js | 163 +++++-------------- src/dispatch/shared-dispatch.js | 217 +++++++++++++++++++++++++ src/dispatch/worker-dispatch.js | 148 +++++------------ test/fixtures/dispatch-test-service.js | 5 - test/fixtures/dispatch-test-worker.js | 5 +- test/unit/dispatch.js | 48 +++--- 6 files changed, 328 insertions(+), 258 deletions(-) create mode 100644 src/dispatch/shared-dispatch.js diff --git a/src/dispatch/central-dispatch.js b/src/dispatch/central-dispatch.js index 98bb2805a..6dd236255 100644 --- a/src/dispatch/central-dispatch.js +++ b/src/dispatch/central-dispatch.js @@ -1,3 +1,5 @@ +const SharedDispatch = require('./shared-dispatch'); + const log = require('../util/log'); /** @@ -7,21 +9,9 @@ const log = require('../util/log'); * context. The dispatch system will forward function arguments and return values across worker boundaries as needed. * @see {WorkerDispatch} */ -class CentralDispatch { +class CentralDispatch extends SharedDispatch { constructor () { - /** - * List of callback registrations for promises waiting for a response from a call to a service on another - * worker. A callback registration is an array of [resolve,reject] Promise functions. - * Calls to local services don't enter this list. - * @type {Array.<[Function,Function]>} - */ - this.callbacks = []; - - /** - * The next callback ID to be used. - * @type {int} - */ - this.nextCallback = 0; + super(); /** * Map of channel name to worker or local service provider. @@ -45,65 +35,6 @@ class CentralDispatch { this.workers = []; } - /** - * Call a particular method on a particular service, regardless of whether that service is provided locally or on - * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone - * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be - * transferred to the worker, and they should not be used after this call. - * @example - * dispatcher.call('vm', 'setData', 'cat', 42); - * // this finds the worker for the 'vm' service, then on that worker calls: - * vm.setData('cat', 42); - * @param {string} service - the name of the service. - * @param {string} method - the name of the method. - * @param {*} [args] - the arguments to be copied to the method, if any. - * @returns {Promise} - a promise for the return value of the service method. - */ - call (service, method, ...args) { - return this.transferCall(service, method, null, ...args); - } - - /** - * Call a particular method on a particular service, regardless of whether that service is provided locally or on - * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone - * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be - * transferred to the worker, and they should not be used after this call. - * @example - * dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer); - * // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls: - * vm.setData('cat', myArrayBuffer); - * @param {string} service - the name of the service. - * @param {string} method - the name of the method. - * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. - * @param {*} [args] - the arguments to be copied to the method, if any. - * @returns {Promise} - a promise for the return value of the service method. - */ - transferCall (service, method, transfer, ...args) { - return new Promise((resolve, reject) => { - try { - if (this.services.hasOwnProperty(service)) { - const provider = this.services[service]; - if (provider instanceof this.workerClass) { - const callbackId = this.nextCallback++; - this.callbacks[callbackId] = [resolve, reject]; - if (transfer) { - provider.postMessage([service, method, callbackId, args], transfer); - } else { - provider.postMessage([service, method, callbackId, args]); - } - } else { - const result = provider[method].apply(provider, args); - resolve(result); - } - } else { - reject(new Error(`Service not found: ${service}`)); - } - } catch (e) { - reject(e); - } - }); - } - /** * Set a local object as the global provider of the specified service. * WARNING: Any method on the provider can be called from any worker within the dispatch system. @@ -112,13 +43,16 @@ class CentralDispatch { * @returns {Promise} - a promise which will resolve once the service is registered. */ setService (service, provider) { - return new Promise(resolve => { + /** Return a promise for consistency with {@link WorkerDispatch#setService} */ + try { if (this.services.hasOwnProperty(service)) { - log.warn(`Replacing existing service provider for ${service}`); + log.warn(`Central dispatch replacing existing service provider for ${service}`); } this.services[service] = provider; - resolve(); - }); + return Promise.resolve(); + } catch (e) { + return Promise.reject(e); + } } /** @@ -130,58 +64,47 @@ class CentralDispatch { if (this.workers.indexOf(worker) === -1) { this.workers.push(worker); worker.onmessage = this._onMessage.bind(this, worker); - worker.postMessage(['dispatch', '_handshake']); + this._remoteCall(worker, 'dispatch', 'handshake').catch(e => { + log.error(`Could not handshake with worker: ${JSON.stringify(e)}`); + }); } else { - log.warn('Ignoring attempt to add duplicate worker'); + log.warn('Central dispatch ignoring attempt to add duplicate worker'); } } /** - * Handle a message event received from a connected worker. + * Fetch the service provider object for a particular service name. + * @override + * @param {string} service - the name of the service to look up + * @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found + * @protected + */ + _getServiceProvider (service) { + const provider = this.services[service]; + return provider && { + provider, + isRemote: provider instanceof this.workerClass + }; + } + + /** + * Handle a call message sent to the dispatch service itself + * @override * @param {Worker} worker - the worker which sent the message. - * @param {MessageEvent} event - the message event to be handled. - * @private + * @param {DispatchCallMessage} message - the message to be handled. + * @returns {Promise|undefined} - a promise for the results of this operation, if appropriate + * @protected */ - _onMessage (worker, event) { - const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data; + _onDispatchMessage (worker, message) { let promise; - if (service === 'dispatch') { - switch (method) { - case '_callback': - this._callback(callbackId, ...args); - break; - case 'setService': - promise = this.setService(args[0], worker); - break; - } - } else { - promise = this.call(service, method, ...args); - } - if (promise) { - promise.then( - result => { - worker.postMessage(['dispatch', '_callback', callbackId, result]); - }, - error => { - worker.postMessage(['dispatch', '_callback', callbackId, null, error]); - }); - } - } - - /** - * Handle a callback from a worker. This should only be called as the result of a message from a worker. - * @param {int} callbackId - the ID of the callback to call. - * @param {*} result - if `error` is not truthy, resolve the callback promise with this value. - * @param {*} [error] - if this is truthy, reject the callback promise with this value. - * @private - */ - _callback (callbackId, result, error) { - const [resolve, reject] = this.callbacks[callbackId]; - if (error) { - reject(error); - } else { - resolve(result); + switch (message.method) { + case 'setService': + promise = this.setService(message.args[0], worker); + break; + default: + log.error(`Central dispatch received message for unknown method: ${message.method}`); } + return promise; } } diff --git a/src/dispatch/shared-dispatch.js b/src/dispatch/shared-dispatch.js new file mode 100644 index 000000000..d582dc170 --- /dev/null +++ b/src/dispatch/shared-dispatch.js @@ -0,0 +1,217 @@ +const log = require('../util/log'); + +/** + * @typedef {object} DispatchCallMessage - a message to the dispatch system representing a service method call + * @property {*} responseId - send a response message with this response ID. See {@link DispatchResponseMessage} + * @property {string} service - the name of the service to be called + * @property {string} method - the name of the method to be called + * @property {Array|undefined} args - the arguments to be passed to the method + */ + +/** + * @typedef {object} DispatchResponseMessage - a message to the dispatch system representing the results of a call + * @property {*} responseId - a copy of the response ID from the call which generated this response + * @property {*|undefined} error - if this is truthy, then it contains results from a failed call (such as an exception) + * @property {*|undefined} result - if error is not truthy, then this contains the return value of the call (if any) + */ + +/** + * @typedef {DispatchCallMessage|DispatchResponseMessage} DispatchMessage + * Any message to the dispatch system. + */ + +/** + * The SharedDispatch class is responsible for dispatch features shared by + * {@link CentralDispatch} and {@link WorkerDispatch}. + */ +class SharedDispatch { + constructor () { + /** + * List of callback registrations for promises waiting for a response from a call to a service on another + * worker. A callback registration is an array of [resolve,reject] Promise functions. + * Calls to local services don't enter this list. + * @type {Array.<[Function,Function]>} + */ + this.callbacks = []; + + /** + * The next response ID to be used. + * @type {int} + */ + this.nextResponseId = 0; + } + + /** + * Call a particular method on a particular service, regardless of whether that service is provided locally or on + * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone + * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be + * transferred to the worker, and they should not be used after this call. + * @example + * dispatcher.call('vm', 'setData', 'cat', 42); + * // this finds the worker for the 'vm' service, then on that worker calls: + * vm.setData('cat', 42); + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + call (service, method, ...args) { + return this.transferCall(service, method, null, ...args); + } + + /** + * Call a particular method on a particular service, regardless of whether that service is provided locally or on + * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone + * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be + * transferred to the worker, and they should not be used after this call. + * @example + * dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer); + * // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls: + * vm.setData('cat', myArrayBuffer); + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + transferCall (service, method, transfer, ...args) { + try { + const {provider, isRemote} = this._getServiceProvider(service); + if (provider) { + if (isRemote) { + return this._remoteTransferCall(provider, service, method, transfer, ...args); + } + + const result = provider[method].apply(provider, args); + return Promise.resolve(result); + } + return Promise.reject(new Error(`Service not found: ${service}`)); + } catch (e) { + return Promise.reject(e); + } + } + + /** + * Like {@link call}, but force the call to be posted through a particular communication channel. + * @param {object} provider - send the call through this object's `postMessage` function. + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + _remoteCall (provider, service, method, ...args) { + return this._remoteTransferCall(provider, service, method, null, ...args); + } + + /** + * Like {@link transferCall}, but force the call to be posted through a particular communication channel. + * @param {object} provider - send the call through this object's `postMessage` function. + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + _remoteTransferCall (provider, service, method, transfer, ...args) { + return new Promise((resolve, reject) => { + const responseId = this._storeCallbacks(resolve, reject); + if (transfer) { + provider.postMessage({service, method, responseId, args}, transfer); + } else { + provider.postMessage({service, method, responseId, args}); + } + }); + } + + /** + * Store callback functions pending a response message. + * @param {Function} resolve - function to call if the service method returns. + * @param {Function} reject - function to call if the service method throws. + * @returns {*} - a unique response ID for this set of callbacks. See {@link _deliverResponse}. + * @protected + */ + _storeCallbacks (resolve, reject) { + const responseId = this.nextResponseId++; + this.callbacks[responseId] = [resolve, reject]; + return responseId; + } + + /** + * Deliver call response from a worker. This should only be called as the result of a message from a worker. + * @param {int} responseId - the response ID of the callback set to call. + * @param {DispatchResponseMessage} message - the message containing the response value(s). + * @protected + */ + _deliverResponse (responseId, message) { + try { + const [resolve, reject] = this.callbacks[responseId]; + delete this.callbacks[responseId]; + if (message.error) { + reject(message.error); + } else { + resolve(message.result); + } + } catch (e) { + log.error(`Dispatch callback failed: ${JSON.stringify(e)}`); + } + } + + /** + * Handle a message event received from a connected worker. + * @param {Worker} worker - the worker which sent the message, or the global object if running in a worker. + * @param {MessageEvent} event - the message event to be handled. + * @protected + */ + _onMessage (worker, event) { + /** @type {DispatchMessage} */ + const message = event.data; + message.args = message.args || []; + let promise; + if (message.service) { + if (message.service === 'dispatch') { + promise = this._onDispatchMessage(worker, message); + } else { + promise = this.call(message.service, message.method, ...message.args); + } + } else if (typeof message.responseId === 'undefined') { + log.error(`Dispatch caught malformed message from a worker: ${JSON.stringify(event)}`); + } else { + this._deliverResponse(message.responseId, message); + } + if (promise) { + if (typeof message.responseId === 'undefined') { + log.error(`Dispatch message missing required response ID: ${JSON.stringify(event)}`); + } else { + promise.then( + result => worker.postMessage({responseId: message.responseId, result}), + error => worker.postMessage({responseId: message.responseId, error}) + ); + } + } + } + + /** + * Fetch the service provider object for a particular service name. + * @abstract + * @param {string} service - the name of the service to look up + * @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found + * @protected + */ + _getServiceProvider (service) { + throw new Error(`Could not get provider for ${service}: _getServiceProvider not implemented`); + } + + /** + * Handle a call message sent to the dispatch service itself + * @abstract + * @param {Worker} worker - the worker which sent the message. + * @param {DispatchCallMessage} message - the message to be handled. + * @returns {Promise|undefined} - a promise for the results of this operation, if appropriate + * @private + */ + _onDispatchMessage (worker, message) { + throw new Error(`Unimplemented dispatch message handler cannot handle ${message.method} method`); + } +} + +module.exports = SharedDispatch; diff --git a/src/dispatch/worker-dispatch.js b/src/dispatch/worker-dispatch.js index 50328d84c..10542af46 100644 --- a/src/dispatch/worker-dispatch.js +++ b/src/dispatch/worker-dispatch.js @@ -1,3 +1,5 @@ +const SharedDispatch = require('./shared-dispatch'); + const log = require('../util/log'); /** @@ -7,15 +9,9 @@ const log = require('../util/log'); * worker boundaries as needed. * @see {CentralDispatch} */ -class WorkerDispatch { +class WorkerDispatch extends SharedDispatch { constructor () { - /** - * List of callback registrations for promises waiting for a response from a call to a service on another - * worker. A callback registration is an array of [resolve,reject] Promise functions. - * Calls to local services don't enter this list. - * @type {Array.<[Function,Function]>} - */ - this.callbacks = []; + super(); /** * This promise will be resolved when we have successfully connected to central dispatch. @@ -27,12 +23,6 @@ class WorkerDispatch { this._onConnect = resolve; }); - /** - * The next callback ID to be used. - * @type {int} - */ - this.nextCallback = 0; - /** * Map of service name to local service provider. * If a service is not listed here, it is assumed to be provided by another context (another Worker or the main @@ -42,7 +32,7 @@ class WorkerDispatch { */ this.services = {}; - this._onMessage = this._onMessage.bind(this); + this._onMessage = this._onMessage.bind(this, self); if (typeof self !== 'undefined') { self.onmessage = this._onMessage; } @@ -60,61 +50,6 @@ class WorkerDispatch { return this._connectionPromise; } - /** - * Call a particular method on a particular service, regardless of whether that service is provided locally or on - * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone - * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be - * transferred to the worker, and they should not be used after this call. - * @example - * dispatcher.call('vm', 'setData', 'cat', 42); - * // this finds the worker for the 'vm' service, then on that worker calls: - * vm.setData('cat', 42); - * @param {string} service - the name of the service. - * @param {string} method - the name of the method. - * @param {*} [args] - the arguments to be copied to the method, if any. - * @returns {Promise} - a promise for the return value of the service method. - */ - call (service, method, ...args) { - return this.transferCall(service, method, null, ...args); - } - - /** - * Call a particular method on a particular service, regardless of whether that service is provided locally or on - * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone - * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be - * transferred to the worker, and they should not be used after this call. - * @example - * dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer); - * // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls: - * vm.setData('cat', myArrayBuffer); - * @param {string} service - the name of the service. - * @param {string} method - the name of the method. - * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. - * @param {*} [args] - the arguments to be copied to the method, if any. - * @returns {Promise} - a promise for the return value of the service method. - */ - transferCall (service, method, transfer, ...args) { - return new Promise((resolve, reject) => { - try { - if (this.services.hasOwnProperty(service)) { - const provider = this.services[service]; - const result = provider[method].apply(provider, args); - resolve(result); - } else { - const callbackId = this.nextCallback++; - this.callbacks[callbackId] = [resolve, reject]; - if (transfer) { - self.postMessage([service, method, callbackId, ...args], transfer); - } else { - self.postMessage([service, method, callbackId, ...args]); - } - } - } catch (e) { - reject(e); - } - }); - } - /** * Set a local object as the global provider of the specified service. * WARNING: Any method on the provider can be called from any worker within the dispatch system. @@ -124,56 +59,51 @@ class WorkerDispatch { */ setService (service, provider) { if (this.services.hasOwnProperty(service)) { - log.warn(`Replacing existing service provider for ${service}`); + log.warn(`Worker dispatch replacing existing service provider for ${service}`); } this.services[service] = provider; - return this.waitForConnection.then(() => this.call('dispatch', 'setService', service)); + return this.waitForConnection.then(() => this._remoteCall(self, 'dispatch', 'setService', service)); } /** - * Message handler active after the dispatcher handshake. This only handles method calls. - * @param {MessageEvent} event - the message event to be handled. - * @private + * Fetch the service provider object for a particular service name. + * @override + * @param {string} service - the name of the service to look up + * @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found + * @protected */ - _onMessage (event) { - const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data; - if (service === 'dispatch') { - switch (method) { - case '_handshake': - this._onConnect(); - break; - case '_callback': - this._callback(callbackId, ...args); - break; - case '_terminate': - self.close(); - break; - } - } else { - this.call(service, method, ...args).then( - result => { - self.postMessage(['dispatch', '_callback', callbackId, result]); - }, - error => { - self.postMessage(['dispatch', '_callback', callbackId, null, error]); - }); - } + _getServiceProvider (service) { + // if we don't have a local service by this name, contact central dispatch by calling `postMessage` on self + const provider = this.services[service]; + return { + provider: provider || self, + isRemote: !provider + }; } /** - * Handle a callback from a worker. This should only be called as the result of a message from a worker. - * @param {int} callbackId - the ID of the callback to call. - * @param {*} result - if `error` is not truthy, resolve the callback promise with this value. - * @param {*} [error] - if this is truthy, reject the callback promise with this value. - * @private + * Handle a call message sent to the dispatch service itself + * @override + * @param {Worker} worker - the worker which sent the message. + * @param {DispatchCallMessage} message - the message to be handled. + * @returns {Promise|undefined} - a promise for the results of this operation, if appropriate + * @protected */ - _callback (callbackId, result, error) { - const [resolve, reject] = this.callbacks[callbackId]; - if (error) { - reject(error); - } else { - resolve(result); + _onDispatchMessage (worker, message) { + let promise; + switch (message.method) { + case 'handshake': + promise = this._onConnect(); + break; + case 'terminate': + // Don't close until next tick, after sending confirmation back + setTimeout(() => self.close(), 0); + promise = Promise.resolve(); + break; + default: + log.error(`Worker dispatch received message for unknown method: ${message.method}`); } + return promise; } } diff --git a/test/fixtures/dispatch-test-service.js b/test/fixtures/dispatch-test-service.js index 959a87637..3e01e68f5 100644 --- a/test/fixtures/dispatch-test-service.js +++ b/test/fixtures/dispatch-test-service.js @@ -10,11 +10,6 @@ class DispatchTestService { throwException () { throw new Error('This is a test exception thrown by DispatchTest'); } - - close () { - // eslint-disable-next-line no-undef - self.close(); - } } module.exports = DispatchTestService; diff --git a/test/fixtures/dispatch-test-worker.js b/test/fixtures/dispatch-test-worker.js index 4ca16a545..2c57fbb3e 100644 --- a/test/fixtures/dispatch-test-worker.js +++ b/test/fixtures/dispatch-test-worker.js @@ -1,8 +1,11 @@ const dispatch = require('../../src/dispatch/worker-dispatch'); const DispatchTestService = require('./dispatch-test-service'); +const log = require('../../src/util/log'); dispatch.setService('RemoteDispatchTest', new DispatchTestService()); dispatch.waitForConnection.then(() => { - dispatch.call('test', 'onWorkerReady'); + dispatch.call('test', 'onWorkerReady').catch(e => { + log(`Test worker failed to call onWorkerReady: ${JSON.stringify(e)}`); + }); }); diff --git a/test/unit/dispatch.js b/test/unit/dispatch.js index 8e94735e2..fdcbdb3ea 100644 --- a/test/unit/dispatch.js +++ b/test/unit/dispatch.js @@ -12,31 +12,37 @@ dispatch.workerClass = Worker; const runServiceTest = function (serviceName, t) { const promises = []; - promises.push(dispatch.call(serviceName, 'returnFortyTwo').then(x => { - t.equal(x, 42); - })); + promises.push(dispatch.call(serviceName, 'returnFortyTwo') + .then( + x => t.equal(x, 42), + e => t.fail(e) + )); - promises.push(dispatch.call(serviceName, 'doubleArgument', 9).then(x => { - t.equal(x, 18); - })); + promises.push(dispatch.call(serviceName, 'doubleArgument', 9) + .then( + x => t.equal(x, 18), + e => t.fail(e) + )); - promises.push(dispatch.call(serviceName, 'doubleArgument', 123).then(x => { - t.equal(x, 246); - })); + promises.push(dispatch.call(serviceName, 'doubleArgument', 123) + .then( + x => t.equal(x, 246), + e => t.fail(e) + )); // I tried using `t.rejects` here but ran into https://github.com/tapjs/node-tap/issues/384 promises.push(dispatch.call(serviceName, 'throwException') - .then(() => { - t.fail('exception was not propagated as expected'); - }, () => { - t.pass('exception was propagated as expected'); - })); + .then( + () => t.fail('exception was not propagated as expected'), + () => t.pass('exception was propagated as expected') + )); return Promise.all(promises); }; test('local', t => { - dispatch.setService('LocalDispatchTest', new DispatchTestService()); + dispatch.setService('LocalDispatchTest', new DispatchTestService()) + .catch(e => t.fail(e)); return runServiceTest('LocalDispatchTest', t); }); @@ -47,15 +53,11 @@ test('remote', t => { dispatch.addWorker(worker); const waitForWorker = new Promise(resolve => { - dispatch.setService('test', { - onWorkerReady: resolve - }); + dispatch.setService('test', {onWorkerReady: resolve}) + .catch(e => t.fail(e)); }); return waitForWorker - .then(() => runServiceTest('RemoteDispatchTest', t)) - .then(() => { - // Allow some time for the worker to finish, then terminate it - setTimeout(() => dispatch.call('RemoteDispatchTest', 'close'), 10); - }); + .then(() => runServiceTest('RemoteDispatchTest', t), e => t.fail(e)) + .then(() => dispatch._remoteCall(worker, 'dispatch', 'terminate'), e => t.fail(e)); });