stream: add filter method to readable

This continues the work in https://github.com/nodejs/node/pull/40815 to
make streams compatible with upcoming ECMAScript language features. It
adds an experimental `filter` api to streams and tests/docs for it.

See https://github.com/tc39/proposal-iterator-helpers/

Co-Authored-By: Robert Nagy <ronagy@icloud.com>

PR-URL: https://github.com/nodejs/node/pull/41354
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Benjamin Gruenbaum 2021-12-30 12:14:03 +02:00
parent 231ec0adf2
commit 55c5120b07
3 changed files with 172 additions and 0 deletions

View File

@ -1781,6 +1781,55 @@ for await (const result of dnsResults) {
}
```
### `readable.filter(fn[, options])`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to filter items from stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream filtered with the predicate `fn`.
This method allows filtering the stream. For each item in the stream the `fn`
function will be called and if it returns a truthy value, the item will be
passed to the result stream. If the `fn` function returns a promise - that
promise will be `await`ed.
```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';
// With a synchronous predicate.
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(item); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).filter(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
// Logs domains with more than 60 seconds on the resolved dns record.
console.log(result);
}
```
### Duplex and transform streams
#### Class: `stream.Duplex`

View File

@ -147,6 +147,20 @@ async function * map(fn, options) {
}
}
async function * filter(fn, options) {
if (typeof fn !== 'function') {
throw (new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this));
}
async function filterFn(value, options) {
if (await fn(value, options)) {
return value;
}
return kEmpty;
}
yield* this.map(filterFn, options);
}
module.exports = {
map,
filter
};

View File

@ -0,0 +1,109 @@
'use strict';
const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');
{
// Filter works on synchronous streams with a synchronous predicate
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => x < 3);
const result = [1, 2];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}
{
// Filter works on synchronous streams with an asynchronous predicate
const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => {
await Promise.resolve();
return x > 3;
});
const result = [4, 5];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}
{
// Map works on asynchronous streams with a asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
await Promise.resolve();
return x + x;
}).filter((x) => x > 5);
const result = [6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}
{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
}, { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
setImmediate(() => {
ac.abort();
assert.strictEqual(calls, 2);
});
}
{
// Concurrency result order
const stream = Readable.from([1, 2]).filter(async (item, { signal }) => {
await setTimeout(10 - item, { signal });
return true;
}, { concurrency: 2 });
(async () => {
const expected = [1, 2];
for await (const item of stream) {
assert.strictEqual(item, expected.shift());
}
})().then(common.mustCall());
}
{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).filter(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).filter((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).filter((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
// Test result is a Readable
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true);
assert.strictEqual(stream.readable, true);
}