stream: use ByteLengthQueuingStrategy
when not in objectMode
Fixes: https://github.com/nodejs/node/issues/46347 PR-URL: https://github.com/nodejs/node/pull/48847 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
06619aa058
commit
261e88e269
@ -31,6 +31,7 @@ const {
|
|||||||
|
|
||||||
const {
|
const {
|
||||||
CountQueuingStrategy,
|
CountQueuingStrategy,
|
||||||
|
ByteLengthQueuingStrategy,
|
||||||
} = require('internal/webstreams/queuingstrategies');
|
} = require('internal/webstreams/queuingstrategies');
|
||||||
|
|
||||||
const {
|
const {
|
||||||
@ -452,11 +453,7 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
|
|||||||
return new CountQueuingStrategy({ highWaterMark });
|
return new CountQueuingStrategy({ highWaterMark });
|
||||||
}
|
}
|
||||||
|
|
||||||
// When not running in objectMode explicitly, we just fall
|
return new ByteLengthQueuingStrategy({ highWaterMark });
|
||||||
// back to a minimal strategy that just specifies the highWaterMark
|
|
||||||
// and no size algorithm. Using a ByteLengthQueuingStrategy here
|
|
||||||
// is unnecessary.
|
|
||||||
return { highWaterMark };
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const strategy = evaluateStrategyOrFallback(options?.strategy);
|
const strategy = evaluateStrategyOrFallback(options?.strategy);
|
||||||
|
62
test/parallel/test-stream-readable-to-web.js
Normal file
62
test/parallel/test-stream-readable-to-web.js
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
'use strict';
|
||||||
|
const common = require('../common');
|
||||||
|
if (!common.hasCrypto) { common.skip('missing crypto'); }
|
||||||
|
|
||||||
|
const { Readable } = require('stream');
|
||||||
|
const process = require('process');
|
||||||
|
const { randomBytes } = require('crypto');
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
|
// Based on: https://github.com/nodejs/node/issues/46347#issuecomment-1413886707
|
||||||
|
// edit: make it cross-platform as /dev/urandom is not available on Windows
|
||||||
|
{
|
||||||
|
let currentMemoryUsage = process.memoryUsage().arrayBuffers;
|
||||||
|
|
||||||
|
// We initialize a stream, but not start consuming it
|
||||||
|
const randomNodeStream = new Readable({
|
||||||
|
read(size) {
|
||||||
|
randomBytes(size, (err, buffer) => {
|
||||||
|
if (err) {
|
||||||
|
// If an error occurs, emit an 'error' event
|
||||||
|
this.emit('error', err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push the random bytes to the stream
|
||||||
|
this.push(buffer);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// after 2 seconds, it'll get converted to web stream
|
||||||
|
let randomWebStream;
|
||||||
|
|
||||||
|
// We check memory usage every second
|
||||||
|
// since it's a stream, it shouldn't be higher than the chunk size
|
||||||
|
const reportMemoryUsage = () => {
|
||||||
|
const { arrayBuffers } = process.memoryUsage();
|
||||||
|
currentMemoryUsage = arrayBuffers;
|
||||||
|
|
||||||
|
assert(currentMemoryUsage <= 256 * 1024 * 1024);
|
||||||
|
};
|
||||||
|
setInterval(reportMemoryUsage, 1000);
|
||||||
|
|
||||||
|
// after 1 second we use Readable.toWeb
|
||||||
|
// memory usage should stay pretty much the same since it's still a stream
|
||||||
|
setTimeout(() => {
|
||||||
|
randomWebStream = Readable.toWeb(randomNodeStream);
|
||||||
|
}, 1000);
|
||||||
|
|
||||||
|
// after 2 seconds we start consuming the stream
|
||||||
|
// memory usage will grow, but the old chunks should be garbage-collected pretty quickly
|
||||||
|
setTimeout(async () => {
|
||||||
|
// eslint-disable-next-line no-unused-vars
|
||||||
|
for await (const _ of randomWebStream) {
|
||||||
|
// Do nothing, just let the stream flow
|
||||||
|
}
|
||||||
|
}, 2000);
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
// Test considered passed if we don't crash
|
||||||
|
process.exit(0);
|
||||||
|
}, 5000);
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user