Skip to content

Commit

Permalink
feat(flatMap): add initial AsyncIterable#flatMap concurrent implement…
Browse files Browse the repository at this point in the history
…ation

BREAKING CHANGE: flatMap now supports concurrency

fix #244
  • Loading branch information
trxcllnt committed Feb 11, 2019
1 parent f8d516b commit 8e17ccf
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 47 deletions.
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/flatmap-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';

test('Iterable#flatMap with range', async () => {
const xs = of(1, 2, 3);
const ys = flatMap(xs, async x => range(0, x));
const ys = xs.pipe(flatMap(async x => range(0, x)));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
Expand Down
122 changes: 78 additions & 44 deletions src/asynciterable/operators/flatmap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ const NEVER_PROMISE = new Promise(() => {
/**/
});

type InnerResult<T> = { value: IteratorResult<T>; index: number; type: 'inner' };
type OuterResult<T> = { value: IteratorResult<AsyncIterable<T>>; index: number; type: 'outer' };
type InnerResult<T> = { value: T; done: boolean; index: number; type: 'inner' };
type OuterResult<T> = { value: AsyncIterable<T>; done: boolean; index: number; type: 'outer' };

function isInnerResult<T>(x: any): x is InnerResult<T> {
return x.type === 'inner';
function isOuterResult<T>(x: any): x is OuterResult<T> {
return x.type === 'outer';
}

function wrapInnerValuePromiseWithIndex<T>(promise: Promise<IteratorResult<T>>, index: number) {
return promise.then(value => ({ value, index, type: 'inner' })) as Promise<InnerResult<T>>;
return promise.then(({ value, done }) => ({ value, done, index, type: 'inner' })) as Promise<
InnerResult<T>
>;
}

function wrapOuterValuePromiseWithIndex<T, R>(
Expand All @@ -25,9 +27,10 @@ function wrapOuterValuePromiseWithIndex<T, R>(
return promise.then(
async ({ value, done }) =>
({
done,
index,
type: 'outer',
value: { value: await selector(value), done }
value: !done ? await selector(value) : null
} as OuterResult<R>)
);
}
Expand All @@ -50,60 +53,91 @@ export class FlatMapAsyncIterable<TSource, TResult> extends AsyncIterableX<TResu

async *[Symbol.asyncIterator]() {
const { _maxConcurrent, _source, _selector } = this;
const outerSources = _source[Symbol.asyncIterator]();
const activeInners = [] as AsyncIterator<TResult>[];
const queuedInners = [] as AsyncIterable<TResult>[];
const pending = [] as Promise<OuterResult<TResult> | InnerResult<TResult>>[];
const outer = _source[Symbol.asyncIterator]();
const inners = [] as AsyncIterator<TResult>[];
const queued = [] as AsyncIterable<TResult>[];
const promises = [] as Promise<OuterResult<TResult> | InnerResult<TResult>>[];

pending[0] = wrapOuterValuePromiseWithIndex(outerSources.next(), 0, _selector);
promises[0] = wrapOuterValuePromiseWithIndex(outer.next(), 0, _selector);

let outerDone = false,
active = 0;

do {
const res = await Promise.race(pending);
if (isInnerResult(res)) {
const { value: next, index } = res;
if (!next.done) {
const itr = activeInners[index - 1];
pending[index] = wrapInnerValuePromiseWithIndex(itr.next(), index);
yield next.value;
} else {
const it = queuedInners.shift();
if (!it) {
--active;
activeInners[index - 1] = <any>null;
pending[index] = <Promise<InnerResult<TResult>>>NEVER_PROMISE;
let active = 0;
let outerDone = false;
try {
do {
const res = await Promise.race(promises);
if (isOuterResult<TResult>(res)) {
const { done, value } = res as OuterResult<TResult>;
if ((outerDone = done)) {
promises[0] = <Promise<OuterResult<TResult>>>NEVER_PROMISE;
} else {
const itr = (activeInners[index - 1] = it[Symbol.asyncIterator]());
pending[index] = wrapInnerValuePromiseWithIndex(itr.next(), index);
const itr = (inners[active++] = value[Symbol.asyncIterator]());
promises[active] = wrapInnerValuePromiseWithIndex(itr.next(), active);
if (active >= _maxConcurrent) {
queued.push(value);
promises[0] = <Promise<OuterResult<TResult>>>NEVER_PROMISE;
} else {
promises[0] = wrapOuterValuePromiseWithIndex(outer.next(), 0, _selector);
}
}
}
} else {
const { value: next } = res;
if (next.done) {
outerDone = true;
pending[0] = <Promise<OuterResult<TResult>>>NEVER_PROMISE;
} else {
const it = (<unknown>next.value) as AsyncIterable<TResult>;
if (active >= _maxConcurrent) {
queuedInners.push(it);
const { done, value, index } = res as InnerResult<TResult>;
if (!done) {
const itr = inners[index - 1];
promises[index] = wrapInnerValuePromiseWithIndex(itr.next(), index);
yield value;
} else if (queued.length > 0) {
const it = queued.shift()!;
const itr = (inners[index - 1] = it[Symbol.asyncIterator]());
promises[index] = wrapInnerValuePromiseWithIndex(itr.next(), index);
} else {
const itr = (activeInners[active++] = it[Symbol.asyncIterator]());
pending[active] = wrapInnerValuePromiseWithIndex(itr.next(), active);
pending[0] = wrapOuterValuePromiseWithIndex(outerSources.next(), 0, _selector);
--active;
inners[index - 1] = <any>null;
promises[index] = <Promise<InnerResult<TResult>>>NEVER_PROMISE;
if (!outerDone) {
promises[0] = wrapOuterValuePromiseWithIndex(outer.next(), 0, _selector);
}
}
}
} while (!outerDone || active > 0);
} catch (e) {
throw e;
} finally {
try {
await Promise.all<any>(
[outer, ...inners].map(it => it && typeof it.return === 'function' && it.return())
);
} catch (e) {
/* ignored */
}
} while (!outerDone && active > 0);
}
}
}

export function flatMap<TSource, TResult>(
selector: (value: TSource) => AsyncIterable<TResult> | Promise<AsyncIterable<TResult>>,
thisArg?: any
): OperatorAsyncFunction<TSource, TResult>;

export function flatMap<TSource, TResult>(
selector: (value: TSource) => AsyncIterable<TResult> | Promise<AsyncIterable<TResult>>,
maxConcurrent: number,
thisArg?: any
): OperatorAsyncFunction<TSource, TResult>;

export function flatMap<TSource, TResult>(
selector: (value: TSource) => AsyncIterable<TResult> | Promise<AsyncIterable<TResult>>,
maxConcurrent?: number,
thisArg?: any
): OperatorAsyncFunction<TSource, TResult> {
if (arguments.length === 2 && typeof maxConcurrent !== 'number') {
thisArg = maxConcurrent;
maxConcurrent = undefined;
}
return function flatMapOperatorFunction(source: AsyncIterable<TSource>): AsyncIterableX<TResult> {
return new FlatMapAsyncIterable<TSource, TResult>(source, bindCallback(selector, thisArg, 1));
return new FlatMapAsyncIterable<TSource, TResult>(
source,
bindCallback(selector, thisArg, 1),
maxConcurrent
);
};
}
6 changes: 4 additions & 2 deletions src/asynciterable/operators/mergeall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import { AsyncIterableX } from '../asynciterablex';
import { flatMap } from './flatmap';
import { OperatorAsyncFunction } from '../../interfaces';

export function mergeAll<TSource>(): OperatorAsyncFunction<AsyncIterable<TSource>, TSource> {
export function mergeAll<TSource>(
maxConcurrent?: number
): OperatorAsyncFunction<AsyncIterable<TSource>, TSource> {
return function mergeAllOperatorFunction(
source: AsyncIterable<AsyncIterable<TSource>>
): AsyncIterableX<TSource> {
return flatMap<AsyncIterable<TSource>, TSource>(source, source => source);
return flatMap<AsyncIterable<TSource>, TSource>(source => source, maxConcurrent)(source);
};
}

0 comments on commit 8e17ccf

Please sign in to comment.