Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: support generic Duplex streams #16267

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions doc/api/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -797,11 +797,14 @@ added: v0.1.0

* `socket` {net.Socket}

When a new TCP stream is established. `socket` is an object of type
[`net.Socket`][]. Usually users will not want to access this event. In
particular, the socket will not emit `'readable'` events because of how
the protocol parser attaches to the socket. The `socket` can also be
accessed at `request.connection`.
This event is emitted when a new TCP stream is established. `socket` is
typically an object of type [`net.Socket`][]. Usually users will not want to
access this event. In particular, the socket will not emit `'readable'` events
because of how the protocol parser attaches to the socket. The `socket` can
also be accessed at `request.connection`.

*Note*: This event can also be explicitly emitted by users to inject connections
into the HTTP server. In that case, any [`Duplex`][] stream can be passed.

### Event: 'request'
<!-- YAML
Expand Down Expand Up @@ -1769,7 +1772,7 @@ changes:
use for the request when the `agent` option is not used. This can be used to
avoid creating a custom `Agent` class just to override the default
`createConnection` function. See [`agent.createConnection()`][] for more
details.
details. Any [`Duplex`][] stream is a valid return value.
* `timeout` {number}: A number specifying the socket timeout in milliseconds.
This will set the timeout before the socket is connected.
* `callback` {Function}
Expand Down Expand Up @@ -1869,6 +1872,7 @@ const req = http.request(options, (res) => {
[`'request'`]: #http_event_request
[`'response'`]: #http_event_response
[`Agent`]: #http_class_http_agent
[`Duplex`]: stream.html#stream_class_stream_duplex
[`EventEmitter`]: events.html#events_class_eventemitter
[`TypeError`]: errors.html#errors_class_typeerror
[`URL`]: url.html#url_the_whatwg_url_api
Expand Down
5 changes: 4 additions & 1 deletion lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,10 @@ function responseKeepAlive(res, req) {
if (!req.shouldKeepAlive) {
if (socket.writable) {
debug('AGENT socket.destroySoon()');
socket.destroySoon();
if (typeof socket.destroySoon === 'function')
socket.destroySoon();
else
socket.end();
}
assert(!socket.writable);
} else {
Expand Down
22 changes: 14 additions & 8 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ function connectionListener(socket) {
// If the user has added a listener to the server,
// request, or response, then it's their responsibility.
// otherwise, destroy on timeout by default
if (this.timeout)
if (this.timeout && typeof socket.setTimeout === 'function')
socket.setTimeout(this.timeout);
socket.on('timeout', socketOnTimeout);

Expand Down Expand Up @@ -354,11 +354,13 @@ function connectionListener(socket) {
socket.on = socketOnWrap;

// We only consume the socket if it has never been consumed before.
var external = socket._handle._externalStream;
if (!socket._handle._consumed && external) {
parser._consumed = true;
socket._handle._consumed = true;
parser.consume(external);
if (socket._handle) {
var external = socket._handle._externalStream;
if (!socket._handle._consumed && external) {
parser._consumed = true;
socket._handle._consumed = true;
parser.consume(external);
}
}
parser[kOnExecute] =
onParserExecute.bind(undefined, this, socket, parser, state);
Expand Down Expand Up @@ -533,9 +535,13 @@ function resOnFinish(req, res, socket, state, server) {
res.detachSocket(socket);

if (res._last) {
socket.destroySoon();
if (typeof socket.destroySoon === 'function') {
socket.destroySoon();
} else {
socket.end();
}
} else if (state.outgoing.length === 0) {
if (server.keepAliveTimeout) {
if (server.keepAliveTimeout && typeof socket.setTimeout === 'function') {
socket.setTimeout(0);
socket.setTimeout(server.keepAliveTimeout);
state.keepAliveTimeoutSet = true;
Expand Down
9 changes: 9 additions & 0 deletions test/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This directory contains modules used to test the Node.js implementation.
* [Common module API](#common-module-api)
* [Countdown module](#countdown-module)
* [DNS module](#dns-module)
* [Duplex pair helper](#duplex-pair-helper)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#duplex-pair-module?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. :)

* [Fixtures module](#fixtures-module)
* [WPT module](#wpt-module)

Expand Down Expand Up @@ -458,6 +459,14 @@ Reads a Domain String and returns a Buffer containing the domain.
Takes in a parsed Object and writes its fields to a DNS packet as a Buffer
object.

## Duplex pair helper

The `common/duplexpair` module exports a single function `makeDuplexPair`,
which returns an object `{ clientSide, serverSide }` where each side is a
`Duplex` stream connected to the other side.

There is no difference between client or server side beyond their names.

## Fixtures Module

The `common/fixtures` module provides convenience methods for working with
Expand Down
45 changes: 45 additions & 0 deletions test/common/duplexpair.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/* eslint-disable required-modules */
'use strict';
const { Duplex } = require('stream');
const assert = require('assert');

const kCallback = Symbol('Callback');
const kOtherSide = Symbol('Other');

class DuplexSocket extends Duplex {
constructor() {
super();
this[kCallback] = null;
this[kOtherSide] = null;
}

_read() {
const callback = this[kCallback];
if (callback) {
this[kCallback] = null;
callback();
}
}

_write(chunk, encoding, callback) {
assert.notStrictEqual(this[kOtherSide], null);
assert.strictEqual(this[kOtherSide][kCallback], null);
this[kOtherSide][kCallback] = callback;
this[kOtherSide].push(chunk);
}

_final(callback) {
this[kOtherSide].on('end', callback);
this[kOtherSide].push(null);
}
}

function makeDuplexPair() {
const clientSide = new DuplexSocket();
const serverSide = new DuplexSocket();
clientSide[kOtherSide] = serverSide;
serverSide[kOtherSide] = clientSide;
return { clientSide, serverSide };
}

module.exports = makeDuplexPair;
60 changes: 60 additions & 0 deletions test/parallel/test-http-generic-streams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');

// Test 1: Simple HTTP test, no keep-alive.
{
const testData = 'Hello, World!\n';
const server = http.createServer(common.mustCall((req, res) => {
res.statusCode = 200;
res.setHeader('Content-Type', 'text/plain');
res.end(testData);
}));

const { clientSide, serverSide } = MakeDuplexPair();
server.emit('connection', serverSide);

const req = http.request({
createConnection: common.mustCall(() => clientSide)
}, common.mustCall((res) => {
res.setEncoding('utf8');
res.on('data', common.mustCall((data) => {
assert.strictEqual(data, testData);
}));
}));
req.end();
}

// Test 2: Keep-alive for 2 requests.
{
const testData = 'Hello, World!\n';
const server = http.createServer(common.mustCall((req, res) => {
res.statusCode = 200;
res.setHeader('Content-Type', 'text/plain');
res.end(testData);
}, 2));

const { clientSide, serverSide } = MakeDuplexPair();
server.emit('connection', serverSide);

function doRequest(cb) {
const req = http.request({
createConnection: common.mustCall(() => clientSide),
headers: { Connection: 'keep-alive' }
}, common.mustCall((res) => {
res.setEncoding('utf8');
res.on('data', common.mustCall((data) => {
assert.strictEqual(data, testData);
}));
res.on('end', common.mustCall(cb));
}));
req.shouldKeepAlive = true;
req.end();
}

doRequest(() => {
doRequest();
});
}