stream: add highWaterMark
for the map operator
this is done so we don't wait for the first items to finish before starting new ones Fixes: https://github.com/nodejs/node/issues/46132 Co-authored-by: Robert Nagy <ronagy@icloud.com> PR-URL: https://github.com/nodejs/node/pull/49249 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
This commit is contained in:
parent
da197d1890
commit
b0f423390f
@ -2012,6 +2012,10 @@ showBoth();
|
||||
added:
|
||||
- v17.4.0
|
||||
- v16.14.0
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/49249
|
||||
description: added `highWaterMark` in options.
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
@ -2025,6 +2029,8 @@ added:
|
||||
* `options` {Object}
|
||||
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
|
||||
on the stream at once. **Default:** `1`.
|
||||
* `highWaterMark` {number} how many items to buffer while waiting for user
|
||||
consumption of the mapped items. **Default:** `concurrency * 2 - 1`.
|
||||
* `signal` {AbortSignal} allows destroying the stream if the signal is
|
||||
aborted.
|
||||
* Returns: {Readable} a stream mapped with the function `fn`.
|
||||
@ -2059,6 +2065,10 @@ for await (const result of dnsResults) {
|
||||
added:
|
||||
- v17.4.0
|
||||
- v16.14.0
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/49249
|
||||
description: added `highWaterMark` in options.
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
@ -2071,6 +2081,8 @@ added:
|
||||
* `options` {Object}
|
||||
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
|
||||
on the stream at once. **Default:** `1`.
|
||||
* `highWaterMark` {number} how many items to buffer while waiting for user
|
||||
consumption of the filtered items. **Default:** `concurrency * 2 - 1`.
|
||||
* `signal` {AbortSignal} allows destroying the stream if the signal is
|
||||
aborted.
|
||||
* Returns: {Readable} a stream filtered with the predicate `fn`.
|
||||
|
@ -32,6 +32,7 @@ const {
|
||||
NumberIsNaN,
|
||||
Promise,
|
||||
PromiseReject,
|
||||
PromiseResolve,
|
||||
PromisePrototypeThen,
|
||||
Symbol,
|
||||
} = primordials;
|
||||
@ -81,7 +82,15 @@ function map(fn, options) {
|
||||
concurrency = MathFloor(options.concurrency);
|
||||
}
|
||||
|
||||
validateInteger(concurrency, 'concurrency', 1);
|
||||
let highWaterMark = concurrency - 1;
|
||||
if (options?.highWaterMark != null) {
|
||||
highWaterMark = MathFloor(options.highWaterMark);
|
||||
}
|
||||
|
||||
validateInteger(concurrency, 'options.concurrency', 1);
|
||||
validateInteger(highWaterMark, 'options.highWaterMark', 0);
|
||||
|
||||
highWaterMark += concurrency;
|
||||
|
||||
return async function* map() {
|
||||
const signal = AbortSignal.any([options?.signal].filter(Boolean));
|
||||
@ -92,9 +101,28 @@ function map(fn, options) {
|
||||
let next;
|
||||
let resume;
|
||||
let done = false;
|
||||
let cnt = 0;
|
||||
|
||||
function onDone() {
|
||||
function onCatch() {
|
||||
done = true;
|
||||
afterItemProcessed();
|
||||
}
|
||||
|
||||
function afterItemProcessed() {
|
||||
cnt -= 1;
|
||||
maybeResume();
|
||||
}
|
||||
|
||||
function maybeResume() {
|
||||
if (
|
||||
resume &&
|
||||
!done &&
|
||||
cnt < concurrency &&
|
||||
queue.length < highWaterMark
|
||||
) {
|
||||
resume();
|
||||
resume = null;
|
||||
}
|
||||
}
|
||||
|
||||
async function pump() {
|
||||
@ -110,17 +138,19 @@ function map(fn, options) {
|
||||
|
||||
try {
|
||||
val = fn(val, signalOpt);
|
||||
|
||||
if (val === kEmpty) {
|
||||
continue;
|
||||
}
|
||||
|
||||
val = PromiseResolve(val);
|
||||
} catch (err) {
|
||||
val = PromiseReject(err);
|
||||
}
|
||||
|
||||
if (val === kEmpty) {
|
||||
continue;
|
||||
}
|
||||
cnt += 1;
|
||||
|
||||
if (typeof val?.catch === 'function') {
|
||||
val.catch(onDone);
|
||||
}
|
||||
PromisePrototypeThen(val, afterItemProcessed, onCatch);
|
||||
|
||||
queue.push(val);
|
||||
if (next) {
|
||||
@ -128,7 +158,7 @@ function map(fn, options) {
|
||||
next = null;
|
||||
}
|
||||
|
||||
if (!done && queue.length && queue.length >= concurrency) {
|
||||
if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
|
||||
await new Promise((resolve) => {
|
||||
resume = resolve;
|
||||
});
|
||||
@ -137,7 +167,7 @@ function map(fn, options) {
|
||||
queue.push(kEof);
|
||||
} catch (err) {
|
||||
const val = PromiseReject(err);
|
||||
PromisePrototypeThen(val, undefined, onDone);
|
||||
PromisePrototypeThen(val, afterItemProcessed, onCatch);
|
||||
queue.push(val);
|
||||
} finally {
|
||||
done = true;
|
||||
@ -168,10 +198,7 @@ function map(fn, options) {
|
||||
}
|
||||
|
||||
queue.shift();
|
||||
if (resume) {
|
||||
resume();
|
||||
resume = null;
|
||||
}
|
||||
maybeResume();
|
||||
}
|
||||
|
||||
await new Promise((resolve) => {
|
||||
|
@ -96,7 +96,7 @@ const { once } = require('events');
|
||||
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
|
||||
calls++;
|
||||
await once(signal, 'abort');
|
||||
}, { signal: ac.signal, concurrency: 2 });
|
||||
}, { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
|
||||
// pump
|
||||
assert.rejects(async () => {
|
||||
await forEachPromise;
|
||||
|
@ -8,6 +8,25 @@ const assert = require('assert');
|
||||
const { once } = require('events');
|
||||
const { setTimeout } = require('timers/promises');
|
||||
|
||||
function createDependentPromises(n) {
|
||||
const promiseAndResolveArray = [];
|
||||
|
||||
for (let i = 0; i < n; i++) {
|
||||
let res;
|
||||
const promise = new Promise((resolve) => {
|
||||
if (i === 0) {
|
||||
res = resolve;
|
||||
return;
|
||||
}
|
||||
res = () => promiseAndResolveArray[i - 1][0].then(resolve);
|
||||
});
|
||||
|
||||
promiseAndResolveArray.push([promise, res]);
|
||||
}
|
||||
|
||||
return promiseAndResolveArray;
|
||||
}
|
||||
|
||||
{
|
||||
// Map works on synchronous streams with a synchronous mapper
|
||||
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
|
||||
@ -143,7 +162,7 @@ const { setTimeout } = require('timers/promises');
|
||||
const stream = range.map(common.mustCall(async (_, { signal }) => {
|
||||
await once(signal, 'abort');
|
||||
throw signal.reason;
|
||||
}, 2), { signal: ac.signal, concurrency: 2 });
|
||||
}, 2), { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
|
||||
// pump
|
||||
assert.rejects(async () => {
|
||||
for await (const item of stream) {
|
||||
@ -173,12 +192,164 @@ const { setTimeout } = require('timers/promises');
|
||||
})().then(common.mustCall());
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
// highWaterMark with small concurrency
|
||||
const finishOrder = [];
|
||||
|
||||
const promises = createDependentPromises(4);
|
||||
|
||||
const raw = Readable.from([2, 0, 1, 3]);
|
||||
const stream = raw.map(async (item) => {
|
||||
const [promise, resolve] = promises[item];
|
||||
resolve();
|
||||
|
||||
await promise;
|
||||
finishOrder.push(item);
|
||||
return item;
|
||||
}, { concurrency: 2 });
|
||||
|
||||
(async () => {
|
||||
await stream.toArray();
|
||||
|
||||
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]);
|
||||
})().then(common.mustCall(), common.mustNotCall());
|
||||
}
|
||||
|
||||
{
|
||||
// highWaterMark with a lot of items and large concurrency
|
||||
const finishOrder = [];
|
||||
|
||||
const promises = createDependentPromises(20);
|
||||
|
||||
const input = [10, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 11, 12, 13, 18, 15, 16, 17, 14, 19];
|
||||
const raw = Readable.from(input);
|
||||
// Should be
|
||||
// 10, 1, 0, 3, 4, 2 | next: 0
|
||||
// 10, 1, 3, 4, 2, 5 | next: 1
|
||||
// 10, 3, 4, 2, 5, 7 | next: 2
|
||||
// 10, 3, 4, 5, 7, 8 | next: 3
|
||||
// 10, 4, 5, 7, 8, 9 | next: 4
|
||||
// 10, 5, 7, 8, 9, 6 | next: 5
|
||||
// 10, 7, 8, 9, 6, 11 | next: 6
|
||||
// 10, 7, 8, 9, 11, 12 | next: 7
|
||||
// 10, 8, 9, 11, 12, 13 | next: 8
|
||||
// 10, 9, 11, 12, 13, 18 | next: 9
|
||||
// 10, 11, 12, 13, 18, 15 | next: 10
|
||||
// 11, 12, 13, 18, 15, 16 | next: 11
|
||||
// 12, 13, 18, 15, 16, 17 | next: 12
|
||||
// 13, 18, 15, 16, 17, 14 | next: 13
|
||||
// 18, 15, 16, 17, 14, 19 | next: 14
|
||||
// 18, 15, 16, 17, 19 | next: 15
|
||||
// 18, 16, 17, 19 | next: 16
|
||||
// 18, 17, 19 | next: 17
|
||||
// 18, 19 | next: 18
|
||||
// 19 | next: 19
|
||||
//
|
||||
|
||||
const stream = raw.map(async (item) => {
|
||||
const [promise, resolve] = promises[item];
|
||||
resolve();
|
||||
|
||||
await promise;
|
||||
finishOrder.push(item);
|
||||
return item;
|
||||
}, { concurrency: 6 });
|
||||
|
||||
(async () => {
|
||||
const outputOrder = await stream.toArray();
|
||||
|
||||
assert.deepStrictEqual(outputOrder, input);
|
||||
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
|
||||
})().then(common.mustCall(), common.mustNotCall());
|
||||
}
|
||||
|
||||
{
|
||||
// Custom highWaterMark with a lot of items and large concurrency
|
||||
const finishOrder = [];
|
||||
|
||||
const promises = createDependentPromises(20);
|
||||
|
||||
const input = [11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19];
|
||||
const raw = Readable.from(input);
|
||||
// Should be
|
||||
// 11, 1, 0, 3, 4 | next: 0, buffer: []
|
||||
// 11, 1, 3, 4, 2 | next: 1, buffer: [0]
|
||||
// 11, 3, 4, 2, 5 | next: 2, buffer: [0, 1]
|
||||
// 11, 3, 4, 5, 7 | next: 3, buffer: [0, 1, 2]
|
||||
// 11, 4, 5, 7, 8 | next: 4, buffer: [0, 1, 2, 3]
|
||||
// 11, 5, 7, 8, 9 | next: 5, buffer: [0, 1, 2, 3, 4]
|
||||
// 11, 7, 8, 9, 6 | next: 6, buffer: [0, 1, 2, 3, 4, 5]
|
||||
// 11, 7, 8, 9, 10 | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full
|
||||
// 11, 8, 9, 10, 12 | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6]
|
||||
// 11, 9, 10, 12, 13 | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6]
|
||||
// 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6]
|
||||
// 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6]
|
||||
// 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it
|
||||
// 13, 18, 15, 16, 17 | next: 13, buffer: []
|
||||
// 18, 15, 16, 17, 14 | next: 14, buffer: []
|
||||
// 18, 15, 16, 17, 19 | next: 15, buffer: [14]
|
||||
// 18, 16, 17, 19 | next: 16, buffer: [14, 15]
|
||||
// 18, 17, 19 | next: 17, buffer: [14, 15, 16]
|
||||
// 18, 19 | next: 18, buffer: [14, 15, 16, 17]
|
||||
// 19 | next: 19, buffer: [] -- all items flushed
|
||||
//
|
||||
|
||||
const stream = raw.map(async (item) => {
|
||||
const [promise, resolve] = promises[item];
|
||||
resolve();
|
||||
|
||||
await promise;
|
||||
finishOrder.push(item);
|
||||
return item;
|
||||
}, { concurrency: 5, highWaterMark: 7 });
|
||||
|
||||
(async () => {
|
||||
const outputOrder = await stream.toArray();
|
||||
|
||||
assert.deepStrictEqual(outputOrder, input);
|
||||
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
|
||||
})().then(common.mustCall(), common.mustNotCall());
|
||||
}
|
||||
|
||||
{
|
||||
// Where there is a delay between the first and the next item it should not wait for filled queue
|
||||
// before yielding to the user
|
||||
const promises = createDependentPromises(3);
|
||||
|
||||
const raw = Readable.from([0, 1, 2]);
|
||||
|
||||
const stream = raw
|
||||
.map(async (item) => {
|
||||
if (item !== 0) {
|
||||
await promises[item][0];
|
||||
}
|
||||
|
||||
return item;
|
||||
}, { concurrency: 2 })
|
||||
.map((item) => {
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
for (const [_, resolve] of promises) {
|
||||
resolve();
|
||||
}
|
||||
|
||||
return item;
|
||||
});
|
||||
|
||||
(async () => {
|
||||
await stream.toArray();
|
||||
})().then(common.mustCall(), common.mustNotCall());
|
||||
}
|
||||
|
||||
{
|
||||
// Error cases
|
||||
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
|
||||
assert.throws(() => Readable.from([1]).map((x) => x, {
|
||||
concurrency: 'Foo'
|
||||
}), /ERR_OUT_OF_RANGE/);
|
||||
assert.throws(() => Readable.from([1]).map((x) => x, {
|
||||
concurrency: -1
|
||||
}), /ERR_OUT_OF_RANGE/);
|
||||
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
|
||||
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user