diff --git a/packages/connect-express/src/express-connect-middleware.ts b/packages/connect-express/src/express-connect-middleware.ts index f01c5dbe3..df05dfb18 100644 --- a/packages/connect-express/src/express-connect-middleware.ts +++ b/packages/connect-express/src/express-connect-middleware.ts @@ -87,6 +87,7 @@ export function expressConnectMiddleware( } const uReq = universalRequestFromNodeRequest( req, + res, getPreparsedBody(req), options.contextValues?.(req), ); diff --git a/packages/connect-fastify/src/fastify-connect-plugin.ts b/packages/connect-fastify/src/fastify-connect-plugin.ts index c75b2763d..a2c45acfc 100644 --- a/packages/connect-fastify/src/fastify-connect-plugin.ts +++ b/packages/connect-fastify/src/fastify-connect-plugin.ts @@ -115,6 +115,7 @@ export function fastifyConnectPlugin( const uRes = await uHandler( universalRequestFromNodeRequest( req.raw, + reply.raw, req.body as JsonValue | undefined, opts.contextValues?.(req), ), diff --git a/packages/connect-next/src/connect-nextjs-adapter.ts b/packages/connect-next/src/connect-nextjs-adapter.ts index d5618225a..2fe6d1463 100644 --- a/packages/connect-next/src/connect-nextjs-adapter.ts +++ b/packages/connect-next/src/connect-nextjs-adapter.ts @@ -95,6 +95,7 @@ export function nextJsApiRouter(options: NextJsApiRouterOptions): ApiRoute { const uRes = await uHandler( universalRequestFromNodeRequest( req, + res, req.body as JsonValue | undefined, options.contextValues?.(req), ), diff --git a/packages/connect-node/src/connect-node-adapter.ts b/packages/connect-node/src/connect-node-adapter.ts index 68a389bff..ec2394b17 100644 --- a/packages/connect-node/src/connect-node-adapter.ts +++ b/packages/connect-node/src/connect-node-adapter.ts @@ -96,6 +96,7 @@ export function connectNodeAdapter( } const uReq = universalRequestFromNodeRequest( req, + res, undefined, options.contextValues?.(req), ); diff --git a/packages/connect-node/src/node-universal-handler.spec.ts b/packages/connect-node/src/node-universal-handler.spec.ts index 47f35de7d..510fa8d3a 100644 --- a/packages/connect-node/src/node-universal-handler.spec.ts +++ b/packages/connect-node/src/node-universal-handler.spec.ts @@ -27,14 +27,15 @@ import type { UniversalServerRequest } from "@connectrpc/connect/protocol"; // Polyfill the Headers API for Node versions < 18 import "./node-headers-polyfill.js"; -describe("universalRequestFromNodeRequest()", function () { +describe("universalRequestFromNodeResponse()", function () { describe("with HTTP/2 stream closed with an RST code", function () { let serverRequest: UniversalServerRequest | undefined; const server = useNodeServer(() => { serverRequest = undefined; - return http2.createServer(function (request) { + return http2.createServer(function (request, response) { serverRequest = universalRequestFromNodeRequest( request, + response, undefined, undefined, ); @@ -176,9 +177,10 @@ describe("universalRequestFromNodeRequest()", function () { connectionsCheckingInterval: 1, requestTimeout: 0, }, - function (request) { + function (request, response) { serverRequest = universalRequestFromNodeRequest( request, + response, undefined, undefined, ); @@ -269,6 +271,7 @@ describe("universalRequestFromNodeRequest()", function () { function (request, response) { serverRequest = universalRequestFromNodeRequest( request, + response, undefined, undefined, ); @@ -322,4 +325,49 @@ describe("universalRequestFromNodeRequest()", function () { } }); }); + describe("with HTTP/1.1", function () { + let serverRequest: UniversalServerRequest | undefined; + let serverNodeResponse: + | http.ServerResponse + | undefined; + const server = useNodeServer(() => + http.createServer(function (request, response) { + serverRequest = universalRequestFromNodeRequest( + request, + response, + undefined, + undefined, + ); + response.on("error", fail); + serverNodeResponse = response; + void readAllBytes( + serverRequest.body as AsyncIterable, + Number.MAX_SAFE_INTEGER, + ).then(() => { + response.writeHead(200); + response.flushHeaders(); + }); + }), + ); + it("signal should not be aborted on start", async function () { + await new Promise((resolve) => { + const request = http.request(server.getUrl(), { + method: "POST", + // close TCP connection after we're done so that the server shuts down cleanly + agent: new http.Agent({ keepAlive: false }), + }); + request.on("error", fail); + request.flushHeaders(); + request.end(); + request.on("response", (response) => { + expect(serverRequest).toBeDefined(); + expect(serverRequest?.signal.aborted).toBeFalse(); + serverNodeResponse?.end(); + void readAllBytes(response, Number.MAX_SAFE_INTEGER).then(() => + resolve(), + ); + }); + }); + }); + }); }); diff --git a/packages/connect-node/src/node-universal-handler.ts b/packages/connect-node/src/node-universal-handler.ts index 15d9e027e..997f8779f 100644 --- a/packages/connect-node/src/node-universal-handler.ts +++ b/packages/connect-node/src/node-universal-handler.ts @@ -66,16 +66,44 @@ export type NodeServerResponse = ( }; /** - * Converts a UniversalServerRequest to a Node.js server request. + * Converts a Node.js server request to a UniversalServerRequest. * This function helps to implement adapters to server frameworks running * on Node.js. Please be careful using this function in your own code, as we * may have to make changes to it in the future. */ +export function universalRequestFromNodeRequest( + nodeRequest: NodeServerRequest, + nodeResponse: NodeServerResponse, + parsedJsonBody: JsonValue | undefined, + contextValues: ContextValues | undefined, +): UniversalServerRequest; +/** + * @deprecated + */ export function universalRequestFromNodeRequest( nodeRequest: NodeServerRequest, parsedJsonBody: JsonValue | undefined, contextValues: ContextValues | undefined, +): UniversalServerRequest; +export function universalRequestFromNodeRequest( + nodeRequest: NodeServerRequest, + ...rest: + | [ + nodeResponse: NodeServerResponse, + parsedJsonBody: JsonValue | undefined, + contextValues: ContextValues | undefined, + ] + | [ + parsedJsonBody: JsonValue | undefined, + contextValues: ContextValues | undefined, + ] ): UniversalServerRequest { + const nodeResponse: NodeServerResponse | undefined = + rest.length === 3 ? rest[0] : undefined; + const parsedJsonBody: JsonValue | undefined = + rest.length === 3 ? rest[1] : rest[0]; + const contextValues: ContextValues | undefined = + rest.length === 3 ? rest[2] : rest[1]; const encrypted = "encrypted" in nodeRequest.socket && nodeRequest.socket.encrypted; const protocol = encrypted ? "https" : "http"; @@ -107,18 +135,28 @@ export function universalRequestFromNodeRequest( }); } else { // HTTP/1.1 does not have error codes, but Node.js has ECONNRESET + const nodeResponsOrRequest = nodeResponse ?? nodeRequest; const onH1Error = (e: Error) => { nodeRequest.off("error", onH1Error); - nodeRequest.off("close", onH1Close); + nodeResponsOrRequest.off("close", onH1Close); abortController.abort(connectErrorFromNodeReason(e)); }; const onH1Close = () => { nodeRequest.off("error", onH1Error); - nodeRequest.off("close", onH1Close); - abortController.abort(); + nodeResponsOrRequest.off("close", onH1Close); + // When subscribed to the response, this can get called before "error" + abortController.abort( + nodeRequest.errored + ? connectErrorFromNodeReason(nodeRequest.errored) + : undefined, + ); }; nodeRequest.once("error", onH1Error); - nodeRequest.once("close", onH1Close); + // Node emits close on the request as soon as all data is read. + // We instead subscribe to the response (if available) + // + // Ref: https://github.com/nodejs/node/issues/40775 + nodeResponsOrRequest.once("close", onH1Close); } return { httpVersion: nodeRequest.httpVersion,