stream: fix writable.end callback behavior

Changes so that the end() callback behaves the same way in relation
to _final as write() does to _write/_writev.

PR-URL: https://github.com/nodejs/node/pull/34101
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
Robert Nagy 2020-06-28 17:44:07 +02:00
parent e2b468eb5c
commit c7e55c6b72
8 changed files with 40 additions and 42 deletions

View File

@ -412,6 +412,9 @@ Is `true` after [`writable.destroy()`][writable-destroy] has been called.
<!-- YAML <!-- YAML
added: v0.9.4 added: v0.9.4
changes: changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/34101
description: The `callback` is invoked before 'finish' or on error.
- version: v14.0.0 - version: v14.0.0
pr-url: https://github.com/nodejs/node/pull/29747 pr-url: https://github.com/nodejs/node/pull/29747
description: The `callback` is invoked if 'finish' or 'error' is emitted. description: The `callback` is invoked if 'finish' or 'error' is emitted.
@ -428,15 +431,13 @@ changes:
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`. other than `null`.
* `encoding` {string} The encoding if `chunk` is a string * `encoding` {string} The encoding if `chunk` is a string
* `callback` {Function} Optional callback for when the stream finishes * `callback` {Function} Callback for when the stream is finished.
or errors
* Returns: {this} * Returns: {this}
Calling the `writable.end()` method signals that no more data will be written Calling the `writable.end()` method signals that no more data will be written
to the [`Writable`][]. The optional `chunk` and `encoding` arguments allow one to the [`Writable`][]. The optional `chunk` and `encoding` arguments allow one
final additional chunk of data to be written immediately before closing the final additional chunk of data to be written immediately before closing the
stream. If provided, the optional `callback` function is attached as a listener stream.
for the [`'finish'`][] and the `'error'` event.
Calling the [`stream.write()`][stream-write] method after calling Calling the [`stream.write()`][stream-write] method after calling
[`stream.end()`][stream-end] will raise an error. [`stream.end()`][stream-end] will raise an error.
@ -592,7 +593,7 @@ changes:
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`. other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string. **Default:** `'utf8'` * `encoding` {string} The encoding, if `chunk` is a string. **Default:** `'utf8'`
* `callback` {Function} Callback for when this chunk of data is flushed * `callback` {Function} Callback for when this chunk of data is flushed.
* Returns: {boolean} `false` if the stream wishes for the calling code to * Returns: {boolean} `false` if the stream wishes for the calling code to
wait for the `'drain'` event to be emitted before continuing to write wait for the `'drain'` event to be emitted before continuing to write
additional data; otherwise `true`. additional data; otherwise `true`.

View File

