stream: initial approach to include strategy options on Readable.toWeb()

PR-URL: https://github.com/nodejs/node/pull/43515
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
txxnano 2022-07-13 07:14:38 -03:00 committed by GitHub
parent 90bc773fb8
commit 99b109f7f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 109 additions and 12 deletions

View File

@ -2807,7 +2807,7 @@ added:
Returns whether the stream is readable.
### `stream.Readable.toWeb(streamReadable)`
### `stream.Readable.toWeb(streamReadable[, options])`
<!-- YAML
added: v17.0.0
@ -2816,6 +2816,10 @@ added: v17.0.0
> Stability: 1 - Experimental
* `streamReadable` {stream.Readable}
* `options` {Object}
* `strategy` {Object}
* `highWaterMark` {number}
* `size` {Function}
* Returns: {ReadableStream}
### `stream.Writable.fromWeb(writableStream[, options])`

View File

@ -1405,8 +1405,10 @@ Readable.fromWeb = function(readableStream, options) {
options);
};
Readable.toWeb = function(streamReadable) {
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
Readable.toWeb = function(streamReadable, options) {
return lazyWebStreams().newReadableStreamFromStreamReadable(
streamReadable,
options);
};
Readable.wrap = function(src, options) {

View File

@ -359,10 +359,14 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
}
/**
* @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
* @param {Readable} streamReadable
* @param {{
* strategy : QueuingStrategy
* }} [options]
* @returns {ReadableStream}
*/
function newReadableStreamFromStreamReadable(streamReadable) {
function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) {
// Not using the internal/streams/utils isReadableNodeStream utility
// here because it will return false if streamReadable is a Duplex
// whose readable option is false. For a Duplex that is not readable,
@ -382,14 +386,26 @@ function newReadableStreamFromStreamReadable(streamReadable) {
const objectMode = streamReadable.readableObjectMode;
const highWaterMark = streamReadable.readableHighWaterMark;
// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
const strategy =
objectMode ?
new CountQueuingStrategy({ highWaterMark }) :
{ highWaterMark };
const evaluateStrategyOrFallback = (strategy) => {
// If there is a strategy available, use it
if (strategy)
return strategy;
if (objectMode) {
// When running in objectMode explicitly but no strategy, we just fall
// back to CountQueuingStrategy
return new CountQueuingStrategy({ highWaterMark });
}
// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
return { highWaterMark };
};
const strategy = evaluateStrategyOrFallback(options?.strategy);
let controller;

View File

@ -0,0 +1,75 @@
'use strict';
const common = require('../common');
const { Readable } = require('stream');
const assert = require('assert');
const { strictEqual } = require('assert');
{
// Strategy 2
const streamData = ['a', 'b', 'c', null];
// Fulfill a Readable object
const readable = new Readable({
read: common.mustCall(() => {
process.nextTick(() => {
readable.push(streamData.shift());
});
}, streamData.length),
});
// Use helper to convert it to a Web ReadableStream using ByteLength strategy
const readableStream = Readable.toWeb(readable, {
strategy: new ByteLengthQueuingStrategy({ highWaterMark: 1 }),
});
assert(!readableStream.locked);
readableStream.getReader().read().then(common.mustCall());
}
{
// Strategy 2
const streamData = ['a', 'b', 'c', null];
// Fulfill a Readable object
const readable = new Readable({
read: common.mustCall(() => {
process.nextTick(() => {
readable.push(streamData.shift());
});
}, streamData.length),
});
// Use helper to convert it to a Web ReadableStream using Count strategy
const readableStream = Readable.toWeb(readable, {
strategy: new CountQueuingStrategy({ highWaterMark: 1 }),
});
assert(!readableStream.locked);
readableStream.getReader().read().then(common.mustCall());
}
{
const desireSizeExpected = 2;
const stringStream = new ReadableStream(
{
start(controller) {
// Check if the strategy is being assigned on the init of the ReadableStream
strictEqual(controller.desiredSize, desireSizeExpected);
controller.enqueue('a');
controller.enqueue('b');
controller.close();
},
},
new CountQueuingStrategy({ highWaterMark: desireSizeExpected })
);
const reader = stringStream.getReader();
reader.read().then(common.mustCall());
reader.read().then(common.mustCall());
reader.read().then(({ value, done }) => {
strictEqual(value, undefined);
strictEqual(done, true);
});
}