Skip to content

Commit

Permalink
stream: throw on premature close in Readable[AsyncIterator]
Browse files Browse the repository at this point in the history
Fixes: #39086
  • Loading branch information
RaisinTen committed Jul 5, 2021
1 parent 0738a2b commit 4244f9e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 19 deletions.
8 changes: 4 additions & 4 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ const {

const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
ERR_STREAM_PREMATURE_CLOSE,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
} = require('internal/errors').codes;
const { validateObject } = require('internal/validators');

Expand Down Expand Up @@ -1138,7 +1139,7 @@ async function* createAsyncIterator(stream, options) {
} else if (endEmitted) {
break;
} else if (closeEmitted) {
break;
throw new ERR_STREAM_PREMATURE_CLOSE();
} else {
await new Promise(next);
}
Expand All @@ -1152,7 +1153,6 @@ async function* createAsyncIterator(stream, options) {
} finally {
if (!errorThrown && opts.destroyOnReturn) {
if (state.autoDestroy || !endEmitted) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
destroyImpl.destroyer(stream, null);
}
}
Expand Down
75 changes: 60 additions & 15 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
} = require('stream');
const assert = require('assert');
const http = require('http');
const fs = require('fs');

async function tests() {
{
Expand Down Expand Up @@ -338,11 +339,17 @@ async function tests() {
process.nextTick(async () => {
readable.on('close', common.mustNotCall());
let received = 0;
for await (const k of readable) {
// Just make linting pass. This should never run.
assert.strictEqual(k, 'hello');
received++;
let err = null;
try {
for await (const k of readable) {
// Just make linting pass. This should never run.
assert.strictEqual(k, 'hello');
received++;
}
} catch (_err) {
err = _err;
}
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
assert.strictEqual(received, 0);
});
}
Expand Down Expand Up @@ -412,8 +419,13 @@ async function tests() {

readable.destroy();

const { done } = await readable[Symbol.asyncIterator]().next();
assert.strictEqual(done, true);
const it = await readable[Symbol.asyncIterator]();
const next = it.next();
next
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
}

{
Expand Down Expand Up @@ -458,7 +470,7 @@ async function tests() {
}

{
console.log('destroy mid-stream does not error');
console.log('destroy mid-stream errors');
const r = new Readable({
objectMode: true,
read() {
Expand All @@ -467,10 +479,16 @@ async function tests() {
}
});

// eslint-disable-next-line no-unused-vars
for await (const a of r) {
r.destroy(null);
let err = null;
try {
// eslint-disable-next-line no-unused-vars
for await (const a of r) {
r.destroy(null);
}
} catch (_err) {
err = _err;
}
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}

{
Expand Down Expand Up @@ -514,7 +532,7 @@ async function tests() {
}

{
console.log('all next promises must be resolved on destroy');
console.log('all next promises must be rejected on destroy');
const r = new Readable({
objectMode: true,
read() {
Expand All @@ -525,7 +543,11 @@ async function tests() {
const c = b.next();
const d = b.next();
r.destroy();
assert.deepStrictEqual(await c, { done: true, value: undefined });
c
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
assert.deepStrictEqual(await d, { done: true, value: undefined });
}

Expand Down Expand Up @@ -675,7 +697,7 @@ async function tests() {
}

{
// AsyncIterator should finish correctly if destroyed.
// AsyncIterator should not finish correctly if destroyed.

const r = new Readable({
objectMode: true,
Expand All @@ -688,11 +710,34 @@ async function tests() {
const it = r[Symbol.asyncIterator]();
const next = it.next();
next
.then(common.mustCall(({ done }) => assert.strictEqual(done, true)))
.catch(common.mustNotCall());
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
});
}

{
// AsyncIterator should throw if prematurely closed
// before end has been emitted.
(async function() {
const readable = fs.createReadStream(__filename);

try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable) {
readable.close();
}

assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}

assert.ok(readable.destroyed);
})().then(common.mustCall());
}

// AsyncIterator non-destroying iterator
{
function createReadable() {
Expand Down

0 comments on commit 4244f9e

Please sign in to comment.