From 7e4cc619cead2096243298c129e5e971c7098b06 Mon Sep 17 00:00:00 2001 From: Khafra Date: Mon, 12 Feb 2024 05:28:24 -0500 Subject: [PATCH] simplify formData body parsing (#2735) * simplify formData body parsing * perf: don't copy all headers * fixup --- lib/fetch/body.js | 63 ++++++++++++++--------------------------------- 1 file changed, 19 insertions(+), 44 deletions(-) diff --git a/lib/fetch/body.js b/lib/fetch/body.js index 9ceb094ef8d..2781a7b90d9 100644 --- a/lib/fetch/body.js +++ b/lib/fetch/body.js @@ -15,12 +15,12 @@ const { FormData } = require('./formdata') const { kState } = require('./symbols') const { webidl } = require('./webidl') const { Blob, File: NativeFile } = require('node:buffer') -const { kBodyUsed } = require('../core/symbols') const assert = require('node:assert') const { isErrored } = require('../core/util') -const { isUint8Array, isArrayBuffer } = require('util/types') +const { isArrayBuffer } = require('util/types') const { File: UndiciFile } = require('./file') const { serializeAMimeType } = require('./dataURL') +const { Readable } = require('node:stream') /** @type {globalThis['File']} */ const File = NativeFile ?? UndiciFile @@ -291,29 +291,6 @@ function cloneBody (body) { } } -async function * consumeBody (body) { - if (body) { - if (isUint8Array(body)) { - yield body - } else { - const stream = body.stream - - if (util.isDisturbed(stream)) { - throw new TypeError('The body has already been consumed.') - } - - if (stream.locked) { - throw new TypeError('The stream is locked.') - } - - // Compat. - stream[kBodyUsed] = true - - yield * stream - } - } -} - function throwIfAborted (state) { if (state.aborted) { throw new DOMException('The operation was aborted.', 'AbortError') @@ -328,7 +305,7 @@ function bodyMixinMethods (instance) { // given a byte sequence bytes: return a Blob whose // contents are bytes and whose type attribute is this’s // MIME type. - return specConsumeBody(this, (bytes) => { + return consumeBody(this, (bytes) => { let mimeType = bodyMimeType(this) if (mimeType === null) { @@ -348,7 +325,7 @@ function bodyMixinMethods (instance) { // of running consume body with this and the following step // given a byte sequence bytes: return a new ArrayBuffer // whose contents are bytes. - return specConsumeBody(this, (bytes) => { + return consumeBody(this, (bytes) => { return new Uint8Array(bytes).buffer }, instance) }, @@ -356,13 +333,13 @@ function bodyMixinMethods (instance) { text () { // The text() method steps are to return the result of running // consume body with this and UTF-8 decode. - return specConsumeBody(this, utf8DecodeBytes, instance) + return consumeBody(this, utf8DecodeBytes, instance) }, json () { // The json() method steps are to return the result of running // consume body with this and parse JSON from bytes. - return specConsumeBody(this, parseJSONFromBytes, instance) + return consumeBody(this, parseJSONFromBytes, instance) }, async formData () { @@ -375,16 +352,15 @@ function bodyMixinMethods (instance) { // If mimeType’s essence is "multipart/form-data", then: if (mimeType !== null && mimeType.essence === 'multipart/form-data') { - const headers = {} - for (const [key, value] of this.headers) headers[key] = value - const responseFormData = new FormData() let busboy try { busboy = new Busboy({ - headers, + headers: { + 'content-type': serializeAMimeType(mimeType) + }, preservePath: true }) } catch (err) { @@ -427,8 +403,10 @@ function bodyMixinMethods (instance) { busboy.on('error', (err) => reject(new TypeError(err))) }) - if (this.body !== null) for await (const chunk of consumeBody(this[kState].body)) busboy.write(chunk) - busboy.end() + if (this.body !== null) { + Readable.from(this[kState].body.stream).pipe(busboy) + } + await busboyResolve return responseFormData @@ -442,20 +420,17 @@ function bodyMixinMethods (instance) { // application/x-www-form-urlencoded parser will keep the BOM. // https://url.spec.whatwg.org/#concept-urlencoded-parser // Note that streaming decoder is stateful and cannot be reused - const streamingDecoder = new TextDecoder('utf-8', { ignoreBOM: true }) + const stream = this[kState].body.stream.pipeThrough(new TextDecoderStream('utf-8', { ignoreBOM: true })) - for await (const chunk of consumeBody(this[kState].body)) { - if (!isUint8Array(chunk)) { - throw new TypeError('Expected Uint8Array chunk') - } - text += streamingDecoder.decode(chunk, { stream: true }) + for await (const chunk of stream) { + text += chunk } - text += streamingDecoder.decode() + entries = new URLSearchParams(text) } catch (err) { // istanbul ignore next: Unclear when new URLSearchParams can fail on a string. // 2. If entries is failure, then throw a TypeError. - throw new TypeError(undefined, { cause: err }) + throw new TypeError(err) } // 3. Return a new FormData object whose entries are entries. @@ -493,7 +468,7 @@ function mixinBody (prototype) { * @param {(value: unknown) => unknown} convertBytesToJSValue * @param {Response|Request} instance */ -async function specConsumeBody (object, convertBytesToJSValue, instance) { +async function consumeBody (object, convertBytesToJSValue, instance) { webidl.brandCheck(object, instance) throwIfAborted(object[kState])