diff --git a/lib/api/readable.js b/lib/api/readable.js index 5269dfae50c..473e3ac38e0 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -62,6 +62,18 @@ module.exports = class BodyReadable extends Readable { return super.destroy(err) } + _destroy (err, callback) { + // Workaround for Node "bug". If the stream is destroyed in same + // tick as it is created, then a user who is waiting for a + // promise (i.e micro tick) for installing a 'error' listener will + // never get a chance and will always encounter an unhandled exception. + // - tick => process.nextTick(fn) + // - micro tick => queueMicrotask(fn) + queueMicrotask(() => { + callback(err) + }) + } + emit (ev, ...args) { if (ev === 'data') { // Node < 16.7 @@ -166,7 +178,7 @@ module.exports = class BodyReadable extends Readable { } } - if (this.closed) { + if (this._readableState.closeEmitted) { return Promise.resolve(null) } @@ -210,10 +222,6 @@ function isUnusable (self) { } async function consume (stream, type) { - if (isUnusable(stream)) { - throw new TypeError('unusable') - } - assert(!stream[kConsume]) return new Promise((resolve, reject) => { @@ -226,17 +234,32 @@ async function consume (stream, type) { body: [] } - stream - .on('error', function (err) { - consumeFinish(this[kConsume], err) - }) - .on('close', function () { - if (this[kConsume].body !== null) { - consumeFinish(this[kConsume], new RequestAbortedError()) - } - }) + if (isUnusable(stream)) { + const rState = stream._readableState + if (rState.destroyed && rState.closeEmitted === false) { + stream + .on('error', err => { + reject(err) + }) + .on('close', () => { + reject(new TypeError('unusable')) + }) + } else { + reject(new TypeError('unusable')) + } + } else { + stream + .on('error', function (err) { + consumeFinish(this[kConsume], err) + }) + .on('close', function () { + if (this[kConsume].body !== null) { + consumeFinish(this[kConsume], new RequestAbortedError()) + } + }) - process.nextTick(consumeStart, stream[kConsume]) + queueMicrotask(() => consumeStart(stream[kConsume])) + } }) } diff --git a/lib/client.js b/lib/client.js index af16fd16d3e..2fec533faf1 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1963,12 +1963,19 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength, body.resume() } } - const onAbort = function () { - if (finished) { - return + const onClose = function () { + // 'close' might be emitted *before* 'error' for + // broken streams. Wait a tick to avoid this case. + queueMicrotask(() => { + // It's only safe to remove 'error' listener after + // 'close'. + body.removeListener('error', onFinished) + }) + + if (!finished) { + const err = new RequestAbortedError() + queueMicrotask(() => onFinished(err)) } - const err = new RequestAbortedError() - queueMicrotask(() => onFinished(err)) } const onFinished = function (err) { if (finished) { @@ -1986,8 +1993,7 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength, body .removeListener('data', onData) .removeListener('end', onFinished) - .removeListener('error', onFinished) - .removeListener('close', onAbort) + .removeListener('close', onClose) if (!err) { try { @@ -2010,7 +2016,7 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength, .on('data', onData) .on('end', onFinished) .on('error', onFinished) - .on('close', onAbort) + .on('close', onClose) if (body.resume) { body.resume() diff --git a/test/readable.test.js b/test/readable.test.js index 3f4f7939f94..535ecb66baa 100644 --- a/test/readable.test.js +++ b/test/readable.test.js @@ -21,3 +21,40 @@ test('avoid body reordering', async function (t) { t.equal(text, 'helloworld') }) + +test('destroy timing text', async function (t) { + t.plan(1) + + function resume () { + } + function abort () { + } + const _err = new Error('kaboom') + const r = new Readable({ resume, abort }) + r.destroy(_err) + try { + await r.text() + } catch (err) { + t.same(err, _err) + } +}) + +test('destroy timing promise', async function (t) { + t.plan(1) + + function resume () { + } + function abort () { + } + const r = await new Promise(resolve => { + const r = new Readable({ resume, abort }) + r.destroy(new Error('kaboom')) + resolve(r) + }) + await new Promise(resolve => { + r.on('error', err => { + t.ok(err) + resolve(null) + }) + }) +})