Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zip n sources #73

Merged
merged 2 commits into from
Oct 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions spec/asynciterable-operators/memoize-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async function* rand() {

test('AsyncIterable#memoize should share effects of random', async t => {
const rnd = memoize(take(rand(), 100));
t.true(await every(zip(rnd, rnd, async (l, r) => l === r), async x => x));
t.true(await every(zip(async ([l, r]) => l === r, rnd, rnd), async x => x));
t.end();
});

Expand All @@ -189,7 +189,7 @@ test('AsyncIterable#memoize with selector', async t => {
memoize(
tap(range(0, 4), { next: async () => { n++; } }),
undefined,
xs => take(zip(xs, xs, async (l, r) => l + r), 4)
xs => take(zip(async ([l, r]) => l + r, xs, xs), 4)
)
);

Expand All @@ -204,7 +204,7 @@ test('AsyncIterable#memoize limited with selector', async t => {
memoize(
tap(range(0, 4), { next: async () => { n++; } }),
2,
xs => take(zip(xs, xs, async (l, r) => l + r), 4)
xs => take(zip(async ([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/publish-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ test('AsyncIterable#publish with selector', async t => {
const res = await toArray(
publish(
tap(range(0, 10), { next: async () => { n++; } }),
xs => take(zip(xs, xs, async (l, r) => l + r), 4)
xs => take(zip(async ([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/share-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ test('AsyncIterable#share with selector', async t => {
const res = await toArray(
share(
tap(range(0, 10), { next: async () => { n++;} }),
xs => take(zip(xs, xs, (l, r) => l + r), 4)
xs => take(zip(([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
26 changes: 20 additions & 6 deletions spec/asynciterable-operators/zip-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';
test('AsyncIterable#zip equal length', async t => {
const xs = of(1, 2, 3);
const ys = of(4, 5, 6);
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.asyncIterator]();
await hasNext(t, it, 1 * 4);
Expand All @@ -21,7 +21,7 @@ test('AsyncIterable#zip equal length', async t => {
test('AsyncIterable#zip left longer', async t => {
const xs = of(1, 2, 3, 4);
const ys = of(4, 5, 6);
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.asyncIterator]();
await hasNext(t, it, 1 * 4);
Expand All @@ -34,7 +34,7 @@ test('AsyncIterable#zip left longer', async t => {
test('AsyncIterable#zip right longer', async t => {
const xs = of(1, 2, 3);
const ys = of(4, 5, 6, 7);
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.asyncIterator]();
await hasNext(t, it, 1 * 4);
Expand All @@ -44,11 +44,25 @@ test('AsyncIterable#zip right longer', async t => {
t.end();
});

test('AsyncIterable#zip multiple sources', async t => {
const xs = of(1, 2, 3);
const ys = of(4, 5, 6, 7);
const zs = of(8, 9, 10);
const res = zip(([x, y, z]) => x * y * z, xs, ys, zs);

const it = res[Symbol.asyncIterator]();
await hasNext(t, it, 1 * 4 * 8);
await hasNext(t, it, 2 * 5 * 9);
await hasNext(t, it, 3 * 6 * 10);
await noNext(t, it);
t.end();
});

test('AsyncIterable#zip left throws', async t => {
const err = new Error();
const xs = _throw<number>(err);
const ys = of(4, 5, 6);
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.asyncIterator]();
try {
Expand All @@ -63,7 +77,7 @@ test('AsyncIterable#zip right throws', async t => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = _throw<number>(err);
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.asyncIterator]();
try {
Expand All @@ -78,7 +92,7 @@ test('AsyncIterable#zip selector throws', async t => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = of(4, 5, 6);
const res = zip(xs, ys, (x, y) => { if (x > 0) { throw err; } return x * y; });
const res = zip(([x, y]) => { if (x > 0) { throw err; } return x * y; }, xs, ys);

const it = res[Symbol.asyncIterator]();
try {
Expand Down
6 changes: 3 additions & 3 deletions spec/iterable-operators/memoize-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ function* rand() {

test('Iterable#memoize should share effects of random', t => {
const rnd = memoize(take(rand(), 100));
t.true(every(zip(rnd, rnd, (l, r) => l === r), x => x));
t.true(every(zip(([l, r]) => l === r, rnd, rnd), x => x));
t.end();
});

Expand All @@ -176,7 +176,7 @@ test('Iterable#memoize with selector', t => {
memoize(
tap(range(0, 4), { next: () => n++ }),
undefined,
xs => take(zip(xs, xs, (l, r) => l + r), 4)
xs => take(zip(([l, r]) => l + r, xs, xs), 4)
)
);

Expand All @@ -191,7 +191,7 @@ test('Iterable#memoize limited with selector', t => {
memoize(
tap(range(0, 4), { next: () => n++ }),
2,
xs => take(zip(xs, xs, (l, r) => l + r), 4)
xs => take(zip(([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
2 changes: 1 addition & 1 deletion spec/iterable-operators/publish-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ test('Iterable#publish with selector', t => {
const res = toArray(
publish(
tap(range(0, 10), { next: () => n++ }),
xs => take(zip(xs, xs, (l, r) => l + r), 4)
xs => take(zip(([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
2 changes: 1 addition & 1 deletion spec/iterable-operators/share-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ test('Iterable#share with selector', t => {
const res = toArray(
share(
tap(range(0, 10), { next: () => n++ }),
xs => take(zip(xs, xs, (l, r) => l + r), 4)
xs => take(zip(([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
26 changes: 20 additions & 6 deletions spec/iterable-operators/zip-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { hasNext, noNext } from '../iterablehelpers';
test('Iterable#zip equal length', t => {
const xs = [1, 2, 3];
const ys = [4, 5, 6];
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.iterator]();
hasNext(t, it, 1 * 4);
Expand All @@ -20,7 +20,7 @@ test('Iterable#zip equal length', t => {
test('Iterable#zip left longer', t => {
const xs = [1, 2, 3, 4];
const ys = [4, 5, 6];
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.iterator]();
hasNext(t, it, 1 * 4);
Expand All @@ -33,7 +33,7 @@ test('Iterable#zip left longer', t => {
test('Iterable#zip right longer', t => {
const xs = [1, 2, 3];
const ys = [4, 5, 6, 7];
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.iterator]();
hasNext(t, it, 1 * 4);
Expand All @@ -43,10 +43,24 @@ test('Iterable#zip right longer', t => {
t.end();
});

test('Iterable#zip multiple sources', t => {
const xs = [1, 2, 3];
const ys = [4, 5, 6, 7];
const zs = [8, 9, 10];
const res = zip(([x, y, z]) => x * y * z, xs, ys, zs);

const it = res[Symbol.iterator]();
hasNext(t, it, 1 * 4 * 8);
hasNext(t, it, 2 * 5 * 9);
hasNext(t, it, 3 * 6 * 10);
noNext(t, it);
t.end();
});

test('Iterable#zip left throws', t => {
const xs = _throw<number>(new Error());
const ys = [4, 5, 6];
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.iterator]();
t.throws(() => it.next());
Expand All @@ -56,7 +70,7 @@ test('Iterable#zip left throws', t => {
test('Iterable#zip right throws', t => {
const xs = [1, 2, 3];
const ys = _throw<number>(new Error());
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.iterator]();
t.throws(() => it.next());
Expand All @@ -66,7 +80,7 @@ test('Iterable#zip right throws', t => {
test('Iterable#zip selector throws', t => {
const xs = [1, 2, 3];
const ys = [4, 5, 6];
const res = zip(xs, ys, (x, y) => { if (x > 0) { throw new Error(); } return x * y; });
const res = zip(([x, y]) => { if (x > 0) { throw new Error(); } return x * y; }, xs, ys);

const it = res[Symbol.iterator]();
t.throws(() => it.next());
Expand Down
27 changes: 22 additions & 5 deletions src/add/asynciterable-operators/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,28 @@ import { zip } from '../../asynciterable/zip';
/**
* @ignore
*/
export function zipProto<T, TResult>(
this: AsyncIterableX<T>,
second: AsyncIterable<T>,
selector: (fst: T, snd: T) => TResult | Promise<TResult>): AsyncIterableX<TResult> {
return zip(this, second, selector);
/* tslint:disable:max-line-length */
export function zipProto<T, T2>(this: AsyncIterableX<T>, source2: AsyncIterable<T2>): AsyncIterableX<[T, T2]>;
export function zipProto<T, T2, T3>(this: AsyncIterableX<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>): AsyncIterableX<[T, T2, T3]>;
export function zipProto<T, T2, T3, T4>(this: AsyncIterableX<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>): AsyncIterableX<[T, T2, T3, T4]>;
export function zipProto<T, T2, T3, T4, T5>(this: AsyncIterableX<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>): AsyncIterableX<[T, T2, T3, T4, T5]>;
export function zipProto<T, T2, T3, T4, T5, T6>(this: AsyncIterableX<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>, source6: AsyncIterable<T6>): AsyncIterableX<[T, T2, T3, T4, T5, T6]>;

export function zipProto<T, R>(this: AsyncIterableX<T>, project: (values: [T]) => R): AsyncIterableX<R>;
export function zipProto<T, T2, R>(this: AsyncIterableX<T>, project: (values: [T, T2]) => R, source2: AsyncIterable<T2>): AsyncIterableX<R>;
export function zipProto<T, T2, T3, R>(this: AsyncIterableX<T>, project: (values: [T, T2, T3]) => R, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>): AsyncIterableX<R>;
export function zipProto<T, T2, T3, T4, R>(this: AsyncIterableX<T>, project: (values: [T, T2, T3, T4]) => R, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>): AsyncIterableX<R>;
export function zipProto<T, T2, T3, T4, T5, R>(this: AsyncIterableX<T>, project: (values: [T, T2,T3, T4, T5]) => R, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>): AsyncIterableX<R>;
export function zipProto<T, T2, T3, T4, T5, T6, R>(this: AsyncIterableX<T>, project: (values: [T, T2, T3, T4, T5, T6]) => R, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>, source6: AsyncIterable<T6>): AsyncIterableX<R>;

export function zipProto<T>(this: AsyncIterableX<T>, ...sources: AsyncIterable<T>[]): AsyncIterableX<T[]>;
export function zipProto<T, R>(this: AsyncIterableX<T>, project: (values: T[]) => R, ...sources: AsyncIterable<T>[]): AsyncIterableX<R>;
/* tslint:enable:max-line-length */
export function zipProto<T, R>(this: AsyncIterableX<T>, ...args: any[]): AsyncIterableX<R> {
let [arg1, ...sources] = args;
sources = (typeof arg1 === 'function') ?
[this, ...sources] : (arg1 = this) && args;
return zip<T, R>(arg1, ...sources);
}

AsyncIterableX.prototype.zip = zipProto;
Expand Down
27 changes: 22 additions & 5 deletions src/add/iterable-operators/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,28 @@ import { zip } from '../../iterable/zip';
/**
* @ignore
*/
export function zipProto<T, TResult>(
this: IterableX<T>,
second: Iterable<T>,
fn: (fst: T, snd: T) => TResult): IterableX<TResult> {
return zip(this, second, fn);
/* tslint:disable:max-line-length */
export function zipProto<T, T2>(this: IterableX<T>, source2: Iterable<T2>): IterableX<[T, T2]>;
export function zipProto<T, T2, T3>(this: IterableX<T>, source2: Iterable<T2>, source3: Iterable<T3>): IterableX<[T, T2, T3]>;
export function zipProto<T, T2, T3, T4>(this: IterableX<T>, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>): IterableX<[T, T2, T3, T4]>;
export function zipProto<T, T2, T3, T4, T5>(this: IterableX<T>, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>, source5: Iterable<T5>): IterableX<[T, T2, T3, T4, T5]>;
export function zipProto<T, T2, T3, T4, T5, T6>(this: IterableX<T>, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>, source5: Iterable<T5>, source6: Iterable<T6>): IterableX<[T, T2, T3, T4, T5, T6]>;

export function zipProto<T, R>(this: IterableX<T>, project: (values: [T]) => R): IterableX<R>;
export function zipProto<T, T2, R>(this: IterableX<T>, project: (values: [T, T2]) => R, source2: Iterable<T2>): IterableX<R>;
export function zipProto<T, T2, T3, R>(this: IterableX<T>, project: (values: [T, T2, T3]) => R, source2: Iterable<T2>, source3: Iterable<T3>): IterableX<R>;
export function zipProto<T, T2, T3, T4, R>(this: IterableX<T>, project: (values: [T, T2, T3, T4]) => R, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>): IterableX<R>;
export function zipProto<T, T2, T3, T4, T5, R>(this: IterableX<T>, project: (values: [T, T2,T3, T4, T5]) => R, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>, source5: Iterable<T5>): IterableX<R>;
export function zipProto<T, T2, T3, T4, T5, T6, R>(this: IterableX<T>, project: (values: [T, T2, T3, T4, T5, T6]) => R, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>, source5: Iterable<T5>, source6: Iterable<T6>): IterableX<R>;

export function zipProto<T>(this: IterableX<T>, ...sources: Iterable<T>[]): IterableX<T[]>;
export function zipProto<T, R>(this: IterableX<T>, project: (values: T[]) => R, ...sources: Iterable<T>[]): IterableX<R>;
/* tslint:enable:max-line-length */
export function zipProto<T, R>(this: IterableX<T>, ...args: any[]): IterableX<R> {
let [arg1, ...sources] = args;
sources = (typeof arg1 === 'function') ?
[this, ...sources] : (arg1 = this) && args;
return zip<T, R>(arg1, ...sources) as IterableX<R>;
}

IterableX.prototype.zip = zipProto;
Expand Down
70 changes: 45 additions & 25 deletions src/asynciterable/zip.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,58 @@
import { AsyncIterableX } from '../asynciterable';
import { identityAsync } from '../internal/identity';
import { returnAsyncIterator } from '../internal/returniterator';

class ZipIterable<TSource, TResult> extends AsyncIterableX<TResult> {
private _left: AsyncIterable<TSource>;
private _right: AsyncIterable<TSource>;
private _fn: (left: TSource, right: TSource) => TResult | Promise<TResult>;
class ZipAsyncIterable<TSource, TResult> extends AsyncIterableX<TResult> {
private _sources: AsyncIterable<TSource>[];
private _fn: (values: any[]) => TResult | Promise<TResult>;

constructor(
left: AsyncIterable<TSource>,
right: AsyncIterable<TSource>,
fn: (left: TSource, right: TSource) => TResult | Promise<TResult>) {
constructor(sources: AsyncIterable<TSource>[], fn: (values: any[]) => TResult | Promise<TResult>) {
super();
this._left = left;
this._right = right;
this._sources = sources;
this._fn = fn;
}

async *[Symbol.asyncIterator]() {
const it1 = this._left[Symbol.asyncIterator]();
const it2 = this._right[Symbol.asyncIterator]();
while (1) {
const xs = await Promise.all([it1.next(), it2.next()]);
const [next1, next2] = xs;
if (!next1.done && !next2.done) {
yield await this._fn(next1.value, next2.value);
} else {
break;
const fn = this._fn;
const sourcesLength = this._sources.length;
const its = this._sources.map((x) => x[Symbol.asyncIterator]());
do {
const values = new Array(sourcesLength);
for (let i = -1; ++i < sourcesLength;) {
const result = await its[i].next();
if (result.done) {
await Promise.all(its.map(returnAsyncIterator));
return;
}
values[i] = result.value;
}
}
yield await fn(values);
} while (1);
}
}

export function zip<TSource, TResult>(
left: AsyncIterable<TSource>,
right: AsyncIterable<TSource>,
fn: (left: TSource, right: TSource) => TResult | Promise<TResult>): AsyncIterableX<TResult> {
return new ZipIterable<TSource, TResult>(left, right, fn);
/* tslint:disable:max-line-length */
export function zip<T, T2>(source: AsyncIterable<T>, source2: AsyncIterable<T2>): AsyncIterableX<[T, T2]>;
export function zip<T, T2, T3>(source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>): AsyncIterableX<[T, T2, T3]>;
export function zip<T, T2, T3, T4>(source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>): AsyncIterableX<[T, T2, T3, T4]>;
export function zip<T, T2, T3, T4, T5>(source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>): AsyncIterableX<[T, T2, T3, T4, T5]>;
export function zip<T, T2, T3, T4, T5, T6>(source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>, source6: AsyncIterable<T6>): AsyncIterableX<[T, T2, T3, T4, T5, T6]>;

export function zip<T, R>(project: (values: [T]) => R | Promise<R>, source: AsyncIterable<T>): AsyncIterableX<R>;
export function zip<T, T2, R>(project: (values: [T, T2]) => R | Promise<R>, source: AsyncIterable<T>, source2: AsyncIterable<T2>): AsyncIterableX<R>;
export function zip<T, T2, T3, R>(project: (values: [T, T2, T3]) => R | Promise<R>, source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>): AsyncIterableX<R>;
export function zip<T, T2, T3, T4, R>(project: (values: [T, T2, T3, T4]) => R | Promise<R>, source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>): AsyncIterableX<R>;
export function zip<T, T2, T3, T4, T5, R>(project: (values: [T, T2,T3, T4, T5]) => R | Promise<R>, source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>): AsyncIterableX<R>;
export function zip<T, T2, T3, T4, T5, T6, R>(project: (values: [T, T2, T3, T4, T5, T6]) => R | Promise<R>, source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>, source6: AsyncIterable<T6>): AsyncIterableX<R>;

export function zip<T>(...sources: AsyncIterable<T>[]): AsyncIterableX<T[]>;
export function zip<T, R>(project: (values: T[]) => R | Promise<R>, ...sources: AsyncIterable<T>[]): AsyncIterableX<R>;
/* tslint:enable:max-line-length */
export function zip<T, R>(...sources: any[]): AsyncIterableX<R> {
let fn = sources.shift() as (values: any[]) => R | Promise<R>;
if (typeof fn !== 'function') {
sources.push(fn);
fn = identityAsync;
}
return new ZipAsyncIterable<T, R>(sources as AsyncIterable<T>[], fn);
}
Loading