From 221d07d38a6e9238a3745a9781510dae2dc8a526 Mon Sep 17 00:00:00 2001 From: bcoll Date: Wed, 22 Mar 2023 17:05:34 +0000 Subject: [PATCH] Fix cloning of `Response`s constructed with byte streams, closes #527 This applies the same fix for `Request` bodies from #510 to `Response`s. Notably, byte streams are returned from lots of Workers runtime APIs (e.g. KV, R2) to support BYOB reads. It's likely these are then used in `Response`s and cloned for caching. --- packages/core/src/standards/http.ts | 20 ++++++++-- packages/core/test/standards/http.spec.ts | 46 +++++++++++++++-------- 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/packages/core/src/standards/http.ts b/packages/core/src/standards/http.ts index dbb4dcc03..03400cb4d 100644 --- a/packages/core/src/standards/http.ts +++ b/packages/core/src/standards/http.ts @@ -570,6 +570,13 @@ export class Response< [kWaitUntil]?: Promise; constructor(body?: BodyInit, init?: ResponseInit | Response | BaseResponse) { + // If body is a FixedLengthStream, set Content-Length to its expected + // length. We may replace `body` later on with a different stream, so + // extract `contentLength` now. + const contentLength: number | undefined = ( + body as { [kContentLength]?: number } + )?.[kContentLength]; + let encodeBody: string | undefined; let status: number | undefined; let webSocket: WebSocket | undefined; @@ -582,6 +589,15 @@ export class Response< if (body instanceof ArrayBuffer) { body = body.slice(0); } + if (body instanceof ReadableStream && _isByteStream(body)) { + // Since `undici@5.12.0`, cloning a body now invokes `structuredClone` + // on the underlying stream (https://github.com/nodejs/undici/pull/1697). + // Unfortunately, due to a bug in Node, byte streams cannot be + // `structuredClone`d (https://github.com/nodejs/undici/issues/1873, + // https://github.com/nodejs/node/pull/45955), leading to issues when + // constructing bodies with byte streams. + body = convertToRegularStream(body); + } if (init instanceof Response) { encodeBody = init.#encodeBody; @@ -628,10 +644,6 @@ export class Response< this.#status = status; this.#webSocket = webSocket; - // If body is a FixedLengthStream, set Content-Length to its expected length - const contentLength: number | undefined = ( - body as { [kContentLength]?: number } - )?.[kContentLength]; if (contentLength !== undefined) { this.headers.set("content-length", contentLength.toString()); } diff --git a/packages/core/test/standards/http.spec.ts b/packages/core/test/standards/http.spec.ts index 0c1bdd751..d555e12d6 100644 --- a/packages/core/test/standards/http.spec.ts +++ b/packages/core/test/standards/http.spec.ts @@ -128,21 +128,6 @@ test("Body: same body instance is always returned", (t) => { t.not(body.body, null); t.is(body.body, body.body); }); -test("Body: reuses byte stream if not input gated", (t) => { - const bodyStream = new ReadableStream({ - type: "bytes", - pull(controller) { - controller.enqueue(utf8Encode("chunk")); - controller.close(); - }, - }); - - let res = new Response(bodyStream); - t.is(res.body, bodyStream); - - res = withInputGating(new Response(bodyStream)); - t.not(res.body, bodyStream); -}); test("Body: body isn't locked until read from", async (t) => { const res = new Response("body"); // noinspection SuspiciousTypeOfGuard @@ -845,6 +830,37 @@ test("Response: clones non-standard properties", async (t) => { t.is(await res2.text(), "body"); t.is(await res3.text(), "body"); }); +test("Response: clones stream bodies", async (t) => { + let stream = new ReadableStream({ + start(controller) { + controller.enqueue(utf8Encode("chunk1")); + controller.close(); + }, + }); + let res = new Response(stream); + let clone = res.clone(); + assert(res.body !== null && clone.body !== null); + t.true(_isByteStream(res.body)); + t.true(_isByteStream(clone.body)); + t.is(await res.text(), "chunk1"); + t.is(await clone.text(), "chunk1"); + + // Check again with byte stream + stream = new ReadableStream({ + type: "bytes", + start(controller) { + controller.enqueue(utf8Encode("chunk2")); + controller.close(); + }, + }); + res = new Response(stream); + clone = res.clone(); + assert(res.body !== null && clone.body !== null); + t.true(_isByteStream(res.body)); + t.true(_isByteStream(clone.body)); + t.is(await res.text(), "chunk2"); + t.is(await clone.text(), "chunk2"); +}); test("Response: constructing from Response copies non-standard properties", (t) => { const pair = new WebSocketPair(); const res = new Response("body1", {