Skip to content

Commit

Permalink
stream: support some and every
Browse files Browse the repository at this point in the history
This continues on the iterator-helpers work by adding `.some` and
`.every` to readable streams.

Co-Authored-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
benjamingr and ronag committed Jan 20, 2022
1 parent 5a407d6 commit f159202
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 1 deletion.
98 changes: 97 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1918,7 +1918,7 @@ import { Resolver } from 'dns/promises';
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

// Make dns queries concurrently using .map and collect
// the results into an aray using toArray
// the results into an array using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
Expand All @@ -1929,6 +1929,102 @@ const dnsResults = await Readable.from([
}, { concurrency: 2 }).toArray();
```

### `readable.some(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
value for some of the chunks.

This method is similar to `Array.prototype.some` and calls `fn` on each chunk
in the stream until one item returns true (or any truthy value). Once an `fn`
call on a chunk returns a truthy value the stream is destroyed and the promise
is fulfilled with `true`. If none of the `fn` calls on the chunks return a
truthy value the promise is fulfilled with `false`.

```mjs
import { Readable } from 'stream';
import { stat } from 'fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false

// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).some(async (fileName) => {
const stats = await stat(fileName);
return stat.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished
```

### `readable.every(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
value for all of the chunks.

This method is similar to `Array.prototype.every` and calls `fn` on each chunk
in the stream to check if they all return a truthy value for `fn`. Once an `fn`
call on a chunk returns a falsy value the stream is destroyed and the promise
is fulfilled with `false`. If all of the `fn` calls on the chunks return a
truthy value the promise is fulfilled with `true`.

```mjs
import { Readable } from 'stream';
import { stat } from 'fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true

// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
'file1',
'file2',
'file3',
]).every(async (fileName) => {
const stats = await stat(fileName);
return stat.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if all files in the list are bigger than 1MiB
console.log('done'); // Stream has finished
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
37 changes: 37 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');

const {
ArrayPrototypePush,
Expand Down Expand Up @@ -150,6 +151,40 @@ async function * map(fn, options) {
}
}

async function some(fn, options) {
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
// Note that some does short circuit but also closes the iterator if it does
const ac = new AbortController();
if (options?.signal) {
if (options.signal.aborted) {
ac.abort();
}
options.signal.addEventListener('abort', () => ac.abort(), {
[kWeakHandler]: this,
once: true,
});
}
const mapped = this.map(fn, { ...options, signal: ac.signal });
for await (const result of mapped) {
if (result) {
ac.abort();
return true;
}
}
return false;
}

async function every(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
}
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
return !(await some.call(this, async (x) => {
return !(await fn(x));
}, options));
}

async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -196,6 +231,8 @@ module.exports.streamReturningOperators = {
};

module.exports.promiseReturningOperators = {
every,
forEach,
toArray,
some,
};
99 changes: 99 additions & 0 deletions test/parallel/test-stream-some-every.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');

function oneTo5() {
return Readable.from([1, 2, 3, 4, 5]);
}

function oneTo5Async() {
return oneTo5().map(async (x) => {
await Promise.resolve();
return x;
});
}
{
// Some and every work with a synchronous stream and predicate
(async () => {
assert.strictEqual(await oneTo5().some((x) => x > 3), true);
assert.strictEqual(await oneTo5().every((x) => x > 3), false);
assert.strictEqual(await oneTo5().some((x) => x > 6), false);
assert.strictEqual(await oneTo5().every((x) => x < 6), true);
assert.strictEqual(await Readable.from([]).some((x) => true), false);
assert.strictEqual(await Readable.from([]).every((x) => true), true);
})().then(common.mustCall());
}

{
// Some and every work with an asynchronous stream and synchronous predicate
(async () => {
assert.strictEqual(await oneTo5Async().some((x) => x > 3), true);
assert.strictEqual(await oneTo5Async().every((x) => x > 3), false);
assert.strictEqual(await oneTo5Async().some((x) => x > 6), false);
assert.strictEqual(await oneTo5Async().every((x) => x < 6), true);
})().then(common.mustCall());
}

{
// Some and every work on asynchronous streams with an asynchronous predicate
(async () => {
assert.strictEqual(await oneTo5().some(async (x) => x > 3), true);
assert.strictEqual(await oneTo5().every(async (x) => x > 3), false);
assert.strictEqual(await oneTo5().some(async (x) => x > 6), false);
assert.strictEqual(await oneTo5().every(async (x) => x < 6), true);
})().then(common.mustCall());
}

{
// Some and every short circuit
(async () => {
await oneTo5().some(common.mustCall((x) => x > 2, 3));
await oneTo5().every(common.mustCall((x) => x < 3, 3));
// When short circuit isn't possible the whole stream is iterated
await oneTo5().some(common.mustCall((x) => x > 6, 5));
// The stream is destroyed afterwards
const stream = oneTo5();
await stream.some(common.mustCall((x) => x > 2, 3));
assert.strictEqual(stream.destroyed, true);
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
assert.rejects(async () => {
await Readable.from([1, 2, 3]).some(
async (x) => new Promise(() => {}),
{ signal: ac.signal });
}, {
name: 'AbortError',
});
ac.abort();
}
{
// Support for pre-aborted AbortSignal
const ac = new AbortController();
ac.abort();
assert.rejects(async () => {
await Readable.from([1, 2, 3]).some(
async (x) => new Promise(() => {}),
{ signal: ac.signal });
}, {
name: 'AbortError',
});
}
{
// Error cases
assert.rejects(async () => {
await Readable.from([1]).every(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1]).every((x) => x, {
concurrency: 'Foo'
});
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
}

0 comments on commit f159202

Please sign in to comment.