diff --git a/.gitignore b/.gitignore index 84dc48e..7ad9e67 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,9 @@ node_modules -coverage -.nyc_output -package-lock.json +build dist +.docs +.coverage +node_modules +package-lock.json +yarn.lock +.vscode diff --git a/README.md b/README.md index 4391173..0e2f5c4 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,18 @@ # it-pipe [![codecov](https://img.shields.io/codecov/c/github/alanshaw/it-pipe.svg?style=flat-square)](https://codecov.io/gh/alanshaw/it-pipe) -[![CI](https://img.shields.io/github/workflow/status/alanshaw/it-pipe/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/alanshaw/it-pipe/actions/workflows/js-test-and-release.yml) +[![CI](https://img.shields.io/github/actions/workflow/status/alanshaw/it-pipe/js-test-and-release.yml?branch=master\&style=flat-square)](https://github.com/alanshaw/it-pipe/actions/workflows/js-test-and-release.yml?query=branch%3Amaster) > Utility to "pipe" async iterables together ## Table of contents - [Install](#install) + - [Browser ` +``` + Based on this definition of streaming iterables . Almost identical to the [`pipeline`](https://github.com/bustle/streaming-iterables#pipeline) function from the [`streaming-iterables`](https://www.npmjs.com/package/streaming-iterables) module except that it supports duplex streams *and* will automatically wrap a "source" as the first param in a function. @@ -65,9 +74,9 @@ Note: - `firstFn` may be a `Function` or an `Iterable` - `firstFn` or any of `fns` may be a [duplex object](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it) (an object with a `sink` and `source`). -## Contribute +## API Docs -Feel free to dive in! [Open an issue](https://github.com/alanshaw/it-pipe/issues/new) or submit PRs. +- ## License diff --git a/package.json b/package.json index be45c41..5a55c05 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "types": "./dist/src/index.d.ts", "files": [ "src", - "dist/src", + "dist", "!dist/test", "!**/*.tsbuildinfo" ], @@ -144,18 +144,20 @@ "test:firefox-webworker": "aegir test -t webworker -- --browser firefox", "test:node": "aegir test -t node --cov", "test:electron-main": "aegir test -t electron-main", - "release": "aegir release" + "release": "aegir release", + "docs": "aegir docs" }, "dependencies": { - "it-merge": "^2.0.0", - "it-pushable": "^3.1.0", - "it-stream-types": "^1.0.3" + "it-merge": "^3.0.0", + "it-pushable": "^3.1.0" }, "devDependencies": { - "aegir": "^37.4.8", + "aegir": "^38.1.8", "delay": "^5.0.0", - "it-all": "^2.0.0", - "it-drain": "^2.0.0", + "it-all": "^3.0.0", + "it-drain": "^3.0.0", + "it-map": "^3.0.0", + "it-stream-types": "^1.0.3", "p-defer": "^4.0.0", "streaming-iterables": "^7.0.4" } diff --git a/src/index.ts b/src/index.ts index 0154855..219f7f4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,8 +1,7 @@ import { pushable } from 'it-pushable' import merge from 'it-merge' -import type * as it from 'it-stream-types' -export const rawPipe = (...fns: any) => { +export const rawPipe = (...fns: any): any => { let res while (fns.length > 0) { res = fns.shift()(res) @@ -10,24 +9,28 @@ export const rawPipe = (...fns: any) => { return res } -export const isIterable = (obj: any): obj is it.Source => { - return obj != null && ( - typeof obj[Symbol.asyncIterator] === 'function' || - typeof obj[Symbol.iterator] === 'function' || - typeof obj.next === 'function' // Probably, right? - ) +const isAsyncIterable = (obj: any): obj is AsyncIterable => { + return obj?.[Symbol.asyncIterator] != null } -export const isDuplex = > (obj: any): obj is it.Duplex => { - return obj != null && typeof obj.sink === 'function' && isIterable(obj.source) +const isIterable = (obj: any): obj is Iterable => { + return obj?.[Symbol.iterator] != null } -const duplexPipelineFn = (duplex: it.Duplex) => { - return (source: any): it.Source => { +const isDuplex = (obj: any): obj is Duplex => { + if (obj == null) { + return false + } + + return obj.sink != null && obj.source != null +} + +const duplexPipelineFn = (duplex: Duplex) => { + return (source: any) => { const p = duplex.sink(source) - if (p.then != null) { - const stream = pushable({ + if (p?.then != null) { + const stream = pushable({ objectMode: true }) p.then(() => { @@ -36,9 +39,21 @@ const duplexPipelineFn = (duplex: it.Duplex) => { stream.end(err) }) - const sourceWrap = async function * () { - yield * duplex.source - stream.end() + let sourceWrap: () => Iterable | AsyncIterable + const source = duplex.source + + if (isAsyncIterable(source)) { + sourceWrap = async function * () { + yield * source + stream.end() + } + } else if (isIterable(source)) { + sourceWrap = function * () { + yield * source + stream.end() + } + } else { + throw new Error('Unknown duplex source type - must be Iterable or AsyncIterable') } return merge(stream, sourceWrap()) @@ -48,102 +63,284 @@ const duplexPipelineFn = (duplex: it.Duplex) => { } } -export type Source = it.Source | (() => it.Source) | it.Duplex -export type Transform = it.Transform | it.Duplex -export type Sink = it.Sink | it.Duplex - -export function pipe ( - first: Source -): it.Source - -export function pipe ( - first: Source, - second: Sink -): B - -export function pipe ( - first: Source, - second: Transform, - third: Sink -): C - -export function pipe ( - first: Source, - second: Transform, - third: Transform, - fourth: Sink -): D - -export function pipe ( - first: Source, - second: Transform, - third: Transform, - fourth: Transform, - fifth: Sink -): E - -export function pipe ( - first: Source, - second: Transform, - third: Transform, - fourth: Transform, - fifth: Transform, - sixth: Sink -): F - -export function pipe ( - first: Source, - second: Transform, - third: Transform, - fourth: Transform, - fifth: Transform, - sixth: Transform, - seventh: Sink -): G - -export function pipe ( - first: Source, - second: Transform, - third: Transform, - fourth: Transform, - fifth: Transform, - sixth: Transform, - seventh: Transform, - eighth: Sink -): H - -export function pipe ( - first: Source, - second: Transform, - third: Transform, - fourth: Transform, - fifth: Transform, - sixth: Transform, - seventh: Transform, - eighth: Transform, - ninth: Sink -): I - -export function pipe ( - first: Source, - second: Transform, - third: Transform, - fourth: Transform, - fifth: Transform, - sixth: Transform, - seventh: Transform, - eighth: Transform, - ninth: Transform, - tenth: Sink -): J +export interface Duplex { + source: TSource + sink: (source: TSink) => RSink +} + +export interface FnSource { (source: A): Iterable } +export interface FnAsyncSource { (source: A): AsyncIterable } + +export interface FnSink { (source: A): B } +export interface FnAsyncSink { (source: A): Promise } + +type PipeSource = + Iterable | + AsyncIterable | + FnSource | + FnAsyncSource | + Duplex + +type PipeTransform | AsyncIterable = any> = + FnSource | + FnAsyncSource | + Duplex + +type PipeSink = + FnSink | + FnAsyncSink | + Duplex + +type PipeOutput = + A extends FnSink ? ReturnType : + A extends FnAsyncSink ? ReturnType : + A extends Duplex ? ReturnType : + never + +// single item pipe output includes pipe source types +type SingleItemPipeOutput = + A extends Iterable ? A : + A extends AsyncIterable ? A : + A extends FnSource ? ReturnType : + A extends FnAsyncSource ? ReturnType : + A extends Duplex ? A['source'] : + PipeOutput + +type PipeFnInput = + A extends Iterable ? A : + A extends AsyncIterable ? A : + A extends FnSource ? ReturnType : + A extends FnAsyncSource ? ReturnType : + A extends Duplex ? A['source'] : + never + +// one item, just a pass-through +export function pipe< + A extends PipeSource +> ( + source: A +): SingleItemPipeOutput + +// two items, source to sink +export function pipe< + A extends PipeSource, + B extends PipeSink> +> ( + source: A, + sink: B +): PipeOutput + +// three items, source to sink with transform(s) in between +export function pipe< + A extends PipeSource, + B extends PipeTransform>, + C extends PipeSink> +> ( + source: A, + transform1: B, + sink: C +): PipeOutput + +// many items, source to sink with transform(s) in between +export function pipe< + A extends PipeSource, + B extends PipeTransform>, + C extends PipeTransform>, + D extends PipeSink> +> ( + source: A, + transform1: B, + transform2: C, + sink: D +): PipeOutput + +// lots of items, source to sink with transform(s) in between +export function pipe< + A extends PipeSource, + B extends PipeTransform>, + C extends PipeTransform>, + D extends PipeTransform>, + E extends PipeSink> +> ( + source: A, + transform1: B, + transform2: C, + transform3: D, + sink: E +): PipeOutput + +// lots of items, source to sink with transform(s) in between +export function pipe< + A extends PipeSource, + B extends PipeTransform>, + C extends PipeTransform>, + D extends PipeTransform>, + E extends PipeTransform>, + F extends PipeSink> +> ( + source: A, + transform1: B, + transform2: C, + transform3: D, + transform4: E, + sink: F +): PipeOutput + +// lots of items, source to sink with transform(s) in between +export function pipe< + A extends PipeSource, + B extends PipeTransform>, + C extends PipeTransform>, + D extends PipeTransform>, + E extends PipeTransform>, + F extends PipeTransform>, + G extends PipeSink> +> ( + source: A, + transform1: B, + transform2: C, + transform3: D, + transform4: E, + transform5: F, + sink: G +): PipeOutput + +// lots of items, source to sink with transform(s) in between +export function pipe< + A extends PipeSource, + B extends PipeTransform>, + C extends PipeTransform>, + D extends PipeTransform>, + E extends PipeTransform>, + F extends PipeTransform>, + G extends PipeTransform>, + H extends PipeSink> +> ( + source: A, + transform1: B, + transform2: C, + transform3: D, + transform4: E, + transform5: F, + transform6: G, + sink: H +): PipeOutput + +// lots of items, source to sink with transform(s) in between +export function pipe< + A extends PipeSource, + B extends PipeTransform>, + C extends PipeTransform>, + D extends PipeTransform>, + E extends PipeTransform>, + F extends PipeTransform>, + G extends PipeTransform>, + H extends PipeTransform>, + I extends PipeSink> +> ( + source: A, + transform1: B, + transform2: C, + transform3: D, + transform4: E, + transform5: F, + transform6: G, + transform7: H, + sink: I +): PipeOutput + +// lots of items, source to sink with transform(s) in between +export function pipe< + A extends PipeSource, + B extends PipeTransform>, + C extends PipeTransform>, + D extends PipeTransform>, + E extends PipeTransform>, + F extends PipeTransform>, + G extends PipeTransform>, + H extends PipeTransform>, + I extends PipeTransform>, + J extends PipeSink> +> ( + source: A, + transform1: B, + transform2: C, + transform3: D, + transform4: E, + transform5: F, + transform6: G, + transform7: H, + transform8: I, + sink: J +): PipeOutput + +// lots of items, source to sink with transform(s) in between +export function pipe< + A extends PipeSource, + B extends PipeTransform>, + C extends PipeTransform>, + D extends PipeTransform>, + E extends PipeTransform>, + F extends PipeTransform>, + G extends PipeTransform>, + H extends PipeTransform>, + I extends PipeTransform>, + J extends PipeTransform>, + K extends PipeSink> +> ( + source: A, + transform1: B, + transform2: C, + transform3: D, + transform4: E, + transform5: F, + transform6: G, + transform7: H, + transform8: I, + transform9: J, + sink: K +): PipeOutput + +// lots of items, source to sink with transform(s) in between +export function pipe< + A extends PipeSource, + B extends PipeTransform>, + C extends PipeTransform>, + D extends PipeTransform>, + E extends PipeTransform>, + F extends PipeTransform>, + G extends PipeTransform>, + H extends PipeTransform>, + I extends PipeTransform>, + J extends PipeTransform>, + K extends PipeTransform>, + L extends PipeSink> +> ( + source: A, + transform1: B, + transform2: C, + transform3: D, + transform4: E, + transform5: F, + transform6: G, + transform7: H, + transform8: I, + transform9: J, + transform10: K, + sink: L +): PipeOutput export function pipe (first: any, ...rest: any[]): any { + if (first == null) { + throw new Error('Empty pipeline') + } + // Duplex at start: wrap in function and return duplex source if (isDuplex(first)) { const duplex = first first = () => duplex.source // Iterable at start: wrap in function - } else if (isIterable(first)) { + } else if (isIterable(first) || isAsyncIterable(first)) { const source = first first = () => source } diff --git a/test/index.spec.ts b/test/index.spec.ts index fe8c3c7..945a36a 100644 --- a/test/index.spec.ts +++ b/test/index.spec.ts @@ -2,70 +2,360 @@ import { expect } from 'aegir/chai' import { pipe } from '../src/index.js' import all from 'it-all' import drain from 'it-drain' +import map from 'it-map' import { filter, collect, consume } from 'streaming-iterables' import delay from 'delay' import defer from 'p-defer' -import type { Duplex, Source } from 'it-stream-types' +import type { Source } from 'it-stream-types' +import type { Duplex } from '../src/index.js' -const oneTwoThree = () => [1, 2, 3] +const oneTwoThree = (): number[] => [1, 2, 3] + +const asyncOneTwoThree = async function * (): AsyncGenerator { + yield * oneTwoThree() +} + +type IsAny = unknown extends T ? T extends {} ? T : never : never +type NotAny = T extends IsAny ? never : T + +/** + * Utility function to assert that tsc has not derived the type of + * the passed argument as 'any' + */ +function assertNotAny (x: NotAny): void { + +} + +/** + * Utility function to assert that tsc has not derived the type of + * the passed argument as 'never' + */ +function assertNotNever (x: never): void { + +} describe('it-pipe', () => { - it('should pipe source', async () => { - const result = await pipe(oneTwoThree) - expect(result).to.deep.equal([1, 2, 3]) + describe('one item pipeline', () => { + it('should pipe iterable source', () => { + const result = pipe(oneTwoThree()) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result[Symbol.iterator]).to.be.ok() + expect(result).to.deep.equal([1, 2, 3]) + }) + + it('should pipe function source', () => { + const result = pipe(oneTwoThree) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result[Symbol.iterator]).to.be.ok() + expect(result).to.deep.equal([1, 2, 3]) + }) + + it('should pipe async iterable source', async () => { + const result = pipe(asyncOneTwoThree()) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result[Symbol.asyncIterator]).to.be.ok() + await expect(all(result)).to.eventually.deep.equal([1, 2, 3]) + }) + + it('should pipe async function source', async () => { + const result = pipe(asyncOneTwoThree) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result[Symbol.asyncIterator]).to.be.ok() + await expect(all(result)).to.eventually.deep.equal([1, 2, 3]) + }) + + it('should allow single duplex', async () => { + const duplex: Duplex = { + source: oneTwoThree(), + sink: (source) => { + return all(source).map(n => n.toString()) + } + } + + const result = pipe( + duplex + ) + + // @ts-expect-error - result should not be assignable to never + assertNotNever(result) + assertNotAny(result) + + expect(result[Symbol.iterator]).to.be.ok() + expect(result).to.deep.equal(oneTwoThree()) + }) + + it('should allow single async duplex', async () => { + const duplex: Duplex, AsyncGenerator, AsyncGenerator> = { + source: asyncOneTwoThree(), + sink: (source) => { + return map(source, n => n.toString()) + } + } + + const result = pipe( + duplex + ) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result[Symbol.asyncIterator]).to.be.ok() + await expect(all(result)).to.eventually.deep.equal(oneTwoThree()) + }) }) - it('should pipe source -> sink', async () => { - const result = await pipe(oneTwoThree, all) - expect(result).to.deep.equal([1, 2, 3]) + describe('two item pipeline', () => { + it('should pipe iterable source -> sink function', () => { + const result = pipe( + oneTwoThree(), + (source) => all(source) + ) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result).to.deep.equal([1, 2, 3]) + }) + + it('should pipe iterable function source -> sink function', () => { + const result = pipe( + oneTwoThree, + (source) => all(source) + ) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result).to.deep.equal([1, 2, 3]) + }) + + it('should pipe async iterable source -> sink function', async () => { + const result = await pipe( + asyncOneTwoThree(), + async (source) => await all(source) + ) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result).to.deep.equal([1, 2, 3]) + }) + + it('should pipe async iterable function source -> sink function', async () => { + const result = await pipe( + asyncOneTwoThree, + async (source) => await all(source) + ) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result).to.deep.equal([1, 2, 3]) + }) }) - it('should pipe source -> transform -> sink', async () => { + describe('three item pipeline', () => { + it('should pipe iterable source -> transform -> sink function', () => { + const result = pipe( + oneTwoThree(), + function * (source) { + for (const n of source) { + yield n.toString() + } + }, + (source) => all(source) + ) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result).to.deep.equal(['1', '2', '3']) + }) + + it('should pipe async iterable source -> async transform -> sink function', async () => { + const result = await pipe( + asyncOneTwoThree(), + async function * (source) { + for await (const n of source) { + yield n.toString() + } + }, + async (source) => await all(source) + ) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result).to.deep.equal(['1', '2', '3']) + }) + + it('should pipe iterable source -> duplex -> sink function', () => { + const duplex: Duplex = { + sink: source => { duplex.source = source }, + source: [] + } + + const result = pipe( + oneTwoThree, + duplex, + (source) => all(source) + ) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result[Symbol.iterator]).to.be.ok() + expect(result).to.deep.equal([1, 2, 3]) + }) + + it('should pipe async iterable source -> duplex -> sink function', async () => { + const duplex: Duplex, AsyncGenerator, void> = { + sink: source => { duplex.source = source }, + source: (async function * () {}()) + } + + const result = await pipe( + asyncOneTwoThree(), + duplex, + async (source) => await all(source) + ) + + // @ts-expect-error - result should not be assignable to never - if it is + // then we've broken the types and this comment will cause tsc to fail + assertNotNever(result) + assertNotAny(result) + + expect(result).to.deep.equal([1, 2, 3]) + }) + }) + + it('should pipe source -> transform -> sink', () => { + const result = pipe( + oneTwoThree, + function * transform (source) { + for (const val of source) { + yield val * 2 + } + }, + (source) => all(source) + ) + + expect(result[Symbol.iterator]).to.be.ok() + expect(result).to.deep.equal([2, 4, 6]) + }) + + it('should pipe source -> async transform -> sink', async () => { const result = await pipe( oneTwoThree, - function transform (source) { - return (async function * () { // A generator is async iterable - for await (const val of source) yield val * 2 - })() + async function * transform (source) { + for await (const val of source) { + yield val * 2 + } }, - all + async (source) => await all(source) ) + expect(result[Symbol.iterator]).to.be.ok() expect(result).to.deep.equal([2, 4, 6]) }) - it('should allow iterable first param', async () => { - const result = await pipe(oneTwoThree(), all) + it('should allow iterable first param', () => { + const result = pipe( + oneTwoThree(), + (source) => all(source) + ) + + expect(result[Symbol.iterator]).to.be.ok() expect(result).to.deep.equal([1, 2, 3]) }) + it('should allow async iterable first param', async () => { + const result = pipe( + (async function * () { + yield * oneTwoThree() + }()), + async (source) => await all(source) + ) + + expect(result.then).to.be.ok() + await expect(result).to.eventually.deep.equal([1, 2, 3]) + }) + it('should allow duplex at start', async () => { - const duplex: Duplex> = { + const duplex = { sink: all, source: oneTwoThree() } - const result = await pipe(duplex, all) + const result = pipe( + duplex, + (source) => all(source) + ) + + expect(result[Symbol.iterator]).to.be.ok() expect(result).to.deep.equal([1, 2, 3]) }) - it('should allow duplex at end', async () => { - const duplex: Duplex> = { + it('should allow async duplex at start', async () => { + const duplex: Duplex, number[]> = { sink: all, - source: oneTwoThree() + source: asyncOneTwoThree() } - const result = await pipe(oneTwoThree, duplex) - expect(result).to.deep.equal([1, 2, 3]) + const result = pipe( + duplex, + async (source) => await all(source) + ) + + expect(result.then).to.be.ok() + await expect(result).to.eventually.deep.equal([1, 2, 3]) }) - it('should allow duplex in the middle', async () => { - const duplex: Duplex> = { - sink: async source => { duplex.source = source }, - source: [] + it('should allow duplex at end', async () => { + const duplex = { + sink: (source: number[]) => all(source), + source: oneTwoThree() } - const result = await pipe(oneTwoThree, duplex, all) + const result = pipe(oneTwoThree, duplex) + + expect(result[Symbol.iterator]).to.be.ok() expect(result).to.deep.equal([1, 2, 3]) }) @@ -84,7 +374,7 @@ describe('it-pipe', () => { it('should allow it-drain', async () => { const input = [0, 1, 2, 3] - const result = await pipe( + const result = await pipe( // eslint-disable-line @typescript-eslint/no-confusing-void-expression input, (source) => source, filter((val: number) => val > 1), @@ -100,7 +390,7 @@ describe('it-pipe', () => { const result = await pipe( input, filter((val: number) => val > 1), - collect + async (source) => await collect(source) ) expect(result).to.deep.equal([2, 3]) @@ -109,11 +399,10 @@ describe('it-pipe', () => { it('should allow streaming iterables consume', async () => { const input = [0, 1, 2, 3] - const result = await pipe( + const result = await pipe( // eslint-disable-line @typescript-eslint/no-confusing-void-expression input, filter((val: number) => val > 1), - // @ts-expect-error https://github.com/reconbot/streaming-iterables/issues/232 - (source) => consume(source) + async (source) => { await consume(source) } ) expect(result).to.be.undefined() @@ -125,7 +414,7 @@ describe('it-pipe', () => { await expect( pipe( oneTwoThree, { - source: (async function * () { + source: (async function * (): AsyncGenerator { await delay(1000) yield 5 }()), @@ -134,7 +423,7 @@ describe('it-pipe', () => { throw err } }, - async (source) => await drain(source) + async (source) => { await drain(source) } ) ).to.eventually.be.rejected.with.property('message', err.message) }) @@ -181,7 +470,7 @@ describe('it-pipe', () => { await delay(1) yield * data }()) - const output: Duplex = { + const output: Duplex, Promise> = { source: ['hello', 'world'], sink: async (source) => { collected.resolve(await all(source))