From 8e17ccf01c12430bdef3b2feed58f526dfdb7ac0 Mon Sep 17 00:00:00 2001 From: ptaylor Date: Sun, 10 Feb 2019 20:34:32 -0800 Subject: [PATCH] feat(flatMap): add initial AsyncIterable#flatMap concurrent implementation BREAKING CHANGE: flatMap now supports concurrency fix #244 --- spec/asynciterable-operators/flatmap-spec.ts | 2 +- src/asynciterable/operators/flatmap.ts | 122 ++++++++++++------- src/asynciterable/operators/mergeall.ts | 6 +- 3 files changed, 83 insertions(+), 47 deletions(-) diff --git a/spec/asynciterable-operators/flatmap-spec.ts b/spec/asynciterable-operators/flatmap-spec.ts index f83c86c5..ce8dd169 100644 --- a/spec/asynciterable-operators/flatmap-spec.ts +++ b/spec/asynciterable-operators/flatmap-spec.ts @@ -4,7 +4,7 @@ import { hasNext, noNext } from '../asynciterablehelpers'; test('Iterable#flatMap with range', async () => { const xs = of(1, 2, 3); - const ys = flatMap(xs, async x => range(0, x)); + const ys = xs.pipe(flatMap(async x => range(0, x))); const it = ys[Symbol.asyncIterator](); hasNext(it, 0); diff --git a/src/asynciterable/operators/flatmap.ts b/src/asynciterable/operators/flatmap.ts index 1a726437..6f6c3c79 100644 --- a/src/asynciterable/operators/flatmap.ts +++ b/src/asynciterable/operators/flatmap.ts @@ -6,15 +6,17 @@ const NEVER_PROMISE = new Promise(() => { /**/ }); -type InnerResult = { value: IteratorResult; index: number; type: 'inner' }; -type OuterResult = { value: IteratorResult>; index: number; type: 'outer' }; +type InnerResult = { value: T; done: boolean; index: number; type: 'inner' }; +type OuterResult = { value: AsyncIterable; done: boolean; index: number; type: 'outer' }; -function isInnerResult(x: any): x is InnerResult { - return x.type === 'inner'; +function isOuterResult(x: any): x is OuterResult { + return x.type === 'outer'; } function wrapInnerValuePromiseWithIndex(promise: Promise>, index: number) { - return promise.then(value => ({ value, index, type: 'inner' })) as Promise>; + return promise.then(({ value, done }) => ({ value, done, index, type: 'inner' })) as Promise< + InnerResult + >; } function wrapOuterValuePromiseWithIndex( @@ -25,9 +27,10 @@ function wrapOuterValuePromiseWithIndex( return promise.then( async ({ value, done }) => ({ + done, index, type: 'outer', - value: { value: await selector(value), done } + value: !done ? await selector(value) : null } as OuterResult) ); } @@ -50,60 +53,91 @@ export class FlatMapAsyncIterable extends AsyncIterableX[]; - const queuedInners = [] as AsyncIterable[]; - const pending = [] as Promise | InnerResult>[]; + const outer = _source[Symbol.asyncIterator](); + const inners = [] as AsyncIterator[]; + const queued = [] as AsyncIterable[]; + const promises = [] as Promise | InnerResult>[]; - pending[0] = wrapOuterValuePromiseWithIndex(outerSources.next(), 0, _selector); + promises[0] = wrapOuterValuePromiseWithIndex(outer.next(), 0, _selector); - let outerDone = false, - active = 0; - - do { - const res = await Promise.race(pending); - if (isInnerResult(res)) { - const { value: next, index } = res; - if (!next.done) { - const itr = activeInners[index - 1]; - pending[index] = wrapInnerValuePromiseWithIndex(itr.next(), index); - yield next.value; - } else { - const it = queuedInners.shift(); - if (!it) { - --active; - activeInners[index - 1] = null; - pending[index] = >>NEVER_PROMISE; + let active = 0; + let outerDone = false; + try { + do { + const res = await Promise.race(promises); + if (isOuterResult(res)) { + const { done, value } = res as OuterResult; + if ((outerDone = done)) { + promises[0] = >>NEVER_PROMISE; } else { - const itr = (activeInners[index - 1] = it[Symbol.asyncIterator]()); - pending[index] = wrapInnerValuePromiseWithIndex(itr.next(), index); + const itr = (inners[active++] = value[Symbol.asyncIterator]()); + promises[active] = wrapInnerValuePromiseWithIndex(itr.next(), active); + if (active >= _maxConcurrent) { + queued.push(value); + promises[0] = >>NEVER_PROMISE; + } else { + promises[0] = wrapOuterValuePromiseWithIndex(outer.next(), 0, _selector); + } } - } - } else { - const { value: next } = res; - if (next.done) { - outerDone = true; - pending[0] = >>NEVER_PROMISE; } else { - const it = (next.value) as AsyncIterable; - if (active >= _maxConcurrent) { - queuedInners.push(it); + const { done, value, index } = res as InnerResult; + if (!done) { + const itr = inners[index - 1]; + promises[index] = wrapInnerValuePromiseWithIndex(itr.next(), index); + yield value; + } else if (queued.length > 0) { + const it = queued.shift()!; + const itr = (inners[index - 1] = it[Symbol.asyncIterator]()); + promises[index] = wrapInnerValuePromiseWithIndex(itr.next(), index); } else { - const itr = (activeInners[active++] = it[Symbol.asyncIterator]()); - pending[active] = wrapInnerValuePromiseWithIndex(itr.next(), active); - pending[0] = wrapOuterValuePromiseWithIndex(outerSources.next(), 0, _selector); + --active; + inners[index - 1] = null; + promises[index] = >>NEVER_PROMISE; + if (!outerDone) { + promises[0] = wrapOuterValuePromiseWithIndex(outer.next(), 0, _selector); + } } } + } while (!outerDone || active > 0); + } catch (e) { + throw e; + } finally { + try { + await Promise.all( + [outer, ...inners].map(it => it && typeof it.return === 'function' && it.return()) + ); + } catch (e) { + /* ignored */ } - } while (!outerDone && active > 0); + } } } export function flatMap( selector: (value: TSource) => AsyncIterable | Promise>, thisArg?: any +): OperatorAsyncFunction; + +export function flatMap( + selector: (value: TSource) => AsyncIterable | Promise>, + maxConcurrent: number, + thisArg?: any +): OperatorAsyncFunction; + +export function flatMap( + selector: (value: TSource) => AsyncIterable | Promise>, + maxConcurrent?: number, + thisArg?: any ): OperatorAsyncFunction { + if (arguments.length === 2 && typeof maxConcurrent !== 'number') { + thisArg = maxConcurrent; + maxConcurrent = undefined; + } return function flatMapOperatorFunction(source: AsyncIterable): AsyncIterableX { - return new FlatMapAsyncIterable(source, bindCallback(selector, thisArg, 1)); + return new FlatMapAsyncIterable( + source, + bindCallback(selector, thisArg, 1), + maxConcurrent + ); }; } diff --git a/src/asynciterable/operators/mergeall.ts b/src/asynciterable/operators/mergeall.ts index 058510ed..a8e7e292 100644 --- a/src/asynciterable/operators/mergeall.ts +++ b/src/asynciterable/operators/mergeall.ts @@ -2,10 +2,12 @@ import { AsyncIterableX } from '../asynciterablex'; import { flatMap } from './flatmap'; import { OperatorAsyncFunction } from '../../interfaces'; -export function mergeAll(): OperatorAsyncFunction, TSource> { +export function mergeAll( + maxConcurrent?: number +): OperatorAsyncFunction, TSource> { return function mergeAllOperatorFunction( source: AsyncIterable> ): AsyncIterableX { - return flatMap, TSource>(source, source => source); + return flatMap, TSource>(source => source, maxConcurrent)(source); }; }