Revert "stream: handle generator destruction from Duplex.from()"

This reverts commit 55413004c8ab489a03793b80c20d2ec6552668c0.

PR-URL: https://github.com/nodejs/node/pull/56278
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
jakecastelli 2024-12-17 15:12:08 +10:30 committed by Node.js GitHub Bot
parent a9e65f61d4
commit 6156f8a6d5
2 changed files with 7 additions and 243 deletions

View File

@ -83,19 +83,15 @@ module.exports = function duplexify(body, name) {
}
if (typeof body === 'function') {
let d;
const { value, write, final, destroy } = fromAsyncGen(body, () => {
destroyer(d);
});
const { value, write, final, destroy } = fromAsyncGen(body);
// Body might be a constructor function instead of an async generator function.
if (isDuplexNodeStream(value)) {
return d = value;
return value;
}
if (isIterable(value)) {
return d = from(Duplexify, value, {
return from(Duplexify, value, {
// TODO (ronag): highWaterMark?
objectMode: true,
write,
@ -106,16 +102,12 @@ module.exports = function duplexify(body, name) {
const then = value?.then;
if (typeof then === 'function') {
let finalized = false;
let d;
const promise = FunctionPrototypeCall(
then,
value,
(val) => {
// The function returned without (fully) consuming the generator.
if (!finalized) {
destroyer(d);
}
if (val != null) {
throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val);
}
@ -131,7 +123,6 @@ module.exports = function duplexify(body, name) {
readable: false,
write,
final(cb) {
finalized = true;
final(async () => {
try {
await promise;
@ -217,12 +208,11 @@ module.exports = function duplexify(body, name) {
body);
};
function fromAsyncGen(fn, destructor) {
function fromAsyncGen(fn) {
let { promise, resolve } = PromiseWithResolvers();
const ac = new AbortController();
const signal = ac.signal;
const asyncGenerator = (async function* () {
const value = fn(async function*() {
while (true) {
const _promise = promise;
promise = null;
@ -232,44 +222,9 @@ function fromAsyncGen(fn, destructor) {
if (signal.aborted)
throw new AbortError(undefined, { cause: signal.reason });
({ promise, resolve } = PromiseWithResolvers());
// Next line will "break" the loop if the generator is returned/thrown.
yield chunk;
}
})();
const originalReturn = asyncGenerator.return;
asyncGenerator.return = async function(value) {
try {
return await originalReturn.call(this, value);
} finally {
if (promise) {
const _promise = promise;
promise = null;
const { cb } = await _promise;
process.nextTick(cb);
process.nextTick(destructor);
}
}
};
const originalThrow = asyncGenerator.throw;
asyncGenerator.throw = async function(err) {
try {
return await originalThrow.call(this, err);
} finally {
if (promise) {
const _promise = promise;
promise = null;
const { cb } = await _promise;
// asyncGenerator.throw(undefined) should cause a callback error
process.nextTick(cb, err ?? new AbortError());
}
}
};
const value = fn(asyncGenerator, { signal });
}(), { signal });
return {
value,

View File

@ -5,7 +5,6 @@ const assert = require('assert');
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
const { ReadableStream, WritableStream } = require('stream/web');
const { Blob } = require('buffer');
const sleep = require('util').promisify(setTimeout);
{
const d = Duplex.from({
@ -402,193 +401,3 @@ function makeATestWritableStream(writeFunc) {
assert.strictEqual(d.writable, false);
}));
}
{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
const values = await Array.fromAsync(asyncGenerator);
assert.deepStrictEqual(values, ['foo', 'bar', 'baz']);
await asyncGenerator.return();
await asyncGenerator.return();
await asyncGenerator.return();
}),
common.mustSucceed(() => {
assert.strictEqual(r.destroyed, true);
})
);
}
{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
// eslint-disable-next-line no-unused-vars
for await (const _ of asyncGenerator) break;
}),
common.mustSucceed(() => {
assert.strictEqual(r.destroyed, true);
})
);
}
{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
const a = await asyncGenerator.next();
assert.strictEqual(a.done, false);
assert.strictEqual(a.value.toString(), 'foo');
const b = await asyncGenerator.return();
assert.strictEqual(b.done, true);
}),
common.mustSucceed(() => {
assert.strictEqual(r.destroyed, true);
})
);
}
{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
// Note: the generator is not even started at this point
await asyncGenerator.return();
}),
common.mustSucceed(() => {
assert.strictEqual(r.destroyed, true);
})
);
}
{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
// Same as before, with a delay
await sleep(100);
await asyncGenerator.return();
}),
common.mustSucceed(() => {
assert.strictEqual(r.destroyed, true);
})
);
}
{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {}),
common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
assert.strictEqual(r.destroyed, true);
})
);
}
{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
await sleep(100);
}),
common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
assert.strictEqual(r.destroyed, true);
})
);
}
{
const r = Readable.from(['foo', 'bar', 'baz']);
const d = Duplex.from(async function(asyncGenerator) {
while (!(await asyncGenerator.next()).done) await sleep(100);
});
setTimeout(() => d.destroy(), 150);
pipeline(
r,
d,
common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
assert.strictEqual(r.destroyed, true);
})
);
}
{
const r = Duplex.from(async function* () {
for (const value of ['foo', 'bar', 'baz']) {
await sleep(50);
yield value;
}
});
const d = Duplex.from(async function(asyncGenerator) {
while (!(await asyncGenerator.next()).done);
});
setTimeout(() => r.destroy(), 75);
pipeline(
r,
d,
common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
assert.strictEqual(r.destroyed, true);
assert.strictEqual(d.destroyed, true);
})
);
}
{
const r = Readable.from(['foo']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
await asyncGenerator.throw(new Error('my error'));
}),
common.mustCall((err) => {
assert.strictEqual(err.message, 'my error');
assert.strictEqual(r.destroyed, true);
})
);
}
{
const r = Readable.from(['foo', 'bar']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
await asyncGenerator.next();
await asyncGenerator.throw(new Error('my error'));
}),
common.mustCall((err) => {
assert.strictEqual(err.message, 'my error');
assert.strictEqual(r.destroyed, true);
})
);
}
{
const r = Readable.from(['foo', 'bar']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
await asyncGenerator.next();
await asyncGenerator.throw();
}),
common.mustCall((err) => {
assert.strictEqual(err.code, 'ABORT_ERR');
assert.strictEqual(r.destroyed, true);
})
);
}