stream: cleanup eos

PR-URL: https://github.com/nodejs/node/pull/40998
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Robert Nagy 2021-12-01 02:50:24 +01:00 committed by GitHub
parent d81f328aa5
commit 40a52bde95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -51,14 +51,10 @@ function eos(stream, options, callback) {
callback = once(callback); callback = once(callback);
const readable = options.readable || const readable = options.readable ?? isReadableNodeStream(stream);
(options.readable !== false && isReadableNodeStream(stream)); const writable = options.writable ?? isWritableNodeStream(stream);
const writable = options.writable ||
(options.writable !== false && isWritableNodeStream(stream));
if (isNodeStream(stream)) { if (!isNodeStream(stream)) {
// Do nothing...
} else {
// TODO: Webstreams. // TODO: Webstreams.
// TODO: Throw INVALID_ARG_TYPE. // TODO: Throw INVALID_ARG_TYPE.
} }
@ -67,7 +63,9 @@ function eos(stream, options, callback) {
const rState = stream._readableState; const rState = stream._readableState;
const onlegacyfinish = () => { const onlegacyfinish = () => {
if (!stream.writable) onfinish(); if (!stream.writable) {
onfinish();
}
}; };
// TODO (ronag): Improve soft detection to include core modules and // TODO (ronag): Improve soft detection to include core modules and
@ -85,10 +83,17 @@ function eos(stream, options, callback) {
// Stream should not be destroyed here. If it is that // Stream should not be destroyed here. If it is that
// means that user space is doing something differently and // means that user space is doing something differently and
// we cannot trust willEmitClose. // we cannot trust willEmitClose.
if (stream.destroyed) willEmitClose = false; if (stream.destroyed) {
willEmitClose = false;
}
if (willEmitClose && (!stream.readable || readable)) return; if (willEmitClose && (!stream.readable || readable)) {
if (!readable || readableFinished) callback.call(stream); return;
}
if (!readable || readableFinished) {
callback.call(stream);
}
}; };
let readableFinished = isReadableFinished(stream, false); let readableFinished = isReadableFinished(stream, false);
@ -97,10 +102,17 @@ function eos(stream, options, callback) {
// Stream should not be destroyed here. If it is that // Stream should not be destroyed here. If it is that
// means that user space is doing something differently and // means that user space is doing something differently and
// we cannot trust willEmitClose. // we cannot trust willEmitClose.
if (stream.destroyed) willEmitClose = false; if (stream.destroyed) {
willEmitClose = false;
}
if (willEmitClose && (!stream.writable || writable)) return; if (willEmitClose && (!stream.writable || writable)) {
if (!writable || writableFinished) callback.call(stream); return;
}
if (!writable || writableFinished) {
callback.call(stream);
}
}; };
const onerror = (err) => { const onerror = (err) => {
@ -141,8 +153,11 @@ function eos(stream, options, callback) {
if (!willEmitClose) { if (!willEmitClose) {
stream.on('abort', onclose); stream.on('abort', onclose);
} }
if (stream.req) onrequest(); if (stream.req) {
else stream.on('request', onrequest); onrequest();
} else {
stream.on('request', onrequest);
}
} else if (writable && !wState) { // legacy streams } else if (writable && !wState) { // legacy streams
stream.on('end', onlegacyfinish); stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish); stream.on('close', onlegacyfinish);
@ -155,7 +170,9 @@ function eos(stream, options, callback) {
stream.on('end', onend); stream.on('end', onend);
stream.on('finish', onfinish); stream.on('finish', onfinish);
if (options.error !== false) stream.on('error', onerror); if (options.error !== false) {
stream.on('error', onerror);
}
stream.on('close', onclose); stream.on('close', onclose);
if (closed) { if (closed) {