Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: add postMessageToThread #53682

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,54 @@ The `Worker` instance terminated because it reached its memory limit.
The path for the main script of a worker is neither an absolute path
nor a relative path starting with `./` or `../`.

<a id="ERR_WORKER_MESSAGING_ERRORED"></a>

### `ERR_WORKER_MESSAGING_ERRORED`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development

The destination thread threw an error while processing a message sent via [`postMessageToThread()`][].

<a id="ERR_WORKER_MESSAGING_FAILED"></a>

### `ERR_WORKER_MESSAGING_FAILED`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development

The thread requested in [`postMessageToThread()`][] is invalid or has no `workerMessage` listener.

<a id="ERR_WORKER_MESSAGING_SAME_THREAD"></a>

### `ERR_WORKER_MESSAGING_SAME_THREAD`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development

The thread id requested in [`postMessageToThread()`][] is the current thread id.

<a id="ERR_WORKER_MESSAGING_TIMEOUT"></a>

### `ERR_WORKER_MESSAGING_TIMEOUT`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development

Sending a message via [`postMessageToThread()`][] timed out.

<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>

### `ERR_WORKER_UNSERIALIZABLE_ERROR`
Expand Down Expand Up @@ -4027,6 +4075,7 @@ An error occurred trying to allocate memory. This should never happen.
[`new URLSearchParams(iterable)`]: url.md#new-urlsearchparamsiterable
[`package.json`]: packages.md#nodejs-packagejson-field-definitions
[`postMessage()`]: worker_threads.md#portpostmessagevalue-transferlist
[`postMessageToThread()`]: worker_threads.md#workerpostmessagetothreadthreadid-value-transferlist-timeout
[`process.on('exit')`]: process.md#event-exit
[`process.send()`]: process.md#processsendmessage-sendhandle-options-callback
[`process.setUncaughtExceptionCaptureCallback()`]: process.md#processsetuncaughtexceptioncapturecallbackfn
Expand Down
13 changes: 13 additions & 0 deletions doc/api/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,18 @@ possible to record such errors in an error log, either periodically (which is
likely best for long-running application) or upon process exit (which is likely
most convenient for scripts).

### Event: `'workerMessage'`

<!-- YAML
added: REPLACEME
-->

* `value` {any} A value transmitted using [`postMessageToThread()`][].
* `source` {number} The transmitting worker thread ID or `0` for the main thread.

The `'workerMessage'` event is emitted for any incoming message send by the other
party by using [`postMessageToThread()`][].

### Event: `'uncaughtException'`

<!-- YAML
Expand Down Expand Up @@ -4073,6 +4085,7 @@ cases:
[`net.Server`]: net.md#class-netserver
[`net.Socket`]: net.md#class-netsocket
[`os.constants.dlopen`]: os.md#dlopen-constants
[`postMessageToThread()`]: worker_threads.md#workerpostmessagetothreadthreadid-value-transferlist-timeout
[`process.argv`]: #processargv
[`process.config`]: #processconfig
[`process.execPath`]: #processexecpath
Expand Down
116 changes: 116 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,118 @@ if (isMainThread) {
}
```

## `worker.postMessageToThread(threadId, value[, transferList][, timeout])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development

* `destination` {number} The target thread ID. If the thread ID is invalid, a
[`ERR_WORKER_MESSAGING_FAILED`][] error will be thrown. If the target thread ID is the current thread ID,
a [`ERR_WORKER_MESSAGING_SAME_THREAD`][] error will be thrown.
* `value` {any} The value to send.
* `transferList` {Object\[]} If one or more `MessagePort`-like objects are passed in `value`,
a `transferList` is required for those items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] is thrown.
See [`port.postMessage()`][] for more information.
* `timeout` {number} Time to wait for the message to be delivered in milliseconds.
By default it's `undefined`, which means wait forever. If the operation times out,
a [`ERR_WORKER_MESSAGING_TIMEOUT`][] error is thrown.
* Returns: {Promise} A promise which is fulfilled if the message was successfully processed by destination thread.

Sends a value to another worker, identified by its thread ID.

If the target thread has no listener for the `workerMessage` event, then the operation will throw
a [`ERR_WORKER_MESSAGING_FAILED`][] error.

If the target thread threw an error while processing the `workerMessage` event, then the operation will throw
a [`ERR_WORKER_MESSAGING_ERRORED`][] error.

This method should be used when the target thread is not the direct
parent or child of the current thread.
If the two threads are parent-children, use the [`require('node:worker_threads').parentPort.postMessage()`][]
and the [`worker.postMessage()`][] to let the threads communicate.

The example below shows the use of of `postMessageToThread`: it creates 10 nested threads,
the last one will try to communicate with the main thread.

```mjs
import { fileURLToPath } from 'node:url';
import { once } from 'node:events';
import process from 'node:process';
import {
isMainThread,
postMessageToThread,
threadId,
workerData,
Worker,
} from 'node:worker_threads';

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
const worker = new Worker(fileURLToPath(import.meta.url), {
workerData: { level: level + 1 },
});
}

if (level === 0) {
process.on('workerMessage', (value, source) => {
console.log(`${source} -> ${threadId}:`, value);
postMessageToThread(source, { message: 'pong' });
});
} else if (level === 10) {
process.on('workerMessage', (value, source) => {
console.log(`${source} -> ${threadId}:`, value);
channel.postMessage('done');
channel.close();
});

await postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;
```

