stream: ensure finish is emitted in next tick
When using end() it was possible for 'finish' to be emitted synchronously. PR-URL: https://github.com/nodejs/node/pull/30733 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
parent
8dea6dc2fb
commit
e13a37e49d
@ -698,30 +698,40 @@ function prefinish(stream, state) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function finishMaybe(stream, state) {
|
function finishMaybe(stream, state, sync) {
|
||||||
const need = needFinish(state);
|
const need = needFinish(state);
|
||||||
if (need) {
|
if (need) {
|
||||||
prefinish(stream, state);
|
prefinish(stream, state);
|
||||||
if (state.pendingcb === 0) {
|
if (state.pendingcb === 0) {
|
||||||
state.finished = true;
|
state.pendingcb++;
|
||||||
stream.emit('finish');
|
if (sync) {
|
||||||
|
process.nextTick(finish, stream, state);
|
||||||
if (state.autoDestroy) {
|
} else {
|
||||||
// In case of duplex streams we need a way to detect
|
finish(stream, state);
|
||||||
// if the readable side is ready for autoDestroy as well
|
|
||||||
const rState = stream._readableState;
|
|
||||||
if (!rState || (rState.autoDestroy && rState.endEmitted)) {
|
|
||||||
stream.destroy();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return need;
|
return need;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function finish(stream, state) {
|
||||||
|
state.pendingcb--;
|
||||||
|
state.finished = true;
|
||||||
|
stream.emit('finish');
|
||||||
|
|
||||||
|
if (state.autoDestroy) {
|
||||||
|
// In case of duplex streams we need a way to detect
|
||||||
|
// if the readable side is ready for autoDestroy as well
|
||||||
|
const rState = stream._readableState;
|
||||||
|
if (!rState || (rState.autoDestroy && rState.endEmitted)) {
|
||||||
|
stream.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function endWritable(stream, state, cb) {
|
function endWritable(stream, state, cb) {
|
||||||
state.ending = true;
|
state.ending = true;
|
||||||
finishMaybe(stream, state);
|
finishMaybe(stream, state, true);
|
||||||
if (cb) {
|
if (cb) {
|
||||||
if (state.finished)
|
if (state.finished)
|
||||||
process.nextTick(cb);
|
process.nextTick(cb);
|
||||||
|
@ -70,5 +70,7 @@ const filename = path.join(tmpdir.path, 'sync-write-stream.txt');
|
|||||||
assert.strictEqual(stream.fd, fd);
|
assert.strictEqual(stream.fd, fd);
|
||||||
|
|
||||||
stream.end();
|
stream.end();
|
||||||
assert.strictEqual(stream.fd, null);
|
stream.on('close', common.mustCall(() => {
|
||||||
|
assert.strictEqual(stream.fd, null);
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
@ -238,13 +238,15 @@ const assert = require('assert');
|
|||||||
// called again.
|
// called again.
|
||||||
const write = new Writable({
|
const write = new Writable({
|
||||||
write: common.mustNotCall(),
|
write: common.mustNotCall(),
|
||||||
final: common.mustCall((cb) => cb(), 2)
|
final: common.mustCall((cb) => cb(), 2),
|
||||||
|
autoDestroy: true
|
||||||
});
|
});
|
||||||
|
|
||||||
write.end();
|
write.end();
|
||||||
write.destroy();
|
write.once('close', common.mustCall(() => {
|
||||||
write._undestroy();
|
write._undestroy();
|
||||||
write.end();
|
write.end();
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -28,3 +28,16 @@ const assert = require('assert');
|
|||||||
assert.strictEqual(writable.writableFinished, true);
|
assert.strictEqual(writable.writableFinished, true);
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Emit finish asynchronously
|
||||||
|
|
||||||
|
const w = new Writable({
|
||||||
|
write(chunk, encoding, cb) {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
w.end();
|
||||||
|
w.on('finish', common.mustCall());
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user