stream: don't emit finish after destroy

PR-URL: https://github.com/nodejs/node/pull/40852
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Robert Nagy 2021-11-18 07:50:29 +01:00
parent f1658bdc8f
commit 48e784043d
8 changed files with 25 additions and 26 deletions

View File

@ -650,6 +650,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
function needFinish(state) { function needFinish(state) {
return (state.ending && return (state.ending &&
!state.destroyed &&
state.constructed && state.constructed &&
state.length === 0 && state.length === 0 &&
!state.errored && !state.errored &&
@ -732,11 +733,18 @@ function prefinish(stream, state) {
function finishMaybe(stream, state, sync) { function finishMaybe(stream, state, sync) {
if (needFinish(state)) { if (needFinish(state)) {
prefinish(stream, state); prefinish(stream, state);
if (state.pendingcb === 0 && needFinish(state)) { if (state.pendingcb === 0) {
state.pendingcb++;
if (sync) { if (sync) {
process.nextTick(finish, stream, state); state.pendingcb++;
process.nextTick((stream, state) => {
if (needFinish(state)) {
finish(stream, state);
} else { } else {
state.pendingcb--;
}
}, stream, state);
} else if (needFinish(state)) {
state.pendingcb++;
finish(stream, state); finish(stream, state);
} }
} }

View File

@ -22,7 +22,7 @@ fs.readFile(loc, common.mustSucceed((data) => {
const server = http2.createServer(); const server = http2.createServer();
let client; let client;
const countdown = new Countdown(3, () => { const countdown = new Countdown(2, () => {
server.close(); server.close();
client.close(); client.close();
}); });
@ -50,7 +50,6 @@ fs.readFile(loc, common.mustSucceed((data) => {
req.resume(); req.resume();
req.on('end', common.mustCall()); req.on('end', common.mustCall());
req.on('finish', () => countdown.dec());
const str = fs.createReadStream(loc); const str = fs.createReadStream(loc);
str.on('end', common.mustCall()); str.on('end', common.mustCall());
str.on('close', () => countdown.dec()); str.on('close', () => countdown.dec());

View File

@ -73,16 +73,6 @@ server.on('request', common.mustCall(function(request, response) {
assert.throws(() => request.socket.pause = noop, errMsg); assert.throws(() => request.socket.pause = noop, errMsg);
assert.throws(() => request.socket.resume = noop, errMsg); assert.throws(() => request.socket.resume = noop, errMsg);
request.stream.on('finish', common.mustCall(() => {
setImmediate(() => {
request.socket.setTimeout = noop;
assert.strictEqual(request.stream.setTimeout, noop);
assert.strictEqual(request.stream._isProcessing, undefined);
request.socket._isProcessing = true;
assert.strictEqual(request.stream._isProcessing, true);
});
}));
response.stream.destroy(); response.stream.destroy();
})); }));

View File

@ -125,7 +125,7 @@ const assert = require('assert');
duplex.removeListener('end', fail); duplex.removeListener('end', fail);
duplex.removeListener('finish', fail); duplex.removeListener('finish', fail);
duplex.on('end', common.mustNotCall()); duplex.on('end', common.mustNotCall());
duplex.on('finish', common.mustCall()); duplex.on('finish', common.mustNotCall());
assert.strictEqual(duplex.destroyed, true); assert.strictEqual(duplex.destroyed, true);
} }

View File

@ -117,7 +117,7 @@ const assert = require('assert');
transform.removeListener('end', fail); transform.removeListener('end', fail);
transform.removeListener('finish', fail); transform.removeListener('finish', fail);
transform.on('end', common.mustCall()); transform.on('end', common.mustCall());
transform.on('finish', common.mustCall()); transform.on('finish', common.mustNotCall());
} }
{ {

View File

@ -124,8 +124,6 @@ const assert = require('assert');
write.destroy(); write.destroy();
write.removeListener('finish', fail);
write.on('finish', common.mustCall());
assert.strictEqual(write.destroyed, true); assert.strictEqual(write.destroyed, true);
} }

View File

@ -31,3 +31,13 @@ const { Writable } = require('stream');
w.write('asd'); w.write('asd');
w.destroy(); w.destroy();
} }
{
const w = new Writable({
write() {
}
});
w.on('finish', common.mustNotCall());
w.end();
w.destroy();
}

View File

@ -20,9 +20,7 @@ for (const withPendingData of [ false, true ]) {
let chunksWritten = 0; let chunksWritten = 0;
let drains = 0; let drains = 0;
let finished = false;
w.on('drain', () => drains++); w.on('drain', () => drains++);
w.on('finish', () => finished = true);
function onWrite(err) { function onWrite(err) {
if (err) { if (err) {
@ -60,9 +58,5 @@ for (const withPendingData of [ false, true ]) {
assert.strictEqual(chunksWritten, useEnd && !withPendingData ? 1 : 2); assert.strictEqual(chunksWritten, useEnd && !withPendingData ? 1 : 2);
assert.strictEqual(callbacks.length, 0); assert.strictEqual(callbacks.length, 0);
assert.strictEqual(drains, 1); assert.strictEqual(drains, 1);
// When we used `.end()`, we see the 'finished' event if and only if
// we actually finished processing the write queue.
assert.strictEqual(finished, !withPendingData && useEnd);
} }
} }