lib: fix blob.stream() causing hanging promises
Refs: https://github.com/nodejs/node/issues/47993#issuecomment-1546901936 PR-URL: https://github.com/nodejs/node/pull/48232 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
parent
2c6698b4db
commit
8cc14387a2
@ -329,34 +329,45 @@ class Blob {
|
||||
pull(c) {
|
||||
const { promise, resolve, reject } = createDeferredPromise();
|
||||
this.pendingPulls.push({ resolve, reject });
|
||||
reader.pull((status, buffer) => {
|
||||
// If pendingPulls is empty here, the stream had to have
|
||||
// been canceled, and we don't really care about the result.
|
||||
// we can simply exit.
|
||||
if (this.pendingPulls.length === 0) {
|
||||
return;
|
||||
}
|
||||
const pending = this.pendingPulls.shift();
|
||||
if (status === 0) {
|
||||
// EOS
|
||||
c.close();
|
||||
pending.resolve();
|
||||
return;
|
||||
} else if (status < 0) {
|
||||
// The read could fail for many different reasons when reading
|
||||
// from a non-memory resident blob part (e.g. file-backed blob).
|
||||
// The error details the system error code.
|
||||
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
|
||||
|
||||
c.error(error);
|
||||
pending.reject(error);
|
||||
return;
|
||||
}
|
||||
if (buffer !== undefined) {
|
||||
c.enqueue(new Uint8Array(buffer));
|
||||
}
|
||||
pending.resolve();
|
||||
});
|
||||
const readNext = () => {
|
||||
reader.pull((status, buffer) => {
|
||||
// If pendingPulls is empty here, the stream had to have
|
||||
// been canceled, and we don't really care about the result.
|
||||
// We can simply exit.
|
||||
if (this.pendingPulls.length === 0) {
|
||||
return;
|
||||
}
|
||||
if (status === 0) {
|
||||
// EOS
|
||||
c.close();
|
||||
const pending = this.pendingPulls.shift();
|
||||
pending.resolve();
|
||||
return;
|
||||
} else if (status < 0) {
|
||||
// The read could fail for many different reasons when reading
|
||||
// from a non-memory resident blob part (e.g. file-backed blob).
|
||||
// The error details the system error code.
|
||||
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
|
||||
const pending = this.pendingPulls.shift();
|
||||
c.error(error);
|
||||
pending.reject(error);
|
||||
return;
|
||||
}
|
||||
if (buffer !== undefined) {
|
||||
c.enqueue(new Uint8Array(buffer));
|
||||
}
|
||||
// We keep reading until we either reach EOS, some error, or we
|
||||
// hit the flow rate of the stream (c.desiredSize).
|
||||
queueMicrotask(() => {
|
||||
if (c.desiredSize <= 0) {
|
||||
// A manual backpressure check.
|
||||
return;
|
||||
}
|
||||
readNext();
|
||||
});
|
||||
});
|
||||
};
|
||||
readNext();
|
||||
return promise;
|
||||
},
|
||||
cancel(reason) {
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Flags: --no-warnings
|
||||
// Flags: --no-warnings --expose-internals
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
@ -6,6 +6,7 @@ const assert = require('assert');
|
||||
const { Blob } = require('buffer');
|
||||
const { inspect } = require('util');
|
||||
const { EOL } = require('os');
|
||||
const { kState } = require('internal/webstreams/util');
|
||||
|
||||
{
|
||||
const b = new Blob();
|
||||
@ -237,6 +238,50 @@ assert.throws(() => new Blob({}), {
|
||||
assert(res.done);
|
||||
})().then(common.mustCall());
|
||||
|
||||
(async () => {
|
||||
const b = new Blob(Array(10).fill('hello'));
|
||||
const reader = b.stream().getReader();
|
||||
const chunks = [];
|
||||
while (true) {
|
||||
const res = await reader.read();
|
||||
if (res.done) break;
|
||||
assert.strictEqual(res.value.byteLength, 5);
|
||||
chunks.push(res.value);
|
||||
}
|
||||
assert.strictEqual(chunks.length, 10);
|
||||
})().then(common.mustCall());
|
||||
|
||||
(async () => {
|
||||
const b = new Blob(Array(10).fill('hello'));
|
||||
const reader = b.stream().getReader();
|
||||
const chunks = [];
|
||||
while (true) {
|
||||
const res = await reader.read();
|
||||
if (chunks.length === 5) {
|
||||
reader.cancel('boom');
|
||||
break;
|
||||
}
|
||||
if (res.done) break;
|
||||
assert.strictEqual(res.value.byteLength, 5);
|
||||
chunks.push(res.value);
|
||||
}
|
||||
assert.strictEqual(chunks.length, 5);
|
||||
reader.closed.then(common.mustCall());
|
||||
})().then(common.mustCall());
|
||||
|
||||
(async () => {
|
||||
const b = new Blob(Array(10).fill('hello'));
|
||||
const stream = b.stream();
|
||||
const reader = stream.getReader();
|
||||
assert.strictEqual(stream[kState].controller.desiredSize, 1);
|
||||
const { value, done } = await reader.read();
|
||||
assert.strictEqual(value.byteLength, 5);
|
||||
assert(!done);
|
||||
setTimeout(() => {
|
||||
assert.strictEqual(stream[kState].controller.desiredSize, 0);
|
||||
}, 0);
|
||||
})().then(common.mustCall());
|
||||
|
||||
{
|
||||
const b = new Blob(['hello\n'], { endings: 'native' });
|
||||
assert.strictEqual(b.size, EOL.length + 5);
|
||||
|
Loading…
x
Reference in New Issue
Block a user