From b4c0cfe94083e05954feeddbb3bfcf2dbe8cb42f Mon Sep 17 00:00:00 2001
From: Christopher Willis-Ford <cwillisf@media.mit.edu>
Date: Fri, 4 Aug 2017 23:52:31 -0700
Subject: [PATCH] Refactor common code in {central,worker}-dispatch

Central dispatch and worker dispatch share most of their code now by
inheriting from a new shared dispatch class.

Also, the format of passed messages has been altered to make it easier
to understand in a debugger.
---
 src/dispatch/central-dispatch.js       | 163 +++++--------------
 src/dispatch/shared-dispatch.js        | 217 +++++++++++++++++++++++++
 src/dispatch/worker-dispatch.js        | 148 +++++------------
 test/fixtures/dispatch-test-service.js |   5 -
 test/fixtures/dispatch-test-worker.js  |   5 +-
 test/unit/dispatch.js                  |  48 +++---
 6 files changed, 328 insertions(+), 258 deletions(-)
 create mode 100644 src/dispatch/shared-dispatch.js

diff --git a/src/dispatch/central-dispatch.js b/src/dispatch/central-dispatch.js
index 98bb2805a..6dd236255 100644
--- a/src/dispatch/central-dispatch.js
+++ b/src/dispatch/central-dispatch.js
@@ -1,3 +1,5 @@
+const SharedDispatch = require('./shared-dispatch');
+
 const log = require('../util/log');
 
 /**
@@ -7,21 +9,9 @@ const log = require('../util/log');
  * context. The dispatch system will forward function arguments and return values across worker boundaries as needed.
  * @see {WorkerDispatch}
  */
