stream: destroy readable on read error

PR-URL: https://github.com/nodejs/node/pull/39342
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Robert Nagy 2021-07-10 15:03:02 +02:00
parent a5dec3a470
commit 09d8c0c8d2
4 changed files with 29 additions and 85 deletions

View File

@ -1525,9 +1525,6 @@ added: v16.3.0
* `destroyOnReturn` {boolean} When set to `false`, calling `return` on the * `destroyOnReturn` {boolean} When set to `false`, calling `return` on the
async iterator, or exiting a `for await...of` iteration using a `break`, async iterator, or exiting a `for await...of` iteration using a `break`,
`return`, or `throw` will not destroy the stream. **Default:** `true`. `return`, or `throw` will not destroy the stream. **Default:** `true`.
* `destroyOnError` {boolean} When set to `false`, if the stream emits an
error while it's being iterated, the iterator will not destroy the stream.
**Default:** `true`.
* Returns: {AsyncIterator} to consume the stream. * Returns: {AsyncIterator} to consume the stream.
The iterator created by this method gives users the option to cancel the The iterator created by this method gives users the option to cancel the

View File

@ -486,7 +486,22 @@ Readable.prototype.read = function(n) {
state.needReadable = true; state.needReadable = true;
// Call internal read method // Call internal read method
this._read(state.highWaterMark); try {
const result = this._read(state.highWaterMark);
if (result != null) {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
nop,
function(err) {
errorOrDestroy(this, err);
});
}
}
} catch (err) {
errorOrDestroy(this, err);
}
state.sync = false; state.sync = false;
// If _read pushed data synchronously, then `reading` will be false, // If _read pushed data synchronously, then `reading` will be false,
@ -1131,14 +1146,11 @@ async function* createAsyncIterator(stream, options) {
error = aggregateTwoErrors(error, err); error = aggregateTwoErrors(error, err);
throw error; throw error;
} finally { } finally {
if (error) { if (
if (options?.destroyOnError !== false) { (error || options?.destroyOnReturn !== false) &&
destroyImpl.destroyer(stream, error); (error === undefined || stream._readableState.autoDestroy)
} ) {
} else if (options?.destroyOnReturn !== false) { destroyImpl.destroyer(stream, null);
if (error === undefined || stream._readableState.autoDestroy) {
destroyImpl.destroyer(stream, null);
}
} }
} }
} }

View File

@ -750,22 +750,6 @@ async function tests() {
})()); })());
} }
function createErrorReadable() {
const opts = { read() { throw new Error('inner'); } };
return new Readable(opts);
}
// Check default destroys on return
(async function() {
const readable = createReadable();
for await (const chunk of readable.iterator()) {
assert.strictEqual(chunk, 5);
break;
}
assert.ok(readable.destroyed);
})().then(common.mustCall());
// Check explicit destroying on return // Check explicit destroying on return
(async function() { (async function() {
const readable = createReadable(); const readable = createReadable();
@ -777,50 +761,6 @@ async function tests() {
assert.ok(readable.destroyed); assert.ok(readable.destroyed);
})().then(common.mustCall()); })().then(common.mustCall());
// Check default destroys on error
(async function() {
const readable = createErrorReadable();
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}
assert.ok(readable.destroyed);
})().then(common.mustCall());
// Check explicit destroys on error
(async function() {
const readable = createErrorReadable();
const opts = { destroyOnError: true, destroyOnReturn: false };
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}
assert.ok(readable.destroyed);
})().then(common.mustCall());
// Check explicit non-destroy with return true
(async function() {
const readable = createErrorReadable();
const opts = { destroyOnError: false, destroyOnReturn: true };
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}
assert.ok(!readable.destroyed);
})().then(common.mustCall());
// Check explicit non-destroy with return true // Check explicit non-destroy with return true
(async function() { (async function() {
const readable = createReadable(); const readable = createReadable();

View File

@ -1,18 +1,13 @@
'use strict'; 'use strict';
require('../common'); const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream'); const { Readable } = require('stream');
const readable = new Readable(); const readable = new Readable();
assert.throws( readable.read();
() => { readable.on('error', common.expectsError({
readable.read(); code: 'ERR_METHOD_NOT_IMPLEMENTED',
}, name: 'Error',
{ message: 'The _read() method is not implemented'
code: 'ERR_METHOD_NOT_IMPLEMENTED', }));
name: 'Error', readable.on('close', common.mustCall());
message: 'The _read() method is not implemented'
}
);