diff --git a/spec/asynciterable-operators/flat-spec.ts b/spec/asynciterable-operators/flat-spec.ts index ad044df1..e2714a0d 100644 --- a/spec/asynciterable-operators/flat-spec.ts +++ b/spec/asynciterable-operators/flat-spec.ts @@ -6,16 +6,16 @@ function compareArrays(fst: Iterable, snd: Iterable) { expect(fst.toString()).toBe(snd.toString()); } -test('AsyncIterable#flat flattens all', async () => { +test('AsyncIterable#flat flattens all layers', async () => { const xs = of(1, of(2, of(3)), 4); const ys = await toArray(xs.pipe(flat())); - compareArrays(ys, [1, 2, 3, 4]); + compareArrays(ys, [1, 4, 2, 3]); }); -test('AsyncIterable#flat flattens all layers', async () => { +test('AsyncIterable#flat flattens all layers with concurrent = 1', async () => { const xs = of(1, of(2, of(3)), 4); - const ys = await toArray(xs.pipe(flat(-1))); + const ys = await toArray(xs.pipe(flat(-1, 1))); compareArrays(ys, [1, 2, 3, 4]); }); @@ -24,5 +24,5 @@ test('AsyncIterable#flat flattens two layers', async () => { const xs = of(1, of(2, of(3)), 4); const ys = await toArray(xs.pipe(flat(2))); - compareArrays(ys, [1, 2, 3, 4]); + compareArrays(ys, [1, 4, 2, 3]); }); diff --git a/spec/asynciterable-operators/flatmap-spec.ts b/spec/asynciterable-operators/flatmap-spec.ts index ca1535a1..dd634471 100644 --- a/spec/asynciterable-operators/flatmap-spec.ts +++ b/spec/asynciterable-operators/flatmap-spec.ts @@ -1,19 +1,12 @@ -import { hasNext, noNext } from '../asynciterablehelpers'; -import { of, range, throwError } from 'ix/asynciterable'; +import { hasNext } from '../asynciterablehelpers'; +import { of, range, throwError, toArray } from 'ix/asynciterable'; import { flatMap } from 'ix/asynciterable/operators'; test('AsyncIterable#flatMap with range', async () => { const xs = of(1, 2, 3); const ys = xs.pipe(flatMap(async (x) => range(0, x))); - const it = ys[Symbol.asyncIterator](); - hasNext(it, 0); - hasNext(it, 0); - hasNext(it, 1); - hasNext(it, 0); - hasNext(it, 1); - hasNext(it, 2); - noNext(it); + expect(await toArray(ys)).toEqual([0, 0, 0, 1, 1, 2]); }); test('AsyncIterable#flatMap selector returns throw', async () => { diff --git a/spec/asynciterable-operators/mergeall-spec.ts b/spec/asynciterable-operators/mergeall-spec.ts index d50c9390..b08b8f6a 100644 --- a/spec/asynciterable-operators/mergeall-spec.ts +++ b/spec/asynciterable-operators/mergeall-spec.ts @@ -4,5 +4,5 @@ import { mergeAll } from 'ix/asynciterable/operators'; test('AsyncIterable#merge mergeAll behavior', async () => { const res = of(of(1, 2, 3), of(4, 5)).pipe(mergeAll()); - expect(await toArray(res)).toEqual([1, 2, 3, 4, 5]); + expect(await toArray(res)).toEqual([1, 2, 4, 3, 5]); }); diff --git a/src/add/asynciterable-operators/flat.ts b/src/add/asynciterable-operators/flat.ts index 24f80fa2..e0b53086 100644 --- a/src/add/asynciterable-operators/flat.ts +++ b/src/add/asynciterable-operators/flat.ts @@ -4,8 +4,12 @@ import { flat } from '../../asynciterable/operators/flat'; /** * @ignore */ -export function flatProto(this: AsyncIterableX, depth: D = -1 as any) { - return flat(depth)(this); +export function flatProto( + this: AsyncIterableX, + depth: D = -1 as any, + concurrent = Infinity +) { + return flat(depth, concurrent)(this); } AsyncIterableX.prototype.flat = flatProto; diff --git a/src/add/asynciterable-operators/flatmap.ts b/src/add/asynciterable-operators/flatmap.ts index 82f1f3bb..e6a3928f 100644 --- a/src/add/asynciterable-operators/flatmap.ts +++ b/src/add/asynciterable-operators/flatmap.ts @@ -8,9 +8,10 @@ import { FlattenConcurrentSelector } from '../../asynciterable/operators/_flatte export function flatMapProto( this: AsyncIterableX, selector: FlattenConcurrentSelector, + concurrent = Infinity, thisArg?: any ) { - return flatMap(selector, thisArg)(this); + return flatMap(selector, concurrent, thisArg)(this); } AsyncIterableX.prototype.flatMap = flatMapProto; diff --git a/src/add/asynciterable-operators/mergeall.ts b/src/add/asynciterable-operators/mergeall.ts index c45bc156..86febf58 100644 --- a/src/add/asynciterable-operators/mergeall.ts +++ b/src/add/asynciterable-operators/mergeall.ts @@ -4,8 +4,11 @@ import { mergeAll } from '../../asynciterable/operators/mergeall'; /** * @ignore */ -export function mergeAllProto(this: AsyncIterableX>): AsyncIterableX { - return mergeAll()(this); +export function mergeAllProto( + this: AsyncIterableX>, + concurrent = Infinity +): AsyncIterableX { + return mergeAll(concurrent)(this); } AsyncIterableX.prototype.mergeAll = mergeAllProto; diff --git a/src/asynciterable/operators/flat.ts b/src/asynciterable/operators/flat.ts index 116f8bed..90ada0ed 100644 --- a/src/asynciterable/operators/flat.ts +++ b/src/asynciterable/operators/flat.ts @@ -25,7 +25,7 @@ type FlattenWithDepth = { * @param {number} [depth=Infinity] The depth to flatten the async-iterable sequence if specified, otherwise infinite. * @returns {MonoTypeOperatorAsyncFunction} An operator that flattens the async-iterable sequence. */ -export function flat(depth: D = -1 as any) { +export function flat(depth: D = -1 as any, concurrent = Infinity) { depth = (depth < 0 ? Infinity : depth) as any; return function flattenOperatorFunction( source: AsyncIterable @@ -35,6 +35,6 @@ export function flat(depth: D = -1 as any) { return depth > 0 ? flat(depth - 1)(item) : item; } return [item]; - })(source) as AsyncIterableX>; + }, concurrent)(source) as AsyncIterableX>; }; } diff --git a/src/asynciterable/operators/flatmap.ts b/src/asynciterable/operators/flatmap.ts index 509495c4..991970c3 100644 --- a/src/asynciterable/operators/flatmap.ts +++ b/src/asynciterable/operators/flatmap.ts @@ -18,9 +18,10 @@ import { OperatorAsyncFunction } from '../../interfaces'; */ export function flatMap( selector: FlattenConcurrentSelector, + concurrent = Infinity, thisArg?: any ): OperatorAsyncFunction { return function flatMapOperatorFunction(source) { - return new FlattenConcurrentAsyncIterable(source, selector, 1, false, thisArg); + return new FlattenConcurrentAsyncIterable(source, selector, concurrent, false, thisArg); }; } diff --git a/src/asynciterable/operators/mergeall.ts b/src/asynciterable/operators/mergeall.ts index 351e2f91..47b87e45 100644 --- a/src/asynciterable/operators/mergeall.ts +++ b/src/asynciterable/operators/mergeall.ts @@ -6,8 +6,8 @@ import { FlattenConcurrentAsyncIterable } from './_flatten'; * @template TSource The type of the elements in the source sequences. * @returns {OperatorAsyncFunction, TSource>} The async-iterable sequence that merges the elements of the inner sequences. */ -export function mergeAll() { +export function mergeAll(concurrent = Infinity) { return function mergeAllOperatorFunction(source: AsyncIterable>) { - return new FlattenConcurrentAsyncIterable(source, (s) => s, 1, false); + return new FlattenConcurrentAsyncIterable(source, (s) => s, concurrent, false); }; }