stream: fix .end() error propagation
PR-URL: https://github.com/nodejs/node/pull/36817 Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
7809c4ff80
commit
a4fce32eab
@ -27,6 +27,7 @@
|
|||||||
|
|
||||||
const {
|
const {
|
||||||
FunctionPrototype,
|
FunctionPrototype,
|
||||||
|
Error,
|
||||||
ObjectDefineProperty,
|
ObjectDefineProperty,
|
||||||
ObjectDefineProperties,
|
ObjectDefineProperties,
|
||||||
ObjectSetPrototypeOf,
|
ObjectSetPrototypeOf,
|
||||||
@ -290,8 +291,8 @@ Writable.prototype.pipe = function() {
|
|||||||
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
|
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
|
||||||
};
|
};
|
||||||
|
|
||||||
Writable.prototype.write = function(chunk, encoding, cb) {
|
function _write(stream, chunk, encoding, cb) {
|
||||||
const state = this._writableState;
|
const state = stream._writableState;
|
||||||
|
|
||||||
if (typeof encoding === 'function') {
|
if (typeof encoding === 'function') {
|
||||||
cb = encoding;
|
cb = encoding;
|
||||||
@ -333,11 +334,15 @@ Writable.prototype.write = function(chunk, encoding, cb) {
|
|||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
process.nextTick(cb, err);
|
process.nextTick(cb, err);
|
||||||
errorOrDestroy(this, err, true);
|
errorOrDestroy(stream, err, true);
|
||||||
return false;
|
return err;
|
||||||
}
|
}
|
||||||
state.pendingcb++;
|
state.pendingcb++;
|
||||||
return writeOrBuffer(this, state, chunk, encoding, cb);
|
return writeOrBuffer(stream, state, chunk, encoding, cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
Writable.prototype.write = function(chunk, encoding, cb) {
|
||||||
|
return _write(this, chunk, encoding, cb) === true;
|
||||||
};
|
};
|
||||||
|
|
||||||
Writable.prototype.cork = function() {
|
Writable.prototype.cork = function() {
|
||||||
@ -607,8 +612,14 @@ Writable.prototype.end = function(chunk, encoding, cb) {
|
|||||||
encoding = null;
|
encoding = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (chunk !== null && chunk !== undefined)
|
let err;
|
||||||
this.write(chunk, encoding);
|
|
||||||
|
if (chunk !== null && chunk !== undefined) {
|
||||||
|
const ret = _write(this, chunk, encoding);
|
||||||
|
if (ret instanceof Error) {
|
||||||
|
err = ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// .end() fully uncorks.
|
// .end() fully uncorks.
|
||||||
if (state.corked) {
|
if (state.corked) {
|
||||||
@ -616,12 +627,15 @@ Writable.prototype.end = function(chunk, encoding, cb) {
|
|||||||
this.uncork();
|
this.uncork();
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is forgiving in terms of unnecessary calls to end() and can hide
|
if (err) {
|
||||||
// logic errors. However, usually such errors are harmless and causing a
|
// Do nothing...
|
||||||
// hard error can be disproportionately destructive. It is not always
|
} else if (!state.errored && !state.ending) {
|
||||||
// trivial for the user to determine whether end() needs to be called or not.
|
// This is forgiving in terms of unnecessary calls to end() and can hide
|
||||||
let err;
|
// logic errors. However, usually such errors are harmless and causing a
|
||||||
if (!state.errored && !state.ending) {
|
// hard error can be disproportionately destructive. It is not always
|
||||||
|
// trivial for the user to determine whether end() needs to be called
|
||||||
|
// or not.
|
||||||
|
|
||||||
state.ending = true;
|
state.ending = true;
|
||||||
finishMaybe(this, state, true);
|
finishMaybe(this, state, true);
|
||||||
state.ended = true;
|
state.ended = true;
|
||||||
|
@ -46,3 +46,38 @@ const stream = require('stream');
|
|||||||
writable.emit('error', new Error('kaboom'));
|
writable.emit('error', new Error('kaboom'));
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const w = new stream.Writable({
|
||||||
|
write(chunk, encoding, callback) {
|
||||||
|
setImmediate(callback);
|
||||||
|
},
|
||||||
|
finish(callback) {
|
||||||
|
setImmediate(callback);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
w.end('testing ended state', common.mustCall((err) => {
|
||||||
|
// This errors since .destroy(err), which is invoked by errors
|
||||||
|
// in same tick below, will error all pending callbacks.
|
||||||
|
// Does this make sense? Not sure.
|
||||||
|
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
|
||||||
|
}));
|
||||||
|
assert.strictEqual(w.destroyed, false);
|
||||||
|
assert.strictEqual(w.writableEnded, true);
|
||||||
|
w.end(common.mustCall((err) => {
|
||||||
|
// This errors since .destroy(err), which is invoked by errors
|
||||||
|
// in same tick below, will error all pending callbacks.
|
||||||
|
// Does this make sense? Not sure.
|
||||||
|
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
|
||||||
|
}));
|
||||||
|
assert.strictEqual(w.destroyed, false);
|
||||||
|
assert.strictEqual(w.writableEnded, true);
|
||||||
|
w.end('end', common.mustCall((err) => {
|
||||||
|
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
|
||||||
|
}));
|
||||||
|
assert.strictEqual(w.destroyed, true);
|
||||||
|
w.on('error', common.mustCall((err) => {
|
||||||
|
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
|
||||||
|
}));
|
||||||
|
w.on('finish', common.mustNotCall());
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user