stream: normalize async iterator stream destroy
PR-URL: https://github.com/nodejs/node/pull/31316 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
This commit is contained in:
parent
90e6e18f33
commit
a6d63c44a2
@ -22,6 +22,15 @@ const kStream = Symbol('stream');
|
||||
|
||||
let Readable;
|
||||
|
||||
function destroy(stream, err) {
|
||||
// request.destroy just do .end - .abort is what we want
|
||||
if (typeof stream.abort === 'function') return stream.abort();
|
||||
if (stream.req &&
|
||||
typeof stream.req.abort === 'function') return stream.req.abort();
|
||||
if (typeof stream.destroy === 'function') return stream.destroy(err);
|
||||
if (typeof stream.close === 'function') return stream.close();
|
||||
}
|
||||
|
||||
function createIterResult(value, done) {
|
||||
return { value, done };
|
||||
}
|
||||
@ -141,7 +150,7 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
|
||||
resolve(createIterResult(undefined, true));
|
||||
}
|
||||
});
|
||||
stream.destroy();
|
||||
destroy(stream);
|
||||
});
|
||||
},
|
||||
}, AsyncIteratorPrototype);
|
||||
@ -156,11 +165,7 @@ const createReadableStreamAsyncIterator = (stream) => {
|
||||
|
||||
const src = stream;
|
||||
stream = new Readable({ objectMode: true }).wrap(src);
|
||||
finished(stream, (err) => {
|
||||
if (typeof src.destroy === 'function') {
|
||||
src.destroy(err);
|
||||
}
|
||||
});
|
||||
finished(stream, (err) => destroy(src, err));
|
||||
}
|
||||
|
||||
const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
|
||||
|
@ -56,6 +56,25 @@ async function tests() {
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
// Non standard stream cleanup
|
||||
|
||||
const readable = new Readable({ autoDestroy: false, read() {} });
|
||||
readable.push('asd');
|
||||
readable.push('asd');
|
||||
readable.destroy = null;
|
||||
readable.close = common.mustCall(() => {
|
||||
readable.emit('close');
|
||||
});
|
||||
|
||||
await (async () => {
|
||||
for await (const d of readable) {
|
||||
d;
|
||||
return;
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
{
|
||||
const readable = new Readable({ objectMode: true, read() {} });
|
||||
readable.push(0);
|
||||
|
Loading…
x
Reference in New Issue
Block a user