Skip to content

Commit

Permalink
streams: refactor ReadableStream asyncIterator creation and a few fixes
Browse files Browse the repository at this point in the history
Closes: #23041

- Rewrite `ReadableAsyncIterator` class into
`ReadableStreamAsyncIteratorPrototype` which contains no constructor and
inherits from `%AsyncIteratorPrototype%`.

- Rewrite `AsyncIteratorRecord` into dumb function.

PR-URL: #23042
Fixes: #23041
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
  • Loading branch information
devsnek authored and targos committed Oct 10, 2018
1 parent 18cbde5 commit 2b77b94
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 48 deletions.
10 changes: 6 additions & 4 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const { emitExperimentalWarning } = require('internal/util');

// Lazy loaded to improve the startup performance.
let StringDecoder;
let ReadableAsyncIterator;
let createReadableStreamAsyncIterator;

util.inherits(Readable, Stream);

Expand Down Expand Up @@ -990,9 +990,11 @@ Readable.prototype.wrap = function(stream) {

Readable.prototype[Symbol.asyncIterator] = function() {
emitExperimentalWarning('Readable[Symbol.asyncIterator]');
if (ReadableAsyncIterator === undefined)
ReadableAsyncIterator = require('internal/streams/async_iterator');
return new ReadableAsyncIterator(this);
if (createReadableStreamAsyncIterator === undefined) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}
return createReadableStreamAsyncIterator(this);
};

Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
Expand Down
94 changes: 50 additions & 44 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');

const AsyncIteratorRecord = class AsyncIteratorRecord {
constructor(value, done) {
this.done = done;
this.value = value;
}
};
function createIterResult(value, done) {
return { value, done };
}

function readAndResolve(iter) {
const resolve = iter[kLastResolve];
Expand All @@ -26,7 +23,7 @@ function readAndResolve(iter) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
resolve(createIterResult(data, false));
}
}
}
Expand All @@ -43,7 +40,7 @@ function onEnd(iter) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(null, true));
resolve(createIterResult(null, true));
}
iter[kEnded] = true;
}
Expand All @@ -69,39 +66,13 @@ function wrapForNext(lastPromise, iter) {
};
}

const ReadableAsyncIterator = class ReadableAsyncIterator {
constructor(stream) {
this[kStream] = stream;
this[kLastResolve] = null;
this[kLastReject] = null;
this[kError] = null;
this[kEnded] = false;
this[kLastPromise] = null;

stream.on('readable', onReadable.bind(null, this));
stream.on('end', onEnd.bind(null, this));
stream.on('error', onError.bind(null, this));

// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
this[kHandlePromise] = (resolve, reject) => {
const data = this[kStream].read();
if (data) {
this[kLastPromise] = null;
this[kLastResolve] = null;
this[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
} else {
this[kLastResolve] = resolve;
this[kLastReject] = reject;
}
};
}
const AsyncIteratorPrototype = Object.getPrototypeOf(
Object.getPrototypeOf(async function* () {}).prototype);

const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
get stream() {
return this[kStream];
}
},

next() {
// if we have detected an error in the meanwhile
Expand All @@ -112,7 +83,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
}

if (this[kEnded]) {
return Promise.resolve(new AsyncIteratorRecord(null, true));
return Promise.resolve(createIterResult(null, true));
}

// if we have multiple next() calls
Expand All @@ -129,7 +100,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
// without triggering the next() queue
const data = this[kStream].read();
if (data !== null) {
return Promise.resolve(new AsyncIteratorRecord(data, false));
return Promise.resolve(createIterResult(data, false));
}

promise = new Promise(this[kHandlePromise]);
Expand All @@ -138,7 +109,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
this[kLastPromise] = promise;

return promise;
}
},

return() {
// destroy(err, cb) is a private API
Expand All @@ -150,10 +121,45 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
reject(err);
return;
}
resolve(new AsyncIteratorRecord(null, true));
resolve(createIterResult(null, true));
});
});
}
},
}, AsyncIteratorPrototype);

const createReadableStreamAsyncIterator = (stream) => {
const iterator = Object.create(ReadableStreamAsyncIteratorPrototype, {
[kStream]: { value: stream, writable: true },
[kLastResolve]: { value: null, writable: true },
[kLastReject]: { value: null, writable: true },
[kError]: { value: null, writable: true },
[kEnded]: { value: false, writable: true },
[kLastPromise]: { value: null, writable: true },
// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
[kHandlePromise]: {
value: (resolve, reject) => {
const data = iterator[kStream].read();
if (data) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(data, false));
} else {
iterator[kLastResolve] = resolve;
iterator[kLastReject] = reject;
}
},
writable: true,
},
});

stream.on('readable', onReadable.bind(null, iterator));
stream.on('end', onEnd.bind(null, iterator));
stream.on('error', onError.bind(null, iterator));

return iterator;
};

module.exports = ReadableAsyncIterator;
module.exports = createReadableStreamAsyncIterator;
22 changes: 22 additions & 0 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,28 @@ const { Readable } = require('stream');
const assert = require('assert');

async function tests() {
{
const AsyncIteratorPrototype = Object.getPrototypeOf(
Object.getPrototypeOf(async function* () {}).prototype);
const rs = new Readable({});
assert.strictEqual(
Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())),
AsyncIteratorPrototype);
}

await (async function() {
const readable = new Readable({ objectMode: true, read() {} });
readable.push(0);
readable.push(1);
readable.push(null);

const iter = readable[Symbol.asyncIterator]();
assert.strictEqual((await iter.next()).value, 0);
for await (const d of iter) {
assert.strictEqual(d, 1);
}
})();

await (async function() {
console.log('read without for..await');
const max = 5;
Expand Down

0 comments on commit 2b77b94

Please sign in to comment.