From 37557e3f7e5b225a17da496472018309f9da83e3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 30 Jul 2021 10:32:50 +0200 Subject: [PATCH] fixup: tests --- docs/api/Dispatcher.md | 2 +- lib/api/api-request.js | 2 +- lib/{api => node}/readable.js | 116 +++++----- package.json | 2 +- test/body-readable.js | 387 ++++++++++++++++++++++++++++++++++ 5 files changed, 453 insertions(+), 56 deletions(-) rename lib/{api => node}/readable.js (76%) create mode 100644 test/body-readable.js diff --git a/docs/api/Dispatcher.md b/docs/api/Dispatcher.md index ef58e8e185c..0074f65d4a9 100644 --- a/docs/api/Dispatcher.md +++ b/docs/api/Dispatcher.md @@ -418,7 +418,7 @@ The `RequestOptions.method` property should not be value `'CONNECT'`. * **statusCode** `number` * **headers** `http.IncomingHttpHeaders` -* **body** `stream.Readable` +* **body** `stream.Readable` which also implements [the body mixin from the Fetch Standard](https://fetch.spec.whatwg.org/#body-mixin). * **trailers** `Record` - This object starts out as empty and will be mutated to contain trailers after `body` has emitted `'end'`. * **opaque** `unknown` diff --git a/lib/api/api-request.js b/lib/api/api-request.js index fa1543d55cd..9dac354a602 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -1,6 +1,6 @@ 'use strict' -const Readable = require('./readable') +const Readable = require('../node/readable') const { InvalidArgumentError, RequestAbortedError diff --git a/lib/api/readable.js b/lib/node/readable.js similarity index 76% rename from lib/api/readable.js rename to lib/node/readable.js index e374a0e8c37..f1f9e51468d 100644 --- a/lib/api/readable.js +++ b/lib/node/readable.js @@ -1,29 +1,24 @@ +/* istanbul ignore file: TODO add more coverage */ + +// Ported from https://github.com/nodejs/undici/pull/907 + 'use strict' +const assert = require('assert') const { Readable } = require('stream') -const { InvalidArgumentError } = require('../core/errors') +const { InvalidArgumentError, RequestAbortedError } = require('../core/errors') let StringDecoder let Blob const kConsume = Symbol('kConsume') const kReading = Symbol('kReading') +const kBody = Symbol('kBody') -const kWebStreamType = 1 -const kTextType = 2 -const kBlobType = 3 -const kArrayBufferType = 4 -const kJSONType = 5 - -class AbortError extends Error { - constructor (message) { - super(message) - Error.captureStackTrace(this, AbortError) - this.name = 'AbortError' - this.message = 'aborted' - this.code = 'UND_ERR_ABORTED' - } -} +const kTextType = 1 +const kBlobType = 2 +const kArrayBufferType = 3 +const kJSONType = 4 module.exports = class BodyReadable extends Readable { constructor (opts) { @@ -32,12 +27,24 @@ module.exports = class BodyReadable extends Readable { this._readableState.dataEmitted = false this[kConsume] = null + this[kBody] = null this[kReading] = false // Is stream being consumed through Readable API? } + destroy (err) { + if (this.destroyed) { + // Node < 16 + return this + } + return super.destroy(err) + } + emit (ev, ...args) { if (ev === 'data') { this._readableState.dataEmitted = true + } else if (ev === 'error') { + // Node < 16 + this._readableState.errorEmitted = true } return super.emit(ev, ...args) } @@ -49,6 +56,13 @@ module.exports = class BodyReadable extends Readable { return super.on(ev, ...args) } + addListener (ev, ...args) { + if (ev === 'data' || ev === 'readable') { + this[kReading] = true + } + return super.addListener(ev, ...args) + } + push (chunk, encoding) { if (this[kConsume] && chunk !== null && !this[kReading]) { // Fast path. @@ -90,12 +104,16 @@ module.exports = class BodyReadable extends Readable { return isDisturbed(this) } + // https://fetch.spec.whatwg.org/#dom-body-body get body () { - if (this[kConsume] && this[kConsume].type === kWebStreamType) { - return this[kConsume].stream + if (!this[kBody]) { + if (isUnusable(this)) { + throw new TypeError('unusable') + } + this[kBody] = Readable.toWeb(this) } - return consume(this, kWebStreamType) + return this[kBody] } } @@ -123,32 +141,17 @@ function isUnusable (self) { return isDisturbed(self) || isLocked(self) } -async function consume (parent, type) { - if (isUnusable(parent)) { - // eslint-disable-next-line no-restricted-syntax - throw new TypeError('unusable') - } - - if (parent[kConsume]) { - // TODO: Should multiple consume in same tick be possible? - // eslint-disable-next-line no-restricted-syntax +async function consume (stream, type) { + if (isUnusable(stream)) { throw new TypeError('unusable') } - if (type === kWebStreamType) { - const consume = parent[kConsume] = { - type, - // TODO: Optimized implementation for web streams. - stream: Readable.toWeb(parent) - } - - return consume.stream - } + assert(!stream[kConsume]) return new Promise((resolve, reject) => { - parent[kConsume] = { + stream[kConsume] = { type, - parent, + stream, resolve, reject, length: 0, @@ -159,17 +162,18 @@ async function consume (parent, type) { ended: false } - parent + stream .once('error', function (err) { consumeFinish(this[kConsume], err) }) .once('close', function () { if (this[kConsume].body !== null) { - consumeFinish(this[kConsume], new AbortError()) + // TODO: Use Node error? + consumeFinish(this[kConsume], new RequestAbortedError()) } }) - process.nextTick(consumeStart, parent[kConsume]) + process.nextTick(consumeStart, stream[kConsume]) }) } @@ -178,7 +182,7 @@ function consumeStart (consume) { return } - const { _readableState: state } = consume.parent + const { _readableState: state } = consume.stream for (const chunk of state.buffer) { consumePush(consume, chunk) @@ -187,20 +191,20 @@ function consumeStart (consume) { if (state.endEmitted) { consumeEnd(this[kConsume]) } else { - consume.parent.once('end', function () { + consume.stream.once('end', function () { consumeEnd(this[kConsume]) }) } - if (consume.parent.isPaused()) { - consume.parent.resume() + if (consume.stream.isPaused()) { + consume.stream.resume() } - while (consume.parent.read() != null); + while (consume.stream.read() != null); } function consumeEnd (consume) { - const { type, body, resolve, decoder, parent, length } = consume + const { type, body, resolve, decoder, stream, length } = consume try { if (type === kTextType) { @@ -226,7 +230,7 @@ function consumeEnd (consume) { consumeFinish(consume) } catch (err) { - parent.destroy(err) + stream.destroy(err) } } @@ -241,7 +245,7 @@ function consumePush (consume, chunk, encoding) { if (chunk === null) { consume.ended = true - consume.parent.read() + consume.stream.read() return false } @@ -253,7 +257,7 @@ function consumePush (consume, chunk, encoding) { consumePushBuffer(consume, chunk, encoding) } - if (!consume.parent[kReading] && !consume.reading) { + if (!consume.stream[kReading] && !consume.reading) { consume.reading = true process.nextTick(consumeReadMore, consume) } @@ -283,6 +287,7 @@ function consumePushString (consume, chunk, encoding) { } else { // TODO: What if objectMode? Should we just fail consume // or throw? + // TODO: Use Node error? throw new InvalidArgumentError('chunk') } @@ -300,6 +305,7 @@ function consumePushBuffer (consume, chunk, encoding) { } else if (!ArrayBuffer.isView(chunk)) { // TODO: What if objectMode? Should we just fail consume // or throw? + // TODO: Use Node error? throw new InvalidArgumentError('chunk') } @@ -308,7 +314,7 @@ function consumePushBuffer (consume, chunk, encoding) { } function consumeReadMore (consume) { - if (consume.parent[kReading]) { + if (consume.stream[kReading]) { consume.reading = false return } @@ -316,13 +322,17 @@ function consumeReadMore (consume) { consume.pushed = true while (consume.pushed) { consume.pushed = false - consume.parent._read(consume.parent) + consume.stream._read(consume.stream) } consume.reading = false } function consumeFinish (consume, err) { + if (consume.body === null) { + return + } + if (err) { consume.reject(err) } else { diff --git a/package.json b/package.json index e7d329a8166..21f6f7dfd67 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "undici", - "version": "4.3.0", + "version": "4.2.2", "description": "An HTTP/1.1 client, written from scratch for Node.js", "homepage": "https://undici.nodejs.org", "bugs": { diff --git a/test/body-readable.js b/test/body-readable.js new file mode 100644 index 00000000000..05661023749 --- /dev/null +++ b/test/body-readable.js @@ -0,0 +1,387 @@ +'use strict' +const Readable = require('../lib/node/readable') +const { test } = require('tap') + +test('body readable', t => { + // Ported from https://github.com/nodejs/node/pull/39520. + + let counter = 0 + const assert = Object.assign(t.ok.bind(t), { + strictEqual: t.equal, + deepStrictEqual: t.strictSame + }) + const common = { + mustCall (fn, count = 1) { + counter++ + return (...args) => { + count -= 1 + if (count < 0) { + t.fail() + } else if (count === 0) { + counter -= 1 + queueMicrotask(() => { + if (counter === 0) { + t.end() + counter = null + } + }) + } + return fn && fn(...args) + } + }, + mustNotCall () { + return () => ( + t.fail() + ) + } + } + + { + const r = new Readable({ + read () { + this.push('asd') + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.on('end', common.mustCall()) + // r.on('close', common.mustCall()) Node < 16 + r.text().then(common.mustCall((val) => { + assert.strictEqual(r.bodyUsed, true) + assert.strictEqual(val, 'asd') + })) + process.nextTick(() => { + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const n = ['a', 's', 'd', null] + const r = new Readable({ + read () { + this.push(n.shift()) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.on('end', common.mustCall()) + // r.on('close', common.mustCall()) Node < 16 + r.text().then(common.mustCall((val) => { + assert.strictEqual(r.bodyUsed, true) + assert.strictEqual(val, 'asd') + })) + process.nextTick(() => { + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const r = new Readable({ + read () { + this.push('asd') + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.on('error', common.mustCall(() => { + assert.strictEqual(r.bodyUsed, true) + })) + // r.on('close', common.mustCall()) Node < 16 + const _err = new Error() + r.text().catch(common.mustCall((err) => assert.strictEqual(err, _err))) + r.destroy(_err) + process.nextTick(() => { + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const obj = { asd: '123' } + const r = new Readable({ + read () { + this.push(JSON.stringify(obj)) + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.on('end', common.mustCall()) + // r.on('close', common.mustCall()) Node < 16 + r.json().then(common.mustCall((val) => { + assert.strictEqual(r.bodyUsed, true) + assert.deepStrictEqual(val, obj) + })) + process.nextTick(() => { + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const r = new Readable({ + read () { + this.push('asd') + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.on('error', common.mustCall(() => { + assert.strictEqual(r.bodyUsed, true) + })) + // r.on('close', common.mustCall()) Node < 16 + r.json() + .catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'Unexpected token a in JSON at position 0') + assert.strictEqual(r.bodyUsed, true) + })) + process.nextTick(() => { + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const r = new Readable({ + read () { + this.push('asd') + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.on('error', common.mustCall(() => { + assert.strictEqual(r.bodyUsed, true) + })) + // r.on('close', common.mustCall()) Node < 16 + r.json().catch(common.mustCall((err) => { + assert.strictEqual(r.bodyUsed, true) + assert(err) + })) + process.nextTick(() => { + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const buf = Uint8Array.from('asd') + const r = new Readable({ + read () { + this.push(buf) + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.on('end', common.mustCall()) + // r.on('close', common.mustCall()) Node < 16 + r.arrayBuffer() + .then(common.mustCall((val) => assert.deepStrictEqual(val, buf))) + process.nextTick(() => { + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const r = new Readable({ + read () { + this.push('asd') + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.pause() + assert.strictEqual(r.bodyUsed, false) + r.on('data', common.mustCall(() => { + assert.strictEqual(r.bodyUsed, true) + })) + r.on('end', common.mustCall()) + // r.on('close', common.mustCall()) Node < 16 + r.text().then(common.mustCall((val) => assert.strictEqual(val, 'asd'))) + process.nextTick(() => { + assert.strictEqual(r.bodyUsed, true) + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const n = ['a', 's', 'd', null] + const r = new Readable({ + read () { + this.push(n.shift()) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.pause() + assert.strictEqual(r.bodyUsed, false) + r.on('data', common.mustCall(null, 3)) + r.on('end', common.mustCall()) + // r.on('close', common.mustCall()) Node < 16 + r.text().then(common.mustCall((val) => assert.strictEqual(val, 'asd'))) + process.nextTick(() => { + assert.strictEqual(r.bodyUsed, true) + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const r = new Readable({ + read () { + this.push('asd') + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.pause() + assert.strictEqual(r.bodyUsed, false) + r.on('data', common.mustNotCall()) + r.on('error', common.mustCall()) + // r.on('close', common.mustCall()) Node < 16 + const _err = new Error() + r.text().catch(common.mustCall((err) => assert.strictEqual(err, _err))) + r.destroy(_err) + process.nextTick(() => { + assert.strictEqual(r.bodyUsed, true) + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const obj = { asd: '123' } + const r = new Readable({ + read () { + this.push(JSON.stringify(obj)) + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.pause() + assert.strictEqual(r.bodyUsed, false) + r.on('data', common.mustCall(() => { + assert.strictEqual(r.bodyUsed, true) + })) + r.on('end', common.mustCall()) + // r.on('close', common.mustCall()) Node < 16 + r.json().then(common.mustCall((val) => assert.deepStrictEqual(val, obj))) + process.nextTick(() => { + assert.strictEqual(r.bodyUsed, true) + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const r = new Readable({ + read () { + this.push('asd') + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.pause() + assert.strictEqual(r.bodyUsed, false) + r.on('data', common.mustCall(() => { + assert.strictEqual(r.bodyUsed, true) + })) + r.on('error', common.mustCall(() => { + assert.strictEqual(r.bodyUsed, true) + })) + // r.on('close', common.mustCall()) Node < 16 + r.json() + .catch(common.mustCall((err) => assert.strictEqual( + err.message, 'Unexpected token a in JSON at position 0'))) + process.nextTick(() => { + assert.strictEqual(r.bodyUsed, true) + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const buf = Uint8Array.from('asd') + const r = new Readable({ + read () { + this.push(buf) + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + r.pause() + assert.strictEqual(r.bodyUsed, false) + r.on('data', common.mustCall()) + r.on('end', common.mustCall()) + // r.on('close', common.mustCall()) Node < 16 + r.arrayBuffer() + .then(common.mustCall((val) => assert.deepStrictEqual(val, buf))) + process.nextTick(() => { + assert.strictEqual(r.bodyUsed, true) + assert.strictEqual(r.isPaused(), false) + }) + } + + { + const buf = Uint8Array.from('asd') + const r = new Readable({ + read () { + this.push(buf) + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + assert.strictEqual(r.bodyUsed, false) + r.on('data', common.mustCall(() => { + assert.strictEqual(r.bodyUsed, true) + r.json() + .catch(common.mustCall((err) => assert( + err instanceof TypeError))) + r.arrayBuffer() + .catch(common.mustCall((err) => assert( + err instanceof TypeError))) + r.blob() + .catch(common.mustCall((err) => assert( + err instanceof TypeError))) + r.text() + .catch(common.mustCall((err) => assert( + err instanceof TypeError))) + })) + r.on('error', common.mustNotCall()) + r.on('end', common.mustCall()) + // r.on('close', common.mustCall()) Node < 16 + } + + { + const r = new Readable({ + read () { + this.push(null) + } + }) + assert.strictEqual(r.bodyUsed, false) + assert.strictEqual(r.bodyUsed, false) + r.on('end', common.mustCall(() => { + assert.strictEqual(r.bodyUsed, true) + r.json() + .catch(common.mustCall((err) => assert( + err instanceof TypeError))) + r.arrayBuffer() + .catch(common.mustCall((err) => assert( + err instanceof TypeError))) + r.blob() + .catch(common.mustCall((err) => assert( + err instanceof TypeError))) + r.text() + .catch(common.mustCall((err) => assert( + err instanceof TypeError))) + })) + r.on('error', common.mustNotCall()) + r.on('data', common.mustNotCall()) + // r.on('close', common.mustCall()) Node < 16 + } + + for (const key of ['text', 'json', 'arrayBuffer', 'blob']) { + const r = new Readable({ + read () { + } + }) + assert.strictEqual(r.bodyUsed, false) + assert.strictEqual(r.bodyUsed, false) + r[key]() + .catch(common.mustCall((err) => assert.strictEqual( + err.name, 'RequestAbortedError'))) + r.destroy() + r.on('error', common.mustNotCall()) + r.on('end', common.mustNotCall()) + r.on('data', common.mustNotCall()) + // r.on('close', common.mustCall()) Node < 16 + } +})