readline: add support for async iteration

Co-authored-by: Ivan Filenko <ivan.filenko@protonmail.com>
Fixes: https://github.com/nodejs/node/issues/18603
Refs: https://github.com/nodejs/node/pull/18904
PR-URL: https://github.com/nodejs/node/pull/23916
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
This commit is contained in:
Timothy Gu 2018-10-26 17:53:04 -07:00 committed by Rich Trott
parent 2742f3869a
commit 2a7432dade
6 changed files with 316 additions and 4 deletions

View File

@ -309,6 +309,43 @@ rl.write(null, { ctrl: true, name: 'u' });
The `rl.write()` method will write the data to the `readline` `Interface`'s
`input` *as if it were provided by the user*.
### rl\[Symbol.asyncIterator\]()
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* Returns: {AsyncIterator}
Create an `AsyncIterator` object that iterates through each line in the input
stream as a string. This method allows asynchronous iteration of
`readline.Interface` objects through `for`-`await`-`of` loops.
Errors in the input stream are not forwarded.
If the loop is terminated with `break`, `throw`, or `return`,
[`rl.close()`][] will be called. In other words, iterating over a
`readline.Interface` will always consume the input stream fully.
A caveat with using this experimental API is that the performance is
currently not on par with the traditional `'line'` event API, and thus it is
not recommended for performance-sensitive applications. We expect this
situation to improve in the future.
```js
async function processLineByLine() {
const rl = readline.createInterface({
// ...
});
for await (const line of rl) {
// Each line in the readline input will be successively available here as
// `line`.
}
}
```
## readline.clearLine(stream, dir)
<!-- YAML
added: v0.7.7
@ -517,12 +554,38 @@ rl.on('line', (line) => {
## Example: Read File Stream Line-by-Line
A common use case for `readline` is to consume input from a filesystem
[Readable][] stream one line at a time:
A common use case for `readline` is to consume an input file one line at a
time. The easiest way to do so is leveraging the [`fs.ReadStream`][] API as
well as a `for`-`await`-`of` loop:
```js
const readline = require('readline');
const fs = require('fs');
const readline = require('readline');
async function processLineByLine() {
const fileStream = fs.createReadStream('input.txt');
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// Note: we use the crlfDelay option to recognize all instances of CR LF
// ('\r\n') in input.txt as a single line break.
for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
console.log(`Line from file: ${line}`);
}
}
processLineByLine();
```
Alternatively, one could use the [`'line'`][] event:
```js
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('sample.txt'),
@ -536,8 +599,11 @@ rl.on('line', (line) => {
[`'SIGCONT'`]: readline.html#readline_event_sigcont
[`'SIGTSTP'`]: readline.html#readline_event_sigtstp
[`'line'`]: #readline_event_line
[`fs.ReadStream`]: fs.html#fs_class_fs_readstream
[`process.stdin`]: process.html#process_process_stdin
[`process.stdout`]: process.html#process_process_stdout
[`rl.close()`]: #readline_rl_close
[Readable]: stream.html#stream_readable_streams
[TTY]: tty.html
[Writable]: stream.html#stream_writable_streams

View File

@ -33,6 +33,7 @@ const {
ERR_INVALID_OPT_VALUE
} = require('internal/errors').codes;
const { debug, inherits } = require('util');
const { emitExperimentalWarning } = require('internal/util');
const { Buffer } = require('buffer');
const EventEmitter = require('events');
const {
@ -54,11 +55,16 @@ const {
// Lazy load StringDecoder for startup performance.
let StringDecoder;
// Lazy load Readable for startup performance.
let Readable;
const kHistorySize = 30;
const kMincrlfDelay = 100;
// \r\n, \n, or \r followed by something other than \n
const lineEnding = /\r?\n|\r(?!\n)/;
const kLineObjectStream = Symbol('line object stream');
const KEYPRESS_DECODER = Symbol('keypress-decoder');
const ESCAPE_DECODER = Symbol('escape-decoder');
@ -190,6 +196,8 @@ function Interface(input, output, completer, terminal) {
self._refreshLine();
}
this[kLineObjectStream] = undefined;
if (!this.terminal) {
function onSelfCloseWithoutTerminal() {
input.removeListener('data', ondata);
@ -1019,6 +1027,41 @@ Interface.prototype._ttyWrite = function(s, key) {
}
};
Interface.prototype[Symbol.asyncIterator] = function() {
emitExperimentalWarning('readline Interface [Symbol.asyncIterator]');
if (this[kLineObjectStream] === undefined) {
if (Readable === undefined) {
Readable = require('stream').Readable;
}
const readable = new Readable({
objectMode: true,
read: () => {
this.resume();
},
destroy: (err, cb) => {
this.off('line', lineListener);
this.off('close', closeListener);
this.close();
cb(err);
}
});
const lineListener = (input) => {
if (!readable.push(input)) {
this.pause();
}
};
const closeListener = () => {
readable.push(null);
};
this.on('line', lineListener);
this.on('close', closeListener);
this[kLineObjectStream] = readable;
}
return this[kLineObjectStream][Symbol.asyncIterator]();
};
/**
* accepts a readable Stream instance and makes it emit "keypress" events
*/

View File

@ -0,0 +1,48 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');
const readline = require('readline');
const CONTENT = 'content';
const TOTAL_LINES = 18;
(async () => {
const readable = new Readable({ read() {} });
readable.push(`${CONTENT}\n`.repeat(TOTAL_LINES));
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});
const it = rli[Symbol.asyncIterator]();
const highWaterMark = it.stream.readableHighWaterMark;
// For this test to work, we have to queue up more than the number of
// highWaterMark items in rli. Make sure that is the case.
assert(TOTAL_LINES > highWaterMark);
let iterations = 0;
let readableEnded = false;
for await (const line of it) {
assert.strictEqual(readableEnded, false);
assert.strictEqual(line, CONTENT);
const expectedPaused = TOTAL_LINES - iterations > highWaterMark;
assert.strictEqual(readable.isPaused(), expectedPaused);
iterations += 1;
// We have to end the input stream asynchronously for back pressure to work.
// Only end when we have reached the final line.
if (iterations === TOTAL_LINES) {
readable.push(null);
readableEnded = true;
}
}
assert.strictEqual(iterations, TOTAL_LINES);
})().then(common.mustCall());

View File

@ -0,0 +1,78 @@
'use strict';
const common = require('../common');
const fs = require('fs');
const { join } = require('path');
const readline = require('readline');
const assert = require('assert');
const tmpdir = require('../common/tmpdir');
tmpdir.refresh();
const filename = join(tmpdir.path, 'test.txt');
const testContents = [
'',
'\n',
'line 1',
'line 1\nline 2 南越国是前203年至前111年存在于岭南地区的一个国家\nline 3\ntrailing',
'line 1\nline 2\nline 3 ends with newline\n'
];
async function testSimpleDestroy() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);
const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});
const iteratedLines = [];
for await (const k of rli) {
iteratedLines.push(k);
break;
}
const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
expectedLines.splice(1);
assert.deepStrictEqual(iteratedLines, expectedLines);
}
}
async function testMutualDestroy() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);
const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});
const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
expectedLines.splice(2);
const iteratedLines = [];
for await (const k of rli) {
iteratedLines.push(k);
for await (const l of rli) {
iteratedLines.push(l);
break;
}
assert.deepStrictEqual(iteratedLines, expectedLines);
}
assert.deepStrictEqual(iteratedLines, expectedLines);
}
}
testSimpleDestroy().then(testMutualDestroy).then(common.mustCall());

View File

@ -0,0 +1,77 @@
'use strict';
const common = require('../common');
const fs = require('fs');
const { join } = require('path');
const readline = require('readline');
const assert = require('assert');
const tmpdir = require('../common/tmpdir');
tmpdir.refresh();
const filename = join(tmpdir.path, 'test.txt');
const testContents = [
'',
'\n',
'line 1',
'line 1\nline 2 南越国是前203年至前111年存在于岭南地区的一个国家\nline 3\ntrailing',
'line 1\nline 2\nline 3 ends with newline\n'
];
async function testSimple() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);
const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});
const iteratedLines = [];
for await (const k of rli) {
iteratedLines.push(k);
}
const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
assert.deepStrictEqual(iteratedLines, expectedLines);
assert.strictEqual(iteratedLines.join(''), fileContent.replace(/\n/gm, ''));
}
}
async function testMutual() {
for (const fileContent of testContents) {
fs.writeFileSync(filename, fileContent);
const readable = fs.createReadStream(filename);
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});
const expectedLines = fileContent.split('\n');
if (expectedLines[expectedLines.length - 1] === '') {
expectedLines.pop();
}
const iteratedLines = [];
let iterated = false;
for await (const k of rli) {
// This outer loop should only iterate once.
assert.strictEqual(iterated, false);
iterated = true;
iteratedLines.push(k);
for await (const l of rli) {
iteratedLines.push(l);
}
assert.deepStrictEqual(iteratedLines, expectedLines);
}
assert.deepStrictEqual(iteratedLines, expectedLines);
}
}
testSimple().then(testMutual).then(common.mustCall());

View File

@ -26,7 +26,7 @@ const customTypesMap = {
'this': `${jsDocPrefix}Reference/Operators/this`,
'AsyncIterator': 'https://github.com/tc39/proposal-async-iteration',
'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface',
'bigint': 'https://github.com/tc39/proposal-bigint',