-class CentralDispatch {
+class CentralDispatch extends SharedDispatch {
     constructor () {
-        /**
-         * List of callback registrations for promises waiting for a response from a call to a service on another
-         * worker. A callback registration is an array of [resolve,reject] Promise functions.
-         * Calls to local services don't enter this list.
-         * @type {Array.<[Function,Function]>}
-         */
-        this.callbacks = [];
-
-        /**
-         * The next callback ID to be used.
-         * @type {int}
-         */
-        this.nextCallback = 0;
+        super();
 
         /**
          * Map of channel name to worker or local service provider.
@@ -45,65 +35,6 @@ class CentralDispatch {
         this.workers = [];
     }
 
-    /**
-     * Call a particular method on a particular service, regardless of whether that service is provided locally or on
-     * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
-     * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
-     * transferred to the worker, and they should not be used after this call.
-     * @example
-     *      dispatcher.call('vm', 'setData', 'cat', 42);
-     *      // this finds the worker for the 'vm' service, then on that worker calls:
-     *      vm.setData('cat', 42);
-     * @param {string} service - the name of the service.
-     * @param {string} method - the name of the method.
-     * @param {*} [args] - the arguments to be copied to the method, if any.
-     * @returns {Promise} - a promise for the return value of the service method.
-     */
-    call (service, method, ...args) {
-        return this.transferCall(service, method, null, ...args);
-    }
-
-    /**
-     * Call a particular method on a particular service, regardless of whether that service is provided locally or on
-     * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
-     * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
-     * transferred to the worker, and they should not be used after this call.
-     * @example
-     *      dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
-     *      // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
-     *      vm.setData('cat', myArrayBuffer);
-     * @param {string} service - the name of the service.
-     * @param {string} method - the name of the method.
-     * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
-     * @param {*} [args] - the arguments to be copied to the method, if any.
-     * @returns {Promise} - a promise for the return value of the service method.
-     */
-    transferCall (service, method, transfer, ...args) {
-        return new Promise((resolve, reject) => {
-            try {
-                if (this.services.hasOwnProperty(service)) {
-                    const provider = this.services[service];
-                    if (provider instanceof this.workerClass) {
-                        const callbackId = this.nextCallback++;
-                        this.callbacks[callbackId] = [resolve, reject];
-                        if (transfer) {
-                            provider.postMessage([service, method, callbackId, args], transfer);
-                        } else {
-                            provider.postMessage([service, method, callbackId, args]);
-                        }
-                    } else {
-                        const result = provider[method].apply(provider, args);
-                        resolve(result);
-                    }
-                } else {
-                    reject(new Error(`Service not found: ${service}`));
-                }
-            } catch (e) {
-                reject(e);
-            }
-        });
-    }
-
     /**
      * Set a local object as the global provider of the specified service.
      * WARNING: Any method on the provider can be called from any worker within the dispatch system.
@@ -112,13 +43,16 @@ class CentralDispatch {
      * @returns {Promise} - a promise which will resolve once the service is registered.
      */
     setService (service, provider) {
-        return new Promise(resolve => {
+        /** Return a promise for consistency with {@link WorkerDispatch#setService} */
+        try {
             if (this.services.hasOwnProperty(service)) {
-                log.warn(`Replacing existing service provider for ${service}`);
+                log.warn(`Central dispatch replacing existing service provider for ${service}`);
             }
             this.services[service] = provider;
-            resolve();
-        });
+            return Promise.resolve();
+        } catch (e) {
+            return Promise.reject(e);
+        }
     }
 
     /**
@@ -130,58 +64,47 @@ class CentralDispatch {
         if (this.workers.indexOf(worker) === -1) {
             this.workers.push(worker);
             worker.onmessage = this._onMessage.bind(this, worker);
-            worker.postMessage(['dispatch', '_handshake']);
+            this._remoteCall(worker, 'dispatch', 'handshake').catch(e => {
+                log.error(`Could not handshake with worker: ${JSON.stringify(e)}`);
+            });
         } else {
-            log.warn('Ignoring attempt to add duplicate worker');
+            log.warn('Central dispatch ignoring attempt to add duplicate worker');
         }
     }
 
     /**
-     * Handle a message event received from a connected worker.
+     * Fetch the service provider object for a particular service name.
+     * @override
+     * @param {string} service - the name of the service to look up
+     * @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
+     * @protected
+     */
+    _getServiceProvider (service) {
+        const provider = this.services[service];
+        return provider && {
+            provider,
+            isRemote: provider instanceof this.workerClass
+        };
+    }
+
+    /**
+     * Handle a call message sent to the dispatch service itself
+     * @override
      * @param {Worker} worker - the worker which sent the message.
-     * @param {MessageEvent} event - the message event to be handled.
-     * @private
+     * @param {DispatchCallMessage} message - the message to be handled.
+     * @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
+     * @protected
      */
-    _onMessage (worker, event) {
-        const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data;
+    _onDispatchMessage (worker, message) {
         let promise;
-        if (service === 'dispatch') {
-            switch (method) {
-            case '_callback':
-                this._callback(callbackId, ...args);
-                break;
-            case 'setService':
-                promise = this.setService(args[0], worker);
-                break;
-            }
-        } else {
-            promise = this.call(service, method, ...args);
-        }
-        if (promise) {
-            promise.then(
-                result => {
-                    worker.postMessage(['dispatch', '_callback', callbackId, result]);
-                },
-                error => {
-                    worker.postMessage(['dispatch', '_callback', callbackId, null, error]);
-                });
-        }
-    }
-
-    /**
-     * Handle a callback from a worker. This should only be called as the result of a message from a worker.
-     * @param {int} callbackId - the ID of the callback to call.
-     * @param {*} result - if `error` is not truthy, resolve the callback promise with this value.
-     * @param {*} [error] - if this is truthy, reject the callback promise with this value.
-     * @private
-     */
-    _callback (callbackId, result, error) {
-        const [resolve, reject] = this.callbacks[callbackId];
-        if (error) {
-            reject(error);
-        } else {
-            resolve(result);
+        switch (message.method) {
+        case 'setService':
+            promise = this.setService(message.args[0], worker);
+            break;
+        default:
+            log.error(`Central dispatch received message for unknown method: ${message.method}`);
         }
+        return promise;
     }
 }
 
diff --git a/src/dispatch/shared-dispatch.js b/src/dispatch/shared-dispatch.js
new file mode 100644
index 000000000..d582dc170
--- /dev/null
+++ b/src/dispatch/shared-dispatch.js
@@ -0,0 +1,217 @@
+const log = require('../util/log');
+
+/**
+ * @typedef {object} DispatchCallMessage - a message to the dispatch system representing a service method call
+ * @property {*} responseId - send a response message with this response ID. See {@link DispatchResponseMessage}
+ * @property {string} service - the name of the service to be called
+ * @property {string} method - the name of the method to be called
+ * @property {Array|undefined} args - the arguments to be passed to the method
+ */
+
+/**
+ * @typedef {object} DispatchResponseMessage - a message to the dispatch system representing the results of a call
+ * @property {*} responseId - a copy of the response ID from the call which generated this response
+ * @property {*|undefined} error - if this is truthy, then it contains results from a failed call (such as an exception)
+ * @property {*|undefined} result - if error is not truthy, then this contains the return value of the call (if any)
+ */
+
+/**
+ * @typedef {DispatchCallMessage|DispatchResponseMessage} DispatchMessage
+ * Any message to the dispatch system.
+ */
+
+/**
+ * The SharedDispatch class is responsible for dispatch features shared by
+ * {@link CentralDispatch} and {@link WorkerDispatch}.
+ */
+class SharedDispatch {
+    constructor () {
+        /**
+         * List of callback registrations for promises waiting for a response from a call to a service on another
+         * worker. A callback registration is an array of [resolve,reject] Promise functions.
+         * Calls to local services don't enter this list.
+         * @type {Array.<[Function,Function]>}
+         */
+        this.callbacks = [];
+
+        /**
+         * The next response ID to be used.
+         * @type {int}
+         */
+        this.nextResponseId = 0;
+    }
+
+    /**
+     * Call a particular method on a particular service, regardless of whether that service is provided locally or on
+     * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
+     * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
+     * transferred to the worker, and they should not be used after this call.
+     * @example
+     *      dispatcher.call('vm', 'setData', 'cat', 42);
+     *      // this finds the worker for the 'vm' service, then on that worker calls:
+     *      vm.setData('cat', 42);
+     * @param {string} service - the name of the service.
+     * @param {string} method - the name of the method.
+     * @param {*} [args] - the arguments to be copied to the method, if any.
+     * @returns {Promise} - a promise for the return value of the service method.
+     */
+    call (service, method, ...args) {
+        return this.transferCall(service, method, null, ...args);
+    }
+
+    /**
+     * Call a particular method on a particular service, regardless of whether that service is provided locally or on
+     * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
+     * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
+     * transferred to the worker, and they should not be used after this call.
+     * @example
+     *      dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
+     *      // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
+     *      vm.setData('cat', myArrayBuffer);
+     * @param {string} service - the name of the service.
+     * @param {string} method - the name of the method.
+     * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
+     * @param {*} [args] - the arguments to be copied to the method, if any.
+     * @returns {Promise} - a promise for the return value of the service method.
+     */
+    transferCall (service, method, transfer, ...args) {
+        try {
+            const {provider, isRemote} = this._getServiceProvider(service);
+            if (provider) {
+                if (isRemote) {
+                    return this._remoteTransferCall(provider, service, method, transfer, ...args);
+                }
+
+                const result = provider[method].apply(provider, args);
+                return Promise.resolve(result);
+            }
+            return Promise.reject(new Error(`Service not found: ${service}`));
+        } catch (e) {
+            return Promise.reject(e);
+        }
+    }
+
+    /**
+     * Like {@link call}, but force the call to be posted through a particular communication channel.
+     * @param {object} provider - send the call through this object's `postMessage` function.
+     * @param {string} service - the name of the service.
+     * @param {string} method - the name of the method.
+     * @param {*} [args] - the arguments to be copied to the method, if any.
+     * @returns {Promise} - a promise for the return value of the service method.
+     */
+    _remoteCall (provider, service, method, ...args) {
+        return this._remoteTransferCall(provider, service, method, null, ...args);
+    }
+
+    /**
+     * Like {@link transferCall}, but force the call to be posted through a particular communication channel.
+     * @param {object} provider - send the call through this object's `postMessage` function.
+     * @param {string} service - the name of the service.
+     * @param {string} method - the name of the method.
+     * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
+     * @param {*} [args] - the arguments to be copied to the method, if any.
+     * @returns {Promise} - a promise for the return value of the service method.
+     */
+    _remoteTransferCall (provider, service, method, transfer, ...args) {
+        return new Promise((resolve, reject) => {
+            const responseId = this._storeCallbacks(resolve, reject);
+            if (transfer) {
+                provider.postMessage({service, method, responseId, args}, transfer);
+            } else {
+                provider.postMessage({service, method, responseId, args});
+            }
+        });
+    }
+
+    /**
+     * Store callback functions pending a response message.
+     * @param {Function} resolve - function to call if the service method returns.
+     * @param {Function} reject - function to call if the service method throws.
+     * @returns {*} - a unique response ID for this set of callbacks. See {@link _deliverResponse}.
+     * @protected
+     */
+    _storeCallbacks (resolve, reject) {
+        const responseId = this.nextResponseId++;
+        this.callbacks[responseId] = [resolve, reject];
+        return responseId;
+    }
+
+    /**
+     * Deliver call response from a worker. This should only be called as the result of a message from a worker.
+     * @param {int} responseId - the response ID of the callback set to call.
+     * @param {DispatchResponseMessage} message - the message containing the response value(s).
+     * @protected
+     */
+    _deliverResponse (responseId, message) {
+        try {
+            const [resolve, reject] = this.callbacks[responseId];
+            delete this.callbacks[responseId];
+            if (message.error) {
+                reject(message.error);
+            } else {
+                resolve(message.result);
+            }
+        } catch (e) {
+            log.error(`Dispatch callback failed: ${JSON.stringify(e)}`);
+        }
+    }
+
+    /**
+     * Handle a message event received from a connected worker.
+     * @param {Worker} worker - the worker which sent the message, or the global object if running in a worker.
+     * @param {MessageEvent} event - the message event to be handled.
+     * @protected
+     */
+    _onMessage (worker, event) {
+        /** @type {DispatchMessage} */
+        const message = event.data;
+        message.args = message.args || [];
+        let promise;
+        if (message.service) {
+            if (message.service === 'dispatch') {
+                promise = this._onDispatchMessage(worker, message);
+            } else {
+                promise = this.call(message.service, message.method, ...message.args);
+            }
+        } else if (typeof message.responseId === 'undefined') {
+            log.error(`Dispatch caught malformed message from a worker: ${JSON.stringify(event)}`);
+        } else {
+            this._deliverResponse(message.responseId, message);
+        }
+        if (promise) {
+            if (typeof message.responseId === 'undefined') {
+                log.error(`Dispatch message missing required response ID: ${JSON.stringify(event)}`);
+            } else {
+                promise.then(
+                    result => worker.postMessage({responseId: message.responseId, result}),
+                    error => worker.postMessage({responseId: message.responseId, error})
+                );
+            }
+        }
+    }
+
+    /**
+     * Fetch the service provider object for a particular service name.
+     * @abstract
+     * @param {string} service - the name of the service to look up
+     * @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
+     * @protected
+     */
+    _getServiceProvider (service) {
+        throw new Error(`Could not get provider for ${service}: _getServiceProvider not implemented`);
+    }
+
+    /**
+     * Handle a call message sent to the dispatch service itself
+     * @abstract
+     * @param {Worker} worker - the worker which sent the message.
+     * @param {DispatchCallMessage} message - the message to be handled.
+     * @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
+     * @private
+     */
+    _onDispatchMessage (worker, message) {
+        throw new Error(`Unimplemented dispatch message handler cannot handle ${message.method} method`);
+    }
+}
+
+module.exports = SharedDispatch;
diff --git a/src/dispatch/worker-dispatch.js b/src/dispatch/worker-dispatch.js
index 50328d84c..10542af46 100644
--- a/src/dispatch/worker-dispatch.js
+++ b/src/dispatch/worker-dispatch.js
@@ -1,3 +1,5 @@
+const SharedDispatch = require('./shared-dispatch');
+
 const log = require('../util/log');
 
 /**
@@ -7,15 +9,9 @@ const log = require('../util/log');
  * worker boundaries as needed.
  * @see {CentralDispatch}
  */
-class WorkerDispatch {
+class WorkerDispatch extends SharedDispatch {
     constructor () {
-        /**
-         * List of callback registrations for promises waiting for a response from a call to a service on another
-         * worker. A callback registration is an array of [resolve,reject] Promise functions.
-         * Calls to local services don't enter this list.
-         * @type {Array.<[Function,Function]>}
-         */
-        this.callbacks = [];
+        super();
 
         /**
          * This promise will be resolved when we have successfully connected to central dispatch.
@@ -27,12 +23,6 @@ class WorkerDispatch {
             this._onConnect = resolve;
         });
 
-        /**
-         * The next callback ID to be used.
-         * @type {int}
-         */
-        this.nextCallback = 0;
-
         /**
          * Map of service name to local service provider.
          * If a service is not listed here, it is assumed to be provided by another context (another Worker or the main
@@ -42,7 +32,7 @@ class WorkerDispatch {
          */
         this.services = {};
 
-        this._onMessage = this._onMessage.bind(this);
+        this._onMessage = this._onMessage.bind(this, self);
         if (typeof self !== 'undefined') {
             self.onmessage = this._onMessage;
         }
@@ -60,61 +50,6 @@ class WorkerDispatch {
         return this._connectionPromise;
     }
 
-    /**
-     * Call a particular method on a particular service, regardless of whether that service is provided locally or on
-     * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
-     * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
-     * transferred to the worker, and they should not be used after this call.
-     * @example
-     *      dispatcher.call('vm', 'setData', 'cat', 42);
-     *      // this finds the worker for the 'vm' service, then on that worker calls:
-     *      vm.setData('cat', 42);
-     * @param {string} service - the name of the service.
-     * @param {string} method - the name of the method.
-     * @param {*} [args] - the arguments to be copied to the method, if any.
-     * @returns {Promise} - a promise for the return value of the service method.
-     */
-    call (service, method, ...args) {
-        return this.transferCall(service, method, null, ...args);
-    }
-
-    /**
-     * Call a particular method on a particular service, regardless of whether that service is provided locally or on
-     * a worker. If the service is provided by a worker, the `args` will be copied using the Structured Clone
-     * algorithm, except for any items which are also in the `transfer` list. Ownership of those items will be
-     * transferred to the worker, and they should not be used after this call.
-     * @example
-     *      dispatcher.transferCall('vm', 'setData', [myArrayBuffer], 'cat', myArrayBuffer);
-     *      // this finds the worker for the 'vm' service, transfers `myArrayBuffer` to it, then on that worker calls:
-     *      vm.setData('cat', myArrayBuffer);
-     * @param {string} service - the name of the service.
-     * @param {string} method - the name of the method.
-     * @param {Array} [transfer] - objects to be transferred instead of copied. Must be present in `args` to be useful.
-     * @param {*} [args] - the arguments to be copied to the method, if any.
-     * @returns {Promise} - a promise for the return value of the service method.
-     */
-    transferCall (service, method, transfer, ...args) {
-        return new Promise((resolve, reject) => {
-            try {
-                if (this.services.hasOwnProperty(service)) {
-                    const provider = this.services[service];
-                    const result = provider[method].apply(provider, args);
-                    resolve(result);
-                } else {
-                    const callbackId = this.nextCallback++;
-                    this.callbacks[callbackId] = [resolve, reject];
-                    if (transfer) {
-                        self.postMessage([service, method, callbackId, ...args], transfer);
-                    } else {
-                        self.postMessage([service, method, callbackId, ...args]);
-                    }
-                }
-            } catch (e) {
-                reject(e);
-            }
-        });
-    }
-
     /**
      * Set a local object as the global provider of the specified service.
      * WARNING: Any method on the provider can be called from any worker within the dispatch system.
@@ -124,56 +59,51 @@ class WorkerDispatch {
      */
     setService (service, provider) {
         if (this.services.hasOwnProperty(service)) {
-            log.warn(`Replacing existing service provider for ${service}`);
+            log.warn(`Worker dispatch replacing existing service provider for ${service}`);
         }
         this.services[service] = provider;
-        return this.waitForConnection.then(() => this.call('dispatch', 'setService', service));
+        return this.waitForConnection.then(() => this._remoteCall(self, 'dispatch', 'setService', service));
     }
 
     /**
-     * Message handler active after the dispatcher handshake. This only handles method calls.
-     * @param {MessageEvent} event - the message event to be handled.
-     * @private
+     * Fetch the service provider object for a particular service name.
+     * @override
+     * @param {string} service - the name of the service to look up
+     * @returns {{provider:(object|Worker), isRemote:boolean}} - the means to contact the service, if found
+     * @protected
      */
-    _onMessage (event) {
-        const [service, method, callbackId, ...args] = /** @type {[string, string, *]} */ event.data;
-        if (service === 'dispatch') {
-            switch (method) {
-            case '_handshake':
-                this._onConnect();
-                break;
-            case '_callback':
-                this._callback(callbackId, ...args);
-                break;
-            case '_terminate':
-                self.close();
-                break;
-            }
-        } else {
-            this.call(service, method, ...args).then(
-                result => {
-                    self.postMessage(['dispatch', '_callback', callbackId, result]);
-                },
-                error => {
-                    self.postMessage(['dispatch', '_callback', callbackId, null, error]);
-                });
-        }
+    _getServiceProvider (service) {
+        // if we don't have a local service by this name, contact central dispatch by calling `postMessage` on self
+        const provider = this.services[service];
+        return {
+            provider: provider || self,
+            isRemote: !provider
+        };
     }
 
     /**
-     * Handle a callback from a worker. This should only be called as the result of a message from a worker.
-     * @param {int} callbackId - the ID of the callback to call.
-     * @param {*} result - if `error` is not truthy, resolve the callback promise with this value.
-     * @param {*} [error] - if this is truthy, reject the callback promise with this value.
-     * @private
+     * Handle a call message sent to the dispatch service itself
+     * @override
+     * @param {Worker} worker - the worker which sent the message.
+     * @param {DispatchCallMessage} message - the message to be handled.
+     * @returns {Promise|undefined} - a promise for the results of this operation, if appropriate
+     * @protected
      */
-    _callback (callbackId, result, error) {
-        const [resolve, reject] = this.callbacks[callbackId];
-        if (error) {
-            reject(error);
-        } else {
-            resolve(result);
+    _onDispatchMessage (worker, message) {
+        let promise;
+        switch (message.method) {
+        case 'handshake':
+            promise = this._onConnect();
+            break;
+        case 'terminate':
+            // Don't close until next tick, after sending confirmation back
+            setTimeout(() => self.close(), 0);
+            promise = Promise.resolve();
+            break;
+        default:
+            log.error(`Worker dispatch received message for unknown method: ${message.method}`);
         }
+        return promise;
     }
 }
 
diff --git a/test/fixtures/dispatch-test-service.js b/test/fixtures/dispatch-test-service.js
index 959a87637..3e01e68f5 100644
--- a/test/fixtures/dispatch-test-service.js
+++ b/test/fixtures/dispatch-test-service.js
@@ -10,11 +10,6 @@ class DispatchTestService {
     throwException () {
         throw new Error('This is a test exception thrown by DispatchTest');
     }
-
-    close () {
-        // eslint-disable-next-line no-undef
-        self.close();
-    }
 }
 
 module.exports = DispatchTestService;
diff --git a/test/fixtures/dispatch-test-worker.js b/test/fixtures/dispatch-test-worker.js
index 4ca16a545..2c57fbb3e 100644
--- a/test/fixtures/dispatch-test-worker.js
+++ b/test/fixtures/dispatch-test-worker.js
@@ -1,8 +1,11 @@
 const dispatch = require('../../src/dispatch/worker-dispatch');
 const DispatchTestService = require('./dispatch-test-service');
+const log = require('../../src/util/log');
 
 dispatch.setService('RemoteDispatchTest', new DispatchTestService());
 
 dispatch.waitForConnection.then(() => {
-    dispatch.call('test', 'onWorkerReady');
+    dispatch.call('test', 'onWorkerReady').catch(e => {
+        log(`Test worker failed to call onWorkerReady: ${JSON.stringify(e)}`);
+    });
 });
diff --git a/test/unit/dispatch.js b/test/unit/dispatch.js
index 8e94735e2..fdcbdb3ea 100644
--- a/test/unit/dispatch.js
+++ b/test/unit/dispatch.js
@@ -12,31 +12,37 @@ dispatch.workerClass = Worker;
 const runServiceTest = function (serviceName, t) {
     const promises = [];
 
-    promises.push(dispatch.call(serviceName, 'returnFortyTwo').then(x => {
-        t.equal(x, 42);
-    }));
+    promises.push(dispatch.call(serviceName, 'returnFortyTwo')
+        .then(
+            x => t.equal(x, 42),
+            e => t.fail(e)
+        ));
 
-    promises.push(dispatch.call(serviceName, 'doubleArgument', 9).then(x => {
-        t.equal(x, 18);
-    }));
+    promises.push(dispatch.call(serviceName, 'doubleArgument', 9)
+        .then(
+            x => t.equal(x, 18),
+            e => t.fail(e)
+        ));
 
-    promises.push(dispatch.call(serviceName, 'doubleArgument', 123).then(x => {
-        t.equal(x, 246);
-    }));
+    promises.push(dispatch.call(serviceName, 'doubleArgument', 123)
+        .then(
+            x => t.equal(x, 246),
+            e => t.fail(e)
+        ));
 
     // I tried using `t.rejects` here but ran into https://github.com/tapjs/node-tap/issues/384
     promises.push(dispatch.call(serviceName, 'throwException')
-        .then(() => {
-            t.fail('exception was not propagated as expected');
-        }, () => {
-            t.pass('exception was propagated as expected');
-        }));
+        .then(
+            () => t.fail('exception was not propagated as expected'),
+            () => t.pass('exception was propagated as expected')
+        ));
 
     return Promise.all(promises);
 };
 
 test('local', t => {
-    dispatch.setService('LocalDispatchTest', new DispatchTestService());
+    dispatch.setService('LocalDispatchTest', new DispatchTestService())
+        .catch(e => t.fail(e));
 
     return runServiceTest('LocalDispatchTest', t);
 });
@@ -47,15 +53,11 @@ test('remote', t => {
     dispatch.addWorker(worker);
 
     const waitForWorker = new Promise(resolve => {
-        dispatch.setService('test', {
-            onWorkerReady: resolve
-        });
+        dispatch.setService('test', {onWorkerReady: resolve})
+            .catch(e => t.fail(e));
     });
 
     return waitForWorker
-        .then(() => runServiceTest('RemoteDispatchTest', t))
-        .then(() => {
-            // Allow some time for the worker to finish, then terminate it
-            setTimeout(() => dispatch.call('RemoteDispatchTest', 'close'), 10);
-        });
+        .then(() => runServiceTest('RemoteDispatchTest', t), e => t.fail(e))
+        .then(() => dispatch._remoteCall(worker, 'dispatch', 'terminate'), e => t.fail(e));
 });