From 09d3fe330f9eecf193c26d11e166c5767e823a30 Mon Sep 17 00:00:00 2001 From: Christopher Willis-Ford Date: Wed, 12 Jul 2017 18:48:36 -0400 Subject: [PATCH] Implement message dispatch system This message dispatch system allows a "service" to register itself under a particular name, and then anyone can call any method on that service from any context (the main "window" thread or any worker). Return values are passed back through promise resolution. --- src/dispatch/central-dispatch.js | 173 ++++++++++++++++++++++++++++ src/dispatch/worker-dispatch.js | 190 +++++++++++++++++++++++++++++++ 2 files changed, 363 insertions(+) create mode 100644 src/dispatch/central-dispatch.js create mode 100644 src/dispatch/worker-dispatch.js diff --git a/src/dispatch/central-dispatch.js b/src/dispatch/central-dispatch.js new file mode 100644 index 000000000..2eba7ecc8 --- /dev/null +++ b/src/dispatch/central-dispatch.js @@ -0,0 +1,173 @@ +const log = require('../util/log'); + +/** + * This class serves as the central broker for message dispatch. It expects to operate on the main thread / Window and + * it must be informed of any Worker threads which will participate in the messaging system. From any context in the + * messaging system, the dispatcher's "call" method can call any method on any "service" provided in any participating + * context. The dispatch system will forward function arguments and return values across worker boundaries as needed. + * @see {WorkerDispatch} + */ +class CentralDispatch { + constructor () { + /** + * List of callback registrations for promises waiting for a response from a call to a service on another + * worker. A callback registration is an array of [resolve,reject] Promise functions. + * Calls to services on this worker don't enter this list. + * @type {Array.<[Function,Function]>} + */ + this.callbacks = []; + + /** + * The next callback ID to be used. + * @type {int} + */ + this.nextCallback = 0; + + /** + * Map of channel name to worker or local service provider. + * If the entry is a Worker, the service is provided by an object on that worker. + * Otherwise, the service is provided locally and methods on the service will be called directly. + * @see {setService} + * @type {object.} + */ + this.services = {}; + + /** + * List of workers attached to this dispatcher. + * @type {Array} + */ + this.workers = []; + } + + /** + * Call a particular method on a particular service, regardless of whether that service is provided locally or on + * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone + * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be + * transferred to the worker, and they should not be used after this call. + * @example + * dispatcher.call('vm', 'setData', 'cat', 42); + * // this finds the worker for the 'vm' service, then on that worker calls: + * vm.setData('cat', 42); + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + call (service, method, ...args) { + return this.transferCall(service, method, null, ...args); + } + + /** + * Call a particular method on a particular service, regardless of whether that service is provided locally or on + * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone + * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be + * transferred to the worker, and they should not be used after this call. + * @example + * dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer); + * // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls: + * vm.setData('cat', myArrayBuffer); + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + transferCall (service, method, transfer, ...args) { + return new Promise((resolve, reject) => { + if (this.services.hasOwnProperty(service)) { + const provider = this.services[service]; + if (provider instanceof Worker) { + const callbackId = this.nextCallback++; + this.callbacks[callbackId] = [resolve, reject]; + if (transfer) { + provider.postMessage([service, method, callbackId, args], transfer); + } else { + provider.postMessage([service, method, callbackId, args]); + } + } else { + const result = provider[method].apply(provider, args); + resolve(result); + } + } else { + reject(new Error(`Service not found: ${service}`)); + } + }); + } + + /** + * Set a local object as the global provider of the specified service. + * @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'. + * @param {object} provider - a local object which provides this service. + * WARNING: Any method on the provider can be called from any worker within the dispatch system. + */ + setService (service, provider) { + if (this.services.hasOwnProperty(service)) { + log.warn(`Replacing existing service provider for ${service}`); + } + this.services[service] = provider; + } + + /** + * Add a worker to the message dispatch system. The worker must implement a compatible message dispatch framework. + * The dispatcher will immediately attempt to "handshake" with the worker. + * @param {Worker} worker - the worker to add into the dispatch system. + */ + addWorker (worker) { + if (this.workers.indexOf(worker) === -1) { + this.workers.push(worker); + worker.onmessage = this._onMessage.bind(this); + worker.postMessage('dispatch-handshake'); + } else { + log.warn('Ignoring attempt to add duplicate worker'); + } + } + + /** + * Handle a message event received from a connected worker. + * @param {MessageEvent} event - the message event to be handled. + * @private + */ + _onMessage (event) { + const worker = event.target; + const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data; + if (service === 'dispatch') { + switch (method) { + case '_callback': + this._callback(callbackId, ...args); + break; + case 'setService': + this.setService(args[0], worker); + break; + } + } else { + this.call(service, method, ...args).then( + result => { + worker.postMessage(['dispatch', '_callback', callbackId, result]); + }, + error => { + worker.postMessage(['dispatch', '_callback', callbackId, null, error]); + }); + } + } + + /** + * Handle a callback from a worker. This should only be called as the result of a message from a worker. + * @param {int} callbackId - the ID of the callback to call. + * @param {*} result - if `error` is not truthy, resolve the callback promise with this value. + * @param {*} [error] - if this is truthy, reject the callback promise with this value. + * @private + */ + _callback (callbackId, result, error) { + const [resolve, reject] = this.callbacks[callbackId]; + if (error) { + reject(error); + } else { + resolve(result); + } + } +} + +const dispatch = new CentralDispatch(); +module.exports = dispatch; +self.Scratch = self.Scratch || {}; +self.Scratch.dispatch = dispatch; diff --git a/src/dispatch/worker-dispatch.js b/src/dispatch/worker-dispatch.js new file mode 100644 index 000000000..e8563faff --- /dev/null +++ b/src/dispatch/worker-dispatch.js @@ -0,0 +1,190 @@ +const log = require('../util/log'); + +/** + * This class provides a Worker with the means to participate in the message dispatch system managed by CentralDispatch. + * From any context in the messaging system, the dispatcher's "call" method can call any method on any "service" + * provided in any participating context. The dispatch system will forward function arguments and return values across + * worker boundaries as needed. + * @see {CentralDispatch} + */ +class WorkerDispatch { + constructor () { + /** + * List of callback registrations for promises waiting for a response from a call to a service on another + * worker. A callback registration is an array of [resolve,reject] Promise functions. + * Calls to services on this worker don't enter this list. + * @type {Array.<[Function,Function]>} + */ + this.callbacks = []; + + /** + * This promise will be resolved when we have successfully connected to central dispatch. + * @type {Promise} + * @see {waitForConnection} + * @private + */ + this._connectionPromise = new Promise(resolve => { + this._onConnect = resolve; + }).then(() => { + self.onmessage = this._onMessage.bind(this); + }); + + /** + * The next callback ID to be used. + * @type {int} + */ + this.nextCallback = 0; + + /** + * Map of service name to local service provider. + * If a service is not listed here, it is assumed to be provided by another context (another Worker or the main + * thread). + * @see {setService} + * @type {object} + */ + this.services = {}; + + self.onmessage = this._onHandshake.bind(this); + } + + /** + * @returns {Promise} a promise which will resolve upon connection to central dispatch. If you need to make a call + * immediately on "startup" you can attach a 'then' to this promise. + * @example + * dispatch.waitForConnection.then(() => { + * dispatch.call('myService', 'hello'); + * }) + */ + get waitForConnection () { + return this._connectionPromise; + } + + /** + * Call a particular method on a particular service, regardless of whether that service is provided locally or on + * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone + * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be + * transferred to the worker, and they should not be used after this call. + * @example + * dispatcher.call('vm', 'setData', 'cat', 42); + * // this finds the worker for the 'vm' service, then on that worker calls: + * vm.setData('cat', 42); + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + call (service, method, ...args) { + return this.transferCall(service, method, null, ...args); + } + + /** + * Call a particular method on a particular service, regardless of whether that service is provided locally or on + * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone + * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be + * transferred to the worker, and they should not be used after this call. + * @example + * dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer); + * // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls: + * vm.setData('cat', myArrayBuffer); + * @param {string} service - the name of the service. + * @param {string} method - the name of the method. + * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful. + * @param {*} [args] - the arguments to be copied to the method, if any. + * @returns {Promise} - a promise for the return value of the service method. + */ + transferCall (service, method, transfer, ...args) { + return new Promise((resolve, reject) => { + if (this.services.hasOwnProperty(service)) { + const provider = this.services[service]; + const result = provider[method].apply(provider, args); + resolve(result); + } else { + const callbackId = this.nextCallback++; + this.callbacks[callbackId] = [resolve, reject]; + if (transfer) { + self.postMessage([service, method, callbackId, ...args], transfer); + } else { + self.postMessage([service, method, callbackId, ...args]); + } + } + }); + } + + /** + * Set a local object as the global provider of the specified service. + * @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'. + * @param {object} provider - a local object which provides this service. + * WARNING: Any method on the provider can be called from any worker within the dispatch system. + */ + setService (service, provider) { + if (this.services.hasOwnProperty(service)) { + log.warn(`Replacing existing service provider for ${service}`); + } + this.services[service] = provider; + this.waitForConnection.then(() => { + this.call('dispatch', 'setService', service); + }); + } + + /** + * Message handler active until the dispatcher handshake arrives. + * @param {MessageEvent} event - the message event to be handled. + * @private + */ + _onHandshake (event) { + const message = event.data; + if (message === 'dispatch-handshake') { + this._onConnect(); + } else { + log.error(`WorkerDispatch received unexpected message before handshake: ${JSON.stringify(message)}`); + } + } + + /** + * Message handler active after the dispatcher handshake. This only handles method calls. + * @param {MessageEvent} event - the message event to be handled. + * @private + */ + _onMessage (event) { + const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data; + if (service === 'dispatch') { + switch (method) { + case '_callback': + this._callback(callbackId, ...args); + break; + case '_terminate': + self.close(); + break; + } + } else { + this.call(service, method, ...args).then( + result => { + self.postMessage(['dispatch', '_callback', callbackId, result]); + }, + error => { + self.postMessage(['dispatch', '_callback', callbackId, null, error]); + }); + } + } + + /** + * Handle a callback from a worker. This should only be called as the result of a message from a worker. + * @param {int} callbackId - the ID of the callback to call. + * @param {*} result - if `error` is not truthy, resolve the callback promise with this value. + * @param {*} [error] - if this is truthy, reject the callback promise with this value. + * @private + */ + _callback (callbackId, result, error) { + const [resolve, reject] = this.callbacks[callbackId]; + if (error) { + reject(error); + } else { + resolve(result); + } + } +} + +const dispatch = new WorkerDispatch(); +module.exports = dispatch; +self.Scratch = self.Scratch || {}; +self.Scratch.dispatch = dispatch;