Add an init watchdog for extension workers

If an extension worker does not complete initialization in a reasonable
amount of time (currently 15 seconds) then the worker will be
terminated. A complete initialization includes registering an extension.
This commit is contained in:
Christopher Willis-Ford 2017-10-31 11:28:56 -07:00
parent ceb47bd765
commit e4381b4693
3 changed files with 117 additions and 10 deletions

View file

@ -38,7 +38,7 @@ class CentralDispatch extends SharedDispatch {
/** /**
* Set a local object as the global provider of the specified service. * Set a local object as the global provider of the specified service.
* WARNING: Any method on the provider can be called from any worker within the dispatch system. * WARNING: Any method on the provider can be called from any worker within the dispatch system.
* @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui', 'extension9'. * @param {string} service - a globally unique string identifying this service. Examples: 'vm', 'gui'.
* @param {object} provider - a local object which provides this service. * @param {object} provider - a local object which provides this service.
* @returns {Promise} - a promise which will resolve once the service is registered. * @returns {Promise} - a promise which will resolve once the service is registered.
*/ */
@ -72,6 +72,25 @@ class CentralDispatch extends SharedDispatch {
} }
} }
/**
* Remove a service provider from the central dispatch service, un-registering any services it provides.
* If the provider is a worker its dispatch service will be shut down.
* The worker itself will NOT be terminated by this call.
* @param {Worker|object} provider - the worker or object instance to be removed.
*/
removeProvider (provider) {
const workerIndex = this.workers.indexOf(provider);
if (workerIndex !== -1) {
/** @TODO Should we ask the worker to shut down? If so, owner must wait before terminating the worker. */
this.workers.splice(workerIndex, 1);
}
for (const serviceName in Object.keys(this.services)) {
if (this.services[serviceName] === provider) {
delete this.services[serviceName];
}
}
}
/** /**
* Fetch the service provider object for a particular service name. * Fetch the service provider object for a particular service name.
* @override * @override

View file

@ -39,6 +39,24 @@ class SharedDispatch {
* @type {int} * @type {int}
*/ */
this.nextResponseId = 0; this.nextResponseId = 0;
/**
* While processing a call in _onMessage, this will be set to the originating object.
* At all other times it will be null.
* @type {object}
* @private
*/
this._callingWorker = null;
}
/**
* Central dispatch: while processing a call from a worker, this will be set to the worker originating that call.
* Worker dispatch: while processing a call from the main thread, this will be set to the global object.
* At all other times it will be null.
* @returns {object} - the worker originating the current call, if any.
*/
get callingWorker () {
return this._callingWorker;
} }
/** /**
@ -184,10 +202,15 @@ class SharedDispatch {
message.args = message.args || []; message.args = message.args || [];
let promise; let promise;
if (message.service) { if (message.service) {
if (message.service === 'dispatch') { try {
promise = this._onDispatchMessage(worker, message); this._callingWorker = worker;
} else { if (message.service === 'dispatch') {
promise = this.call(message.service, message.method, ...message.args); promise = this._onDispatchMessage(worker, message);
} else {
promise = this.call(message.service, message.method, ...message.args);
}
} finally {
this._callingWorker = null;
} }
} else if (typeof message.responseId === 'undefined') { } else if (typeof message.responseId === 'undefined') {
log.error(`Dispatch caught malformed message from a worker: ${JSON.stringify(event)}`); log.error(`Dispatch caught malformed message from a worker: ${JSON.stringify(event)}`);

View file

@ -70,6 +70,14 @@ class ExtensionManager {
*/ */
this.pendingWorkers = []; this.pendingWorkers = [];
/**
* Set of workers currently being monitored by `_startWorkerWatchdog`.
* @see {_startWorkerWatchdog}
* @type {Set.<object>}
* @private
*/
this._activeWatchdogs = new Set();
/** /**
* Keep a reference to the runtime so we can construct internal extension objects. * Keep a reference to the runtime so we can construct internal extension objects.
* TODO: remove this in favor of extensions accessing the runtime as a service. * TODO: remove this in favor of extensions accessing the runtime as a service.
@ -99,7 +107,10 @@ class ExtensionManager {
const ExtensionWorker = require('worker-loader?name=extension-worker.js!./extension-worker'); const ExtensionWorker = require('worker-loader?name=extension-worker.js!./extension-worker');
this.pendingExtensions.push({extensionURL, resolve, reject}); this.pendingExtensions.push({extensionURL, resolve, reject});
dispatch.addWorker(new ExtensionWorker());
const worker = new ExtensionWorker();
dispatch.addWorker(worker);
this._startWorkerWatchdog(worker);
}); });
} }
@ -110,16 +121,28 @@ class ExtensionManager {
return [id, workerInfo.extensionURL]; return [id, workerInfo.extensionURL];
} }
/**
* Collect extension metadata from the specified service and begin the extension registration process.
* @param {string} serviceName - the name of the service hosting the extension.
*/
registerExtensionService (serviceName) { registerExtensionService (serviceName) {
dispatch.call(serviceName, 'getInfo').then(info => { dispatch.call(serviceName, 'getInfo').then(info => {
this._registerExtensionInfo(serviceName, info); this._registerExtensionInfo(serviceName, info);
}); });
} }
/**
* Called by an extension worker to indicate that the worker has finished initialization.
* @param {int} id - the worker ID.
* @param {*?} e - the error encountered during initialization, if any.
*/
onWorkerInit (id, e) { onWorkerInit (id, e) {
const workerInfo = this.pendingWorkers[id]; const workerInfo = this.pendingWorkers[id];
delete this.pendingWorkers[id]; delete this.pendingWorkers[id];
if (e) { if (e) {
log.warn(`Extension manager forcibly terminating worker for extension with ID ${id}, URL ${
workerInfo.extensionURL} due to error during initialization`);
workerInfo.worker.terminate();
workerInfo.reject(e); workerInfo.reject(e);
} else { } else {
workerInfo.resolve(id); workerInfo.resolve(id);
@ -133,16 +156,29 @@ class ExtensionManager {
*/ */
_registerInternalExtension (extensionObject) { _registerInternalExtension (extensionObject) {
const extensionInfo = extensionObject.getInfo(); const extensionInfo = extensionObject.getInfo();
const serviceName = `extension.internal.${extensionInfo.id}`; const fakeWorkerId = this.nextExtensionWorker++;
const serviceName = `extension.${fakeWorkerId}.${extensionInfo.id}`;
return dispatch.setService(serviceName, extensionObject) return dispatch.setService(serviceName, extensionObject)
.then(() => dispatch.call('extensions', 'registerExtensionService', serviceName)); .then(() => dispatch.call('extensions', 'registerExtensionService', serviceName));
} }
/**
* Sanitize extension info then register its primitives with the VM.
* @param {string} serviceName - the name of the service hosting the extension
* @param {ExtensionInfo} extensionInfo - the extension's metadata
* @private
*/
_registerExtensionInfo (serviceName, extensionInfo) { _registerExtensionInfo (serviceName, extensionInfo) {
extensionInfo = this._prepareExtensionInfo(serviceName, extensionInfo); extensionInfo = this._prepareExtensionInfo(serviceName, extensionInfo);
dispatch.call('runtime', '_registerExtensionPrimitives', extensionInfo).catch(e => { dispatch.call('runtime', '_registerExtensionPrimitives', extensionInfo).then(
log.error(`Failed to register primitives for extension on service ${serviceName}: ${JSON.stringify(e)}`); () => {
}); if (dispatch.callingWorker) {
this._stopWorkerWatchdog(dispatch.callingWorker);
}
},
e => {
log.error(`Failed to register primitives for extension "${extensionInfo.id}": ${e.message}`);
});
} }
/** /**
@ -211,6 +247,35 @@ class ExtensionManager {
} }
return blockInfo; return blockInfo;
} }
/**
* Start a watchdog to terminate this worker unless it registers an extension before the timeout.
* @param {object} worker - the worker to watch.
* @private
*/
_startWorkerWatchdog (worker) {
const timeout = 5 * 1000; // 5 seconds
this._activeWatchdogs.add(worker);
setTimeout(() => {
if (this._activeWatchdogs.has(worker)) {
this._activeWatchdogs.delete(worker);
log.warn(`Worker watchdog timed out. Terminating worker.`);
dispatch.removeProvider(worker);
worker.terminate();
}
}, timeout);
}
/**
* Mark this worker as "safe" from the watchdog.
* @param {object} worker - the worker to mark as safe.
* @see {_startWorkerWatchdog}
* @private
*/
_stopWorkerWatchdog (worker) {
this._activeWatchdogs.delete(worker);
}
} }
module.exports = ExtensionManager; module.exports = ExtensionManager;