```cjs
const { once } = require('node:events');
const {
isMainThread,
postMessageToThread,
threadId,
workerData,
Worker,
} = require('node:worker_threads');

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
const worker = new Worker(__filename, {
workerData: { level: level + 1 },
});
}

if (level === 0) {
process.on('workerMessage', (value, source) => {
console.log(`${source} -> ${threadId}:`, value);
postMessageToThread(source, { message: 'pong' });
});
} else if (level === 10) {
process.on('workerMessage', (value, source) => {
console.log(`${source} -> ${threadId}:`, value);
channel.postMessage('done');
channel.close();
});

postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;
```

## `worker.receiveMessageOnPort(port)`

<!-- YAML
Expand Down Expand Up @@ -1399,6 +1511,10 @@ thread spawned will spawn another until the application crashes.
[`Buffer.allocUnsafe()`]: buffer.md#static-method-bufferallocunsafesize
[`Buffer`]: buffer.md
[`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`]: errors.md#err_missing_message_port_in_transfer_list
[`ERR_WORKER_MESSAGING_ERRORED`]: errors.md#err_worker_messaging_errored
[`ERR_WORKER_MESSAGING_FAILED`]: errors.md#err_worker_messaging_failed
[`ERR_WORKER_MESSAGING_SAME_THREAD`]: errors.md#err_worker_messaging_same_thread
[`ERR_WORKER_MESSAGING_TIMEOUT`]: errors.md#err_worker_messaging_timeout
[`ERR_WORKER_NOT_RUNNING`]: errors.md#err_worker_not_running
[`EventTarget`]: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
[`FileHandle`]: fs.md#class-filehandle
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1862,6 +1862,10 @@ E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
Error);
E('ERR_WORKER_MESSAGING_ERRORED', 'The destination thread threw an error while processing the message', Error);
E('ERR_WORKER_MESSAGING_FAILED', 'Cannot find the destination thread or listener', Error);
E('ERR_WORKER_MESSAGING_SAME_THREAD', 'Cannot sent a message to the same thread', Error);
E('ERR_WORKER_MESSAGING_TIMEOUT', 'Sending a message to another thread timed out', Error);
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
E('ERR_WORKER_OUT_OF_MEMORY',
'Worker terminated due to reaching memory limit: %s', Error);
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const {
kStdioWantsMoreDataCallback,
} = workerIo;

const { setupMainThreadPort } = require('internal/worker/messaging');

const {
onGlobalUncaughtException,
} = require('internal/process/execution');
Expand Down Expand Up @@ -96,6 +98,7 @@ port.on('message', (message) => {
hasStdin,
publicPort,
workerData,
mainThreadPort,
} = message;

if (doEval !== 'internal') {
Expand All @@ -109,6 +112,7 @@ port.on('message', (message) => {
}

require('internal/worker').assignEnvironmentData(environmentData);
setupMainThreadPort(mainThreadPort);

if (SharedArrayBuffer !== undefined) {
// The counter is only passed to the workers created by the main thread,
Expand Down
15 changes: 11 additions & 4 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const {
ReadableWorkerStdio,
WritableWorkerStdio,
} = workerIo;
const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker/messaging');
const { deserializeError } = require('internal/error_serdes');
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
const { kEmptyObject } = require('internal/util');
Expand Down Expand Up @@ -250,14 +251,18 @@ class Worker extends EventEmitter {

this[kParentSideStdio] = { stdin, stdout, stderr };

const { port1, port2 } = new MessageChannel();
const transferList = [port2];
const mainThreadPortToWorker = createMainThreadPort(this.threadId);
const {
port1: publicPortToParent,
port2: publicPortToWorker,
} = new MessageChannel();
const transferList = [mainThreadPortToWorker, publicPortToWorker];
// If transferList is provided.
if (options.transferList)
ArrayPrototypePush(transferList,
...new SafeArrayIterator(options.transferList));

this[kPublicPort] = port1;
this[kPublicPort] = publicPortToParent;
ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
this[kPublicPort].on(event, (message) => this.emit(event, message));
});
Expand All @@ -271,8 +276,9 @@ class Worker extends EventEmitter {
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
workerData: options.workerData,
environmentData,
publicPort: port2,
hasStdin: !!options.stdin,
publicPort: publicPortToWorker,
mainThreadPort: mainThreadPortToWorker,
}, transferList);
// Use this to cache the Worker's loopStart value once available.
this[kLoopStartTime] = -1;
Expand All @@ -295,6 +301,7 @@ class Worker extends EventEmitter {
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
drainMessagePort(this[kPublicPort]);
drainMessagePort(this[kPort]);
destroyMainThreadPort(this.threadId);
this.removeAllListeners('message');
this.removeAllListeners('messageerrors');
this[kPublicPort].unref();
Expand Down
Loading
Loading