stream: fix pipeline callback not called on ended stream

Fixes: https://github.com/nodejs/node/issues/46595
PR-URL: https://github.com/nodejs/node/pull/46600
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
This commit is contained in:
Debadree Chatterjee 2023-02-19 12:30:07 +05:30 committed by GitHub
parent ec7edfbdb7
commit b4a962d0e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 46 additions and 3 deletions

View File

@ -38,6 +38,7 @@ const {
isTransformStream,
isWebStream,
isReadableStream,
isReadableEnded,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');
@ -417,10 +418,17 @@ function pipe(src, dst, finish, { end }) {
// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
// Now they allow it but "secretly" don't close the underlying fd.
src.once('end', () => {
function endFn() {
ended = true;
dst.end();
});
}
if (isReadableEnded(src)) { // End the destination if the source has already ended.
process.nextTick(endFn);
} else {
src.once('end', endFn);
}
} else {
finish();
}

View File

@ -1591,3 +1591,28 @@ const tsp = require('timers/promises');
assert.strictEqual(writable.endCount, 1);
}));
}
{
const readable = new Readable({
read() {}
});
readable.on('end', common.mustCall(() => {
pipeline(readable, new PassThrough(), common.mustSucceed());
}));
readable.push(null);
readable.read();
}
{
const dup = new Duplex({
read() {},
write(chunk, enc, cb) {
cb();
}
});
dup.on('end', common.mustCall(() => {
pipeline(dup, new PassThrough(), common.mustSucceed());
}));
dup.push(null);
dup.read();
}

View File

@ -2,7 +2,7 @@
const common = require('../common');
const assert = require('assert');
const { Readable, Writable, Transform, pipeline } = require('stream');
const { Readable, Writable, Transform, pipeline, PassThrough } = require('stream');
const { pipeline: pipelinePromise } = require('stream/promises');
const { ReadableStream, WritableStream, TransformStream } = require('stream/web');
const http = require('http');
@ -410,3 +410,13 @@ const http = require('http');
}
c.close();
}
{
const rs = new ReadableStream({
start(controller) {
controller.close();
}
});
pipeline(rs, new PassThrough(), common.mustSucceed());
}