@ -64,6 +64,8 @@ ObjectSetPrototypeOf(Writable, Stream);
function nop() {} function nop() {}
const kOnFinished = Symbol('kOnFinished');
function WritableState(options, stream, isDuplex) { function WritableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share // Duplex streams are both readable and writable, but share
// the same options object. // the same options object.
@ -185,6 +187,8 @@ function WritableState(options, stream, isDuplex) {
// True if close has been emitted or would have been emitted // True if close has been emitted or would have been emitted
// depending on emitClose. // depending on emitClose.
this.closeEmitted = false; this.closeEmitted = false;
this[kOnFinished] = [];
} }
function resetBuffer(state) { function resetBuffer(state) {
@ -411,7 +415,7 @@ function onwriteError(stream, state, er, cb) {
// not enabled. Passing `er` here doesn't make sense since // not enabled. Passing `er` here doesn't make sense since
// it's related to one specific write, not to the buffered // it's related to one specific write, not to the buffered
// writes. // writes.
errorBuffer(state, new ERR_STREAM_DESTROYED('write')); errorBuffer(state);
// This can emit error, but error must always follow cb. // This can emit error, but error must always follow cb.
errorOrDestroy(stream, er); errorOrDestroy(stream, er);
} }
@ -487,14 +491,14 @@ function afterWrite(stream, state, count, cb) {
} }
if (state.destroyed) { if (state.destroyed) {
errorBuffer(state, new ERR_STREAM_DESTROYED('write')); errorBuffer(state);
} }
finishMaybe(stream, state); finishMaybe(stream, state);
} }
// If there's something in the buffer waiting, then invoke callbacks. // If there's something in the buffer waiting, then invoke callbacks.
function errorBuffer(state, err) { function errorBuffer(state) {
if (state.writing) { if (state.writing) {
return; return;
} }
@ -503,7 +507,11 @@ function errorBuffer(state, err) {
const { chunk, callback } = state.buffered[n]; const { chunk, callback } = state.buffered[n];
const len = state.objectMode ? 1 : chunk.length; const len = state.objectMode ? 1 : chunk.length;
state.length -= len; state.length -= len;
callback(err); callback(new ERR_STREAM_DESTROYED('write'));
}
for (const callback of state[kOnFinished].splice(0)) {
callback(new ERR_STREAM_DESTROYED('end'));
} }
resetBuffer(state); resetBuffer(state);
@ -611,10 +619,11 @@ Writable.prototype.end = function(chunk, encoding, cb) {
} }
if (typeof cb === 'function') { if (typeof cb === 'function') {
if (err || state.finished) if (err || state.finished) {
process.nextTick(cb, err); process.nextTick(cb, err);
else } else {
onFinished(this, cb); state[kOnFinished].push(cb);
}
} }
return this; return this;
@ -636,6 +645,9 @@ function callFinal(stream, state) {
stream._final((err) => { stream._final((err) => {
state.pendingcb--; state.pendingcb--;
if (err) { if (err) {
for (const callback of state[kOnFinished].splice(0)) {
callback(err);
}
errorOrDestroy(stream, err, state.sync); errorOrDestroy(stream, err, state.sync);
} else if (needFinish(state)) { } else if (needFinish(state)) {
state.prefinished = true; state.prefinished = true;
@ -683,6 +695,11 @@ function finish(stream, state) {
return; return;
state.finished = true; state.finished = true;
for (const callback of state[kOnFinished].splice(0)) {
callback();
}
stream.emit('finish'); stream.emit('finish');
if (state.autoDestroy) { if (state.autoDestroy) {
@ -701,26 +718,6 @@ function finish(stream, state) {
} }
} }
// TODO(ronag): Avoid using events to implement internal logic.
function onFinished(stream, cb) {
function onerror(err) {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb(err);
if (stream.listenerCount('error') === 0) {
stream.emit('error', err);
}
}
function onfinish() {
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb();
}
stream.on('finish', onfinish);
stream.prependListener('error', onerror);
}
ObjectDefineProperties(Writable.prototype, { ObjectDefineProperties(Writable.prototype, {
destroyed: { destroyed: {
@ -800,7 +797,7 @@ const destroy = destroyImpl.destroy;
Writable.prototype.destroy = function(err, cb) { Writable.prototype.destroy = function(err, cb) {
const state = this._writableState; const state = this._writableState;
if (!state.destroyed) { if (!state.destroyed) {
process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write')); process.nextTick(errorBuffer, state);
} }
destroy.call(this, err, cb); destroy.call(this, err, cb);
return this; return this;

View File

@ -90,7 +90,7 @@ const t = new stream.Transform({
t.on('finish', common.mustCall(function() { t.on('finish', common.mustCall(function() {
state++; state++;
// finishListener // finishListener
assert.strictEqual(state, 14); assert.strictEqual(state, 15);
}, 1)); }, 1));
t.on('end', common.mustCall(function() { t.on('end', common.mustCall(function() {
state++; state++;
@ -106,5 +106,5 @@ t.write(4);
t.end(7, common.mustCall(function() { t.end(7, common.mustCall(function() {
state++; state++;
// endMethodCallback // endMethodCallback
assert.strictEqual(state, 15); assert.strictEqual(state, 14);
}, 1)); }, 1));

View File

@ -92,7 +92,7 @@ const t = new stream.Transform({
t.on('finish', common.mustCall(function() { t.on('finish', common.mustCall(function() {
state++; state++;
// finishListener // finishListener
assert.strictEqual(state, 14); assert.strictEqual(state, 15);
}, 1)); }, 1));
t.on('end', common.mustCall(function() { t.on('end', common.mustCall(function() {
state++; state++;
@ -108,5 +108,5 @@ t.write(4);
t.end(7, common.mustCall(function() { t.end(7, common.mustCall(function() {
state++; state++;
// endMethodCallback // endMethodCallback
assert.strictEqual(state, 15); assert.strictEqual(state, 14);
}, 1)); }, 1));

View File

@ -354,7 +354,7 @@ const assert = require('assert');
assert.strictEqual(err.message, 'asd'); assert.strictEqual(err.message, 'asd');
})); }));
write.end('asd', common.mustCall((err) => { write.end('asd', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd'); assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
})); }));
write.destroy(new Error('asd')); write.destroy(new Error('asd'));
} }

View File

@ -17,10 +17,10 @@ const stream = require('stream');
})); }));
writable.write('asd'); writable.write('asd');
writable.end(common.mustCall((err) => { writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
})); }));
writable.end(common.mustCall((err) => { writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
})); }));
} }

View File

@ -19,5 +19,5 @@ writable._final = (cb) => {
writable.write('asd'); writable.write('asd');
writable.end(common.mustCall((err) => { writable.end(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom'); assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
})); }));

View File

@ -57,7 +57,7 @@ for (const withPendingData of [ false, true ]) {
w.destroy(); w.destroy();
assert.strictEqual(chunksWritten, 1); assert.strictEqual(chunksWritten, 1);
callbacks.shift()(); callbacks.shift()();
assert.strictEqual(chunksWritten, 2); assert.strictEqual(chunksWritten, useEnd && !withPendingData ? 1 : 2);
assert.strictEqual(callbacks.length, 0); assert.strictEqual(callbacks.length, 0);
assert.strictEqual(drains, 1); assert.strictEqual(drains, 1);