Skip to content

Commit

Permalink
Revert "stream: simpler and faster Readable async iterator"
Browse files Browse the repository at this point in the history
This reverts commit 4bb4007.
  • Loading branch information
richardlau committed Sep 9, 2020
1 parent 2d52bdf commit 0f94c6b
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 121 deletions.
38 changes: 0 additions & 38 deletions benchmark/streams/readable-async-iterator.js

This file was deleted.

66 changes: 5 additions & 61 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const {
NumberIsNaN,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
Set,
SymbolAsyncIterator,
Symbol
Expand Down Expand Up @@ -60,11 +59,11 @@ const kPaused = Symbol('kPaused');

// Lazy loaded to improve the startup performance.
let StringDecoder;
let createReadableStreamAsyncIterator;
let from;

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
function nop() {}

const { errorOrDestroy } = destroyImpl;

Expand Down Expand Up @@ -1076,68 +1075,13 @@ Readable.prototype.wrap = function(stream) {
};

Readable.prototype[SymbolAsyncIterator] = function() {
let stream = this;

if (typeof stream.read !== 'function') {
// v1 stream
const src = stream;
stream = new Readable({
objectMode: true,
destroy(err, callback) {
destroyImpl.destroyer(src, err);
callback();
}
}).wrap(src);
if (createReadableStreamAsyncIterator === undefined) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}

const iter = createAsyncIterator(stream);
iter.stream = stream;
return iter;
return createReadableStreamAsyncIterator(this);
};

async function* createAsyncIterator(stream) {
let callback = nop;

function next(resolve) {
if (this === stream) {
callback();
callback = nop;
} else {
callback = resolve;
}
}

stream
.on('readable', next)
.on('error', next)
.on('end', next)
.on('close', next);

try {
const state = stream._readableState;
while (true) {
const chunk = stream.read();
if (chunk !== null) {
yield chunk;
} else if (state.errored) {
throw state.errored;
} else if (state.ended) {
break;
} else if (state.closed) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
break;
} else {
await new Promise(next);
}
}
} catch (err) {
destroyImpl.destroyer(stream, err);
throw err;
} finally {
destroyImpl.destroyer(stream, null);
}
}

// Making it explicit these properties are not enumerable
// because otherwise some prototype manipulation in
// userland will fail.
Expand Down
221 changes: 221 additions & 0 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
'use strict';

const {
ObjectCreate,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Promise,
PromiseReject,
PromiseResolve,
Symbol,
} = primordials;

const finished = require('internal/streams/end-of-stream');
const destroyImpl = require('internal/streams/destroy');

const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
const kError = Symbol('error');
const kEnded = Symbol('ended');
const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');

let Readable;

function createIterResult(value, done) {
return { value, done };
}

function readAndResolve(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
const data = iter[kStream].read();
// We defer if data is null. We can be expecting either 'end' or 'error'.
if (data !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(createIterResult(data, false));
}
}
}

function onReadable(iter) {
// We wait for the next tick, because it might
// emit an error with `process.nextTick()`.
process.nextTick(readAndResolve, iter);
}

function wrapForNext(lastPromise, iter) {
return (resolve, reject) => {
lastPromise.then(() => {
if (iter[kEnded]) {
resolve(createIterResult(undefined, true));
return;
}

iter[kHandlePromise](resolve, reject);
}, reject);
};
}

const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

function finish(self, err) {
return new Promise((resolve, reject) => {
const stream = self[kStream];

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
destroyImpl.destroyer(stream, err);
});
}

const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
get stream() {
return this[kStream];
},

next() {
// If we have detected an error in the meanwhile
// reject straight away.
const error = this[kError];
if (error !== null) {
return PromiseReject(error);
}

if (this[kEnded]) {
return PromiseResolve(createIterResult(undefined, true));
}

if (this[kStream].destroyed) {
return new Promise((resolve, reject) => {
if (this[kError]) {
reject(this[kError]);
} else if (this[kEnded]) {
resolve(createIterResult(undefined, true));
} else {
finished(this[kStream], (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
}
});
}

// If we have multiple next() calls we will wait for the previous Promise to
// finish. This logic is optimized to support for await loops, where next()
// is only called once at a time.
const lastPromise = this[kLastPromise];
let promise;

if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
} else {
// Fast path needed to support multiple this.push()
// without triggering the next() queue.
const data = this[kStream].read();
if (data !== null) {
return PromiseResolve(createIterResult(data, false));
}

promise = new Promise(this[kHandlePromise]);
}

this[kLastPromise] = promise;

return promise;
},

return() {
return finish(this);
},

throw(err) {
return finish(this, err);
},
}, AsyncIteratorPrototype);

const createReadableStreamAsyncIterator = (stream) => {
if (typeof stream.read !== 'function') {
// v1 stream

if (!Readable) {
Readable = require('_stream_readable');
}

const src = stream;
stream = new Readable({ objectMode: true }).wrap(src);
finished(stream, (err) => destroyImpl.destroyer(src, err));
}

const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
[kStream]: { value: stream, writable: true },
[kLastResolve]: { value: null, writable: true },
[kLastReject]: { value: null, writable: true },
[kError]: { value: null, writable: true },
[kEnded]: {
value: stream.readableEnded || stream._readableState.endEmitted,
writable: true
},
// The function passed to new Promise is cached so we avoid allocating a new
// closure at every run.
[kHandlePromise]: {
value: (resolve, reject) => {
const data = iterator[kStream].read();
if (data) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(data, false));
} else {
iterator[kLastResolve] = resolve;
iterator[kLastReject] = reject;
}
},
writable: true,
},
});
iterator[kLastPromise] = null;

finished(stream, { writable: false }, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
const reject = iterator[kLastReject];
// Reject if we are waiting for data in the Promise returned by next() and
// store the error.
if (reject !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
reject(err);
}
iterator[kError] = err;
return;
}

const resolve = iterator[kLastResolve];
if (resolve !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(undefined, true));
}
iterator[kEnded] = true;
});

stream.on('readable', onReadable.bind(null, iterator));

return iterator;
};

module.exports = createReadableStreamAsyncIterator;
10 changes: 5 additions & 5 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const {

let EE;
let PassThrough;
let Readable;
let createReadableStreamAsyncIterator;

function destroyer(stream, reading, writing, callback) {
callback = once(callback);
Expand Down Expand Up @@ -113,11 +113,11 @@ function makeAsyncIterable(val) {
}

async function* fromReadable(val) {
if (!Readable) {
Readable = require('_stream_readable');
if (!createReadableStreamAsyncIterator) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}

yield* Readable.prototype[SymbolAsyncIterator].call(val);
yield* createReadableStreamAsyncIterator(val);
}

async function pump(iterable, writable, finish) {
Expand Down
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@
'lib/internal/worker/js_transferable.js',
'lib/internal/watchdog.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',
'lib/internal/streams/duplexpair.js',
'lib/internal/streams/from.js',
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-readline-async-iterators-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ async function testMutualDestroy() {
break;
}
assert.deepStrictEqual(iteratedLines, expectedLines);
break;
}

assert.deepStrictEqual(iteratedLines, expectedLines);
Expand Down
Loading

0 comments on commit 0f94c6b

Please sign in to comment.