diff --git a/doc/api/errors.md b/doc/api/errors.md index 0a49757da3e39e..972d0971c4b059 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1313,6 +1313,13 @@ but not provided in the `transferList` for that call. An [ES6 module][] could not be resolved. + +### ERR_MISSING_PLATFORM_FOR_WORKER + +The V8 platform used by this instance of Node.js does not support creating +Workers. This is caused by lack of embedder support for Workers. In particular, +this error will not occur with standard builds of Node.js. + ### ERR_MODULE_RESOLUTION_LEGACY @@ -1722,6 +1729,22 @@ The fulfilled value of a linking promise is not a `vm.Module` object. The current module's status does not allow for this operation. The specific meaning of the error depends on the specific function. + +### ERR_WORKER_NEED_ABSOLUTE_PATH + +The path for the main script of a worker is not an absolute path. + + +### ERR_WORKER_UNSERIALIZABLE_ERROR + +All attempts at serializing an uncaught exception from a worker thread failed. + + +### ERR_WORKER_UNSUPPORTED_EXTENSION + +The pathname used for the main script of a worker has an +unknown file extension. + ### ERR_ZLIB_INITIALIZATION_FAILED diff --git a/doc/api/process.md b/doc/api/process.md index 8205efbd780300..0a9c52a44c3e23 100644 --- a/doc/api/process.md +++ b/doc/api/process.md @@ -410,6 +410,8 @@ added: v0.7.0 The `process.abort()` method causes the Node.js process to exit immediately and generate a core file. +This feature is not available in [`Worker`][] threads. + ## process.arch + +* {boolean} + +Is `true` if this code is not running inside of a [`Worker`][] thread. + +## worker.parentPort + + +* {null|MessagePort} + +If this thread was spawned as a [`Worker`][], this will be a [`MessagePort`][] +allowing communication with the parent thread. Messages sent using +`parentPort.postMessage()` will be available in the parent thread +using `worker.on('message')`, and messages sent from the parent thread +using `worker.postMessage()` will be available in this thread using +`parentPort.on('message')`. + +## worker.threadId + + +* {integer} + +An integer identifier for the current thread. On the corresponding worker object +(if there is any), it is available as [`worker.threadId`][]. + +## worker.workerData + + +An arbitrary JavaScript value that contains a clone of the data passed +to this thread’s `Worker` constructor. + ## Class: MessageChannel + +The `Worker` class represents an independent JavaScript execution thread. +Most Node.js APIs are available inside of it. + +Notable differences inside a Worker environment are: + +- The [`process.stdin`][], [`process.stdout`][] and [`process.stderr`][] + properties are set to `null`. +- The [`require('worker').isMainThread`][] property is set to `false`. +- The [`require('worker').postMessage()`][] method is available and the + [`require('worker').on('workerMessage')`][] event will be emitted. +- [`process.exit()`][] does not stop the whole program, just the single thread, + and [`process.abort()`][] is not available. +- [`process.chdir()`][] and `process` methods that set group or user ids + are not available. +- [`process.env`][] is a read-only reference to the environment variables. +- [`process.title`][] cannot be modified. +- Signals will not be delivered through [`process.on('...')`][Signals events]. +- Execution may stop at any point as a result of [`worker.terminate()`][] + being invoked. +- IPC channels from parent processes are not accessible. + +Currently, the following differences also exist until they are addressed: + +- The [`inspector`][] module is not available yet. +- Native addons are not supported yet. + +Creating `Worker` instances inside of other `Worker`s is possible. + +Like [Web Workers][] and the [`cluster` module][], two-way communication can be +achieved through inter-thread message passing. Internally, a `Worker` has a +built-in pair of [`MessagePort`][]s that are already associated with each other +when the `Worker` is created. While the `MessagePort` objects are not directly +exposed, their functionalities are exposed through [`worker.postMessage()`][] +and the [`worker.on('message')`][] event on the `Worker` object for the parent +thread, and [`require('worker').postMessage()`][] and the +[`require('worker').on('workerMessage')`][] on `require('worker')` for the +child thread. + +To create custom messaging channels (which is encouraged over using the default +global channel because it facilitates separation of concerns), users can create +a `MessageChannel` object on either thread and pass one of the +`MessagePort`s on that `MessageChannel` to the other thread through a +pre-existing channel, such as the global one. + +See [`port.postMessage()`][] for more information on how messages are passed, +and what kind of JavaScript values can be successfully transported through +the thread barrier. + +For example: + +```js +const assert = require('assert'); +const { Worker, MessageChannel, MessagePort, isMainThread } = require('worker'); +if (isMainThread) { + const worker = new Worker(__filename); + const subChannel = new MessageChannel(); + worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]); + subChannel.port2.on('message', (value) => { + console.log('received:', value); + }); +} else { + require('worker').once('workerMessage', (value) => { + assert(value.hereIsYourPort instanceof MessagePort); + value.hereIsYourPort.postMessage('the worker is sending this'); + value.hereIsYourPort.close(); + }); +} +``` + +### new Worker(filename, options) + +* `filename` {string} The absolute path to the Worker’s main script. + If `options.eval` is true, this is a string containing JavaScript code rather + than a path. +* `options` {Object} + * `eval` {boolean} If true, interpret the first argument to the constructor + as a script that is executed once the worker is online. + * `data` {any} Any JavaScript value that will be cloned and made + available as [`require('worker').workerData`][]. The cloning will occur as + described in the [HTML structured clone algorithm][], and an error will be + thrown if the object cannot be cloned (e.g. because it contains + `function`s). + +### Event: 'error' + + +* `err` {Error} + +The `'error'` event is emitted if the worker thread throws an uncaught +exception. In that case, the worker will be terminated. + +### Event: 'exit' + + +* `exitCode` {integer} + +The `'exit'` event is emitted once the worker has stopped. If the worker +exited by calling [`process.exit()`][], the `exitCode` parameter will be the +passed exit code. If the worker was terminated, the `exitCode` parameter will +be `1`. + +### Event: 'message' + + +* `value` {any} The transmitted value + +The `'message'` event is emitted when the worker thread has invoked +[`require('worker').postMessage()`][]. See the [`port.on('message')`][] event +for more details. + +### Event: 'online' + + +The `'online'` event is emitted when the worker thread has started executing +JavaScript code. + +### worker.postMessage(value[, transferList]) + + +* `value` {any} +* `transferList` {Object[]} + +Send a message to the worker that will be received via +[`require('worker').on('workerMessage')`][]. See [`port.postMessage()`][] for +more details. + +### worker.ref() + + +Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker will +*not* let the program exit if it's the only active handle left (the default +behavior). If the worker is `ref()`ed, calling `ref()` again will have +no effect. + +### worker.terminate([callback]) + + +* `callback` {Function} + +Stop all JavaScript execution in the worker thread as soon as possible. +`callback` is an optional function that is invoked once this operation is known +to have completed. + +**Warning**: Currently, not all code in the internals of Node.js is prepared to +expect termination at arbitrary points in time and may crash if it encounters +that condition. Consequently, you should currently only call `.terminate()` if +it is known that the Worker thread is not accessing Node.js core modules other +than what is exposed in the `worker` module. + +### worker.threadId + + +* {integer} + +An integer identifier for the referenced thread. Inside the worker thread, +it is available as [`require('worker').threadId`][]. + +### worker.unref() + + +Calling `unref()` on a worker will allow the thread to exit if this is the only +active handle in the event system. If the worker is already `unref()`ed calling +`unref()` again will have no effect. + [`Buffer`]: buffer.html -[child processes]: child_process.html [`EventEmitter`]: events.html [`MessagePort`]: #worker_class_messageport [`port.postMessage()`]: #worker_port_postmessage_value_transferlist +[`Worker`]: #worker_class_worker +[`worker.terminate()`]: #worker_worker_terminate_callback +[`worker.postMessage()`]: #worker_worker_postmessage_value_transferlist_1 +[`worker.on('message')`]: #worker_event_message_1 +[`worker.threadId`]: #worker_worker_threadid_1 +[`port.on('message')`]: #worker_event_message +[`process.exit()`]: process.html#process_process_exit_code +[`process.abort()`]: process.html#process_process_abort +[`process.chdir()`]: process.html#process_process_chdir_directory +[`process.env`]: process.html#process_process_env +[`process.stdin`]: process.html#process_process_stdin +[`process.stderr`]: process.html#process_process_stderr +[`process.stdout`]: process.html#process_process_stdout +[`process.title`]: process.html#process_process_title +[`require('worker').workerData`]: #worker_worker_workerdata +[`require('worker').on('workerMessage')`]: #worker_event_workermessage +[`require('worker').postMessage()`]: #worker_worker_postmessage_value_transferlist +[`require('worker').isMainThread`]: #worker_worker_ismainthread +[`require('worker').threadId`]: #worker_worker_threadid +[`cluster` module]: cluster.html +[`inspector`]: inspector.html [v8.serdes]: v8.html#v8_serialization_api [`SharedArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer +[Signals events]: process.html#process_signal_events [`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array [browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort +[child processes]: child_process.html [HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm +[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API diff --git a/lib/inspector.js b/lib/inspector.js index 3285c1040a7132..f4ec71fd6c2105 100644 --- a/lib/inspector.js +++ b/lib/inspector.js @@ -12,7 +12,7 @@ const { const util = require('util'); const { Connection, open, url } = process.binding('inspector'); -if (!Connection) +if (!Connection || !require('internal/worker').isMainThread) throw new ERR_INSPECTOR_NOT_AVAILABLE(); const connectionSymbol = Symbol('connectionProperty'); diff --git a/lib/internal/bootstrap/node.js b/lib/internal/bootstrap/node.js index 6477c2d8282f43..4817ec110a99e5 100644 --- a/lib/internal/bootstrap/node.js +++ b/lib/internal/bootstrap/node.js @@ -24,6 +24,7 @@ _shouldAbortOnUncaughtToggle }, { internalBinding, NativeModule }) { const exceptionHandlerState = { captureFn: null }; + const isMainThread = internalBinding('worker').threadId === 0; function startup() { const EventEmitter = NativeModule.require('events'); @@ -100,7 +101,9 @@ NativeModule.require('internal/inspector_async_hook').setup(); } - _process.setupChannel(); + if (isMainThread) + _process.setupChannel(); + _process.setupRawDebug(_rawDebug); const browserGlobals = !process._noBrowserGlobals; @@ -175,8 +178,11 @@ // are running from a script and running the REPL - but there are a few // others like the debugger or running --eval arguments. Here we decide // which mode we run in. - - if (NativeModule.exists('_third_party_main')) { + if (internalBinding('worker').getEnvMessagePort() !== undefined) { + // This means we are in a Worker context, and any script execution + // will be directed by the worker module. + NativeModule.require('internal/worker').setupChild(evalScript); + } else if (NativeModule.exists('_third_party_main')) { // To allow people to extend Node in different ways, this hook allows // one to drop a file lib/_third_party_main.js into the build // directory which will be executed instead of Node's normal loading. @@ -542,7 +548,7 @@ return `process.binding('inspector').callAndPauseOnStart(${fn}, {})`; } - function evalScript(name) { + function evalScript(name, body = wrapForBreakOnFirstLine(process._eval)) { const CJSModule = NativeModule.require('internal/modules/cjs/loader'); const path = NativeModule.require('path'); const cwd = tryGetCwd(path); @@ -550,7 +556,6 @@ const module = new CJSModule(name); module.filename = path.join(cwd, name); module.paths = CJSModule._nodeModulePaths(cwd); - const body = wrapForBreakOnFirstLine(process._eval); const script = `global.__filename = ${JSON.stringify(name)};\n` + 'global.exports = exports;\n' + 'global.module = module;\n' + diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 89c0139f8b6fde..d59531debbd042 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -844,4 +844,9 @@ E('ERR_VM_MODULE_NOT_LINKED', E('ERR_VM_MODULE_NOT_MODULE', 'Provided module is not an instance of Module', Error); E('ERR_VM_MODULE_STATUS', 'Module status %s', Error); +E('ERR_WORKER_NEED_ABSOLUTE_PATH', + 'The worker script filename must be an absolute path. Received "%s"', + TypeError); +E('ERR_WORKER_UNSERIALIZABLE_ERROR', + 'Serializing an uncaught exception failed', Error); E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error); diff --git a/lib/internal/process.js b/lib/internal/process.js index 0f0e40d6a0cdbc..f01be32be4b6a3 100644 --- a/lib/internal/process.js +++ b/lib/internal/process.js @@ -16,6 +16,7 @@ const util = require('util'); const constants = process.binding('constants').os.signals; const assert = require('assert').strict; const { deprecate } = require('internal/util'); +const { isMainThread } = require('internal/worker'); process.assert = deprecate( function(x, msg) { @@ -186,6 +187,11 @@ function setupKillAndExit() { function setupSignalHandlers() { + if (!isMainThread) { + // Worker threads don't receive signals. + return; + } + const signalWraps = Object.create(null); let Signal; diff --git a/lib/internal/process/methods.js b/lib/internal/process/methods.js index 91aca398b346d4..9a954f6a9b93cf 100644 --- a/lib/internal/process/methods.js +++ b/lib/internal/process/methods.js @@ -8,11 +8,18 @@ const { validateMode, validateUint32 } = require('internal/validators'); +const { + isMainThread +} = require('internal/worker'); function setupProcessMethods(_chdir, _cpuUsage, _hrtime, _memoryUsage, _rawDebug, _umask, _initgroups, _setegid, _seteuid, _setgid, _setuid, _setgroups) { // Non-POSIX platforms like Windows don't have certain methods. + // Workers also lack these methods since they change process-global state. + if (!isMainThread) + return; + if (_setgid !== undefined) { setupPosixMethods(_initgroups, _setegid, _seteuid, _setgid, _setuid, _setgroups); diff --git a/lib/internal/process/stdio.js b/lib/internal/process/stdio.js index eaba4dfca13a47..76e6ab85140535 100644 --- a/lib/internal/process/stdio.js +++ b/lib/internal/process/stdio.js @@ -6,6 +6,7 @@ const { ERR_UNKNOWN_STDIN_TYPE, ERR_UNKNOWN_STREAM_TYPE } = require('internal/errors').codes; +const { isMainThread } = require('internal/worker'); exports.setup = setupStdio; @@ -16,6 +17,8 @@ function setupStdio() { function getStdout() { if (stdout) return stdout; + if (!isMainThread) + return new (require('stream').Writable)({ write(b, e, cb) { cb(); } }); stdout = createWritableStdioStream(1); stdout.destroySoon = stdout.destroy; stdout._destroy = function(er, cb) { @@ -31,6 +34,8 @@ function setupStdio() { function getStderr() { if (stderr) return stderr; + if (!isMainThread) + return new (require('stream').Writable)({ write(b, e, cb) { cb(); } }); stderr = createWritableStdioStream(2); stderr.destroySoon = stderr.destroy; stderr._destroy = function(er, cb) { @@ -46,6 +51,8 @@ function setupStdio() { function getStdin() { if (stdin) return stdin; + if (!isMainThread) + return new (require('stream').Readable)({ read() { this.push(null); } }); const tty_wrap = process.binding('tty_wrap'); const fd = 0; diff --git a/lib/internal/util/inspector.js b/lib/internal/util/inspector.js index 634d3302333584..3dd73415ded862 100644 --- a/lib/internal/util/inspector.js +++ b/lib/internal/util/inspector.js @@ -1,6 +1,8 @@ 'use strict'; -const hasInspector = process.config.variables.v8_enable_inspector === 1; +// TODO(addaleax): Figure out how to integrate the inspector with workers. +const hasInspector = process.config.variables.v8_enable_inspector === 1 && + require('internal/worker').isMainThread; const inspector = hasInspector ? require('inspector') : undefined; let session; diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 73f7525aa73cc2..c982478b9334e8 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -1,24 +1,49 @@ 'use strict'; +const Buffer = require('buffer').Buffer; const EventEmitter = require('events'); +const assert = require('assert'); +const path = require('path'); const util = require('util'); +const { + ERR_INVALID_ARG_TYPE, + ERR_WORKER_NEED_ABSOLUTE_PATH, + ERR_WORKER_UNSERIALIZABLE_ERROR +} = require('internal/errors').codes; const { internalBinding } = require('internal/bootstrap/loaders'); const { MessagePort, MessageChannel } = internalBinding('messaging'); const { handle_onclose } = internalBinding('symbols'); +const { clearAsyncIdStack } = require('internal/async_hooks'); util.inherits(MessagePort, EventEmitter); +const { + Worker: WorkerImpl, + getEnvMessagePort, + threadId +} = internalBinding('worker'); + +const isMainThread = threadId === 0; + const kOnMessageListener = Symbol('kOnMessageListener'); +const kHandle = Symbol('kHandle'); +const kPort = Symbol('kPort'); +const kPublicPort = Symbol('kPublicPort'); +const kDispose = Symbol('kDispose'); +const kOnExit = Symbol('kOnExit'); +const kOnMessage = Symbol('kOnMessage'); +const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr'); +const kOnErrorMessage = Symbol('kOnErrorMessage'); const debug = util.debuglog('worker'); -// A MessagePort consists of a handle (that wraps around an +// A communication channel consisting of a handle (that wraps around an // uv_async_t) which can receive information from other threads and emits // .onmessage events, and a function used for sending data to a MessagePort // in some other thread. MessagePort.prototype[kOnMessageListener] = function onmessage(payload) { - debug('received message', payload); + debug(`[${threadId}] received message`, payload); // Emit the deserialized object to userland. this.emit('message', payload); }; @@ -79,6 +104,9 @@ MessagePort.prototype.close = function(cb) { originalClose.call(this); }; +const drainMessagePort = MessagePort.prototype.drain; +delete MessagePort.prototype.drain; + function setupPortReferencing(port, eventEmitter, eventName) { // Keep track of whether there are any workerMessage listeners: // If there are some, ref() the channel so it keeps the event loop alive. @@ -99,7 +127,194 @@ function setupPortReferencing(port, eventEmitter, eventName) { }); } + +class Worker extends EventEmitter { + constructor(filename, options = {}) { + super(); + debug(`[${threadId}] create new worker`, filename, options); + if (typeof filename !== 'string') { + throw new ERR_INVALID_ARG_TYPE('filename', 'string', filename); + } + + if (!options.eval && !path.isAbsolute(filename)) { + throw new ERR_WORKER_NEED_ABSOLUTE_PATH(filename); + } + + // Set up the C++ handle for the worker, as well as some internal wiring. + this[kHandle] = new WorkerImpl(); + this[kHandle].onexit = (code) => this[kOnExit](code); + this[kPort] = this[kHandle].messagePort; + this[kPort].on('message', (data) => this[kOnMessage](data)); + this[kPort].start(); + this[kPort].unref(); + debug(`[${threadId}] created Worker with ID ${this.threadId}`); + + const { port1, port2 } = new MessageChannel(); + this[kPublicPort] = port1; + this[kPublicPort].on('message', (message) => this.emit('message', message)); + setupPortReferencing(this[kPublicPort], this, 'message'); + this[kPort].postMessage({ + type: 'loadScript', + filename, + doEval: !!options.eval, + workerData: options.workerData, + publicPort: port2 + }, [port2]); + // Actually start the new thread now that everything is in place. + this[kHandle].startThread(); + } + + [kOnExit](code) { + debug(`[${threadId}] hears end event for Worker ${this.threadId}`); + drainMessagePort.call(this[kPublicPort]); + this[kDispose](); + this.emit('exit', code); + this.removeAllListeners(); + } + + [kOnCouldNotSerializeErr]() { + this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR()); + } + + [kOnErrorMessage](serialized) { + // This is what is called for uncaught exceptions. + const error = deserializeError(serialized); + this.emit('error', error); + } + + [kOnMessage](message) { + switch (message.type) { + case 'upAndRunning': + return this.emit('online'); + case 'couldNotSerializeError': + return this[kOnCouldNotSerializeErr](); + case 'errorMessage': + return this[kOnErrorMessage](message.error); + } + + assert.fail(`Unknown worker message type ${message.type}`); + } + + [kDispose]() { + this[kHandle].onexit = null; + this[kHandle] = null; + this[kPort] = null; + this[kPublicPort] = null; + } + + postMessage(...args) { + this[kPublicPort].postMessage(...args); + } + + terminate(callback) { + if (this[kHandle] === null) return; + + debug(`[${threadId}] terminates Worker with ID ${this.threadId}`); + + if (typeof callback !== 'undefined') + this.once('exit', (exitCode) => callback(null, exitCode)); + + this[kHandle].stopThread(); + } + + ref() { + if (this[kHandle] === null) return; + + this[kHandle].ref(); + this[kPublicPort].ref(); + } + + unref() { + if (this[kHandle] === null) return; + + this[kHandle].unref(); + this[kPublicPort].unref(); + } + + get threadId() { + if (this[kHandle] === null) return -1; + + return this[kHandle].threadId; + } +} + +let originalFatalException; + +function setupChild(evalScript) { + // Called during bootstrap to set up worker script execution. + debug(`[${threadId}] is setting up worker child environment`); + const port = getEnvMessagePort(); + + const publicWorker = require('worker'); + + port.on('message', (message) => { + if (message.type === 'loadScript') { + const { filename, doEval, workerData, publicPort } = message; + publicWorker.parentPort = publicPort; + setupPortReferencing(publicPort, publicPort, 'message'); + publicWorker.workerData = workerData; + debug(`[${threadId}] starts worker script ${filename} ` + + `(eval = ${eval}) at cwd = ${process.cwd()}`); + port.unref(); + port.postMessage({ type: 'upAndRunning' }); + if (doEval) { + evalScript('[worker eval]', filename); + } else { + process.argv[1] = filename; // script filename + require('module').runMain(); + } + return; + } + + assert.fail(`Unknown worker message type ${message.type}`); + }); + + port.start(); + + originalFatalException = process._fatalException; + process._fatalException = fatalException; + + function fatalException(error) { + debug(`[${threadId}] gets fatal exception`); + let caught = false; + try { + caught = originalFatalException.call(this, error); + } catch (e) { + error = e; + } + debug(`[${threadId}] fatal exception caught = ${caught}`); + + if (!caught) { + let serialized; + try { + serialized = serializeError(error); + } catch {} + debug(`[${threadId}] fatal exception serialized = ${!!serialized}`); + if (serialized) + port.postMessage({ type: 'errorMessage', error: serialized }); + else + port.postMessage({ type: 'couldNotSerializeError' }); + clearAsyncIdStack(); + } + } +} + +// TODO(addaleax): These can be improved a lot. +function serializeError(error) { + return Buffer.from(util.inspect(error), 'utf8'); +} + +function deserializeError(error) { + return Buffer.from(error.buffer, + error.byteOffset, + error.byteLength).toString('utf8'); +} + module.exports = { MessagePort, - MessageChannel + MessageChannel, + threadId, + Worker, + setupChild, + isMainThread }; diff --git a/lib/worker.js b/lib/worker.js index d67fb4efe40a33..0609650cd5731d 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -1,5 +1,18 @@ 'use strict'; -const { MessagePort, MessageChannel } = require('internal/worker'); +const { + isMainThread, + MessagePort, + MessageChannel, + threadId, + Worker +} = require('internal/worker'); -module.exports = { MessagePort, MessageChannel }; +module.exports = { + isMainThread, + MessagePort, + MessageChannel, + threadId, + Worker, + parentPort: null +}; diff --git a/node.gyp b/node.gyp index 804c102a5b7eab..e9a2cb46a851da 100644 --- a/node.gyp +++ b/node.gyp @@ -349,6 +349,7 @@ 'src/node_v8.cc', 'src/node_stat_watcher.cc', 'src/node_watchdog.cc', + 'src/node_worker.cc', 'src/node_zlib.cc', 'src/node_i18n.cc', 'src/pipe_wrap.cc', @@ -407,6 +408,7 @@ 'src/node_wrap.h', 'src/node_revert.h', 'src/node_i18n.h', + 'src/node_worker.h', 'src/pipe_wrap.h', 'src/tty_wrap.h', 'src/tcp_wrap.h', diff --git a/src/async_wrap.h b/src/async_wrap.h index cf269a4c1f5e1e..b2f96477b490e0 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -67,6 +67,7 @@ namespace node { V(TTYWRAP) \ V(UDPSENDWRAP) \ V(UDPWRAP) \ + V(WORKER) \ V(WRITEWRAP) \ V(ZLIB) diff --git a/src/base_object-inl.h b/src/base_object-inl.h index 3bd854639b2c6d..06a29223973c5d 100644 --- a/src/base_object-inl.h +++ b/src/base_object-inl.h @@ -65,6 +65,14 @@ v8::Local BaseObject::object() { return PersistentToLocal(env_->isolate(), persistent_handle_); } +v8::Local BaseObject::object(v8::Isolate* isolate) { + v8::Local handle = object(); +#ifdef DEBUG + CHECK_EQ(handle->CreationContext()->GetIsolate(), isolate); + CHECK_EQ(env_->isolate(), isolate); +#endif + return handle; +} Environment* BaseObject::env() const { return env_; diff --git a/src/base_object.h b/src/base_object.h index e0b60843401681..38291d598feb1c 100644 --- a/src/base_object.h +++ b/src/base_object.h @@ -43,6 +43,10 @@ class BaseObject { // persistent.IsEmpty() is true. inline v8::Local object(); + // Same as the above, except it additionally verifies that this object + // is associated with the passed Isolate in debug mode. + inline v8::Local object(v8::Isolate* isolate); + inline Persistent& persistent(); inline Environment* env() const; diff --git a/src/bootstrapper.cc b/src/bootstrapper.cc index 35c7c4dc696ebd..f9db02562d9c8a 100644 --- a/src/bootstrapper.cc +++ b/src/bootstrapper.cc @@ -114,12 +114,14 @@ void SetupBootstrapObject(Environment* env, BOOTSTRAP_METHOD(_umask, Umask); #if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__) - BOOTSTRAP_METHOD(_initgroups, InitGroups); - BOOTSTRAP_METHOD(_setegid, SetEGid); - BOOTSTRAP_METHOD(_seteuid, SetEUid); - BOOTSTRAP_METHOD(_setgid, SetGid); - BOOTSTRAP_METHOD(_setuid, SetUid); - BOOTSTRAP_METHOD(_setgroups, SetGroups); + if (env->is_main_thread()) { + BOOTSTRAP_METHOD(_initgroups, InitGroups); + BOOTSTRAP_METHOD(_setegid, SetEGid); + BOOTSTRAP_METHOD(_seteuid, SetEUid); + BOOTSTRAP_METHOD(_setgid, SetGid); + BOOTSTRAP_METHOD(_setuid, SetUid); + BOOTSTRAP_METHOD(_setgroups, SetGroups); + } #endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__) Local should_abort_on_uncaught_toggle = diff --git a/src/callback_scope.cc b/src/callback_scope.cc index 9eac7beb038a26..23e6d5b0632f2c 100644 --- a/src/callback_scope.cc +++ b/src/callback_scope.cc @@ -79,6 +79,11 @@ void InternalCallbackScope::Close() { closed_ = true; HandleScope handle_scope(env_->isolate()); + if (!env_->can_call_into_js()) return; + if (failed_ && !env_->is_main_thread() && env_->is_stopping_worker()) { + env_->async_hooks()->clear_async_id_stack(); + } + if (pushed_ids_) env_->async_hooks()->pop_async_id(async_context_.async_id); diff --git a/src/env-inl.h b/src/env-inl.h index 50328bd77c1a89..eeb419b4a0fad2 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -582,13 +582,42 @@ void Environment::SetUnrefImmediate(native_immediate_callback cb, } inline bool Environment::can_call_into_js() const { - return can_call_into_js_; + return can_call_into_js_ && (is_main_thread() || !is_stopping_worker()); } inline void Environment::set_can_call_into_js(bool can_call_into_js) { can_call_into_js_ = can_call_into_js; } +inline bool Environment::is_main_thread() const { + return thread_id_ == 0; +} + +inline double Environment::thread_id() const { + return thread_id_; +} + +inline void Environment::set_thread_id(double id) { + thread_id_ = id; +} + +inline worker::Worker* Environment::worker_context() const { + return worker_context_; +} + +inline void Environment::set_worker_context(worker::Worker* context) { + CHECK_EQ(worker_context_, nullptr); // Should be set only once. + worker_context_ = context; +} + +inline void Environment::add_sub_worker_context(worker::Worker* context) { + sub_worker_contexts_.insert(context); +} + +inline void Environment::remove_sub_worker_context(worker::Worker* context) { + sub_worker_contexts_.erase(context); +} + inline performance::performance_state* Environment::performance_state() { return performance_state_.get(); } diff --git a/src/env.cc b/src/env.cc index 090b43968bf665..8df59d1546dbdd 100644 --- a/src/env.cc +++ b/src/env.cc @@ -4,6 +4,7 @@ #include "node_buffer.h" #include "node_platform.h" #include "node_file.h" +#include "node_worker.h" #include "tracing/agent.h" #include @@ -25,6 +26,7 @@ using v8::StackTrace; using v8::String; using v8::Symbol; using v8::Value; +using worker::Worker; IsolateData::IsolateData(Isolate* isolate, uv_loop_t* event_loop, @@ -444,7 +446,9 @@ void Environment::RunAndClearNativeImmediates() { if (it->refed_) ref_count++; if (UNLIKELY(try_catch.HasCaught())) { - FatalException(isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(isolate(), try_catch); + // Bail out, remove the already executed callbacks from list // and set up a new TryCatch for the other pending callbacks. std::move_backward(it, list.end(), list.begin() + (list.end() - it)); @@ -632,4 +636,25 @@ void Environment::AsyncHooks::grow_async_ids_stack() { uv_key_t Environment::thread_local_env = {}; +void Environment::Exit(int exit_code) { + if (is_main_thread()) + exit(exit_code); + else + worker_context_->Exit(exit_code); +} + +void Environment::stop_sub_worker_contexts() { + while (!sub_worker_contexts_.empty()) { + Worker* w = *sub_worker_contexts_.begin(); + remove_sub_worker_context(w); + w->Exit(1); + w->JoinThread(); + } +} + +bool Environment::is_stopping_worker() const { + CHECK(!is_main_thread()); + return worker_context_->is_stopped(); +} + } // namespace node diff --git a/src/env.h b/src/env.h index cdb592732a4264..cf6873e5fe7c6a 100644 --- a/src/env.h +++ b/src/env.h @@ -55,6 +55,10 @@ namespace performance { class performance_state; } +namespace worker { +class Worker; +} + namespace loader { class ModuleWrap; @@ -193,7 +197,10 @@ struct PackageConfig { V(mac_string, "mac") \ V(main_string, "main") \ V(max_buffer_string, "maxBuffer") \ + V(max_semi_space_size_string, "maxSemiSpaceSize") \ + V(max_old_space_size_string, "maxOldSpaceSize") \ V(message_string, "message") \ + V(message_port_string, "messagePort") \ V(message_port_constructor_string, "MessagePort") \ V(minttl_string, "minttl") \ V(modulus_string, "modulus") \ @@ -280,6 +287,7 @@ struct PackageConfig { V(subject_string, "subject") \ V(subjectaltname_string, "subjectaltname") \ V(syscall_string, "syscall") \ + V(thread_id_string, "threadId") \ V(ticketkeycallback_string, "onticketkeycallback") \ V(timeout_string, "timeout") \ V(tls_ticket_string, "tlsTicket") \ @@ -328,6 +336,7 @@ struct PackageConfig { V(http2stream_constructor_template, v8::ObjectTemplate) \ V(immediate_callback_function, v8::Function) \ V(inspector_console_api_object, v8::Object) \ + V(message_port, v8::Object) \ V(message_port_constructor_template, v8::FunctionTemplate) \ V(pbkdf2_constructor_template, v8::ObjectTemplate) \ V(pipe_constructor_template, v8::FunctionTemplate) \ @@ -601,6 +610,7 @@ class Environment { void RegisterHandleCleanups(); void CleanupHandles(); + void Exit(int code); // Register clean-up cb to be called on environment destruction. inline void RegisterHandleCleanup(uv_handle_t* handle, @@ -714,6 +724,18 @@ class Environment { inline bool can_call_into_js() const; inline void set_can_call_into_js(bool can_call_into_js); + // TODO(addaleax): This should be inline. + bool is_stopping_worker() const; + + inline bool is_main_thread() const; + inline double thread_id() const; + inline void set_thread_id(double id); + inline worker::Worker* worker_context() const; + inline void set_worker_context(worker::Worker* context); + inline void add_sub_worker_context(worker::Worker* context); + inline void remove_sub_worker_context(worker::Worker* context); + void stop_sub_worker_contexts(); + inline void ThrowError(const char* errmsg); inline void ThrowTypeError(const char* errmsg); inline void ThrowRangeError(const char* errmsg); @@ -855,12 +877,15 @@ class Environment { std::vector destroy_async_id_list_; AliasedBuffer should_abort_on_uncaught_toggle_; - int should_not_abort_scope_counter_ = 0; std::unique_ptr performance_state_; std::unordered_map performance_marks_; + bool can_call_into_js_ = true; + double thread_id_ = 0; + std::unordered_set sub_worker_contexts_; + #if HAVE_INSPECTOR std::unique_ptr inspector_agent_; @@ -893,6 +918,8 @@ class Environment { std::vector> file_handle_read_wrap_freelist_; + worker::Worker* worker_context_ = nullptr; + struct ExitCallback { void (*cb_)(void* arg); void* arg_; diff --git a/src/js_stream.cc b/src/js_stream.cc index c766c322e3017a..e562a62f3d1bb2 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -44,7 +44,8 @@ bool JSStream::IsClosing() { TryCatch try_catch(env()->isolate()); Local value; if (!MakeCallback(env()->isclosing_string(), 0, nullptr).ToLocal(&value)) { - FatalException(env()->isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(env()->isolate(), try_catch); return true; } return value->IsTrue(); @@ -59,7 +60,8 @@ int JSStream::ReadStart() { int value_int = UV_EPROTO; if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) || !value->Int32Value(env()->context()).To(&value_int)) { - FatalException(env()->isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(env()->isolate(), try_catch); } return value_int; } @@ -73,7 +75,8 @@ int JSStream::ReadStop() { int value_int = UV_EPROTO; if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) || !value->Int32Value(env()->context()).To(&value_int)) { - FatalException(env()->isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(env()->isolate(), try_catch); } return value_int; } @@ -94,7 +97,8 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) { arraysize(argv), argv).ToLocal(&value) || !value->Int32Value(env()->context()).To(&value_int)) { - FatalException(env()->isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(env()->isolate(), try_catch); } return value_int; } @@ -128,7 +132,8 @@ int JSStream::DoWrite(WriteWrap* w, arraysize(argv), argv).ToLocal(&value) || !value->Int32Value(env()->context()).To(&value_int)) { - FatalException(env()->isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(env()->isolate(), try_catch); } return value_int; } diff --git a/src/node.cc b/src/node.cc index baa97281b064a7..663e4a222eba91 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1021,9 +1021,9 @@ void AppendExceptionLine(Environment* env, } -static void ReportException(Environment* env, - Local er, - Local message) { +void ReportException(Environment* env, + Local er, + Local message) { CHECK(!er.IsEmpty()); HandleScope scope(env->isolate()); @@ -1110,9 +1110,9 @@ static void ReportException(Environment* env, const TryCatch& try_catch) { // Executes a str within the current v8 context. -static Local ExecuteString(Environment* env, - Local source, - Local filename) { +static MaybeLocal ExecuteString(Environment* env, + Local source, + Local filename) { EscapableHandleScope scope(env->isolate()); TryCatch try_catch(env->isolate()); @@ -1125,13 +1125,19 @@ static Local ExecuteString(Environment* env, v8::Script::Compile(env->context(), source, &origin); if (script.IsEmpty()) { ReportException(env, try_catch); - exit(3); + env->Exit(3); + return MaybeLocal(); } MaybeLocal result = script.ToLocalChecked()->Run(env->context()); if (result.IsEmpty()) { + if (try_catch.HasTerminated()) { + env->isolate()->CancelTerminateExecution(); + return MaybeLocal(); + } ReportException(env, try_catch); - exit(4); + env->Exit(4); + return MaybeLocal(); } return scope.Escape(result.ToLocalChecked()); @@ -1230,6 +1236,7 @@ static void Abort(const FunctionCallbackInfo& args) { void Chdir(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); + CHECK(env->is_main_thread()); CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsString()); @@ -1411,6 +1418,7 @@ static void GetEGid(const FunctionCallbackInfo& args) { void SetGid(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); + CHECK(env->is_main_thread()); CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsUint32() || args[0]->IsString()); @@ -1430,6 +1438,7 @@ void SetGid(const FunctionCallbackInfo& args) { void SetEGid(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); + CHECK(env->is_main_thread()); CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsUint32() || args[0]->IsString()); @@ -1449,6 +1458,7 @@ void SetEGid(const FunctionCallbackInfo& args) { void SetUid(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); + CHECK(env->is_main_thread()); CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsUint32() || args[0]->IsString()); @@ -1468,6 +1478,7 @@ void SetUid(const FunctionCallbackInfo& args) { void SetEUid(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); + CHECK(env->is_main_thread()); CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsUint32() || args[0]->IsString()); @@ -1629,9 +1640,10 @@ static void WaitForInspectorDisconnect(Environment* env) { static void Exit(const FunctionCallbackInfo& args) { - WaitForInspectorDisconnect(Environment::GetCurrent(args)); + Environment* env = Environment::GetCurrent(args); + WaitForInspectorDisconnect(env); v8_platform.StopTracingAgent(); - exit(args[0]->Int32Value()); + env->Exit(args[0]->Int32Value()); } @@ -2040,6 +2052,9 @@ void FatalException(Isolate* isolate, Local caught = fatal_exception_function->Call(process_object, 1, &error); + if (fatal_try_catch.HasTerminated()) + return; + if (fatal_try_catch.HasCaught()) { // The fatal exception function threw, so we must exit ReportException(env, fatal_try_catch); @@ -2053,6 +2068,12 @@ void FatalException(Isolate* isolate, void FatalException(Isolate* isolate, const TryCatch& try_catch) { + // If we try to print out a termination exception, we'd just get 'null', + // so just crashing here with that information seems like a better idea, + // and in particular it seems like we should handle terminations at the call + // site for this function rather than by printing them out somewhere. + CHECK(!try_catch.HasTerminated()); + HandleScope scope(isolate); if (!try_catch.IsVerbose()) { FatalException(isolate, try_catch.Exception(), try_catch.Message()); @@ -2574,11 +2595,12 @@ void SetupProcessObject(Environment* env, Local process = env->process_object(); auto title_string = FIXED_ONE_BYTE_STRING(env->isolate(), "title"); - CHECK(process->SetAccessor(env->context(), - title_string, - ProcessTitleGetter, - ProcessTitleSetter, - env->as_external()).FromJust()); + CHECK(process->SetAccessor( + env->context(), + title_string, + ProcessTitleGetter, + env->is_main_thread() ? ProcessTitleSetter : nullptr, + env->as_external()).FromJust()); // process.version READONLY_PROPERTY(process, @@ -2862,25 +2884,27 @@ void SetupProcessObject(Environment* env, CHECK(process->SetAccessor(env->context(), debug_port_string, DebugPortGetter, - DebugPortSetter, + env->is_main_thread() ? DebugPortSetter : nullptr, env->as_external()).FromJust()); // define various internal methods - env->SetMethod(process, - "_startProfilerIdleNotifier", - StartProfilerIdleNotifier); - env->SetMethod(process, - "_stopProfilerIdleNotifier", - StopProfilerIdleNotifier); + if (env->is_main_thread()) { + env->SetMethod(process, + "_startProfilerIdleNotifier", + StartProfilerIdleNotifier); + env->SetMethod(process, + "_stopProfilerIdleNotifier", + StopProfilerIdleNotifier); + env->SetMethod(process, "abort", Abort); + env->SetMethod(process, "chdir", Chdir); + env->SetMethod(process, "umask", Umask); + } + env->SetMethod(process, "_getActiveRequests", GetActiveRequests); env->SetMethod(process, "_getActiveHandles", GetActiveHandles); env->SetMethod(process, "reallyExit", Exit); - env->SetMethod(process, "abort", Abort); - env->SetMethod(process, "chdir", Chdir); env->SetMethod(process, "cwd", Cwd); - env->SetMethod(process, "umask", Umask); - #if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__) env->SetMethod(process, "getuid", GetUid); env->SetMethod(process, "geteuid", GetEUid); @@ -2890,16 +2914,17 @@ void SetupProcessObject(Environment* env, #endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__) env->SetMethod(process, "_kill", Kill); + env->SetMethod(process, "dlopen", DLOpen); - env->SetMethod(process, "_debugProcess", DebugProcess); - env->SetMethod(process, "_debugEnd", DebugEnd); + if (env->is_main_thread()) { + env->SetMethod(process, "_debugProcess", DebugProcess); + env->SetMethod(process, "_debugEnd", DebugEnd); + } env->SetMethod(process, "hrtime", Hrtime); env->SetMethod(process, "cpuUsage", CPUUsage); - env->SetMethod(process, "dlopen", DLOpen); - env->SetMethod(process, "uptime", Uptime); env->SetMethod(process, "memoryUsage", MemoryUsage); } @@ -2935,8 +2960,10 @@ void RawDebug(const FunctionCallbackInfo& args) { } -static Local GetBootstrapper(Environment* env, Local source, - Local script_name) { +static MaybeLocal GetBootstrapper( + Environment* env, + Local source, + Local script_name) { EscapableHandleScope scope(env->isolate()); TryCatch try_catch(env->isolate()); @@ -2947,16 +2974,17 @@ static Local GetBootstrapper(Environment* env, Local source, try_catch.SetVerbose(false); // Execute the bootstrapper javascript file - Local bootstrapper_v = ExecuteString(env, source, script_name); + MaybeLocal bootstrapper_v = ExecuteString(env, source, script_name); + if (bootstrapper_v.IsEmpty()) // This happens when execution was interrupted. + return MaybeLocal(); + if (try_catch.HasCaught()) { ReportException(env, try_catch); exit(10); } - CHECK(bootstrapper_v->IsFunction()); - Local bootstrapper = Local::Cast(bootstrapper_v); - - return scope.Escape(bootstrapper); + CHECK(bootstrapper_v.ToLocalChecked()->IsFunction()); + return scope.Escape(bootstrapper_v.ToLocalChecked().As()); } static bool ExecuteBootstrapper(Environment* env, Local bootstrapper, @@ -2995,13 +3023,18 @@ void LoadEnvironment(Environment* env) { // node_js2c. Local loaders_name = FIXED_ONE_BYTE_STRING(env->isolate(), "internal/bootstrap/loaders.js"); - Local loaders_bootstrapper = + MaybeLocal loaders_bootstrapper = GetBootstrapper(env, LoadersBootstrapperSource(env), loaders_name); Local node_name = FIXED_ONE_BYTE_STRING(env->isolate(), "internal/bootstrap/node.js"); - Local node_bootstrapper = + MaybeLocal node_bootstrapper = GetBootstrapper(env, NodeBootstrapperSource(env), node_name); + if (loaders_bootstrapper.IsEmpty() || node_bootstrapper.IsEmpty()) { + // Execution was interrupted. + return; + } + // Add a reference to the global object Local global = env->context()->Global(); @@ -3049,7 +3082,7 @@ void LoadEnvironment(Environment* env) { // Bootstrap internal loaders Local bootstrapped_loaders; - if (!ExecuteBootstrapper(env, loaders_bootstrapper, + if (!ExecuteBootstrapper(env, loaders_bootstrapper.ToLocalChecked(), arraysize(loaders_bootstrapper_args), loaders_bootstrapper_args, &bootstrapped_loaders)) { @@ -3065,7 +3098,7 @@ void LoadEnvironment(Environment* env) { bootstrapper, bootstrapped_loaders }; - if (!ExecuteBootstrapper(env, node_bootstrapper, + if (!ExecuteBootstrapper(env, node_bootstrapper.ToLocalChecked(), arraysize(node_bootstrapper_args), node_bootstrapper_args, &bootstrapped_node)) { @@ -4279,6 +4312,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data, WaitForInspectorDisconnect(&env); env.set_can_call_into_js(false); + env.stop_sub_worker_contexts(); env.RunCleanup(); RunAtExit(&env); diff --git a/src/node_errors.h b/src/node_errors.h index 931ce7b8fdbf33..2c97088cc553b4 100644 --- a/src/node_errors.h +++ b/src/node_errors.h @@ -34,6 +34,7 @@ namespace node { V(ERR_MISSING_ARGS, TypeError) \ V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, TypeError) \ V(ERR_MISSING_MODULE, Error) \ + V(ERR_MISSING_PLATFORM_FOR_WORKER, Error) \ V(ERR_SCRIPT_EXECUTION_INTERRUPTED, Error) \ V(ERR_SCRIPT_EXECUTION_TIMEOUT, Error) \ V(ERR_STRING_TOO_LONG, Error) \ @@ -68,6 +69,9 @@ namespace node { V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \ V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, \ "MessagePort was found in message but not listed in transferList") \ + V(ERR_MISSING_PLATFORM_FOR_WORKER, \ + "The V8 platform used by this instance of Node does not support " \ + "creating Workers") \ V(ERR_SCRIPT_EXECUTION_INTERRUPTED, \ "Script execution was interrupted by `SIGINT`") \ V(ERR_TRANSFERRING_EXTERNALIZED_SHAREDARRAYBUFFER, \ diff --git a/src/node_internals.h b/src/node_internals.h index a5d8ed0e5d3ad7..7760eb26c6c15c 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -137,6 +137,7 @@ struct sockaddr; V(util) \ V(uv) \ V(v8) \ + V(worker) \ V(zlib) #define NODE_BUILTIN_MODULES(V) \ @@ -314,6 +315,10 @@ class FatalTryCatch : public v8::TryCatch { Environment* env_; }; +void ReportException(Environment* env, + v8::Local er, + v8::Local message); + v8::Maybe ProcessEmitWarning(Environment* env, const char* fmt, ...); v8::Maybe ProcessEmitDeprecationWarning(Environment* env, const char* warning, diff --git a/src/node_messaging.cc b/src/node_messaging.cc index b56cef2d7767cf..352749ea48f483 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -57,7 +57,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate { if (!deserializer->ReadUint32(&id)) return MaybeLocal(); CHECK_LE(id, message_ports_.size()); - return message_ports_[id]->object(); + return message_ports_[id]->object(isolate); }; MaybeLocal GetSharedArrayBufferFromId( @@ -436,7 +436,7 @@ MessagePort* MessagePort::New( void MessagePort::OnMessage() { HandleScope handle_scope(env()->isolate()); - Local context = object()->CreationContext(); + Local context = object(env()->isolate())->CreationContext(); // data_ can only ever be modified by the owner thread, so no need to lock. // However, the message port may be transferred while it is processing @@ -447,6 +447,13 @@ void MessagePort::OnMessage() { { // Get the head of the message queue. Mutex::ScopedLock lock(data_->mutex_); + + if (stop_event_loop_) { + CHECK(!data_->receiving_messages_); + uv_stop(env()->event_loop()); + break; + } + if (!data_->receiving_messages_) break; if (data_->incoming_messages_.empty()) @@ -514,8 +521,9 @@ void MessagePort::Send(Message&& message) { void MessagePort::Send(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); + Local context = object(env->isolate())->CreationContext(); Message msg; - if (msg.Serialize(env, object()->CreationContext(), args[0], args[1]) + if (msg.Serialize(env, context, args[0], args[1]) .IsNothing()) { return; } @@ -548,6 +556,14 @@ void MessagePort::Stop() { data_->receiving_messages_ = false; } +void MessagePort::StopEventLoop() { + Mutex::ScopedLock lock(data_->mutex_); + data_->receiving_messages_ = false; + stop_event_loop_ = true; + + TriggerAsync(); +} + void MessagePort::Start(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); MessagePort* port; @@ -570,6 +586,12 @@ void MessagePort::Stop(const FunctionCallbackInfo& args) { port->Stop(); } +void MessagePort::Drain(const FunctionCallbackInfo& args) { + MessagePort* port; + ASSIGN_OR_RETURN_UNWRAP(&port, args.This()); + port->OnMessage(); +} + size_t MessagePort::self_size() const { Mutex::ScopedLock lock(data_->mutex_); size_t sz = sizeof(*this) + sizeof(*data_); @@ -604,6 +626,7 @@ MaybeLocal GetMessagePortConstructor( env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage); env->SetProtoMethod(m, "start", MessagePort::Start); env->SetProtoMethod(m, "stop", MessagePort::Stop); + env->SetProtoMethod(m, "drain", MessagePort::Drain); env->SetProtoMethod(m, "close", HandleWrap::Close); env->SetProtoMethod(m, "unref", HandleWrap::Unref); env->SetProtoMethod(m, "ref", HandleWrap::Ref); diff --git a/src/node_messaging.h b/src/node_messaging.h index ff8fcc72439e9f..9a13437d19a331 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -133,11 +133,15 @@ class MessagePort : public HandleWrap { void Start(); // Stop processing messages on this port as a receiving end. void Stop(); + // Stop processing messages on this port as a receiving end, + // and stop the event loop that this port is associated with. + void StopEventLoop(); static void New(const v8::FunctionCallbackInfo& args); static void PostMessage(const v8::FunctionCallbackInfo& args); static void Start(const v8::FunctionCallbackInfo& args); static void Stop(const v8::FunctionCallbackInfo& args); + static void Drain(const v8::FunctionCallbackInfo& args); // Turns `a` and `b` into siblings, i.e. connects the sending side of one // to the receiving side of the other. This is not thread-safe. @@ -160,6 +164,7 @@ class MessagePort : public HandleWrap { inline uv_async_t* async(); std::unique_ptr data_ = nullptr; + bool stop_event_loop_ = false; friend class MessagePortData; }; diff --git a/src/node_worker.cc b/src/node_worker.cc new file mode 100644 index 00000000000000..4d9e1ee98dca17 --- /dev/null +++ b/src/node_worker.cc @@ -0,0 +1,427 @@ +#include "node_worker.h" +#include "node_errors.h" +#include "node_internals.h" +#include "node_buffer.h" +#include "node_perf.h" +#include "util.h" +#include "util-inl.h" +#include "async_wrap.h" +#include "async_wrap-inl.h" + +using v8::ArrayBuffer; +using v8::Context; +using v8::Function; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::HandleScope; +using v8::Integer; +using v8::Isolate; +using v8::Local; +using v8::Locker; +using v8::Number; +using v8::Object; +using v8::SealHandleScope; +using v8::String; +using v8::Value; + +namespace node { +namespace worker { + +namespace { + +double next_thread_id = 1; +Mutex next_thread_id_mutex; + +} // anonymous namespace + +Worker::Worker(Environment* env, Local wrap) + : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER) { + // Generate a new thread id. + { + Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex); + thread_id_ = next_thread_id++; + } + wrap->Set(env->context(), + env->thread_id_string(), + Number::New(env->isolate(), thread_id_)).FromJust(); + + // Set up everything that needs to be set up in the parent environment. + parent_port_ = MessagePort::New(env, env->context()); + if (parent_port_ == nullptr) { + // This can happen e.g. because execution is terminating. + return; + } + + child_port_data_.reset(new MessagePortData(nullptr)); + MessagePort::Entangle(parent_port_, child_port_data_.get()); + + object()->Set(env->context(), + env->message_port_string(), + parent_port_->object()).FromJust(); + + array_buffer_allocator_.reset(CreateArrayBufferAllocator()); + + isolate_ = NewIsolate(array_buffer_allocator_.get()); + CHECK_NE(isolate_, nullptr); + CHECK_EQ(uv_loop_init(&loop_), 0); + + thread_exit_async_.reset(new uv_async_t); + thread_exit_async_->data = this; + CHECK_EQ(uv_async_init(env->event_loop(), + thread_exit_async_.get(), + [](uv_async_t* handle) { + static_cast(handle->data)->OnThreadStopped(); + }), 0); + + { + // Enter an environment capable of executing code in the child Isolate + // (and only in it). + Locker locker(isolate_); + Isolate::Scope isolate_scope(isolate_); + HandleScope handle_scope(isolate_); + Local context = NewContext(isolate_); + Context::Scope context_scope(context); + + isolate_data_.reset(CreateIsolateData(isolate_, + &loop_, + env->isolate_data()->platform(), + array_buffer_allocator_.get())); + CHECK(isolate_data_); + + // TODO(addaleax): Use CreateEnvironment(), or generally another public API. + env_.reset(new Environment(isolate_data_.get(), + context, + nullptr)); + CHECK_NE(env_, nullptr); + env_->set_abort_on_uncaught_exception(false); + env_->set_worker_context(this); + env_->set_thread_id(thread_id_); + + env_->Start(0, nullptr, 0, nullptr, env->profiler_idle_notifier_started()); + } + + // The new isolate won't be bothered on this thread again. + isolate_->DiscardThreadSpecificMetadata(); +} + +bool Worker::is_stopped() const { + Mutex::ScopedLock stopped_lock(stopped_mutex_); + return stopped_; +} + +void Worker::Run() { + MultiIsolatePlatform* platform = isolate_data_->platform(); + CHECK_NE(platform, nullptr); + + { + Locker locker(isolate_); + Isolate::Scope isolate_scope(isolate_); + SealHandleScope outer_seal(isolate_); + + { + Context::Scope context_scope(env_->context()); + HandleScope handle_scope(isolate_); + + { + HandleScope handle_scope(isolate_); + Mutex::ScopedLock lock(mutex_); + // Set up the message channel for receiving messages in the child. + child_port_ = MessagePort::New(env_.get(), + env_->context(), + std::move(child_port_data_)); + // MessagePort::New() may return nullptr if execution is terminated + // within it. + if (child_port_ != nullptr) + env_->set_message_port(child_port_->object(isolate_)); + } + + if (!is_stopped()) { + HandleScope handle_scope(isolate_); + Environment::AsyncCallbackScope callback_scope(env_.get()); + env_->async_hooks()->push_async_ids(1, 0); + // This loads the Node bootstrapping code. + LoadEnvironment(env_.get()); + env_->async_hooks()->pop_async_id(1); + } + + { + SealHandleScope seal(isolate_); + bool more; + env_->performance_state()->Mark( + node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START); + do { + if (is_stopped()) break; + uv_run(&loop_, UV_RUN_DEFAULT); + if (is_stopped()) break; + + platform->DrainBackgroundTasks(isolate_); + + more = uv_loop_alive(&loop_); + if (more && !is_stopped()) + continue; + + EmitBeforeExit(env_.get()); + + // Emit `beforeExit` if the loop became alive either after emitting + // event, or after running some callbacks. + more = uv_loop_alive(&loop_); + } while (more == true); + env_->performance_state()->Mark( + node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT); + } + } + + { + int exit_code; + bool stopped = is_stopped(); + if (!stopped) + exit_code = EmitExit(env_.get()); + Mutex::ScopedLock lock(mutex_); + if (exit_code_ == 0 && !stopped) + exit_code_ = exit_code; + } + + env_->set_can_call_into_js(false); + Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_, + Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE); + + // Grab the parent-to-child channel and render is unusable. + MessagePort* child_port; + { + Mutex::ScopedLock lock(mutex_); + child_port = child_port_; + child_port_ = nullptr; + } + + { + Context::Scope context_scope(env_->context()); + child_port->Close(); + env_->stop_sub_worker_contexts(); + env_->RunCleanup(); + RunAtExit(env_.get()); + + { + Mutex::ScopedLock stopped_lock(stopped_mutex_); + stopped_ = true; + } + + env_->RunCleanup(); + + // This call needs to be made while the `Environment` is still alive + // because we assume that it is available for async tracking in the + // NodePlatform implementation. + platform->DrainBackgroundTasks(isolate_); + } + + env_.reset(); + } + + DisposeIsolate(); + + // Need to run the loop one more time to close the platform's uv_async_t + uv_run(&loop_, UV_RUN_ONCE); + + { + Mutex::ScopedLock lock(mutex_); + CHECK(thread_exit_async_); + scheduled_on_thread_stopped_ = true; + uv_async_send(thread_exit_async_.get()); + } +} + +void Worker::DisposeIsolate() { + if (isolate_ == nullptr) + return; + + isolate_->Dispose(); + + CHECK(isolate_data_); + MultiIsolatePlatform* platform = isolate_data_->platform(); + platform->CancelPendingDelayedTasks(isolate_); + + isolate_data_.reset(); + isolate_ = nullptr; +} + +void Worker::JoinThread() { + if (thread_joined_) + return; + CHECK_EQ(uv_thread_join(&tid_), 0); + thread_joined_ = true; + + env()->remove_sub_worker_context(this); + + if (thread_exit_async_) { + env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) { + delete async; + }); + + if (scheduled_on_thread_stopped_) + OnThreadStopped(); + } +} + +void Worker::OnThreadStopped() { + Mutex::ScopedLock lock(mutex_); + scheduled_on_thread_stopped_ = false; + + { + Mutex::ScopedLock stopped_lock(stopped_mutex_); + CHECK(stopped_); + } + + CHECK_EQ(child_port_, nullptr); + parent_port_ = nullptr; + + // It's okay to join the thread while holding the mutex because + // OnThreadStopped means it's no longer doing any work that might grab it + // and really just silently exiting. + JoinThread(); + + { + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + + // Reset the parent port as we're closing it now anyway. + object()->Set(env()->context(), + env()->message_port_string(), + Undefined(env()->isolate())).FromJust(); + + Local code = Integer::New(env()->isolate(), exit_code_); + MakeCallback(env()->onexit_string(), 1, &code); + } + + // JoinThread() cleared all libuv handles bound to this Worker, + // the C++ object is no longer needed for anything now. + MakeWeak(); +} + +Worker::~Worker() { + Mutex::ScopedLock lock(mutex_); + JoinThread(); + + CHECK(stopped_); + CHECK(thread_joined_); + CHECK_EQ(child_port_, nullptr); + CHECK_EQ(uv_loop_close(&loop_), 0); + + // This has most likely already happened within the worker thread -- this + // is just in case Worker creation failed early. + DisposeIsolate(); +} + +void Worker::New(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + + CHECK(args.IsConstructCall()); + + if (env->isolate_data()->platform() == nullptr) { + THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env); + return; + } + + new Worker(env, args.This()); +} + +void Worker::StartThread(const FunctionCallbackInfo& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + Mutex::ScopedLock lock(w->mutex_); + + w->env()->add_sub_worker_context(w); + w->stopped_ = false; + CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) { + static_cast(arg)->Run(); + }, static_cast(w)), 0); + w->thread_joined_ = false; +} + +void Worker::StopThread(const FunctionCallbackInfo& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + + w->Exit(1); + w->JoinThread(); +} + +void Worker::Ref(const FunctionCallbackInfo& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + if (w->thread_exit_async_) + uv_ref(reinterpret_cast(w->thread_exit_async_.get())); +} + +void Worker::Unref(const FunctionCallbackInfo& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + if (w->thread_exit_async_) + uv_unref(reinterpret_cast(w->thread_exit_async_.get())); +} + +void Worker::Exit(int code) { + Mutex::ScopedLock lock(mutex_); + Mutex::ScopedLock stopped_lock(stopped_mutex_); + if (!stopped_) { + CHECK_NE(env_, nullptr); + stopped_ = true; + exit_code_ = code; + if (child_port_ != nullptr) + child_port_->StopEventLoop(); + isolate_->TerminateExecution(); + } +} + +size_t Worker::self_size() const { + return sizeof(*this); +} + +namespace { + +// Return the MessagePort that is global for this Environment and communicates +// with the internal [kPort] port of the JS Worker class in the parent thread. +void GetEnvMessagePort(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + Local port = env->message_port(); + if (!port.IsEmpty()) { + CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate()); + args.GetReturnValue().Set(port); + } +} + +void InitWorker(Local target, + Local unused, + Local context, + void* priv) { + Environment* env = Environment::GetCurrent(context); + + { + Local w = env->NewFunctionTemplate(Worker::New); + + w->InstanceTemplate()->SetInternalFieldCount(1); + + AsyncWrap::AddWrapMethods(env, w); + env->SetProtoMethod(w, "startThread", Worker::StartThread); + env->SetProtoMethod(w, "stopThread", Worker::StopThread); + env->SetProtoMethod(w, "ref", Worker::Ref); + env->SetProtoMethod(w, "unref", Worker::Unref); + + Local workerString = + FIXED_ONE_BYTE_STRING(env->isolate(), "Worker"); + w->SetClassName(workerString); + target->Set(workerString, w->GetFunction()); + } + + env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort); + + auto thread_id_string = FIXED_ONE_BYTE_STRING(env->isolate(), "threadId"); + target->Set(env->context(), + thread_id_string, + Number::New(env->isolate(), env->thread_id())).FromJust(); +} + +} // anonymous namespace + +} // namespace worker +} // namespace node + +NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker) diff --git a/src/node_worker.h b/src/node_worker.h new file mode 100644 index 00000000000000..0a98d2f11ef00f --- /dev/null +++ b/src/node_worker.h @@ -0,0 +1,83 @@ +#ifndef SRC_NODE_WORKER_H_ +#define SRC_NODE_WORKER_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "node_messaging.h" +#include + +namespace node { +namespace worker { + +// A worker thread, as represented in its parent thread. +class Worker : public AsyncWrap { + public: + Worker(Environment* env, v8::Local wrap); + ~Worker(); + + // Run the worker. This is only called from the worker thread. + void Run(); + + // Forcibly exit the thread with a specified exit code. This may be called + // from any thread. + void Exit(int code); + + // Wait for the worker thread to stop (in a blocking manner). + void JoinThread(); + + size_t self_size() const override; + bool is_stopped() const; + + static void New(const v8::FunctionCallbackInfo& args); + static void StartThread(const v8::FunctionCallbackInfo& args); + static void StopThread(const v8::FunctionCallbackInfo& args); + static void GetMessagePort(const v8::FunctionCallbackInfo& args); + static void Ref(const v8::FunctionCallbackInfo& args); + static void Unref(const v8::FunctionCallbackInfo& args); + + private: + void OnThreadStopped(); + void DisposeIsolate(); + + uv_loop_t loop_; + DeleteFnPtr isolate_data_; + DeleteFnPtr env_; + v8::Isolate* isolate_ = nullptr; + DeleteFnPtr + array_buffer_allocator_; + uv_thread_t tid_; + + // This mutex protects access to all variables listed below it. + mutable Mutex mutex_; + + // Currently only used for telling the parent thread that the child + // thread exited. + std::unique_ptr thread_exit_async_; + bool scheduled_on_thread_stopped_ = false; + + // This mutex only protects stopped_. If both locks are acquired, this needs + // to be the latter one. + mutable Mutex stopped_mutex_; + bool stopped_ = true; + + bool thread_joined_ = true; + int exit_code_ = 0; + double thread_id_ = -1; + + std::unique_ptr child_port_data_; + + // The child port is always kept alive by the child Environment's persistent + // handle to it. + MessagePort* child_port_ = nullptr; + // This is always kept alive because the JS object associated with the Worker + // instance refers to it via its [kPort] property. + MessagePort* parent_port_ = nullptr; +}; + +} // namespace worker +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + + +#endif // SRC_NODE_WORKER_H_ diff --git a/test/fixtures/worker-script.mjs b/test/fixtures/worker-script.mjs new file mode 100644 index 00000000000000..b712248b2788e8 --- /dev/null +++ b/test/fixtures/worker-script.mjs @@ -0,0 +1,3 @@ +import worker from 'worker'; + +worker.parentPort.postMessage('Hello, world!'); diff --git a/test/parallel/test-message-channel-sharedarraybuffer.js b/test/parallel/test-message-channel-sharedarraybuffer.js new file mode 100644 index 00000000000000..7ae922adbc4e40 --- /dev/null +++ b/test/parallel/test-message-channel-sharedarraybuffer.js @@ -0,0 +1,28 @@ +// Flags: --expose-gc --experimental-worker +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { Worker } = require('worker'); + +{ + const sharedArrayBuffer = new SharedArrayBuffer(12); + const local = Buffer.from(sharedArrayBuffer); + + const w = new Worker(` + const { parentPort } = require('worker'); + parentPort.on('message', ({ sharedArrayBuffer }) => { + const local = Buffer.from(sharedArrayBuffer); + local.write('world!', 6); + parentPort.postMessage('written!'); + }); + `, { eval: true }); + w.on('message', common.mustCall(() => { + assert.strictEqual(local.toString(), 'Hello world!'); + global.gc(); + w.terminate(); + })); + w.postMessage({ sharedArrayBuffer }); + // This would be a race condition if the memory regions were overlapping + local.write('Hello '); +} diff --git a/test/parallel/test-message-channel.js b/test/parallel/test-message-channel.js index 0facaa1d835ea8..eb13fa57c6aa0f 100644 --- a/test/parallel/test-message-channel.js +++ b/test/parallel/test-message-channel.js @@ -2,7 +2,7 @@ 'use strict'; const common = require('../common'); const assert = require('assert'); -const { MessageChannel } = require('worker'); +const { MessageChannel, MessagePort, Worker } = require('worker'); { const channel = new MessageChannel(); @@ -24,3 +24,23 @@ const { MessageChannel } = require('worker'); channel.port2.on('close', common.mustCall()); channel.port2.close(); } + +{ + const channel = new MessageChannel(); + + const w = new Worker(` + const { MessagePort } = require('worker'); + const assert = require('assert'); + require('worker').parentPort.on('message', ({ port }) => { + assert(port instanceof MessagePort); + port.postMessage('works'); + }); + `, { eval: true }); + w.postMessage({ port: channel.port2 }, [ channel.port2 ]); + assert(channel.port1 instanceof MessagePort); + assert(channel.port2 instanceof MessagePort); + channel.port1.on('message', common.mustCall((message) => { + assert.strictEqual(message, 'works'); + w.terminate(); + })); +} diff --git a/test/parallel/test-worker-cleanup-handles.js b/test/parallel/test-worker-cleanup-handles.js new file mode 100644 index 00000000000000..ba4f6aa51a9d41 --- /dev/null +++ b/test/parallel/test-worker-cleanup-handles.js @@ -0,0 +1,30 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { Worker, isMainThread, parentPort } = require('worker'); +const { Server } = require('net'); +const fs = require('fs'); + +if (isMainThread) { + const w = new Worker(__filename); + let fd = null; + w.on('message', common.mustCall((fd_) => { + assert.strictEqual(typeof fd_, 'number'); + fd = fd_; + })); + w.on('exit', common.mustCall((code) => { + if (fd === -1) { + // This happens when server sockets don’t have file descriptors, + // i.e. on Windows. + return; + } + common.expectsError(() => fs.fstatSync(fd), + { code: 'EBADF' }); + })); +} else { + const server = new Server(); + server.listen(0); + parentPort.postMessage(server._handle.fd); + server.unref(); +} diff --git a/test/parallel/test-worker-dns-terminate.js b/test/parallel/test-worker-dns-terminate.js new file mode 100644 index 00000000000000..079a29d52e09a3 --- /dev/null +++ b/test/parallel/test-worker-dns-terminate.js @@ -0,0 +1,15 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const { Worker } = require('worker'); + +const w = new Worker(` +const dns = require('dns'); +dns.lookup('nonexistent.org', () => {}); +require('worker').parentPort.postMessage('0'); +`, { eval: true }); + +w.on('message', common.mustCall(() => { + // This should not crash the worker during a DNS request. + w.terminate(common.mustCall()); +})); diff --git a/test/parallel/test-worker-esmodule.js b/test/parallel/test-worker-esmodule.js new file mode 100644 index 00000000000000..4189eeca3f8908 --- /dev/null +++ b/test/parallel/test-worker-esmodule.js @@ -0,0 +1,11 @@ +// Flags: --experimental-worker --experimental-modules +'use strict'; +const common = require('../common'); +const fixtures = require('../common/fixtures'); +const assert = require('assert'); +const { Worker } = require('worker'); + +const w = new Worker(fixtures.path('worker-script.mjs')); +w.on('message', common.mustCall((message) => { + assert.strictEqual(message, 'Hello, world!'); +})); diff --git a/test/parallel/test-worker-memory.js b/test/parallel/test-worker-memory.js new file mode 100644 index 00000000000000..34b1e0acaf2f2f --- /dev/null +++ b/test/parallel/test-worker-memory.js @@ -0,0 +1,41 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const util = require('util'); +const { Worker } = require('worker'); + +const numWorkers = +process.env.JOBS || require('os').cpus().length; + +// Verify that a Worker's memory isn't kept in memory after the thread finishes. + +function run(n, done) { + if (n <= 0) + return done(); + const worker = new Worker( + 'require(\'worker\').parentPort.postMessage(2 + 2)', + { eval: true }); + worker.on('message', common.mustCall((value) => { + assert.strictEqual(value, 4); + })); + worker.on('exit', common.mustCall(() => { + run(n - 1, done); + })); +} + +const startStats = process.memoryUsage(); +let finished = 0; +for (let i = 0; i < numWorkers; ++i) { + run(60 / numWorkers, () => { + if (++finished === numWorkers) { + const finishStats = process.memoryUsage(); + // A typical value for this ratio would be ~1.15. + // 5 as a upper limit is generous, but the main point is that we + // don't have the memory of 50 Isolates/Node.js environments just lying + // around somewhere. + assert.ok(finishStats.rss / startStats.rss < 5, + 'Unexpected memory overhead: ' + + util.inspect([startStats, finishStats])); + } + }); +} diff --git a/test/parallel/test-worker-nexttick-terminate.js b/test/parallel/test-worker-nexttick-terminate.js new file mode 100644 index 00000000000000..b010a7dbe5727f --- /dev/null +++ b/test/parallel/test-worker-nexttick-terminate.js @@ -0,0 +1,20 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const { Worker } = require('worker'); + +// Checks that terminating in the middle of `process.nextTick()` does not +// Crash the process. + +const w = new Worker(` +require('worker').parentPort.postMessage('0'); +process.nextTick(() => { + while(1); +}); +`, { eval: true }); + +w.on('message', common.mustCall(() => { + setTimeout(() => { + w.terminate(common.mustCall()); + }, 1); +})); diff --git a/test/parallel/test-worker-syntax-error-file.js b/test/parallel/test-worker-syntax-error-file.js new file mode 100644 index 00000000000000..37798f334387d8 --- /dev/null +++ b/test/parallel/test-worker-syntax-error-file.js @@ -0,0 +1,18 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const fixtures = require('../common/fixtures'); +const assert = require('assert'); +const { Worker } = require('worker'); + +// Do not use isMainThread so that this test itself can be run inside a Worker. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = 1; + const w = new Worker(fixtures.path('syntax', 'bad_syntax.js')); + w.on('message', common.mustNotCall()); + w.on('error', common.mustCall((err) => { + assert(/SyntaxError/.test(err)); + })); +} else { + throw new Error('foo'); +} diff --git a/test/parallel/test-worker-syntax-error.js b/test/parallel/test-worker-syntax-error.js new file mode 100644 index 00000000000000..8f9812a721132b --- /dev/null +++ b/test/parallel/test-worker-syntax-error.js @@ -0,0 +1,17 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { Worker } = require('worker'); + +// Do not use isMainThread so that this test itself can be run inside a Worker. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = 1; + const w = new Worker('abc)', { eval: true }); + w.on('message', common.mustNotCall()); + w.on('error', common.mustCall((err) => { + assert(/SyntaxError/.test(err)); + })); +} else { + throw new Error('foo'); +} diff --git a/test/parallel/test-worker-uncaught-exception-async.js b/test/parallel/test-worker-uncaught-exception-async.js new file mode 100644 index 00000000000000..c1d2a5f4fcab16 --- /dev/null +++ b/test/parallel/test-worker-uncaught-exception-async.js @@ -0,0 +1,20 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { Worker } = require('worker'); + +// Do not use isMainThread so that this test itself can be run inside a Worker. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = 1; + const w = new Worker(__filename); + w.on('message', common.mustNotCall()); + w.on('error', common.mustCall((err) => { + // TODO(addaleax): be more specific here + assert(/foo/.test(err)); + })); +} else { + setImmediate(() => { + throw new Error('foo'); + }); +} diff --git a/test/parallel/test-worker-uncaught-exception.js b/test/parallel/test-worker-uncaught-exception.js new file mode 100644 index 00000000000000..b0e3ad11fae839 --- /dev/null +++ b/test/parallel/test-worker-uncaught-exception.js @@ -0,0 +1,18 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { Worker } = require('worker'); + +// Do not use isMainThread so that this test itself can be run inside a Worker. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = 1; + const w = new Worker(__filename); + w.on('message', common.mustNotCall()); + w.on('error', common.mustCall((err) => { + // TODO(addaleax): be more specific here + assert(/foo/.test(err)); + })); +} else { + throw new Error('foo'); +} diff --git a/test/parallel/test-worker.js b/test/parallel/test-worker.js new file mode 100644 index 00000000000000..3fa6e67a347b37 --- /dev/null +++ b/test/parallel/test-worker.js @@ -0,0 +1,18 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { Worker, isMainThread, parentPort } = require('worker'); + +if (isMainThread) { + const w = new Worker(__filename); + w.on('message', common.mustCall((message) => { + assert.strictEqual(message, 'Hello, world!'); + })); +} else { + setImmediate(() => { + process.nextTick(() => { + parentPort.postMessage('Hello, world!'); + }); + }); +} diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index 84a3e3b1f4dc05..af08d7b6567018 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -38,6 +38,7 @@ common.crashOnUnhandledRejection(); // TODO(addaleax): Test for these delete providers.STREAMPIPE; delete providers.MESSAGEPORT; + delete providers.WORKER; const objKeys = Object.keys(providers); if (objKeys.length > 0)