Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix signal in handler always aborted in HTTP/1.1 #1218

Merged
merged 3 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/connect-express/src/express-connect-middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export function expressConnectMiddleware(
}
const uReq = universalRequestFromNodeRequest(
req,
res,
getPreparsedBody(req),
options.contextValues?.(req),
);
Expand Down
1 change: 1 addition & 0 deletions packages/connect-fastify/src/fastify-connect-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export function fastifyConnectPlugin(
const uRes = await uHandler(
universalRequestFromNodeRequest(
req.raw,
reply.raw,
req.body as JsonValue | undefined,
opts.contextValues?.(req),
),
Expand Down
1 change: 1 addition & 0 deletions packages/connect-next/src/connect-nextjs-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export function nextJsApiRouter(options: NextJsApiRouterOptions): ApiRoute {
const uRes = await uHandler(
universalRequestFromNodeRequest(
req,
res,
req.body as JsonValue | undefined,
options.contextValues?.(req),
),
Expand Down
1 change: 1 addition & 0 deletions packages/connect-node/src/connect-node-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export function connectNodeAdapter(
}
const uReq = universalRequestFromNodeRequest(
req,
res,
undefined,
options.contextValues?.(req),
);
Expand Down
54 changes: 51 additions & 3 deletions packages/connect-node/src/node-universal-handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import type { UniversalServerRequest } from "@connectrpc/connect/protocol";
// Polyfill the Headers API for Node versions < 18
import "./node-headers-polyfill.js";

describe("universalRequestFromNodeRequest()", function () {
describe("universalRequestFromNodeResponse()", function () {
describe("with HTTP/2 stream closed with an RST code", function () {
let serverRequest: UniversalServerRequest | undefined;
const server = useNodeServer(() => {
serverRequest = undefined;
return http2.createServer(function (request) {
return http2.createServer(function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
Expand Down Expand Up @@ -176,9 +177,10 @@ describe("universalRequestFromNodeRequest()", function () {
connectionsCheckingInterval: 1,
requestTimeout: 0,
},
function (request) {
function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
Expand Down Expand Up @@ -269,6 +271,7 @@ describe("universalRequestFromNodeRequest()", function () {
function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
Expand Down Expand Up @@ -322,4 +325,49 @@ describe("universalRequestFromNodeRequest()", function () {
}
});
});
describe("with HTTP/1.1", function () {
let serverRequest: UniversalServerRequest | undefined;
let serverNodeResponse:
| http.ServerResponse<http.IncomingMessage>
| undefined;
const server = useNodeServer(() =>
http.createServer(function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
response.on("error", fail);
serverNodeResponse = response;
void readAllBytes(
serverRequest.body as AsyncIterable<Uint8Array>,
Number.MAX_SAFE_INTEGER,
).then(() => {
response.writeHead(200);
response.flushHeaders();
});
}),
);
it("signal should not be aborted on start", async function () {
await new Promise<void>((resolve) => {
const request = http.request(server.getUrl(), {
method: "POST",
// close TCP connection after we're done so that the server shuts down cleanly
agent: new http.Agent({ keepAlive: false }),
});
request.on("error", fail);
request.flushHeaders();
request.end();
request.on("response", (response) => {
expect(serverRequest).toBeDefined();
expect(serverRequest?.signal.aborted).toBeFalse();
serverNodeResponse?.end();
void readAllBytes(response, Number.MAX_SAFE_INTEGER).then(() =>
resolve(),
);
});
});
});
});
});
48 changes: 43 additions & 5 deletions packages/connect-node/src/node-universal-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,44 @@ export type NodeServerResponse = (
};

/**
* Converts a UniversalServerRequest to a Node.js server request.
* Converts a Node.js server request to a UniversalServerRequest.
* This function helps to implement adapters to server frameworks running
* on Node.js. Please be careful using this function in your own code, as we
* may have to make changes to it in the future.
*/
export function universalRequestFromNodeRequest(
nodeRequest: NodeServerRequest,
nodeResponse: NodeServerResponse,
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
): UniversalServerRequest;
/**
* @deprecated
*/
export function universalRequestFromNodeRequest(
nodeRequest: NodeServerRequest,
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
): UniversalServerRequest;
export function universalRequestFromNodeRequest(
nodeRequest: NodeServerRequest,
...rest:
| [
nodeResponse: NodeServerResponse,
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
]
| [
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
]
): UniversalServerRequest {
const nodeResponse: NodeServerResponse | undefined =
rest.length === 3 ? rest[0] : undefined;
const parsedJsonBody: JsonValue | undefined =
rest.length === 3 ? rest[1] : rest[0];
const contextValues: ContextValues | undefined =
rest.length === 3 ? rest[2] : rest[1];
const encrypted =
"encrypted" in nodeRequest.socket && nodeRequest.socket.encrypted;
const protocol = encrypted ? "https" : "http";
Expand Down Expand Up @@ -107,18 +135,28 @@ export function universalRequestFromNodeRequest(
});
} else {
// HTTP/1.1 does not have error codes, but Node.js has ECONNRESET
const nodeResponsOrRequest = nodeResponse ?? nodeRequest;
const onH1Error = (e: Error) => {
nodeRequest.off("error", onH1Error);
nodeRequest.off("close", onH1Close);
nodeResponsOrRequest.off("close", onH1Close);
abortController.abort(connectErrorFromNodeReason(e));
};
const onH1Close = () => {
nodeRequest.off("error", onH1Error);
nodeRequest.off("close", onH1Close);
abortController.abort();
nodeResponsOrRequest.off("close", onH1Close);
// When subscribed to the response, this can get called before "error"
abortController.abort(
nodeRequest.errored
? connectErrorFromNodeReason(nodeRequest.errored)
: undefined,
);
};
nodeRequest.once("error", onH1Error);
nodeRequest.once("close", onH1Close);
// Node emits close on the request as soon as all data is read.
// We instead subscribe to the response (if available)
//
// Ref: https:/nodejs/node/issues/40775
nodeResponsOrRequest.once("close", onH1Close);
}
return {
httpVersion: nodeRequest.httpVersion,
Expand Down