Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: destroying stream without error is abort #40806

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ This is a destructive and immediate way to destroy a stream. Previous calls to
Use `end()` instead of destroy if data should flush before close, or wait for
the `'drain'` event before destroying the stream.

If `.destroy()` is called without an `error` and `autoDestroy` is
enabled, then if the stream has not completed it will be
automatically destroyed with an `AbortError`.

```cjs
const { Writable } = require('stream');

Expand Down Expand Up @@ -1101,6 +1105,10 @@ further errors except from `_destroy()` may be emitted as `'error'`.
Implementors should not override this method, but instead implement
[`readable._destroy()`][readable-_destroy].

If `.destroy()` is called without an `error` and `autoDestroy` is
enabled, then if the stream has not completed it will be
automatically destroyed with an `AbortError`.

##### `readable.closed`

<!-- YAML
Expand Down Expand Up @@ -1805,6 +1813,10 @@ unless `emitClose` is set in false.
Once `destroy()` has been called, any further calls will be a no-op and no
further errors except from `_destroy()` may be emitted as `'error'`.

If `.destroy()` is called without an `error` and `autoDestroy` is
enabled, then if the stream has not completed it will be
automatically destroyed with an `AbortError`.

### `stream.finished(stream[, options], callback)`

<!-- YAML
Expand Down Expand Up @@ -2508,6 +2520,8 @@ changes:
[`stream._construct()`][writable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.
* `autoAbort` {boolean} Whether this stream should automatically
error if `.destroy()` is called without an error before the stream has emitted `'finish'`.
* `signal` {AbortSignal} A signal representing possible cancellation.

<!-- eslint-disable no-useless-constructor -->
Expand Down Expand Up @@ -2865,6 +2879,8 @@ changes:
[`stream._construct()`][readable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.
* `autoAbort` {boolean} Whether this stream should automatically
error if `.destroy()` is called without an error before the stream has emitted `'end'`.
* `signal` {AbortSignal} A signal representing possible cancellation.

<!-- eslint-disable no-useless-constructor -->
Expand Down
12 changes: 11 additions & 1 deletion lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ const {
kDestroyed,
isDestroyed,
isFinished,
isServerRequest
isServerRequest,
isReadableFinished,
isWritableEnded
} = require('internal/streams/utils');

const kDestroy = Symbol('kDestroy');
Expand Down Expand Up @@ -86,6 +88,14 @@ function _destroy(self, err, cb) {
const r = self._readableState;
const w = self._writableState;

if (!err) {
if (r?.autoAbort && !isReadableFinished(self)) {
err = new AbortError();
} else if (w?.autoAbort && !isWritableEnded(self)) {
err = new AbortError();
}
}

checkError(err, w, r);

if (w) {
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ function ReadableState(options, stream, isDuplex) {

this.dataEmitted = false;

this.autoAbort = options?.autoAbort ?? false;

this.decoder = null;
this.encoding = null;
if (options && options.encoding) {
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ function WritableState(options, stream, isDuplex) {
// depending on emitClose.
this.closeEmitted = false;

this.autoAbort = options?.autoAbort ?? false;

this[kOnFinished] = [];
}

Expand Down
1 change: 1 addition & 0 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ function Socket(options) {
// For backwards compat do not emit close on destroy.
options.emitClose = false;
options.autoDestroy = true;
options.autoAbort = false;
// Handle strings directly.
options.decodeStrings = false;
stream.Duplex.call(this, options);
Expand Down
29 changes: 29 additions & 0 deletions test/parallel/test-stream-auto-abort.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const common = require('../common');
const { Readable, Writable } = require('stream');
const assert = require('assert');

{
const w = new Writable({
write() {

}
});
w.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
w.destroy();
}

{
const r = new Readable({
read() {

}
});
r.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
r.destroy();
}