Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flatMap): default flatMap concurrent parameter to 1 #346

Merged
merged 4 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
- name: Lint files
if: ${{ steps.files_changed.outputs.any_changed == 'true' || steps.files_changed.outputs.any_deleted == 'true' }}
run: |
yarn lint
yarn lint:ci

build-and-test-pull-request:
needs:
Expand Down
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
"build": "gulp build",
"clean": "gulp clean",
"debug": "gulp debug",
"lint": "run-p lint:*",
"lint": "run-p lint:src lint:spec",
"release": "./npm-release.sh",
"doc": "shx rm -rf ./doc && typedoc --options typedoc.js",
"postinstall": "patch-package --patch-dir ./patches",
"commitmsg": "validate-commit-msg",
"test:coverage": "gulp test -t src --coverage",
"lint:src": "eslint --ext .ts --fix \"src/**/*.ts\"",
"lint:spec": "eslint --ext .ts --fix \"spec/**/*.ts\"",
"lint:ci": "eslint src spec",
"lint:src": "eslint --fix src",
"lint:spec": "eslint --fix spec",
"prepublishOnly": "echo \"Error: do 'npm run release' instead of 'npm publish'\" && exit 1"
},
"author": "Matthew Podwysocki <matthewp@microsoft.com>",
Expand Down Expand Up @@ -52,8 +53,8 @@
"devDependencies": {
"@types/glob": "7.1.1",
"@types/jest": "27.4.0",
"@typescript-eslint/eslint-plugin": "5.12.0",
"@typescript-eslint/parser": "5.12.0",
"@typescript-eslint/eslint-plugin": "^5.31.0",
"@typescript-eslint/parser": "^5.31.0",
"abortcontroller-polyfill": "1.4.0",
"async-done": "1.3.2",
"benchmark": "2.1.4",
Expand All @@ -66,8 +67,8 @@
"coveralls": "3.0.9",
"cz-conventional-changelog": "3.1.0",
"del": "5.1.0",
"eslint": "8.9.0",
"eslint-plugin-jest": "26.1.1",
"eslint": "^8.20.0",
"eslint-plugin-jest": "^26.6.0",
"esm": "https://github.com/jsg2021/esm/releases/download/v3.x.x-pr883/esm-3.x.x-pr883.tgz",
"glob": "7.1.6",
"google-closure-compiler": "20220601.0.0",
Expand Down
70 changes: 70 additions & 0 deletions spec/asynciterable-operators/concatmap-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { hasNext, noNext } from '../asynciterablehelpers';
import { of, range, sequenceEqual, throwError } from 'ix/asynciterable';
import { map, tap, concatMap } from 'ix/asynciterable/operators';

test('AsyncIterable#concatMap with range', async () => {
const xs = of(1, 2, 3);
const ys = xs.pipe(concatMap(async (x) => range(0, x)));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
hasNext(it, 0);
hasNext(it, 1);
hasNext(it, 2);
noNext(it);
});

test('AsyncIterable#concatMap order of effects', async () => {
let i = 0;
const res = range(0, 3).pipe(
tap({ next: async () => ++i }),
concatMap((x) => range(0, x + 1)),
map((x) => i + ' - ' + x)
);

expect(
await sequenceEqual(res, of('1 - 0', '2 - 0', '2 - 1', '3 - 0', '3 - 1', '3 - 2'))
).toBeTruthy();
});

test('AsyncIterable#concatMap selector returns throw', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(concatMap(async (x) => (x < 3 ? range(0, x) : throwError(err))));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
});

test('AsyncIterable#concatMap with error throws', async () => {
const err = new Error();
const xs = throwError(err);
const ys = xs.pipe(concatMap((x) => range(0, x)));

const it = ys[Symbol.asyncIterator]();
await expect(it.next()).rejects.toThrow(err);
});

test('AsyncIterable#concatMap selector throws error', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(
concatMap(async (x) => {
if (x < 3) {
return range(0, x);
}
throw err;
})
);

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
});
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/distinctuntilchanged-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import '../asynciterablehelpers';
import { of, sequenceEqual } from 'ix/asynciterable';
import { distinctUntilChanged } from 'ix/asynciterable/operators';

