mirror of
https://github.com/scratchfoundation/scratch-vm.git
synced 2024-12-23 14:32:59 -05:00
Implement message dispatch system
This message dispatch system allows a "service" to register itself under a particular name, and then anyone can call any method on that service from any context (the main "window" thread or any worker). Return values are passed back through promise resolution.
This commit is contained in:
parent
e249c5c252
commit
09d3fe330f
2 changed files with 363 additions and 0 deletions
173
src/dispatch/central-dispatch.js
Normal file
173
src/dispatch/central-dispatch.js
Normal file
|
@ -0,0 +1,173 @@
|
|||
const log = require('../util/log');
|
||||
|
||||
/**
|
||||
* This class serves as the central broker for message dispatch. It expects to operate on the main thread / Window and
|
||||
* it must be informed of any Worker threads which will participate in the messaging system. From any context in the
|
||||
* messaging system, the dispatcher's "call" method can call any method on any "service" provided in any participating
|
||||
* context. The dispatch system will forward function arguments and return values across worker boundaries as needed.
|
||||
* @see {WorkerDispatch}
|
||||
*/
|
||||
class CentralDispatch {
|
||||
constructor () {
|
||||
/**
|
||||
* List of callback registrations for promises waiting for a response from a call to a service on another
|
||||
* worker. A callback registration is an array of [resolve,reject] Promise functions.
|
||||
* Calls to services on this worker don't enter this list.
|
||||
* @type {Array.<[Function,Function]>}
|
||||
*/
|
||||
this.callbacks = [];
|
||||
|
||||
/**
|
||||
* The next callback ID to be used.
|
||||
* @type {int}
|
||||
*/
|
||||
this.nextCallback = 0;
|
||||
|
||||
/**
|
||||
* Map of channel name to worker or local service provider.
|
||||
* If the entry is a Worker, the service is provided by an object on that worker.
|
||||
* Otherwise, the service is provided locally and methods on the service will be called directly.
|
||||
* @see {setService}
|
||||
* @type {object.<Worker|object>}
|
||||
*/
|
||||
this.services = {};
|
||||
|
||||
/**
|
||||
* List of workers attached to this dispatcher.
|
||||
* @type {Array}
|
||||
*/
|
||||
this.workers = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
||||
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
||||
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
||||
* transferred to the worker, and they should not be used after this call.
|
||||
* @example
|
||||
* dispatcher.call('vm', 'setData', 'cat', 42);
|
||||
* // this finds the worker for the 'vm' service, then on that worker calls:
|
||||
* vm.setData('cat', 42);
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
call (service, method, ...args) {
|
||||
return this.transferCall(service, method, null, ...args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
||||
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
||||
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
||||
* transferred to the worker, and they should not be used after this call.
|
||||
* @example
|
||||
* dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
|
||||
* // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
|
||||
* vm.setData('cat', myArrayBuffer);
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
transferCall (service, method, transfer, ...args) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (this.services.hasOwnProperty(service)) {
|
||||
const provider = this.services[service];
|
||||
if (provider instanceof Worker) {
|
||||
const callbackId = this.nextCallback++;
|
||||
this.callbacks[callbackId] = [resolve, reject];
|
||||
if (transfer) {
|
||||
provider.postMessage([service, method, callbackId, args], transfer);
|
||||
} else {
|
||||
provider.postMessage([service, method, callbackId, args]);
|
||||
}
|
||||
} else {
|
||||
const result = provider[method].apply(provider, args);
|
||||
resolve(result);
|
||||
}
|
||||
} else {
|
||||
reject(new Error(`Service not found: ${service}`));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a local object as the global provider of the specified service.
|
||||
* @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'.
|
||||
* @param {object} provider - a local object which provides this service.
|
||||
* WARNING: Any method on the provider can be called from any worker within the dispatch system.
|
||||
*/
|
||||
setService (service, provider) {
|
||||
if (this.services.hasOwnProperty(service)) {
|
||||
log.warn(`Replacing existing service provider for ${service}`);
|
||||
}
|
||||
this.services[service] = provider;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a worker to the message dispatch system. The worker must implement a compatible message dispatch framework.
|
||||
* The dispatcher will immediately attempt to "handshake" with the worker.
|
||||
* @param {Worker} worker - the worker to add into the dispatch system.
|
||||
*/
|
||||
addWorker (worker) {
|
||||
if (this.workers.indexOf(worker) === -1) {
|
||||
this.workers.push(worker);
|
||||
worker.onmessage = this._onMessage.bind(this);
|
||||
worker.postMessage('dispatch-handshake');
|
||||
} else {
|
||||
log.warn('Ignoring attempt to add duplicate worker');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a message event received from a connected worker.
|
||||
* @param {MessageEvent} event - the message event to be handled.
|
||||
* @private
|
||||
*/
|
||||
_onMessage (event) {
|
||||
const worker = event.target;
|
||||
const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data;
|
||||
if (service === 'dispatch') {
|
||||
switch (method) {
|
||||
case '_callback':
|
||||
this._callback(callbackId, ...args);
|
||||
break;
|
||||
case 'setService':
|
||||
this.setService(args[0], worker);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
this.call(service, method, ...args).then(
|
||||
result => {
|
||||
worker.postMessage(['dispatch', '_callback', callbackId, result]);
|
||||
},
|
||||
error => {
|
||||
worker.postMessage(['dispatch', '_callback', callbackId, null, error]);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a callback from a worker. This should only be called as the result of a message from a worker.
|
||||
* @param {int} callbackId - the ID of the callback to call.
|
||||
* @param {*} result - if `error` is not truthy, resolve the callback promise with this value.
|
||||
* @param {*} [error] - if this is truthy, reject the callback promise with this value.
|
||||
* @private
|
||||
*/
|
||||
_callback (callbackId, result, error) {
|
||||
const [resolve, reject] = this.callbacks[callbackId];
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const dispatch = new CentralDispatch();
|
||||
module.exports = dispatch;
|
||||
self.Scratch = self.Scratch || {};
|
||||
self.Scratch.dispatch = dispatch;
|
190
src/dispatch/worker-dispatch.js
Normal file
190
src/dispatch/worker-dispatch.js
Normal file
|
@ -0,0 +1,190 @@
|
|||
const log = require('../util/log');
|
||||
|
||||
/**
|
||||
* This class provides a Worker with the means to participate in the message dispatch system managed by CentralDispatch.
|
||||
* From any context in the messaging system, the dispatcher's "call" method can call any method on any "service"
|
||||
* provided in any participating context. The dispatch system will forward function arguments and return values across
|
||||
* worker boundaries as needed.
|
||||
* @see {CentralDispatch}
|
||||
*/
|
||||
class WorkerDispatch {
|
||||
constructor () {
|
||||
/**
|
||||
* List of callback registrations for promises waiting for a response from a call to a service on another
|
||||
* worker. A callback registration is an array of [resolve,reject] Promise functions.
|
||||
* Calls to services on this worker don't enter this list.
|
||||
* @type {Array.<[Function,Function]>}
|
||||
*/
|
||||
this.callbacks = [];
|
||||
|
||||
/**
|
||||
* This promise will be resolved when we have successfully connected to central dispatch.
|
||||
* @type {Promise}
|
||||
* @see {waitForConnection}
|
||||
* @private
|
||||
*/
|
||||
this._connectionPromise = new Promise(resolve => {
|
||||
this._onConnect = resolve;
|
||||
}).then(() => {
|
||||
self.onmessage = this._onMessage.bind(this);
|
||||
});
|
||||
|
||||
/**
|
||||
* The next callback ID to be used.
|
||||
* @type {int}
|
||||
*/
|
||||
this.nextCallback = 0;
|
||||
|
||||
/**
|
||||
* Map of service name to local service provider.
|
||||
* If a service is not listed here, it is assumed to be provided by another context (another Worker or the main
|
||||
* thread).
|
||||
* @see {setService}
|
||||
* @type {object}
|
||||
*/
|
||||
this.services = {};
|
||||
|
||||
self.onmessage = this._onHandshake.bind(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* @returns {Promise} a promise which will resolve upon connection to central dispatch. If you need to make a call
|
||||
* immediately on "startup" you can attach a 'then' to this promise.
|
||||
* @example
|
||||
* dispatch.waitForConnection.then(() => {
|
||||
* dispatch.call('myService', 'hello');
|
||||
* })
|
||||
*/
|
||||
get waitForConnection () {
|
||||
return this._connectionPromise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
||||
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
||||
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
||||
* transferred to the worker, and they should not be used after this call.
|
||||
* @example
|
||||
* dispatcher.call('vm', 'setData', 'cat', 42);
|
||||
* // this finds the worker for the 'vm' service, then on that worker calls:
|
||||
* vm.setData('cat', 42);
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
call (service, method, ...args) {
|
||||
return this.transferCall(service, method, null, ...args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call a particular method on a particular service, regardless of whether that service is provided locally or on
|
||||
* a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
|
||||
* algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
|
||||
* transferred to the worker, and they should not be used after this call.
|
||||
* @example
|
||||
* dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
|
||||
* // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
|
||||
* vm.setData('cat', myArrayBuffer);
|
||||
* @param {string} service - the name of the service.
|
||||
* @param {string} method - the name of the method.
|
||||
* @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
|
||||
* @param {*} [args] - the arguments to be copied to the method, if any.
|
||||
* @returns {Promise} - a promise for the return value of the service method.
|
||||
*/
|
||||
transferCall (service, method, transfer, ...args) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (this.services.hasOwnProperty(service)) {
|
||||
const provider = this.services[service];
|
||||
const result = provider[method].apply(provider, args);
|
||||
resolve(result);
|
||||
} else {
|
||||
const callbackId = this.nextCallback++;
|
||||
this.callbacks[callbackId] = [resolve, reject];
|
||||
if (transfer) {
|
||||
self.postMessage([service, method, callbackId, ...args], transfer);
|
||||
} else {
|
||||
self.postMessage([service, method, callbackId, ...args]);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a local object as the global provider of the specified service.
|
||||
* @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'.
|
||||
* @param {object} provider - a local object which provides this service.
|
||||
* WARNING: Any method on the provider can be called from any worker within the dispatch system.
|
||||
*/
|
||||
setService (service, provider) {
|
||||
if (this.services.hasOwnProperty(service)) {
|
||||
log.warn(`Replacing existing service provider for ${service}`);
|
||||
}
|
||||
this.services[service] = provider;
|
||||
this.waitForConnection.then(() => {
|
||||
this.call('dispatch', 'setService', service);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Message handler active until the dispatcher handshake arrives.
|
||||
* @param {MessageEvent} event - the message event to be handled.
|
||||
* @private
|
||||
*/
|
||||
_onHandshake (event) {
|
||||
const message = event.data;
|
||||
if (message === 'dispatch-handshake') {
|
||||
this._onConnect();
|
||||
} else {
|
||||
log.error(`WorkerDispatch received unexpected message before handshake: ${JSON.stringify(message)}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Message handler active after the dispatcher handshake. This only handles method calls.
|
||||
* @param {MessageEvent} event - the message event to be handled.
|
||||
* @private
|
||||
*/
|
||||
_onMessage (event) {
|
||||
const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data;
|
||||
if (service === 'dispatch') {
|
||||
switch (method) {
|
||||
case '_callback':
|
||||
this._callback(callbackId, ...args);
|
||||
break;
|
||||
case '_terminate':
|
||||
self.close();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
this.call(service, method, ...args).then(
|
||||
result => {
|
||||
self.postMessage(['dispatch', '_callback', callbackId, result]);
|
||||
},
|
||||
error => {
|
||||
self.postMessage(['dispatch', '_callback', callbackId, null, error]);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a callback from a worker. This should only be called as the result of a message from a worker.
|
||||
* @param {int} callbackId - the ID of the callback to call.
|
||||
* @param {*} result - if `error` is not truthy, resolve the callback promise with this value.
|
||||
* @param {*} [error] - if this is truthy, reject the callback promise with this value.
|
||||
* @private
|
||||
*/
|
||||
_callback (callbackId, result, error) {
|
||||
const [resolve, reject] = this.callbacks[callbackId];
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const dispatch = new WorkerDispatch();
|
||||
module.exports = dispatch;
|
||||
self.Scratch = self.Scratch || {};
|
||||
self.Scratch.dispatch = dispatch;
|
Loading…
Reference in a new issue