Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readline: add support for async iteration #23916

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions doc/api/readline.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,43 @@ rl.write(null, { ctrl: true, name: 'u' });
The `rl.write()` method will write the data to the `readline` `Interface`'s
`input` *as if it were provided by the user*.

### rl\[Symbol.asyncIterator\]()
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* Returns: {AsyncIterator}

Create an `AsyncIterator` object that iterates through each line in the input
stream as a string. This method allows asynchronous iteration of
`readline.Interface` objects through `for`-`await`-`of` loops.

Errors in the input stream are not forwarded.

If the loop is terminated with `break`, `throw`, or `return`,
[`rl.close()`][] will be called. In other words, iterating over a
`readline.Interface` will always consume the input stream fully.

A caveat with using this experimental API is that the performance is
currently not on par with the traditional `'line'` event API, and thus it is
not recommended for performance-sensitive applications. We expect this
situation to improve in the future.

```js
async function processLineByLine() {
const rl = readline.createInterface({
// ...
});

for await (const line of rl) {
// Each line in the readline input will be successively available here as
// `line`.
}
}
```

## readline.clearLine(stream, dir)
<!-- YAML
added: v0.7.7
Expand Down Expand Up @@ -517,12 +554,38 @@ rl.on('line', (line) => {

## Example: Read File Stream Line-by-Line

A common use case for `readline` is to consume input from a filesystem
[Readable][] stream one line at a time:
A common use case for `readline` is to consume an input file one line at a
time. The easiest way to do so is leveraging the [`fs.ReadStream`][] API as
well as a `for`-`await`-`of` loop:

```js
const fs = require('fs');
const readline = require('readline');

async function processLineByLine() {
const fileStream = fs.createReadStream('input.txt');

const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// Note: we use the crlfDelay option to recognize all instances of CR LF
// ('\r\n') in input.txt as a single line break.

for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
console.log(`Line from file: ${line}`);
}
}

processLineByLine();
```

Alternatively, one could use the [`'line'`][] event:

```js
const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
input: fs.createReadStream('sample.txt'),
Expand All @@ -536,8 +599,11 @@ rl.on('line', (line) => {

[`'SIGCONT'`]: readline.html#readline_event_sigcont
[`'SIGTSTP'`]: readline.html#readline_event_sigtstp
[`'line'`]: #readline_event_line
[`fs.ReadStream`]: fs.html#fs_class_fs_readstream
[`process.stdin`]: process.html#process_process_stdin
[`process.stdout`]: process.html#process_process_stdout
[`rl.close()`]: #readline_rl_close
[Readable]: stream.html#stream_readable_streams
[TTY]: tty.html
[Writable]: stream.html#stream_writable_streams
Expand Down
43 changes: 43 additions & 0 deletions lib/readline.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const {
ERR_INVALID_OPT_VALUE
} = require('internal/errors').codes;
const { debug, inherits } = require('util');
const { emitExperimentalWarning } = require('internal/util');
const { Buffer } = require('buffer');
const EventEmitter = require('events');
const {
Expand All @@ -54,11 +55,16 @@ const {
// Lazy load StringDecoder for startup performance.
let StringDecoder;

// Lazy load Readable for startup performance.
let Readable;

const kHistorySize = 30;
const kMincrlfDelay = 100;
// \r\n, \n, or \r followed by something other than \n
const lineEnding = /\r?\n|\r(?!\n)/;

const kLineObjectStream = Symbol('line object stream');

const KEYPRESS_DECODER = Symbol('keypress-decoder');
const ESCAPE_DECODER = Symbol('escape-decoder');

Expand Down Expand Up @@ -190,6 +196,8 @@ function Interface(input, output, completer, terminal) {
self._refreshLine();
}

this[kLineObjectStream] = undefined;

if (!this.terminal) {
function onSelfCloseWithoutTerminal() {
input.removeListener('data', ondata);
Expand Down Expand Up @@ -1019,6 +1027,41 @@ Interface.prototype._ttyWrite = function(s, key) {
}
};

Interface.prototype[Symbol.asyncIterator] = function() {
emitExperimentalWarning('readline Interface [Symbol.asyncIterator]');

if (this[kLineObjectStream] === undefined) {
if (Readable === undefined) {
Readable = require('stream').Readable;
}
const readable = new Readable({
objectMode: true,
read: () => {
this.resume();
},
destroy: (err, cb) => {
this.off('line', lineListener);
this.off('close', closeListener);
this.close();
cb(err);
}
});
const lineListener = (input) => {
if (!readable.push(input)) {
this.pause();
}
};
const closeListener = () => {
readable.push(null);
};
this.on('line', lineListener);
this.on('close', closeListener);
this[kLineObjectStream] = readable;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense expose this as a separate method? converting to a stream might be an issue for multiple people.

Copy link
Member Author

@TimothyGu TimothyGu Nov 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider this an implementation detail of @@asyncIterator method. A major reason of why the performance of this method isn't up to par to 'line' event, as you have noted in #23916 (comment), is because of the double buffering necessitated by the intermediate stream, so I'd rather not expose the stream at the moment.

}

return this[kLineObjectStream][Symbol.asyncIterator]();
};

/**
* accepts a readable Stream instance and makes it emit "keypress" events
*/
Expand Down
48 changes: 48 additions & 0 deletions test/parallel/test-readline-async-iterators-backpressure.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');
const readline = require('readline');

const CONTENT = 'content';
const TOTAL_LINES = 18;

(async () => {
const readable = new Readable({ read() {} });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this now (to solve another bug) I think this backpressure behaviour is confusing since other consumers can listen to line on the stream and it's surprising we pause() it for them.

readable.push(`${CONTENT}\n`.repeat(TOTAL_LINES));

const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

const it = rli[Symbol.asyncIterator]();
const highWaterMark = it.stream.readableHighWaterMark;

// For this test to work, we have to queue up more than the number of
// highWaterMark items in rli. Make sure that is the case.
assert(TOTAL_LINES > highWaterMark);

let iterations = 0;
let readableEnded = false;
for await (const line of it) {
assert.strictEqual(readableEnded, false);

assert.strictEqual(line, CONTENT);

const expectedPaused = TOTAL_LINES - iterations > highWaterMark;
assert.strictEqual(readable.isPaused(), expectedPaused);

iterations += 1;

// We have to end the input stream asynchronously for back pressure to work.
// Only end when we have reached the final line.
if (iterations === TOTAL_LINES) {
readable.push(null);
readableEnded = true;
}
}

assert.strictEqual(iterations, TOTAL_LINES);
})().then(common.mustCall());
78 changes: 78 additions & 0 deletions test/parallel/test-readline-async-iterators-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
'use strict';

const common = require('../common');
const fs = require('fs');
const { join } = require('path');
const readline = require('readline');
const assert = require('assert');

const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

const filename = join(tmpdir.path, 'test.txt');

const testContents = [
'',
'\n',
'line 1',
'line 1\nline 2 南越国是前203年至前111年存在于岭南地区的一个国家\nline 3\ntrailing',
'line 1\nline 2\nline 3 ends with newline\n'
];

async function testSimpleDestroy() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);

const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

const iteratedLines = [];
for await (const k of rli) {
iteratedLines.push(k);
break;
}

const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
expectedLines.splice(1);

assert.deepStrictEqual(iteratedLines, expectedLines);
}
}

async function testMutualDestroy() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);

const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
expectedLines.splice(2);

const iteratedLines = [];
for await (const k of rli) {
iteratedLines.push(k);
for await (const l of rli) {
iteratedLines.push(l);
break;
}
assert.deepStrictEqual(iteratedLines, expectedLines);
}

assert.deepStrictEqual(iteratedLines, expectedLines);
}
}

testSimpleDestroy().then(testMutualDestroy).then(common.mustCall());
77 changes: 77 additions & 0 deletions test/parallel/test-readline-async-iterators.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
'use strict';

const common = require('../common');
const fs = require('fs');
const { join } = require('path');
const readline = require('readline');
const assert = require('assert');

const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

const filename = join(tmpdir.path, 'test.txt');

const testContents = [
'',
'\n',
'line 1',
'line 1\nline 2 南越国是前203年至前111年存在于岭南地区的一个国家\nline 3\ntrailing',
'line 1\nline 2\nline 3 ends with newline\n'
];

async function testSimple() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);

const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

const iteratedLines = [];
for await (const k of rli) {
iteratedLines.push(k);
}

const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
assert.deepStrictEqual(iteratedLines, expectedLines);
assert.strictEqual(iteratedLines.join(''), fileContent.replace(/\n/gm, ''));
}
}

async function testMutual() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);

const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});

const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
const iteratedLines = [];
let iterated = false;
for await (const k of rli) {
// This outer loop should only iterate once.
assert.strictEqual(iterated, false);
iterated = true;

iteratedLines.push(k);
for await (const l of rli) {
iteratedLines.push(l);
}
assert.deepStrictEqual(iteratedLines, expectedLines);
}
assert.deepStrictEqual(iteratedLines, expectedLines);
}
}

testSimple().then(testMutual).then(common.mustCall());
2 changes: 1 addition & 1 deletion tools/doc/type-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const customTypesMap = {

'this': `${jsDocPrefix}Reference/Operators/this`,

'AsyncIterator': 'https://github.com/tc39/proposal-async-iteration',
'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface',

'bigint': 'https://github.com/tc39/proposal-bigint',

Expand Down