fs: implement byob mode for readableWebStream()
Fixes: https://github.com/nodejs/node/issues/45853 PR-URL: https://github.com/nodejs/node/pull/46933 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
a57b8ddd9a
commit
a9c3d9244f
@ -446,14 +446,22 @@ Reads data from the file and stores that in the given buffer.
|
||||
If the file is not modified concurrently, the end-of-file is reached when the
|
||||
number of bytes read is zero.
|
||||
|
||||
#### `filehandle.readableWebStream()`
|
||||
#### `filehandle.readableWebStream(options)`
|
||||
|
||||
<!-- YAML
|
||||
added: v17.0.0
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/46933
|
||||
description: Added option to create a 'bytes' stream.
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
|
||||
* `options` {Object}
|
||||
* `type` {string|undefined} Whether to open a normal or a `'bytes'` stream.
|
||||
**Default:** `undefined`
|
||||
|
||||
* Returns: {ReadableStream}
|
||||
|
||||
Returns a `ReadableStream` that may be used to read the files data.
|
||||
|
@ -14,6 +14,7 @@ const {
|
||||
SafePromisePrototypeFinally,
|
||||
Symbol,
|
||||
Uint8Array,
|
||||
FunctionPrototypeBind,
|
||||
} = primordials;
|
||||
|
||||
const { fs: constants } = internalBinding('constants');
|
||||
@ -249,7 +250,7 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
|
||||
* } ReadableStream
|
||||
* @returns {ReadableStream}
|
||||
*/
|
||||
readableWebStream() {
|
||||
readableWebStream(options = kEmptyObject) {
|
||||
if (this[kFd] === -1)
|
||||
throw new ERR_INVALID_STATE('The FileHandle is closed');
|
||||
if (this[kClosePromise])
|
||||
@ -257,21 +258,65 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
|
||||
if (this[kLocked])
|
||||
throw new ERR_INVALID_STATE('The FileHandle is locked');
|
||||
this[kLocked] = true;
|
||||
const {
|
||||
newReadableStreamFromStreamBase,
|
||||
} = require('internal/webstreams/adapters');
|
||||
const readable = newReadableStreamFromStreamBase(
|
||||
this[kHandle],
|
||||
undefined,
|
||||
{ ondone: () => this[kUnref]() });
|
||||
|
||||
const {
|
||||
readableStreamCancel,
|
||||
} = require('internal/webstreams/readablestream');
|
||||
this[kRef]();
|
||||
this.once('close', () => {
|
||||
readableStreamCancel(readable);
|
||||
});
|
||||
if (options.type !== undefined) {
|
||||
validateString(options.type, 'options.type');
|
||||
}
|
||||
|
||||
let readable;
|
||||
|
||||
if (options.type !== 'bytes') {
|
||||
const {
|
||||
newReadableStreamFromStreamBase,
|
||||
} = require('internal/webstreams/adapters');
|
||||
readable = newReadableStreamFromStreamBase(
|
||||
this[kHandle],
|
||||
undefined,
|
||||
{ ondone: () => this[kUnref]() });
|
||||
|
||||
const {
|
||||
readableStreamCancel,
|
||||
} = require('internal/webstreams/readablestream');
|
||||
this[kRef]();
|
||||
this.once('close', () => {
|
||||
readableStreamCancel(readable);
|
||||
});
|
||||
} else {
|
||||
const {
|
||||
readableStreamCancel,
|
||||
ReadableStream,
|
||||
} = require('internal/webstreams/readablestream');
|
||||
|
||||
const readFn = FunctionPrototypeBind(this.read, this);
|
||||
const ondone = FunctionPrototypeBind(this[kUnref], this);
|
||||
|
||||
readable = new ReadableStream({
|
||||
type: 'bytes',
|
||||
autoAllocateChunkSize: 16384,
|
||||
|
||||
async pull(controller) {
|
||||
const view = controller.byobRequest.view;
|
||||
const { bytesRead } = await readFn(view, view.byteOffset, view.byteLength);
|
||||
|
||||
if (bytesRead === 0) {
|
||||
ondone();
|
||||
controller.close();
|
||||
}
|
||||
|
||||
controller.byobRequest.respond(bytesRead);
|
||||
},
|
||||
|
||||
cancel() {
|
||||
ondone();
|
||||
},
|
||||
});
|
||||
|
||||
this[kRef]();
|
||||
|
||||
this.once('close', () => {
|
||||
readableStreamCancel(readable);
|
||||
});
|
||||
}
|
||||
|
||||
return readable;
|
||||
}
|
||||
|
@ -86,3 +86,87 @@ const check = readFileSync(__filename, { encoding: 'utf8' });
|
||||
mc.port1.close();
|
||||
await file.close();
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Make sure 'bytes' stream works
|
||||
(async () => {
|
||||
const file = await open(__filename);
|
||||
const dec = new TextDecoder();
|
||||
const readable = file.readableWebStream({ type: 'bytes' });
|
||||
const reader = readable.getReader({ mode: 'byob' });
|
||||
|
||||
let data = '';
|
||||
let result;
|
||||
do {
|
||||
const buff = new ArrayBuffer(100);
|
||||
result = await reader.read(new DataView(buff));
|
||||
if (result.value !== undefined) {
|
||||
data += dec.decode(result.value);
|
||||
assert.ok(result.value.byteLength <= 100);
|
||||
}
|
||||
} while (!result.done);
|
||||
|
||||
assert.strictEqual(check, data);
|
||||
|
||||
assert.throws(() => file.readableWebStream(), {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
|
||||
await file.close();
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Make sure that acquiring a ReadableStream 'bytes' stream
|
||||
// fails if the FileHandle is already closed.
|
||||
(async () => {
|
||||
const file = await open(__filename);
|
||||
await file.close();
|
||||
|
||||
assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Make sure that acquiring a ReadableStream 'bytes' stream
|
||||
// fails if the FileHandle is already closing.
|
||||
(async () => {
|
||||
const file = await open(__filename);
|
||||
file.close();
|
||||
|
||||
assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
|
||||
code: 'ERR_INVALID_STATE',
|
||||
});
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Make sure the 'bytes' ReadableStream is closed when the underlying
|
||||
// FileHandle is closed.
|
||||
(async () => {
|
||||
const file = await open(__filename);
|
||||
const readable = file.readableWebStream({ type: 'bytes' });
|
||||
const reader = readable.getReader({ mode: 'byob' });
|
||||
file.close();
|
||||
await reader.closed;
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Make sure the 'bytes' ReadableStream is closed when the underlying
|
||||
// FileHandle is closed.
|
||||
(async () => {
|
||||
const file = await open(__filename);
|
||||
const readable = file.readableWebStream({ type: 'bytes' });
|
||||
file.close();
|
||||
const reader = readable.getReader({ mode: 'byob' });
|
||||
await reader.closed;
|
||||
})().then(common.mustCall());
|
||||
|
||||
// Make sure that the FileHandle is properly marked "in use"
|
||||
// when a 'bytes' ReadableStream has been acquired for it.
|
||||
(async () => {
|
||||
const file = await open(__filename);
|
||||
file.readableWebStream({ type: 'bytes' });
|
||||
const mc = new MessageChannel();
|
||||
mc.port1.onmessage = common.mustNotCall();
|
||||
assert.throws(() => mc.port2.postMessage(file, [file]), {
|
||||
code: 25,
|
||||
name: 'DataCloneError',
|
||||
});
|
||||
mc.port1.close();
|
||||
await file.close();
|
||||
})().then(common.mustCall());
|
||||
|
Loading…
x
Reference in New Issue
Block a user