test('Iterable#distinctUntilChanged no selector', async () => {
test('AsyncIterable#distinctUntilChanged no selector', async () => {
const res = of(1, 2, 2, 3, 3, 3, 2, 2, 1).pipe(distinctUntilChanged());
expect(await sequenceEqual(res, of(1, 2, 3, 2, 1))).toBeTruthy();
});

test('Iterable#distinctUntilChanged with selector', async () => {
test('AsyncIterable#distinctUntilChanged with selector', async () => {
const res = of(1, 1, 2, 3, 4, 5, 5, 6, 7).pipe(
distinctUntilChanged({ keySelector: (x) => Math.floor(x / 2) })
);
Expand Down
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/dowhile-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { defer, of, toArray } from 'ix/asynciterable';
import { doWhile, tap } from 'ix/asynciterable/operators';
import { sequenceEqual } from 'ix/iterable';

test('Iterable#doWhile some', async () => {
test('AsyncIterable#doWhile some', async () => {
let x = 5;

const res = await toArray(
Expand All @@ -16,7 +16,7 @@ test('Iterable#doWhile some', async () => {
expect(sequenceEqual(res, [5, 4, 3, 2, 1])).toBeTruthy();
});

test('Iterable#doWhile one', async () => {
test('AsyncIterable#doWhile one', async () => {
let x = 0;
const res = await toArray(
defer(() => of(x)).pipe(
Expand Down
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/except-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';
import { of } from 'ix/asynciterable';
import { except } from 'ix/asynciterable/operators';

test('Iterable#except with default comparer', async () => {
test('AsyncIterable#except with default comparer', async () => {
const xs = of(1, 2, 3);
const ys = of(3, 5, 1, 4);
const res = xs.pipe(except(ys));
Expand All @@ -12,7 +12,7 @@ test('Iterable#except with default comparer', async () => {
await noNext(it);
});

test('Iterable#except with custom comparer', async () => {
test('AsyncIterable#except with custom comparer', async () => {
const comparer = (x: number, y: number) => Math.abs(x) === Math.abs(y);
const xs = of(1, 2, -3);
const ys = of(3, 5, -1, 4);
Expand Down
8 changes: 4 additions & 4 deletions spec/asynciterable-operators/flat-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ test('AsyncIterable#flat flattens all', async () => {
const xs = of(1, of(2, of(3)), 4);
const ys = await toArray(xs.pipe(flat()));

compareArrays(ys, [1, 4, 2, 3]);
compareArrays(ys, [1, 2, 3, 4]);
});

test('AsyncIterable#flat flattens all with concurrent = 1', async () => {
test('AsyncIterable#flat flattens all layers', async () => {
const xs = of(1, of(2, of(3)), 4);
const ys = await toArray(xs.pipe(flat(-1, 1)));
const ys = await toArray(xs.pipe(flat(-1)));

compareArrays(ys, [1, 2, 3, 4]);
});
Expand All @@ -24,5 +24,5 @@ test('AsyncIterable#flat flattens two layers', async () => {
const xs = of(1, of(2, of(3)), 4);
const ys = await toArray(xs.pipe(flat(2)));

compareArrays(ys, [1, 4, 2, 3]);
compareArrays(ys, [1, 2, 3, 4]);
});
8 changes: 4 additions & 4 deletions spec/asynciterable-operators/flatmap-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';
import { of, range, throwError } from 'ix/asynciterable';
import { flatMap } from 'ix/asynciterable/operators';

test('Iterable#flatMap with range', async () => {
test('AsyncIterable#flatMap with range', async () => {
const xs = of(1, 2, 3);
const ys = xs.pipe(flatMap(async (x) => range(0, x)));

Expand All @@ -16,7 +16,7 @@ test('Iterable#flatMap with range', async () => {
noNext(it);
});

test('Iterable#flatMap selector returns throw', async () => {
test('AsyncIterable#flatMap selector returns throw', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(flatMap(async (x) => (x < 3 ? range(0, x) : throwError(err))));
Expand All @@ -28,7 +28,7 @@ test('Iterable#flatMap selector returns throw', async () => {
await expect(it.next()).rejects.toThrow(err);
});

test('Iterable#flatMap with error throws', async () => {
test('AsyncIterable#flatMap with error throws', async () => {
const err = new Error();
const xs = throwError(err);
const ys = xs.pipe(flatMap((x) => range(0, x)));
Expand All @@ -37,7 +37,7 @@ test('Iterable#flatMap with error throws', async () => {
await expect(it.next()).rejects.toThrow(err);
});

test('Iterable#flatMap selector throws error', async () => {
test('AsyncIterable#flatMap selector throws error', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(
Expand Down
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/ignoreelements.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import '../asynciterablehelpers';
import { range } from 'ix/asynciterable';
import { ignoreElements, take, tap } from 'ix/asynciterable/operators';

test('Iterable#ignoreElements has side effects', async () => {
test('AsyncIterable#ignoreElements has side effects', async () => {
let n = 0;
await range(0, 10)
.pipe(
Expand Down
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/intersect-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';
import { of } from 'ix/asynciterable';
import { intersect } from 'ix/asynciterable/operators';

test('Iterable#intersect with default comparer', async () => {
test('AsyncIterable#intersect with default comparer', async () => {
const xs = of(1, 2, 3);
const ys = of(3, 5, 1, 4);
const res = xs.pipe(intersect(ys));
Expand All @@ -13,7 +13,7 @@ test('Iterable#intersect with default comparer', async () => {
await noNext(it);
});

test('Iterable#intersect with custom comparer', async () => {
test('AsyncIterable#intersect with custom comparer', async () => {
const comparer = (x: number, y: number) => Math.abs(x) === Math.abs(y);
const xs = of(1, 2, -3);
const ys = of(3, 5, -1, 4);
Expand Down
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/isempty-spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import '../asynciterablehelpers';
import { of, empty, isEmpty } from 'ix/asynciterable';

test('Iterable#isEmpty empty', async () => {
test('AsyncIterable#isEmpty empty', async () => {
expect(await isEmpty(empty())).toBeTruthy();
});

test('Iterable#isEmpty not-empty', async () => {
test('AsyncIterable#isEmpty not-empty', async () => {
expect(await isEmpty(of(1))).toBeFalsy();
});
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/mergeall-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import { mergeAll } from 'ix/asynciterable/operators';

test('AsyncIterable#merge mergeAll behavior', async () => {
const res = of(of(1, 2, 3), of(4, 5)).pipe(mergeAll());
expect(await toArray(res)).toEqual([1, 2, 4, 3, 5]);
expect(await toArray(res)).toEqual([1, 2, 3, 4, 5]);
});
6 changes: 3 additions & 3 deletions spec/asynciterable-operators/pluck-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';
import { of } from 'ix/asynciterable';
import { pluck } from 'ix/asynciterable/operators';

test('Iterable#pluck simple prop', async () => {
test('AsyncIterable#pluck simple prop', async () => {
const xs = of({ prop: 1 }, { prop: 2 }, { prop: 3 }, { prop: 4 }, { prop: 5 });
const ys = xs.pipe(pluck('prop'));

Expand All @@ -15,7 +15,7 @@ test('Iterable#pluck simple prop', async () => {
await noNext(it);
});

test('Iterable#pluck nested prop', async () => {
test('AsyncIterable#pluck nested prop', async () => {
const xs = of(
{ a: { b: { c: 1 } } },
{ a: { b: { c: 2 } } },
Expand All @@ -34,7 +34,7 @@ test('Iterable#pluck nested prop', async () => {
await noNext(it);
});

test('Iterable#pluck edge cases', async () => {
test('AsyncIterable#pluck edge cases', async () => {
const xs = of<any>(
{ a: { b: { c: 1 } } },
{ a: { b: 2 } },
Expand Down
46 changes: 46 additions & 0 deletions spec/asynciterable-operators/switchall-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import '../asynciterablehelpers';
import { of, toArray } from 'ix/asynciterable';
import { switchAll, delayEach, tap } from 'ix/asynciterable/operators';

describe(`AsyncIterable#switchAll`, () => {
test('switches inner sequences', async () => {
const innerValues = new Array<string>();

const ys = of('0', '1', '2', '3').pipe(
delayEach(200),
tap((x) => innerValues.push(x))
);
const xs = of(ys, ys, ys).pipe(delayEach(500));
const source = xs.pipe(switchAll());
const expected = [
'0',
'1', // xs=0
'0',
'1', // xs=1
'0',
'1',
'2',
'3', // xs=2
];

expect(await toArray(source)).toEqual(expected);

expect(innerValues).toEqual(expected);
});

test(`supports projecting to Arrays`, async () => {
const xs = of([0, 1, 2], [0, 1, 2], [0, 1, 2]).pipe(delayEach(100));
const source = xs.pipe(switchAll());
expect(await toArray(source)).toEqual([0, 1, 2, 0, 1, 2, 0, 1, 2]);
});

test(`supports projecting to Promise<Array>`, async () => {
const xs = of(
Promise.resolve([0, 1, 2]),
Promise.resolve([0, 1, 2]),
Promise.resolve([0, 1, 2])
).pipe(delayEach(100));
const source = xs.pipe(switchAll());
expect(await toArray(source)).toEqual([0, 1, 2, 0, 1, 2, 0, 1, 2]);
});
});
1 change: 1 addition & 0 deletions src/Ix.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import './add/asynciterable-operators/orderby';
import './add/asynciterable-operators/pairwise';
import './add/asynciterable-operators/pluck';
import './add/asynciterable-operators/publish';
import './add/asynciterable-operators/switchall';
import './add/asynciterable-operators/switchmap';
import './add/asynciterable-operators/reduceright';
import './add/asynciterable-operators/reduce';
Expand Down
17 changes: 17 additions & 0 deletions src/add/asynciterable-operators/switchall.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { switchAll } from '../../asynciterable/operators/switchall';

/**
* @ignore
*/
export function switchAllProto<T>(this: AsyncIterableX<AsyncIterable<T>>) {
return switchAll()(this);
}

AsyncIterableX.prototype.switchAll = switchAllProto;

declare module '../../asynciterable/asynciterablex' {
interface AsyncIterableX<T> {
switchAll: typeof switchAllProto;
}
}
2 changes: 1 addition & 1 deletion src/asynciterable/operators/_flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class FlattenConcurrentAsyncIterable<TSource, TResult> extends AsyncItera
private _thisArg?: any
) {
super();
this._concurrent = this._switchMode ? 1 : Math.max(_concurrent, 0);
this._concurrent = this._switchMode ? 1 : Math.max(_concurrent, 1);
}
async *[Symbol.asyncIterator](outerSignal?: AbortSignal) {
throwIfAborted(outerSignal);
Expand Down
Loading