Skip to content

Commit

Permalink
stream: lazy allocate back pressure buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Oct 2, 2023
1 parent 85c09f1 commit fe4064a
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const kWriteCb = 1 << 26;
const kExpectWriteCb = 1 << 27;
const kAfterWriteTickInfo = 1 << 28;
const kAfterWritePending = 1 << 29;
const kHasBuffer = 1 << 30;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
Expand Down Expand Up @@ -337,19 +338,20 @@ function WritableState(options, stream, isDuplex) {
}

function resetBuffer(state) {
state.buffered = [];
state.buffered = null;
state.bufferedIndex = 0;
state.state |= kAllBuffers | kAllNoop;
state.state &= ~kHasBuffer;
}

WritableState.prototype.getBuffer = function getBuffer() {
return ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
return ArrayPrototypeSlice(this.buffered ?? [], this.bufferedIndex);
};

ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
__proto__: null,
get() {
return this.buffered.length - this.bufferedIndex;
return (this.state & kHasBuffer) === 0 ? 0 : this.buffered.length - this.bufferedIndex;
},
});

Expand Down Expand Up @@ -522,6 +524,11 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
}

if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
if ((state.state & kHasBuffer) === 0) {
state.state |= kHasBuffer;
state.buffered = [];
}

state.buffered.push({ chunk, encoding, callback });
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
state.state &= ~kAllBuffers;
Expand Down Expand Up @@ -607,7 +614,7 @@ function onwrite(stream, er) {
onwriteError(stream, state, er, cb);
}
} else {
if (state.buffered.length > state.bufferedIndex) {
if ((state.state & kHasBuffer) !== 0) {
clearBuffer(stream, state);
}

Expand Down Expand Up @@ -677,11 +684,13 @@ function errorBuffer(state) {
return;
}

for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
const { chunk, callback } = state.buffered[n];
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
state.length -= len;
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
if ((state.state & kHasBuffer) !== 0) {
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
const { chunk, callback } = state.buffered[n];
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
state.length -= len;
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
}
}


Expand All @@ -692,8 +701,7 @@ function errorBuffer(state) {

// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
(state.state & kConstructed) === 0) {
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kHasBuffer)) !== kHasBuffer) {
return;
}

Expand Down Expand Up @@ -828,10 +836,9 @@ function needFinish(state) {
kWriting |
kErrorEmitted |
kCloseEmitted |
kErrored
)) === (kEnding | kConstructed) &&
state.length === 0 &&
state.buffered.length === 0);
kErrored |
kHasBuffer
)) === (kEnding | kConstructed) && state.length === 0);
}

function callFinal(stream, state) {
Expand Down Expand Up @@ -1073,9 +1080,7 @@ Writable.prototype.destroy = function(err, cb) {
const state = this._writableState;

// Invoke pending callbacks.
if ((state.state & kDestroyed) === 0 &&
(state.bufferedIndex < state.buffered.length ||
(state.state & kOnFinished) !== 0)) {
if ((state.state & (kHasBuffer | kOnFinished | kDestroyed)) !== kDestroyed) {
process.nextTick(errorBuffer, state);
}

Expand Down

0 comments on commit fe4064a

Please sign in to comment.