Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add map method to Readable: #40815

Closed
wants to merge 19 commits into from
44 changes: 44 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1737,6 +1737,50 @@ async function showBoth() {
showBoth();
```

### `readable.map(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to map over every item in the stream.
Linkgoron marked this conversation as resolved.
Show resolved Hide resolved
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream mapped with the function `fn`.

This method allows mapping over the stream. The `fn` function will be called
for every item in the stream. If the `fn` function returns a promise - that
promise will be `await`ed before being passed to the result stream.

ronag marked this conversation as resolved.
Show resolved Hide resolved
```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous mapper.
for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(item); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
console.log(result); // Logs the DNS result of resolver.resolve4.
}
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
152 changes: 152 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
'use strict';

const { AbortController } = require('internal/abort_controller');
const {
codes: {
ERR_INVALID_ARG_TYPE,
},
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');

const {
MathFloor,
Promise,
PromiseReject,
PromisePrototypeCatch,
Symbol,
} = primordials;

const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');

async function * map(fn, options) {
ronag marked this conversation as resolved.
Show resolved Hide resolved
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this);
}

if (options != null && typeof options !== 'object') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateObject(options, 'options', { nullable: true });

throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}

let concurrency = 1;
if (options?.concurrency != null) {
concurrency = MathFloor(options.concurrency);
}

validateInteger(concurrency, 'concurrency', 1);

const ac = new AbortController();
const stream = this;
const queue = [];
const signal = ac.signal;
const signalOpt = { signal };

const abort = () => ac.abort();
options?.signal?.addEventListener('abort', abort);

let next;
let resume;
let done = false;

function onDone() {
done = true;
}

async function pump() {
try {
for await (let val of stream) {
ronag marked this conversation as resolved.
Show resolved Hide resolved
if (done) {
return;
}

if (signal.aborted) {
throw new AbortError();
}

try {
val = fn(val, signalOpt);
} catch (err) {
val = PromiseReject(err);
}

if (val === kEmpty) {
continue;
}

if (typeof val?.catch === 'function') {
val.catch(onDone);
}

queue.push(val);
if (next) {
next();
next = null;
}

if (!done && queue.length && queue.length >= concurrency) {
await new Promise((resolve) => {
resume = resolve;
});
}
}
queue.push(kEof);
} catch (err) {
const val = PromiseReject(err);
PromisePrototypeCatch(val, onDone);
queue.push(val);
} finally {
done = true;
if (next) {
next();
next = null;
}
options?.signal?.removeEventListener('abort', abort);
}
}

pump();

try {
while (true) {
Linkgoron marked this conversation as resolved.
Show resolved Hide resolved
while (queue.length > 0) {
const val = await queue[0];

if (val === kEof) {
return;
}

if (signal.aborted) {
throw new AbortError();
}

if (val !== kEmpty) {
yield val;
}

queue.shift();
if (resume) {
resume();
resume = null;
}
}

await new Promise((resolve) => {
next = resolve;
});
}
} finally {
ac.abort();

done = true;
if (resume) {
resume();
resume = null;
}
}
}

module.exports = {
map,
};
9 changes: 9 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@

const {
ObjectDefineProperty,
ObjectKeys,
ReflectApply,
} = primordials;

const {
promisify: { custom: customPromisify },
} = require('internal/util');

const operators = require('internal/streams/operators');
const compose = require('internal/streams/compose');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
Expand All @@ -42,6 +45,12 @@ const Stream = module.exports = require('internal/streams/legacy').Stream;
Stream.isDisturbed = utils.isDisturbed;
Stream.isErrored = utils.isErrored;
Stream.Readable = require('internal/streams/readable');
for (const key of ObjectKeys(operators)) {
ronag marked this conversation as resolved.
Show resolved Hide resolved
const op = operators[key];
Stream.Readable.prototype[key] = function(...args) {
return Stream.Readable.from(ReflectApply(op, this, args));
};
}
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ const expectedModules = new Set([
'NativeModule internal/streams/end-of-stream',
'NativeModule internal/streams/from',
'NativeModule internal/streams/legacy',
'NativeModule internal/streams/operators',
'NativeModule internal/streams/passthrough',
'NativeModule internal/streams/pipeline',
'NativeModule internal/streams/readable',
Expand Down
108 changes: 108 additions & 0 deletions test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
'use strict';
benjamingr marked this conversation as resolved.
Show resolved Hide resolved

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');

{
// Map works on synchronous streams with a synchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
const result = [2, 4, 6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Map works on synchronous streams with an asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
await Promise.resolve();
return x + x;
});
const result = [2, 4, 6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Map works on asynchronous streams with a asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
return x + x;
}).map((x) => x + x);
const result = [4, 8, 12, 16, 20];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
}, { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());

setImmediate(() => {
ac.abort();
assert.strictEqual(calls, 2);
});
}

{
// Concurrency result order
const stream = Readable.from([1, 2]).map(async (item, { signal }) => {
await setTimeout(10 - item, { signal });
return item;
}, { concurrency: 2 });

(async () => {
const expected = [1, 2];
for await (const item of stream) {
assert.strictEqual(item, expected.shift());
}
})().then(common.mustCall());
}

{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).map(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).map((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).map((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
// Test result is a Readable
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);
assert.strictEqual(stream.readable, true);
}
6 changes: 3 additions & 3 deletions tools/doc/type-parser.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ const customTypesMap = {

'Stream': 'stream.html#stream',
'stream.Duplex': 'stream.html#class-streamduplex',
'stream.Readable': 'stream.html#class-streamreadable',
'stream.Transform': 'stream.html#class-streamtransform',
'stream.Writable': 'stream.html#class-streamwritable',
'Duplex': 'stream.html#class-streamduplex',
'stream.Readable': 'stream.html#class-streamreadable',
'Readable': 'stream.html#class-streamreadable',
'stream.Transform': 'stream.html#class-streamtransform',
'Transform': 'stream.html#class-streamtransform',
'stream.Writable': 'stream.html#class-streamwritable',
'Writable': 'stream.html#class-streamwritable',

'Immediate': 'timers.html#class-immediate',
Expand Down