Skip to content

Commit

Permalink
feat(merge): fix indexOf issues with merge
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Oct 5, 2017
1 parent b81a007 commit 2a542e5
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions src/asynciterable/merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import { AsyncIterableX } from '../asynciterable';
// tslint:disable-next-line:no-empty
const NEVER_PROMISE = new Promise(() => { });

type MergeResult<T> = { value: T, index: number };

function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) {
return promise.then((value) => ({ value, index })) as Promise<MergeResult<T>>;
}

class MergeAsyncIterable<T> extends AsyncIterableX<T> {
private _source: AsyncIterable<T>[];

Expand All @@ -14,24 +20,23 @@ class MergeAsyncIterable<T> extends AsyncIterableX<T> {
async *[Symbol.asyncIterator](): AsyncIterator<T> {
const length = this._source.length;
const iterators = new Array<AsyncIterator<T>>(length);
const nexts = new Array<Promise<IteratorResult<T>>>(length);
const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length);
let active = length;
for (let i = 0; i < length; i++) {
const iterator = this._source[i][Symbol.asyncIterator]();
iterators[i] = iterator;
nexts[i] = iterator.next();
nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
}

while (active > 0) {
const next = Promise.race(nexts);
const index = nexts.indexOf(next);
const next$ = await next;
const { value: next$, index } = await next;
if (next$.done) {
nexts[index] = <Promise<IteratorResult<T>>>(NEVER_PROMISE);
nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>(NEVER_PROMISE);
active--;
} else {
const iterator$ = iterators[index];
nexts[index] = iterator$.next();
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
yield next$.value;
}
}
Expand Down

0 comments on commit 2a542e5

Please sign in to comment.