Skip to content

Commit

Permalink
Fix signal in handler always aborted in HTTP/1.1 (#1218)
Browse files Browse the repository at this point in the history
Signed-off-by: Sri Krishna Paritala <[email protected]>
  • Loading branch information
srikrsna-buf authored Sep 9, 2024
1 parent 575bd85 commit 3a67300
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 8 deletions.
1 change: 1 addition & 0 deletions packages/connect-express/src/express-connect-middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export function expressConnectMiddleware(
}
const uReq = universalRequestFromNodeRequest(
req,
res,
getPreparsedBody(req),
options.contextValues?.(req),
);
Expand Down
1 change: 1 addition & 0 deletions packages/connect-fastify/src/fastify-connect-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export function fastifyConnectPlugin(
const uRes = await uHandler(
universalRequestFromNodeRequest(
req.raw,
reply.raw,
req.body as JsonValue | undefined,
opts.contextValues?.(req),
),
Expand Down
1 change: 1 addition & 0 deletions packages/connect-next/src/connect-nextjs-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export function nextJsApiRouter(options: NextJsApiRouterOptions): ApiRoute {
const uRes = await uHandler(
universalRequestFromNodeRequest(
req,
res,
req.body as JsonValue | undefined,
options.contextValues?.(req),
),
Expand Down
1 change: 1 addition & 0 deletions packages/connect-node/src/connect-node-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export function connectNodeAdapter(
}
const uReq = universalRequestFromNodeRequest(
req,
res,
undefined,
options.contextValues?.(req),
);
Expand Down
54 changes: 51 additions & 3 deletions packages/connect-node/src/node-universal-handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import type { UniversalServerRequest } from "@connectrpc/connect/protocol";
// Polyfill the Headers API for Node versions < 18
import "./node-headers-polyfill.js";

describe("universalRequestFromNodeRequest()", function () {
describe("universalRequestFromNodeResponse()", function () {
describe("with HTTP/2 stream closed with an RST code", function () {
let serverRequest: UniversalServerRequest | undefined;
const server = useNodeServer(() => {
serverRequest = undefined;
return http2.createServer(function (request) {
return http2.createServer(function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
Expand Down Expand Up @@ -176,9 +177,10 @@ describe("universalRequestFromNodeRequest()", function () {
connectionsCheckingInterval: 1,
requestTimeout: 0,
},
function (request) {
function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
Expand Down Expand Up @@ -269,6 +271,7 @@ describe("universalRequestFromNodeRequest()", function () {
function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
Expand Down Expand Up @@ -322,4 +325,49 @@ describe("universalRequestFromNodeRequest()", function () {
}
});
});
describe("with HTTP/1.1", function () {
let serverRequest: UniversalServerRequest | undefined;
let serverNodeResponse:
| http.ServerResponse<http.IncomingMessage>
| undefined;
const server = useNodeServer(() =>
http.createServer(function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
response.on("error", fail);
serverNodeResponse = response;
void readAllBytes(
serverRequest.body as AsyncIterable<Uint8Array>,
Number.MAX_SAFE_INTEGER,
).then(() => {
response.writeHead(200);
response.flushHeaders();
});
}),
);
it("signal should not be aborted on start", async function () {
await new Promise<void>((resolve) => {
const request = http.request(server.getUrl(), {
method: "POST",
// close TCP connection after we're done so that the server shuts down cleanly
agent: new http.Agent({ keepAlive: false }),
});
request.on("error", fail);
request.flushHeaders();
request.end();
request.on("response", (response) => {
expect(serverRequest).toBeDefined();
expect(serverRequest?.signal.aborted).toBeFalse();
serverNodeResponse?.end();
void readAllBytes(response, Number.MAX_SAFE_INTEGER).then(() =>
resolve(),
);
});
});
});
});
});
48 changes: 43 additions & 5 deletions packages/connect-node/src/node-universal-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,44 @@ export type NodeServerResponse = (
};

/**
* Converts a UniversalServerRequest to a Node.js server request.
* Converts a Node.js server request to a UniversalServerRequest.
* This function helps to implement adapters to server frameworks running
* on Node.js. Please be careful using this function in your own code, as we
* may have to make changes to it in the future.
*/
export function universalRequestFromNodeRequest(
nodeRequest: NodeServerRequest,
nodeResponse: NodeServerResponse,
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
): UniversalServerRequest;
/**
* @deprecated
*/
export function universalRequestFromNodeRequest(
nodeRequest: NodeServerRequest,
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
): UniversalServerRequest;
export function universalRequestFromNodeRequest(
nodeRequest: NodeServerRequest,
...rest:
| [
nodeResponse: NodeServerResponse,
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
]
| [
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
]
): UniversalServerRequest {
const nodeResponse: NodeServerResponse | undefined =
rest.length === 3 ? rest[0] : undefined;
const parsedJsonBody: JsonValue | undefined =
rest.length === 3 ? rest[1] : rest[0];
const contextValues: ContextValues | undefined =
rest.length === 3 ? rest[2] : rest[1];
const encrypted =
"encrypted" in nodeRequest.socket && nodeRequest.socket.encrypted;
const protocol = encrypted ? "https" : "http";
Expand Down Expand Up @@ -107,18 +135,28 @@ export function universalRequestFromNodeRequest(
});
} else {
// HTTP/1.1 does not have error codes, but Node.js has ECONNRESET
const nodeResponsOrRequest = nodeResponse ?? nodeRequest;
const onH1Error = (e: Error) => {
nodeRequest.off("error", onH1Error);
nodeRequest.off("close", onH1Close);
nodeResponsOrRequest.off("close", onH1Close);
abortController.abort(connectErrorFromNodeReason(e));
};
const onH1Close = () => {
nodeRequest.off("error", onH1Error);
nodeRequest.off("close", onH1Close);
abortController.abort();
nodeResponsOrRequest.off("close", onH1Close);
// When subscribed to the response, this can get called before "error"
abortController.abort(
nodeRequest.errored
? connectErrorFromNodeReason(nodeRequest.errored)
: undefined,
);
};
nodeRequest.once("error", onH1Error);
nodeRequest.once("close", onH1Close);
// Node emits close on the request as soon as all data is read.
// We instead subscribe to the response (if available)
//
// Ref: https:/nodejs/node/issues/40775
nodeResponsOrRequest.once("close", onH1Close);
}
return {
httpVersion: nodeRequest.httpVersion,
Expand Down

0 comments on commit 3a67300

Please sign in to comment.