Skip to content

Commit

Permalink
stream: move common async iteration logic to base_async_iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
prog1dev committed May 5, 2018
1 parent f574c2b commit 2f5c558
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 270 deletions.
5 changes: 3 additions & 2 deletions doc/api/readline.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,9 @@ async function processLineByLine(readable) {
processLineByLine(fs.createReadStream('file')).catch(console.error);
```

If the loop terminates with a `break` or a `throw`, the stream will be destroyed.
In other terms, iterating over a stream will consume the stream fully.
If the loop terminates with a `break` or a `throw`, the stream will be
destroyed. In other terms, iterating over a stream will consume the stream
fully.

## readline.clearLine(stream, dir)
<!-- YAML
Expand Down
2 changes: 1 addition & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const {
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
} = require('internal/errors').codes;
const { ReadableAsyncIterator } = require('internal/streams/async_iterator');
const ReadableAsyncIterator = require('internal/streams/async_iterator');
const { emitExperimentalWarning } = require('internal/util');
var StringDecoder;

Expand Down
211 changes: 91 additions & 120 deletions lib/internal/readline/async_iterator.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
'use strict';

const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
const kError = Symbol('error');
const kEnded = Symbol('ended');
const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');
const kReadlineInterface = Symbol('readlineInterface');
const { AsyncIteratorRecord } = require('internal/streams/async_iterator');
const {
kLastResolve,
kLastReject,
kError,
kEnded,
kLastPromise,
kHandlePromise,
kStream,
onEnd,
onError,
wrapForNext,
AsyncIteratorRecord,
BaseAsyncIterator
} = require('internal/streams/base_async_iterator');

function readAndResolve(iter) {
const resolve = iter[kLastResolve];
Expand Down Expand Up @@ -38,129 +44,94 @@ function onReadable(iter) {
process.nextTick(writeToBuffer, iter);
}

function onEnd(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(null, true));
}
iter[kEnded] = true;
}

function onError(iter, err) {
const reject = iter[kLastReject];
// reject if we are waiting for data in the Promise
// returned by next() and store the error
if (reject !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
reject(err);
}
iter.error = err;
}
const ReadlineAsyncIterator =
class ReadlineAsyncIterator extends BaseAsyncIterator {
constructor(readline_interface) {
super();
this[kReadlineInterface] = readline_interface;
this[kStream] = readline_interface.input;
this[kLastResolve] = null;
this[kLastReject] = null;
this[kError] = null;
this[kEnded] = false;
this[kLastPromise] = null;

this[kStream].on('readable', onReadable.bind(null, this));
this[kStream].on('end', onEnd.bind(null, this));
this[kStream].on('error', onError.bind(null, this));

// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
this[kHandlePromise] = (resolve, reject) => {
const data = this[kReadlineInterface].read();
if (data) {
this[kLastPromise] = null;
this[kLastResolve] = null;
this[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
} else {
this[kLastResolve] = resolve;
this[kLastReject] = reject;
}
};
}

function wrapForNext(lastPromise, iter) {
return function(resolve, reject) {
lastPromise.then(function() {
iter[kHandlePromise](resolve, reject);
}, reject);
};
}
get stream() {
return this[kStream];
}

const ReadlineAsyncIterator = class ReadlineAsyncIterator {
constructor(readline_interface) {
this[kReadlineInterface] = readline_interface;
this[kStream] = readline_interface.input;
this[kLastResolve] = null;
this[kLastReject] = null;
this[kError] = null;
this[kEnded] = false;
this[kLastPromise] = null;

this[kStream].on('readable', onReadable.bind(null, this));
this[kStream].on('end', onEnd.bind(null, this));
this[kStream].on('error', onError.bind(null, this));

// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
this[kHandlePromise] = (resolve, reject) => {
const data = this[kReadlineInterface].read();
console.log('iterator qwe 2');
console.log(data);
// throw new Error("Something unexpected has occurred.");
if (data) {
this[kLastPromise] = null;
this[kLastResolve] = null;
this[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
} else {
this[kLastResolve] = resolve;
this[kLastReject] = reject;
next() {
// if we have detected an error in the meanwhile
// reject straight away
const error = this[kError];
if (error !== null) {
return Promise.reject(error);
}
};
}

get stream() {
return this[kStream];
}
if (this[kEnded]) {
return Promise.resolve(new AsyncIteratorRecord(null, true));
}

next() {
// if we have detected an error in the meanwhile
// reject straight away
const error = this[kError];
if (error !== null) {
return Promise.reject(error);
}
// if we have multiple next() calls
// we will wait for the previous Promise to finish
// this logic is optimized to support for await loops,
// where next() is only called once at a time
const lastPromise = this[kLastPromise];
let promise;

if (this[kEnded]) {
return Promise.resolve(new AsyncIteratorRecord(null, true));
}
if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
} else {
// fast path needed to support multiple this.push()
// without triggering the next() queue
const data = this[kReadlineInterface].read();
if (data !== null) {
return Promise.resolve(new AsyncIteratorRecord(data, false));
}

// if we have multiple next() calls
// we will wait for the previous Promise to finish
// this logic is optimized to support for await loops,
// where next() is only called once at a time
const lastPromise = this[kLastPromise];
let promise;

if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
console.log('iterator qwe 4');
console.log(promise);
} else {
// fast path needed to support multiple this.push()
// without triggering the next() queue
const data = this[kReadlineInterface].read();
if (data !== null) {
return Promise.resolve(new AsyncIteratorRecord(data, false));
promise = new Promise(this[kHandlePromise]);
}

promise = new Promise(this[kHandlePromise]);
}

this[kLastPromise] = promise;
this[kLastPromise] = promise;

return promise;
}
return promise;
}

return() {
// destroy(err, cb) is a private API
// we can guarantee we have that here, because we control the
// Readable class this is attached to
return new Promise((resolve, reject) => {
this[kStream].destroy(null, (err) => {
if (err) {
reject(err);
return;
}
resolve(new AsyncIteratorRecord(null, true));
return() {
// destroy(err, cb) is a private API
// we can guarantee we have that here, because we control the
// Readable class this is attached to
return new Promise((resolve, reject) => {
this[kStream].destroy(null, (err) => {
if (err) {
reject(err);
return;
}
resolve(new AsyncIteratorRecord(null, true));
});
});
});
}
};
}
};

module.exports = ReadlineAsyncIterator;
Loading

0 comments on commit 2f5c558

Please sign in to comment.