stream: add drop and take
This adds the `drop` and `take` methods to readable streams allowing users easily drop and take items from the stream. This continues the iterator-helper proposal alignment task. Co-Authored-By: Robert Nagy <ronagy@icloud.com> PR-URL: https://github.com/nodejs/node/pull/41630 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
3d27a04b56
commit
a8afe26fca
@ -2074,6 +2074,50 @@ for await (const result of concatResult) {
|
||||
}
|
||||
```
|
||||
|
||||
### `readable.drop(limit[, options])`
|
||||
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
|
||||
* `limit` {number} the number of chunks to drop from the readable.
|
||||
* `options` {Object}
|
||||
* `signal` {AbortSignal} allows destroying the stream if the signal is
|
||||
aborted.
|
||||
* Returns: {Readable} a stream with `limit` chunks dropped.
|
||||
|
||||
This method returns a new stream with the first `limit` chunks dropped.
|
||||
|
||||
```mjs
|
||||
import { Readable } from 'stream';
|
||||
|
||||
await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
|
||||
```
|
||||
|
||||
### `readable.take(limit[, options])`
|
||||
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1 - Experimental
|
||||
|
||||
* `limit` {number} the number of chunks to take from the readable.
|
||||
* `options` {Object}
|
||||
* `signal` {AbortSignal} allows destroying the stream if the signal is
|
||||
aborted.
|
||||
* Returns: {Readable} a stream with `limit` chunks taken.
|
||||
|
||||
This method returns a new stream with the first `limit` chunks.
|
||||
|
||||
```mjs
|
||||
import { Readable } from 'stream';
|
||||
|
||||
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
|
||||
```
|
||||
|
||||
### Duplex and transform streams
|
||||
|
||||
#### Class: `stream.Duplex`
|
||||
|
@ -5,6 +5,7 @@ const { AbortController } = require('internal/abort_controller');
|
||||
const {
|
||||
codes: {
|
||||
ERR_INVALID_ARG_TYPE,
|
||||
ERR_OUT_OF_RANGE,
|
||||
},
|
||||
AbortError,
|
||||
} = require('internal/errors');
|
||||
@ -14,6 +15,8 @@ const { kWeakHandler } = require('internal/event_target');
|
||||
const {
|
||||
ArrayPrototypePush,
|
||||
MathFloor,
|
||||
Number,
|
||||
NumberIsNaN,
|
||||
Promise,
|
||||
PromiseReject,
|
||||
PromisePrototypeCatch,
|
||||
@ -232,10 +235,62 @@ async function* flatMap(fn, options) {
|
||||
}
|
||||
}
|
||||
|
||||
function toIntegerOrInfinity(number) {
|
||||
// We coerce here to align with the spec
|
||||
// https://github.com/tc39/proposal-iterator-helpers/issues/169
|
||||
number = Number(number);
|
||||
if (NumberIsNaN(number)) {
|
||||
return 0;
|
||||
}
|
||||
if (number < 0) {
|
||||
throw new ERR_OUT_OF_RANGE('number', '>= 0', number);
|
||||
}
|
||||
return number;
|
||||
}
|
||||
|
||||
function drop(number, options) {
|
||||
number = toIntegerOrInfinity(number);
|
||||
return async function* drop() {
|
||||
if (options?.signal?.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
for await (const val of this) {
|
||||
if (options?.signal?.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
if (number-- <= 0) {
|
||||
yield val;
|
||||
}
|
||||
}
|
||||
}.call(this);
|
||||
}
|
||||
|
||||
|
||||
function take(number, options) {
|
||||
number = toIntegerOrInfinity(number);
|
||||
return async function* take() {
|
||||
if (options?.signal?.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
for await (const val of this) {
|
||||
if (options?.signal?.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
if (number-- > 0) {
|
||||
yield val;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}.call(this);
|
||||
}
|
||||
|
||||
module.exports.streamReturningOperators = {
|
||||
drop,
|
||||
filter,
|
||||
flatMap,
|
||||
map,
|
||||
take,
|
||||
};
|
||||
|
||||
module.exports.promiseReturningOperators = {
|
||||
|
96
test/parallel/test-stream-drop-take.js
Normal file
96
test/parallel/test-stream-drop-take.js
Normal file
@ -0,0 +1,96 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const {
|
||||
Readable,
|
||||
} = require('stream');
|
||||
const { deepStrictEqual, rejects, throws } = require('assert');
|
||||
|
||||
const { from } = Readable;
|
||||
|
||||
const fromAsync = (...args) => from(...args).map(async (x) => x);
|
||||
|
||||
const naturals = () => from(async function*() {
|
||||
let i = 1;
|
||||
while (true) {
|
||||
yield i++;
|
||||
}
|
||||
}());
|
||||
|
||||
{
|
||||
// Synchronous streams
|
||||
(async () => {
|
||||
deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
|
||||
deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
|
||||
deepStrictEqual(await from([]).drop(2).toArray(), []);
|
||||
deepStrictEqual(await from([]).take(1).toArray(), []);
|
||||
deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
|
||||
deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
|
||||
deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
|
||||
})().then(common.mustCall());
|
||||
// Asynchronous streams
|
||||
(async () => {
|
||||
deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
|
||||
deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
|
||||
deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
|
||||
deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
|
||||
deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
|
||||
deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
|
||||
deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
|
||||
})().then(common.mustCall());
|
||||
// Infinite streams
|
||||
// Asynchronous streams
|
||||
(async () => {
|
||||
deepStrictEqual(await naturals().take(1).toArray(), [1]);
|
||||
deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
|
||||
const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
|
||||
deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
|
||||
deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
|
||||
})().then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
// Coercion
|
||||
(async () => {
|
||||
// The spec made me do this ^^
|
||||
deepStrictEqual(await naturals().take('cat').toArray(), []);
|
||||
deepStrictEqual(await naturals().take('2').toArray(), [1, 2]);
|
||||
deepStrictEqual(await naturals().take(true).toArray(), [1]);
|
||||
})().then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
// Support for AbortSignal
|
||||
const ac = new AbortController();
|
||||
rejects(
|
||||
Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), {
|
||||
name: 'AbortError',
|
||||
}).then(common.mustCall());
|
||||
rejects(
|
||||
Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), {
|
||||
name: 'AbortError',
|
||||
}).then(common.mustCall());
|
||||
ac.abort();
|
||||
}
|
||||
|
||||
{
|
||||
// Support for AbortSignal, already aborted
|
||||
const signal = AbortSignal.abort();
|
||||
rejects(
|
||||
Readable.from([1, 2, 3]).take(1, { signal }).toArray(), {
|
||||
name: 'AbortError',
|
||||
}).then(common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
// Error cases
|
||||
const invalidArgs = [
|
||||
-1,
|
||||
-Infinity,
|
||||
-40,
|
||||
];
|
||||
|
||||
for (const example of invalidArgs) {
|
||||
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user