From aa83250de9c0144beb63c7be55da48d14c52b3f5 Mon Sep 17 00:00:00 2001 From: Timo Stamm Date: Fri, 5 May 2023 17:49:24 +0200 Subject: [PATCH 1/2] Improve tests for Node.js clients and fix hanging requests --- Makefile | 7 +- packages/connect-node/src/node-error.ts | 83 +- .../src/node-universal-client.spec.ts | 892 ++++++++++++++++-- .../connect-node/src/node-universal-client.ts | 72 +- .../src/use-node-server-helper.spec.ts | 77 ++ 5 files changed, 988 insertions(+), 143 deletions(-) create mode 100644 packages/connect-node/src/use-node-server-helper.spec.ts diff --git a/Makefile b/Makefile index 6cbabe15c..6ec78a86e 100644 --- a/Makefile +++ b/Makefile @@ -177,8 +177,11 @@ testconnectpackage: $(BUILD)/connect npm run -w packages/connect jasmine .PHONY: testconnectnodepackage -testconnectnodepackage: $(BUILD)/connect-node - npm run -w packages/connect-node jasmine +testconnectnodepackage: $(BIN)/node16 $(BIN)/node18 $(BIN)/node19 $(BIN)/node20 $(BUILD)/connect-node + cd packages/connect-node && PATH="$(abspath $(BIN)):$(PATH)" node16 --trace-warnings ../../node_modules/.bin/jasmine --config=jasmine.json + cd packages/connect-node && PATH="$(abspath $(BIN)):$(PATH)" node18 --trace-warnings ../../node_modules/.bin/jasmine --config=jasmine.json + cd packages/connect-node && PATH="$(abspath $(BIN)):$(PATH)" node19 --trace-warnings ../../node_modules/.bin/jasmine --config=jasmine.json + cd packages/connect-node && PATH="$(abspath $(BIN)):$(PATH)" node20 --trace-warnings ../../node_modules/.bin/jasmine --config=jasmine.json .PHONY: testnode testnode: $(BIN)/node16 $(BIN)/node18 $(BIN)/node19 $(BIN)/node20 $(BUILD)/connect-node-test diff --git a/packages/connect-node/src/node-error.ts b/packages/connect-node/src/node-error.ts index 50565a730..a9894e609 100644 --- a/packages/connect-node/src/node-error.ts +++ b/packages/connect-node/src/node-error.ts @@ -37,7 +37,9 @@ export function connectErrorFromNodeReason(reason: unknown): ConnectError { } else if ( chain.some( (p) => - p.code == "ERR_STREAM_DESTROYED" || p.code == "ERR_HTTP2_INVALID_STREAM" + p.code == "ERR_STREAM_DESTROYED" || + p.code == "ERR_HTTP2_INVALID_STREAM" || + p.code == "ECONNRESET" ) ) { // A handler whose stream is suddenly destroyed usually means the client @@ -111,3 +113,82 @@ export function getNodeErrorProps(reason: unknown): { } return props; } + +/** + * Returns a ConnectError for a HTTP/2 error code. + */ +export function connectErrorFromH2ResetCode( + rstCode: number +): ConnectError | undefined { + switch (rstCode) { + case H2Code.PROTOCOL_ERROR: + case H2Code.INTERNAL_ERROR: + case H2Code.FLOW_CONTROL_ERROR: + case H2Code.SETTINGS_TIMEOUT: + case H2Code.FRAME_SIZE_ERROR: + case H2Code.COMPRESSION_ERROR: + case H2Code.CONNECT_ERROR: + return new ConnectError( + `http/2 stream closed with RST code ${ + H2Code[rstCode] + } (0x${rstCode.toString(16)})`, + Code.Internal + ); + case H2Code.REFUSED_STREAM: + return new ConnectError( + `http/2 stream closed with RST code ${ + H2Code[rstCode] + } (0x${rstCode.toString(16)})`, + Code.Unavailable + ); + case H2Code.CANCEL: + return new ConnectError( + `http/2 stream closed with RST code ${ + H2Code[rstCode] + } (0x${rstCode.toString(16)})`, + Code.Canceled + ); + case H2Code.ENHANCE_YOUR_CALM: + return new ConnectError( + `http/2 stream closed with RST code ${ + H2Code[rstCode] + } (0x${rstCode.toString(16)})`, + Code.ResourceExhausted + ); + case H2Code.INADEQUATE_SECURITY: + return new ConnectError( + `http/2 stream closed with RST code ${ + H2Code[rstCode] + } (0x${rstCode.toString(16)})`, + Code.PermissionDenied + ); + case H2Code.HTTP_1_1_REQUIRED: + return new ConnectError( + `http/2 stream closed with RST code ${ + H2Code[rstCode] + } (0x${rstCode.toString(16)})`, + Code.PermissionDenied + ); + case H2Code.STREAM_CLOSED: + default: + // Intentionally not mapping STREAM_CLOSED (0x5), see https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors + break; + } + return undefined; +} + +export enum H2Code { + PROTOCOL_ERROR = 0x1, + INTERNAL_ERROR = 0x2, + FLOW_CONTROL_ERROR = 0x3, + SETTINGS_TIMEOUT = 0x4, + STREAM_CLOSED = 0x5, + FRAME_SIZE_ERROR = 0x6, + REFUSED_STREAM = 0x7, + CANCEL = 0x8, + COMPRESSION_ERROR = 0x9, + CONNECT_ERROR = 0xa, + ENHANCE_YOUR_CALM = 0xb, + INADEQUATE_SECURITY = 0xc, + HTTP_1_1_REQUIRED = 0xd, +} diff --git a/packages/connect-node/src/node-universal-client.spec.ts b/packages/connect-node/src/node-universal-client.spec.ts index b25745977..b053dbacf 100644 --- a/packages/connect-node/src/node-universal-client.spec.ts +++ b/packages/connect-node/src/node-universal-client.spec.ts @@ -13,25 +13,25 @@ // limitations under the License. import * as http2 from "http2"; -import type * as net from "net"; +import * as http from "http"; +import { ConnectError, connectErrorFromReason } from "@bufbuild/connect"; import { createAsyncIterable } from "@bufbuild/connect/protocol"; import { createNodeHttpClient } from "./node-universal-client.js"; -import { encodeEnvelope } from "@bufbuild/connect/protocol"; - -describe("Node.js http2 API", function () { - it("should see reset codes from the client side on the server", async function () { - const server = await startServer(); - await h2RequestWithReset(server.baseUrl, http2.constants.NGHTTP2_CANCEL); - const { rstCode } = await server.stop(); - expect(rstCode).toBe(http2.constants.NGHTTP2_CANCEL); - }); +import { useNodeServer } from "./use-node-server-helper.spec.js"; + +// Polyfill the Headers API for Node versions < 18 +import "./node-headers-polyfill.js"; - /** - * Issues an H2 request, and immediately resets with the given code. - */ - async function h2RequestWithReset(baseUrl: string, rstCode: number) { +describe("node http/2 client closing with RST_STREAM with code CANCEL", function () { + let serverReceivedRstCode: number | undefined; + const server = useNodeServer(() => + http2.createServer().on("stream", (stream) => { + stream.on("close", () => (serverReceivedRstCode = stream.rstCode)); + }) + ); + it("should send RST_STREAM frame to the server", async function () { return new Promise((resolve) => { - http2.connect(baseUrl, (session: http2.ClientHttp2Session) => { + http2.connect(server.getUrl(), (session: http2.ClientHttp2Session) => { const stream = session.request( { ":method": "POST", @@ -40,118 +40,802 @@ describe("Node.js http2 API", function () { {} ); setTimeout(() => { - stream.close(rstCode, () => { - session.close(); - setTimeout(() => resolve(), 0); + stream.close(http2.constants.NGHTTP2_CANCEL, () => { + // We are seeing a race condition in Node v16.20.0, where closing + // the session right after closing a stream with an RST code + // _sometimes_ sends an INTERNAL_ERROR code. + // Simply delaying the session close until the next tick like + // we do here seems to work around the issue. + // We do _not_ guard against this case in the universal client, + // since we were not able to reproduce the issue there. + setTimeout(() => session.close(resolve), 0); }); }, 0); }); }); - } + + while (serverReceivedRstCode === undefined) { + // wait for the server to see the reset code + await new Promise((resolve) => setTimeout(resolve, 1)); + } + expect(serverReceivedRstCode).toBe(http2.constants.NGHTTP2_CANCEL); + }); }); -describe("universal node http2 client", function () { - describe("with a signal that is already aborted", function () { - it("should raise error with code canceled", async function () { - const signal = AbortSignal.abort(); - const client = createNodeHttpClient({ - httpVersion: "2", - baseUrl: "http://example.com", - keepSessionAlive: false, - }); - try { - await client({ - url: "http://example.com", +describe("universal node http client", function () { + describe("against an unresolvable host", function () { + for (const httpVersion of ["2", "1.1"] as const) { + describe(`over http ${httpVersion}`, function () { + it("should raise Code.Unavailable", async function () { + const client = createNodeHttpClient({ + httpVersion, + baseUrl: "https://unresolvable-host.some.domain", + keepSessionAlive: false, + }); + try { + await client({ + url: "https://unresolvable-host.some.domain", + method: "POST", + header: new Headers(), + body: createAsyncIterable([]), + }); + } catch (e) { + expect(connectErrorFromReason(e).message).toBe( + "[unavailable] getaddrinfo ENOTFOUND unresolvable-host.some.domain" + ); + } + }); + }); + } + }); + + describe("against a server that closes immediately", function () { + describe("over http/2", function () { + let serverReceivedRequest = false; + const server = useNodeServer(() => + http2.createServer((request, response) => { + serverReceivedRequest = true; + response.stream.close(http2.constants.NGHTTP2_CANCEL); + }) + ); + it("should reject the response promise with Code.Canceled", async function () { + const client = createNodeHttpClient({ + httpVersion: "2", + baseUrl: server.getUrl(), + }); + try { + await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: createAsyncIterable([]), + }); + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] http/2 stream closed with RST code CANCEL (0x8)" + ); + } + expect(serverReceivedRequest).toBeTrue(); + }); + }); + describe("over http/1.1", function () { + let serverReceivedRequest = false; + const server = useNodeServer(() => + http.createServer((req, res) => { + serverReceivedRequest = true; + res.destroy(); + }) + ); + it("should reject the response promise", async function () { + const client = createNodeHttpClient({ + httpVersion: "1.1", + }); + try { + await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: createAsyncIterable([]), + }); + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[aborted] socket hang up" + ); + } + expect(serverReceivedRequest).toBeTrue(); + }); + }); + }); + + describe("against a server that closes before the first response byte", function () { + describe("over http/2", function () { + const server = useNodeServer(() => + http2.createServer((req, res) => { + res.writeHead(200); + // Calling close in the same tick as writeHead appears to prevent + // headers from being sent. The client response promise will reject, + // instead of the response body. + setTimeout(() => res.stream.close(http2.constants.NGHTTP2_CANCEL), 0); + }) + ); + it("should reject the response promise with Code.Canceled", async function () { + const client = createNodeHttpClient({ + httpVersion: "2", + baseUrl: server.getUrl(), + }); + const res = await client({ + url: server.getUrl(), method: "POST", header: new Headers(), body: createAsyncIterable([]), - signal, }); - fail("expected error"); - } catch (e) { - expect((e as Error).message).toBe( - "[canceled] operation was aborted via signal" - ); - } + try { + for await (const chunk of res.body) { + expect(chunk) + .withContext("response body iterable should be empty") + .toBeUndefined(); + } + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] http/2 stream closed with RST code CANCEL (0x8)" + ); + } + }); }); - }); - describe("with a signal aborting mid request", function () { - it("should send RST_STREAM with code CANCEL", async function () { - const server = await startServer(); - - // set up a client that aborts while still streaming the request body - const ac = new AbortController(); - const client = createNodeHttpClient({ - httpVersion: "2", - baseUrl: server.baseUrl, - keepSessionAlive: false, - }); - async function* body() { - await new Promise((resolve) => setTimeout(resolve, 50)); - ac.abort(); - yield encodeEnvelope(0, new Uint8Array(0)); - } - try { - await client({ - url: server.baseUrl, + describe("over http/1.1", function () { + const server = useNodeServer(() => + http.createServer((req, res) => { + res.writeHead(200); + res.flushHeaders(); + res.destroy(); + }) + ); + it("should reject the response promise", async function () { + const client = createNodeHttpClient({ + httpVersion: "1.1", + }); + const res = await client({ + url: server.getUrl(), method: "POST", header: new Headers(), - body: body(), - signal: ac.signal, + body: createAsyncIterable([]), + }); + try { + for await (const chunk of res.body) { + expect(chunk) + .withContext("response body iterable should be empty") + .toBeUndefined(); + } + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe("[aborted] aborted"); + } + }); + }); + }); + + describe("against a server that closes mid request", function () { + describe("over http/2", function () { + let serverReceivedBytes = 0; + const server = useNodeServer(() => + http2.createServer((req, res) => { + void (async () => { + for await (const chunk of req) { + serverReceivedBytes += (chunk as Uint8Array).byteLength; + res.stream.close(http2.constants.NGHTTP2_CANCEL); + break; + } + })(); + }) + ); + it("should reject the response promise with Code.Canceled", async function () { + const client = createNodeHttpClient({ + httpVersion: "2", + baseUrl: server.getUrl(), }); - fail("expected error"); - } catch (e) { - expect((e as Error).message).toBe( - "[canceled] operation was aborted via signal" - ); - } - const { rstCode } = await server.stop(); - expect(rstCode).toBe(http2.constants.NGHTTP2_CANCEL); + async function* body() { + yield new Uint8Array(32); + await new Promise(() => { + // never resolves + }); + } + + try { + await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: body(), + }); + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] http/2 stream closed with RST code CANCEL (0x8)" + ); + } + expect(serverReceivedBytes).toBe(32); + }); + }); + describe("over http/1.1", function () { + let serverReceivedBytes = 0; + const server = useNodeServer(() => + http.createServer((req, res) => { + void (async () => { + for await (const chunk of req) { + serverReceivedBytes += (chunk as Uint8Array).byteLength; + res.destroy(); + break; + } + })(); + }) + ); + it("should reject the response promise", async function () { + const client = createNodeHttpClient({ + httpVersion: "1.1", + }); + + async function* body() { + yield new Uint8Array(32); + await new Promise(() => { + // never resolves + }); + } + + try { + await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: body(), + }); + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[aborted] socket hang up" + ); + } + expect(serverReceivedBytes).toBe(32); + }); }); }); -}); -/** - * Start an H2 server that expects all requests to be closed right away. - * When stopped, waits for all connections to close, then returns the last - * received reset code. - */ -async function startServer(): Promise<{ - baseUrl: string; - stop(): Promise<{ rstCode: number }>; -}> { - const s = http2.createServer({}); - let rstCode = -1; - s.on("stream", (stream) => { - stream.on("close", () => { - rstCode = stream.rstCode; + describe("against a server that closes mid response", function () { + describe("over http/2", function () { + let serverSentBytes = 0; + const server = useNodeServer(() => + http2.createServer((req, res) => { + void (async () => { + res.writeHead(200); + await new Promise((resolve, reject) => + res.write(new Uint8Array(64), (e: Error | undefined) => + e ? reject(e) : resolve() + ) + ); + serverSentBytes += 64; + res.stream.close(http2.constants.NGHTTP2_CANCEL); + })(); + }) + ); + it("should reject the response promise with Code.Canceled", async function () { + const client = createNodeHttpClient({ + httpVersion: "2", + baseUrl: server.getUrl(), + }); + const res = await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: createAsyncIterable([]), + }); + try { + for await (const chunk of res.body) { + expect(chunk.byteLength).toBe(64); + } + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] http/2 stream closed with RST code CANCEL (0x8)" + ); + } + expect(serverSentBytes).toBe(64); + }); + }); + describe("over http/1.1", function () { + let serverSentBytes = 0; + const server = useNodeServer(() => + http.createServer((req, res) => { + void (async () => { + res.writeHead(200); + await new Promise((resolve, reject) => + res.write(new Uint8Array(64), (e) => (e ? reject(e) : resolve())) + ); + serverSentBytes += 64; + res.destroy(); + })(); + }) + ); + it("should reject the response promise", async function () { + const client = createNodeHttpClient({ + httpVersion: "1.1", + }); + const res = await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: createAsyncIterable([]), + }); + try { + for await (const chunk of res.body) { + expect(chunk.byteLength).toBe(64); + } + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe("[aborted] aborted"); + } + expect(serverSentBytes).toBe(64); + }); + }); + }); + + describe("with a signal that is already aborted", function () { + describe("over http/2", function () { + let serverReceivedRequest = false; + const server = useNodeServer(() => + http2.createServer(() => (serverReceivedRequest = true)) + ); + it("should raise error with Code.Canceled and never hit the server", async function () { + const signal = AbortSignal.abort(); + const client = createNodeHttpClient({ + httpVersion: "2", + baseUrl: server.getUrl(), + keepSessionAlive: false, + }); + // client should raise error + try { + await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: createAsyncIterable([]), + signal, + }); + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] This operation was aborted" + ); + } + // request should never hit the server + expect(serverReceivedRequest).toBeFalse(); + }); + }); + describe("over http/1.1", function () { + let serverReceivedRequest = false; + const server = useNodeServer(() => + http.createServer(() => (serverReceivedRequest = true)) + ); + it("should raise error with Code.Canceled and never hit the server", async function () { + const signal = AbortSignal.abort(); + const client = createNodeHttpClient({ + httpVersion: "1.1", + }); + // client should raise error + try { + await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: createAsyncIterable([]), + signal, + }); + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] This operation was aborted" + ); + } + // request should never hit the server + expect(serverReceivedRequest).toBeFalse(); + }); }); }); - await new Promise((resolve) => { - s.listen(0, () => resolve(s)); + + describe("with a signal aborting before first request byte", function () { + describe("over http/2", function () { + let serverReceivedRstCode: number | undefined; + let serverReceivedBytes = 0; + const server = useNodeServer(() => + http2.createServer((req, res) => { + res.stream.on( + "close", + () => (serverReceivedRstCode = res.stream.rstCode) + ); + void (async () => { + for await (const chunk of req) { + serverReceivedBytes += (chunk as Uint8Array).byteLength; + } + })(); + }) + ); + it("should raise error with code canceled and send RST_STREAM with code CANCEL", async function () { + // set up a client that aborts while still streaming the request body + const ac = new AbortController(); + const client = createNodeHttpClient({ + httpVersion: "2", + baseUrl: server.getUrl(), + keepSessionAlive: false, + }); + + async function* body() { + await new Promise((resolve) => setTimeout(resolve, 50)); + ac.abort(); + yield new Uint8Array(32); + } + + // client should raise error + try { + await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: body(), + signal: ac.signal, + }); + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] This operation was aborted" + ); + } + + // server should receive chunks until client cancelled + while (serverReceivedRstCode === undefined) { + // wait for the server to see the reset code + await new Promise((resolve) => setTimeout(resolve, 1)); + } + expect(serverReceivedRstCode).toBe(http2.constants.NGHTTP2_CANCEL); + expect(serverReceivedBytes).toBe(0); + }); + }); + describe("over http/1.1", function () { + let serverReceivedRequest = false; + let serverReceivedBytes = 0; + let serverRequestClosed = false; + let serverResponseClosed = false; + let serverRequestEmittedAborted = false; + let serverRequestEmittedError: (Error & { code?: string }) | undefined; + let serverRequestIterableErrored: (Error & { code?: string }) | undefined; + const server = useNodeServer(() => + http.createServer((req, res) => { + serverReceivedRequest = true; + req.on("aborted", () => (serverRequestEmittedAborted = true)); + req.on("error", (e) => (serverRequestEmittedError = e)); + req.on("close", () => (serverRequestClosed = true)); + res.on("close", () => (serverResponseClosed = true)); + void (async () => { + try { + for await (const chunk of req) { + serverReceivedBytes += (chunk as Uint8Array).byteLength; + } + } catch (e) { + serverRequestIterableErrored = e as Error & { code?: string }; + } + })(); + }) + ); + it("should raise error with code canceled", async function () { + // set up a client that aborts while still streaming the request body + const ac = new AbortController(); + const client = createNodeHttpClient({ + httpVersion: "1.1", + }); + + async function* body() { + await new Promise((resolve) => setTimeout(resolve, 50)); + ac.abort(); + yield new Uint8Array(32); + } + + // client should raise error + try { + await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: body(), + signal: ac.signal, + }); + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] This operation was aborted" + ); + } + + // server should receive chunks until client cancelled + expect(serverReceivedRequest) + .withContext("serverReceivedRequest") + .toBeTrue(); + while (!serverResponseClosed) { + // wait for the server to see the response being closed + await new Promise((resolve) => setTimeout(resolve, 1)); + } + expect(serverRequestClosed).toBeTrue(); + expect(serverResponseClosed).toBeTrue(); + expect(serverRequestEmittedAborted).toBeTrue(); + expect(serverRequestEmittedError?.code).toBe("ECONNRESET"); + expect(serverRequestIterableErrored?.code).toBe("ECONNRESET"); + expect(serverReceivedBytes).toBe(0); + }); + }); }); - return { - baseUrl: `http://localhost:${(s.address() as net.AddressInfo).port}`, - async stop() { - for (;;) { - const count = await new Promise((resolve, reject) => { - s.getConnections((err, count) => { - if (err) { - return reject(err); + + describe("with a signal aborting mid request", function () { + describe("over http/2", function () { + let serverReceivedRstCode: number | undefined; + let serverReceivedBytes = 0; + const server = useNodeServer(() => + http2.createServer((req, res) => { + res.stream.on( + "close", + () => (serverReceivedRstCode = res.stream.rstCode) + ); + void (async () => { + for await (const chunk of req) { + serverReceivedBytes += (chunk as Uint8Array).byteLength; + } + })(); + }) + ); + it("should raise error with code canceled and send RST_STREAM with code CANCEL", async function () { + // set up a client that aborts while still streaming the request body + const ac = new AbortController(); + const client = createNodeHttpClient({ + httpVersion: "2", + baseUrl: server.getUrl(), + keepSessionAlive: false, + }); + + async function* body() { + yield new Uint8Array(32); + await new Promise((resolve) => setTimeout(resolve, 50)); + ac.abort(); + yield new Uint8Array(32); + } + + // client should raise error + try { + await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: body(), + signal: ac.signal, + }); + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] This operation was aborted" + ); + } + + // server should receive chunks until client cancelled + while (serverReceivedRstCode === undefined) { + // wait for the server to see the reset code + await new Promise((resolve) => setTimeout(resolve, 1)); + } + expect(serverReceivedRstCode).toBe(http2.constants.NGHTTP2_CANCEL); + expect(serverReceivedBytes).toBe(32); + }); + }); + describe("over http/1.1", function () { + let serverReceivedRequest = false; + let serverReceivedBytes = 0; + let serverRequestClosed = false; + let serverResponseClosed = false; + let serverRequestEmittedAborted = false; + let serverRequestEmittedError: (Error & { code?: string }) | undefined; + let serverRequestIterableErrored: (Error & { code?: string }) | undefined; + const server = useNodeServer(() => + http.createServer((req, res) => { + serverReceivedRequest = true; + req.on("aborted", () => (serverRequestEmittedAborted = true)); + req.on("error", (e) => (serverRequestEmittedError = e)); + req.on("close", () => (serverRequestClosed = true)); + res.on("close", () => (serverResponseClosed = true)); + void (async () => { + try { + for await (const chunk of req) { + serverReceivedBytes += (chunk as Uint8Array).byteLength; + } + } catch (e) { + serverRequestIterableErrored = e as Error & { code?: string }; } - return resolve(count); + })(); + }) + ); + it("should raise error with code canceled", async function () { + // set up a client that aborts while still streaming the request body + const ac = new AbortController(); + const client = createNodeHttpClient({ + httpVersion: "1.1", + }); + + async function* body() { + yield new Uint8Array(32); + await new Promise((resolve) => setTimeout(resolve, 50)); + ac.abort(); + yield new Uint8Array(32); + } + + // client should raise error + try { + await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: body(), + signal: ac.signal, }); + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] This operation was aborted" + ); + } + + // server should receive chunks until client cancelled + expect(serverReceivedRequest).toBeTrue(); + while (!serverResponseClosed) { + // wait for the server to see the response being closed + await new Promise((resolve) => setTimeout(resolve, 1)); + } + expect(serverRequestClosed).toBeTrue(); + expect(serverResponseClosed).toBeTrue(); + expect(serverRequestEmittedAborted).toBeTrue(); + expect(serverRequestEmittedError?.code).toBe("ECONNRESET"); + expect(serverRequestIterableErrored?.code).toBe("ECONNRESET"); + expect(serverReceivedBytes).toBe(32); + }); + }); + }); + + describe("with a signal aborting mid response", function () { + describe("over http/2", function () { + let serverReceivedRstCode: number | undefined; + let serverSentBytes = 0; + const server = useNodeServer(() => + http2.createServer((req, res) => { + res.stream.on( + "close", + () => (serverReceivedRstCode = res.stream.rstCode) + ); + void (async () => { + res.writeHead(200); + res.write(new Uint8Array(64)); + serverSentBytes += 64; + await new Promise(() => { + // never resolves + }); + })(); + }) + ); + it("should raise error with code canceled and send RST_STREAM with code CANCEL", async function () { + // set up a client that aborts while still streaming the request body + const ac = new AbortController(); + const client = createNodeHttpClient({ + httpVersion: "2", + baseUrl: server.getUrl(), + keepSessionAlive: false, + }); + + const res = await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: createAsyncIterable([]), + signal: ac.signal, }); - if (count === 0) { - break; + + // should raise error with code canceled + try { + for await (const chunk of res.body) { + expect(chunk.byteLength).toBe(64); + ac.abort(); + } + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] This operation was aborted" + ); + } + + // server should receive RST_STREAM with code CANCEL + while (serverReceivedRstCode === undefined) { + // wait for the server to see the reset code + await new Promise((resolve) => setTimeout(resolve, 1)); } - await new Promise((resolve) => setTimeout(resolve, 10)); - } - s.close(); - return Promise.resolve({ - rstCode, + expect(serverReceivedRstCode).toBe(http2.constants.NGHTTP2_CANCEL); + expect(serverSentBytes).toBe(64); }); - }, - }; -} + }); + describe("over http/1.1", function () { + let serverSentBytes = 0; + let serverRequestClosed = false; + let serverResponseClosed = false; + const server = useNodeServer(() => + http.createServer((req, res) => { + req.on("close", () => (serverRequestClosed = true)); + res.on("close", () => (serverResponseClosed = true)); + void (async () => { + res.writeHead(200); + res.write(new Uint8Array(64)); + serverSentBytes += 64; + await new Promise(() => { + // never resolves + }); + })(); + }) + ); + it("should raise error with code canceled", async function () { + // set up a client that aborts while still streaming the request body + const ac = new AbortController(); + const client = createNodeHttpClient({ + httpVersion: "1.1", + }); + + const res = await client({ + url: server.getUrl(), + method: "POST", + header: new Headers(), + body: createAsyncIterable([]), + signal: ac.signal, + }); + + // should raise error with code canceled + try { + for await (const chunk of res.body) { + expect(chunk.byteLength).toBe(64); + ac.abort(); + } + fail("expected error"); + } catch (e) { + expect(e).toBeInstanceOf(ConnectError); + expect(connectErrorFromReason(e).message).toBe( + "[canceled] This operation was aborted" + ); + } + + // server should see request close + while (!serverResponseClosed) { + // wait for the server to see the response being closed + await new Promise((resolve) => setTimeout(resolve, 1)); + } + expect(serverRequestClosed).toBeTrue(); + expect(serverResponseClosed).toBeTrue(); + expect(serverSentBytes).toBe(64); + }); + }); + }); +}); diff --git a/packages/connect-node/src/node-universal-client.ts b/packages/connect-node/src/node-universal-client.ts index e00d2ee46..6a1ddf98a 100644 --- a/packages/connect-node/src/node-universal-client.ts +++ b/packages/connect-node/src/node-universal-client.ts @@ -22,8 +22,10 @@ import { webHeaderToNodeHeaders, } from "./node-universal-header.js"; import { + connectErrorFromH2ResetCode, connectErrorFromNodeReason, getNodeErrorProps, + H2Code, unwrapNodeErrorChain, } from "./node-error.js"; import type { @@ -119,7 +121,7 @@ function createNodeHttp1Client( return async function request( req: UniversalClientRequest ): Promise { - const sentinel = createSentinel(); + const sentinel = createSentinel(req.signal); return new Promise((resolve, reject) => { sentinel.catch((e) => { reject(e); @@ -132,7 +134,6 @@ function createNodeHttp1Client( ...httpOptions, headers: webHeaderToNodeHeaders(req.header), method: req.method, - signal: req.signal, }, (request) => { pipeTo(req.body, sinkRequest(sentinel, request), { @@ -140,15 +141,8 @@ function createNodeHttp1Client( }).catch(sentinel.reject); request.on("response", (response) => { response.on("error", sentinel.reject); - response.on("abort", () => - sentinel.reject( - new ConnectError("node response aborted", Code.Aborted) - ) - ); - response.on("timeout", () => - sentinel.reject( - new ConnectError("node response timed out", Code.Aborted) - ) + sentinel.catch((reason) => + response.destroy(connectErrorFromNodeReason(reason)) ); const trailer = new Headers(); resolve({ @@ -202,7 +196,6 @@ function createNodeHttp2Client( req.url, req.method, webHeaderToNodeHeaders(req.header), - req.signal, {}, (stream) => { void pipeTo(req.body, sinkRequest(sentinel, stream), { @@ -226,7 +219,9 @@ function createNodeHttp2Client( function h1Request( sentinel: Sentinel, url: string, - options: http.RequestOptions | https.RequestOptions, + options: + | Omit + | Omit, onRequest: (request: http.ClientRequest) => void ): void { let request: http.ClientRequest; @@ -235,10 +230,12 @@ function h1Request( } else { request = http.request(url, options); } - request.on("error", sentinel.reject); - request.on("abort", () => - sentinel.reject(new ConnectError("node request aborted", Code.Aborted)) + sentinel.catch((reason) => + request.destroy(connectErrorFromNodeReason(reason)) ); + request.flushHeaders(); + + request.on("error", sentinel.reject); request.on("socket", function onRequestSocket(socket: net.Socket) { function onSocketConnect() { socket.off("connect", onSocketConnect); @@ -300,7 +297,6 @@ function h2Request( url: string, method: string, headers: http2.OutgoingHttpHeaders, - signal: AbortSignal | undefined, options: Omit, onStream: (stream: http2.ClientHttp2Stream) => void ): void { @@ -345,22 +341,20 @@ function h2Request( ); sentinel .catch((reason) => { - return new Promise((resolve) => { - if (stream.closed) { - return resolve(); - } - // Node.js http2 streams that are aborted via an AbortSignal close with - // an RST_STREAM with code INTERNAL_ERROR. - // To comply with the mapping between gRPC and HTTP/2 codes, we need to - // close with code CANCEL. - // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors - // See https://www.rfc-editor.org/rfc/rfc7540#section-7 - if (reason instanceof ConnectError && reason.code == Code.Canceled) { - return stream.close(http2.constants.NGHTTP2_CANCEL, resolve); - } - // For other reasons, INTERNAL_ERROR is the best fit. - stream.close(http2.constants.NGHTTP2_INTERNAL_ERROR, resolve); - }); + if (stream.closed) { + return; + } + // Node.js http2 streams that are aborted via an AbortSignal close with + // an RST_STREAM with code INTERNAL_ERROR. + // To comply with the mapping between gRPC and HTTP/2 codes, we need to + // close with code CANCEL. + // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors + // See https://www.rfc-editor.org/rfc/rfc7540#section-7 + const rstCode = + reason instanceof ConnectError && reason.code == Code.Canceled + ? H2Code.CANCEL + : H2Code.INTERNAL_ERROR; + return new Promise((resolve) => stream.close(rstCode, resolve)); }) .finally(() => { session.off("error", sentinel.reject); @@ -383,8 +377,12 @@ function h2Request( } sentinel.reject(e); }); - stream.on("abort", function h2StreamAbort() { - sentinel.reject(new ConnectError("node request aborted", Code.Aborted)); + + stream.on("close", function h2StreamClose() { + const err = connectErrorFromH2ResetCode(stream.rstCode); + if (err) { + sentinel.reject(err); + } }); onStream(stream); } @@ -553,8 +551,10 @@ function createSentinel(signal?: AbortSignal): Sentinel { const s = Object.assign(p, c); function onSignalAbort() { + // AbortSignal.reason was added in Node.js 17.2.0, we cannot rely on it. c.reject( - new ConnectError("operation was aborted via signal", Code.Canceled) + signal?.reason ?? + new ConnectError("This operation was aborted", Code.Canceled) ); } diff --git a/packages/connect-node/src/use-node-server-helper.spec.ts b/packages/connect-node/src/use-node-server-helper.spec.ts new file mode 100644 index 000000000..427d46f21 --- /dev/null +++ b/packages/connect-node/src/use-node-server-helper.spec.ts @@ -0,0 +1,77 @@ +// Copyright 2021-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as http2 from "http2"; +import * as http from "http"; +import * as https from "https"; + +export function useNodeServer( + createServer: () => + | http.Server + | https.Server + | http2.Http2Server + | http2.Http2SecureServer +) { + let server: + | http.Server + | https.Server + | http2.Http2Server + | http2.Http2SecureServer + | undefined; + + beforeAll(function (doneFn) { + server = createServer(); + server.listen(0, function listenCallback() { + doneFn(); + }); + }); + + afterAll(async function () { + if (server === undefined) { + throw new Error("server not defined"); + } + const s = server; + for (;;) { + const count = await new Promise((resolve, reject) => { + s.getConnections((err, count) => { + if (err) { + return reject(err); + } + return resolve(count); + }); + }); + if (count === 0) { + break; + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + s.close(); + }); + + return { + getUrl(): string { + if (server === undefined) { + throw new Error("cannot get server port"); + } + const address = server.address(); + if (address == null || typeof address == "string") { + throw new Error("cannot get server port"); + } + const secure = + typeof (server as unknown as Record) + .setSecureContext == "function"; + return `${secure ? "https" : "http"}://localhost:${address.port}`; + }, + }; +} From 17e97494673aa3c98577baa4062c8fbd4846476c Mon Sep 17 00:00:00 2001 From: Timo Stamm Date: Fri, 5 May 2023 19:04:24 +0200 Subject: [PATCH 2/2] Add explanatory comment for flushHeaders --- packages/connect-node/src/node-universal-client.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/connect-node/src/node-universal-client.ts b/packages/connect-node/src/node-universal-client.ts index 6a1ddf98a..a84c46a09 100644 --- a/packages/connect-node/src/node-universal-client.ts +++ b/packages/connect-node/src/node-universal-client.ts @@ -233,6 +233,9 @@ function h1Request( sentinel.catch((reason) => request.destroy(connectErrorFromNodeReason(reason)) ); + // Node.js will only send headers with the first request body byte by default. + // We force it to send headers right away for consistent behavior between + // HTTP/1.1 and HTTP/2.2 clients. request.flushHeaders(); request.on("error", sentinel.reject);