Skip to content

Commit

Permalink
Zip n sources (#73)
Browse files Browse the repository at this point in the history
* feat(zip): make zip work on a variable number of source Iterables/AsyncIterables

breaking change: zip selectors now take a single "values" Array argument, instead of varargs

* test(zip): update zip tests for variable sources
  • Loading branch information
trxcllnt authored and mattpodwysocki committed Oct 15, 2017
1 parent 4098a8c commit 0c4d513
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 74 deletions.
6 changes: 3 additions & 3 deletions spec/asynciterable-operators/memoize-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async function* rand() {

test('AsyncIterable#memoize should share effects of random', async t => {
const rnd = memoize(take(rand(), 100));
t.true(await every(zip(rnd, rnd, async (l, r) => l === r), async x => x));
t.true(await every(zip(async ([l, r]) => l === r, rnd, rnd), async x => x));
t.end();
});

Expand All @@ -189,7 +189,7 @@ test('AsyncIterable#memoize with selector', async t => {
memoize(
tap(range(0, 4), { next: async () => { n++; } }),
undefined,
xs => take(zip(xs, xs, async (l, r) => l + r), 4)
xs => take(zip(async ([l, r]) => l + r, xs, xs), 4)
)
);

Expand All @@ -204,7 +204,7 @@ test('AsyncIterable#memoize limited with selector', async t => {
memoize(
tap(range(0, 4), { next: async () => { n++; } }),
2,
xs => take(zip(xs, xs, async (l, r) => l + r), 4)
xs => take(zip(async ([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/publish-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ test('AsyncIterable#publish with selector', async t => {
const res = await toArray(
publish(
tap(range(0, 10), { next: async () => { n++; } }),
xs => take(zip(xs, xs, async (l, r) => l + r), 4)
xs => take(zip(async ([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/share-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ test('AsyncIterable#share with selector', async t => {
const res = await toArray(
share(
tap(range(0, 10), { next: async () => { n++;} }),
xs => take(zip(xs, xs, (l, r) => l + r), 4)
xs => take(zip(([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
26 changes: 20 additions & 6 deletions spec/asynciterable-operators/zip-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';
test('AsyncIterable#zip equal length', async t => {
const xs = of(1, 2, 3);
const ys = of(4, 5, 6);
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.asyncIterator]();
await hasNext(t, it, 1 * 4);
Expand All @@ -21,7 +21,7 @@ test('AsyncIterable#zip equal length', async t => {
test('AsyncIterable#zip left longer', async t => {
const xs = of(1, 2, 3, 4);
const ys = of(4, 5, 6);
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.asyncIterator]();
await hasNext(t, it, 1 * 4);
Expand All @@ -34,7 +34,7 @@ test('AsyncIterable#zip left longer', async t => {
test('AsyncIterable#zip right longer', async t => {
const xs = of(1, 2, 3);
const ys = of(4, 5, 6, 7);
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.asyncIterator]();
await hasNext(t, it, 1 * 4);
Expand All @@ -44,11 +44,25 @@ test('AsyncIterable#zip right longer', async t => {
t.end();
});

test('AsyncIterable#zip multiple sources', async t => {
const xs = of(1, 2, 3);
const ys = of(4, 5, 6, 7);
const zs = of(8, 9, 10);
const res = zip(([x, y, z]) => x * y * z, xs, ys, zs);

const it = res[Symbol.asyncIterator]();
await hasNext(t, it, 1 * 4 * 8);
await hasNext(t, it, 2 * 5 * 9);
await hasNext(t, it, 3 * 6 * 10);
await noNext(t, it);
t.end();
});

test('AsyncIterable#zip left throws', async t => {
const err = new Error();
const xs = _throw<number>(err);
const ys = of(4, 5, 6);
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.asyncIterator]();
try {
Expand All @@ -63,7 +77,7 @@ test('AsyncIterable#zip right throws', async t => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = _throw<number>(err);
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.asyncIterator]();
try {
Expand All @@ -78,7 +92,7 @@ test('AsyncIterable#zip selector throws', async t => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = of(4, 5, 6);
const res = zip(xs, ys, (x, y) => { if (x > 0) { throw err; } return x * y; });
const res = zip(([x, y]) => { if (x > 0) { throw err; } return x * y; }, xs, ys);

const it = res[Symbol.asyncIterator]();
try {
Expand Down
6 changes: 3 additions & 3 deletions spec/iterable-operators/memoize-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ function* rand() {

test('Iterable#memoize should share effects of random', t => {
const rnd = memoize(take(rand(), 100));
t.true(every(zip(rnd, rnd, (l, r) => l === r), x => x));
t.true(every(zip(([l, r]) => l === r, rnd, rnd), x => x));
t.end();
});

Expand All @@ -176,7 +176,7 @@ test('Iterable#memoize with selector', t => {
memoize(
tap(range(0, 4), { next: () => n++ }),
undefined,
xs => take(zip(xs, xs, (l, r) => l + r), 4)
xs => take(zip(([l, r]) => l + r, xs, xs), 4)
)
);

Expand All @@ -191,7 +191,7 @@ test('Iterable#memoize limited with selector', t => {
memoize(
tap(range(0, 4), { next: () => n++ }),
2,
xs => take(zip(xs, xs, (l, r) => l + r), 4)
xs => take(zip(([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
2 changes: 1 addition & 1 deletion spec/iterable-operators/publish-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ test('Iterable#publish with selector', t => {
const res = toArray(
publish(
tap(range(0, 10), { next: () => n++ }),
xs => take(zip(xs, xs, (l, r) => l + r), 4)
xs => take(zip(([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
2 changes: 1 addition & 1 deletion spec/iterable-operators/share-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ test('Iterable#share with selector', t => {
const res = toArray(
share(
tap(range(0, 10), { next: () => n++ }),
xs => take(zip(xs, xs, (l, r) => l + r), 4)
xs => take(zip(([l, r]) => l + r, xs, xs), 4)
)
);

Expand Down
26 changes: 20 additions & 6 deletions spec/iterable-operators/zip-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { hasNext, noNext } from '../iterablehelpers';
test('Iterable#zip equal length', t => {
const xs = [1, 2, 3];
const ys = [4, 5, 6];
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.iterator]();
hasNext(t, it, 1 * 4);
Expand All @@ -20,7 +20,7 @@ test('Iterable#zip equal length', t => {
test('Iterable#zip left longer', t => {
const xs = [1, 2, 3, 4];
const ys = [4, 5, 6];
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.iterator]();
hasNext(t, it, 1 * 4);
Expand All @@ -33,7 +33,7 @@ test('Iterable#zip left longer', t => {
test('Iterable#zip right longer', t => {
const xs = [1, 2, 3];
const ys = [4, 5, 6, 7];
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.iterator]();
hasNext(t, it, 1 * 4);
Expand All @@ -43,10 +43,24 @@ test('Iterable#zip right longer', t => {
t.end();
});

test('Iterable#zip multiple sources', t => {
const xs = [1, 2, 3];
const ys = [4, 5, 6, 7];
const zs = [8, 9, 10];
const res = zip(([x, y, z]) => x * y * z, xs, ys, zs);

const it = res[Symbol.iterator]();
hasNext(t, it, 1 * 4 * 8);
hasNext(t, it, 2 * 5 * 9);
hasNext(t, it, 3 * 6 * 10);
noNext(t, it);
t.end();
});

test('Iterable#zip left throws', t => {
const xs = _throw<number>(new Error());
const ys = [4, 5, 6];
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.iterator]();
t.throws(() => it.next());
Expand All @@ -56,7 +70,7 @@ test('Iterable#zip left throws', t => {
test('Iterable#zip right throws', t => {
const xs = [1, 2, 3];
const ys = _throw<number>(new Error());
const res = zip(xs, ys, (x, y) => x * y);
const res = zip(([x, y]) => x * y, xs, ys);

const it = res[Symbol.iterator]();
t.throws(() => it.next());
Expand All @@ -66,7 +80,7 @@ test('Iterable#zip right throws', t => {
test('Iterable#zip selector throws', t => {
const xs = [1, 2, 3];
const ys = [4, 5, 6];
const res = zip(xs, ys, (x, y) => { if (x > 0) { throw new Error(); } return x * y; });
const res = zip(([x, y]) => { if (x > 0) { throw new Error(); } return x * y; }, xs, ys);

const it = res[Symbol.iterator]();
t.throws(() => it.next());
Expand Down
27 changes: 22 additions & 5 deletions src/add/asynciterable-operators/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,28 @@ import { zip } from '../../asynciterable/zip';
/**
* @ignore
*/
export function zipProto<T, TResult>(
this: AsyncIterableX<T>,
second: AsyncIterable<T>,
selector: (fst: T, snd: T) => TResult | Promise<TResult>): AsyncIterableX<TResult> {
return zip(this, second, selector);
/* tslint:disable:max-line-length */
export function zipProto<T, T2>(this: AsyncIterableX<T>, source2: AsyncIterable<T2>): AsyncIterableX<[T, T2]>;
export function zipProto<T, T2, T3>(this: AsyncIterableX<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>): AsyncIterableX<[T, T2, T3]>;
export function zipProto<T, T2, T3, T4>(this: AsyncIterableX<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>): AsyncIterableX<[T, T2, T3, T4]>;
export function zipProto<T, T2, T3, T4, T5>(this: AsyncIterableX<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>): AsyncIterableX<[T, T2, T3, T4, T5]>;
export function zipProto<T, T2, T3, T4, T5, T6>(this: AsyncIterableX<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>, source6: AsyncIterable<T6>): AsyncIterableX<[T, T2, T3, T4, T5, T6]>;

export function zipProto<T, R>(this: AsyncIterableX<T>, project: (values: [T]) => R): AsyncIterableX<R>;
export function zipProto<T, T2, R>(this: AsyncIterableX<T>, project: (values: [T, T2]) => R, source2: AsyncIterable<T2>): AsyncIterableX<R>;
export function zipProto<T, T2, T3, R>(this: AsyncIterableX<T>, project: (values: [T, T2, T3]) => R, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>): AsyncIterableX<R>;
export function zipProto<T, T2, T3, T4, R>(this: AsyncIterableX<T>, project: (values: [T, T2, T3, T4]) => R, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>): AsyncIterableX<R>;
export function zipProto<T, T2, T3, T4, T5, R>(this: AsyncIterableX<T>, project: (values: [T, T2,T3, T4, T5]) => R, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>): AsyncIterableX<R>;
export function zipProto<T, T2, T3, T4, T5, T6, R>(this: AsyncIterableX<T>, project: (values: [T, T2, T3, T4, T5, T6]) => R, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>, source6: AsyncIterable<T6>): AsyncIterableX<R>;

export function zipProto<T>(this: AsyncIterableX<T>, ...sources: AsyncIterable<T>[]): AsyncIterableX<T[]>;
export function zipProto<T, R>(this: AsyncIterableX<T>, project: (values: T[]) => R, ...sources: AsyncIterable<T>[]): AsyncIterableX<R>;
/* tslint:enable:max-line-length */
export function zipProto<T, R>(this: AsyncIterableX<T>, ...args: any[]): AsyncIterableX<R> {
let [arg1, ...sources] = args;
sources = (typeof arg1 === 'function') ?
[this, ...sources] : (arg1 = this) && args;
return zip<T, R>(arg1, ...sources);
}

AsyncIterableX.prototype.zip = zipProto;
Expand Down
27 changes: 22 additions & 5 deletions src/add/iterable-operators/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,28 @@ import { zip } from '../../iterable/zip';
/**
* @ignore
*/
export function zipProto<T, TResult>(
this: IterableX<T>,
second: Iterable<T>,
fn: (fst: T, snd: T) => TResult): IterableX<TResult> {
return zip(this, second, fn);
/* tslint:disable:max-line-length */
export function zipProto<T, T2>(this: IterableX<T>, source2: Iterable<T2>): IterableX<[T, T2]>;
export function zipProto<T, T2, T3>(this: IterableX<T>, source2: Iterable<T2>, source3: Iterable<T3>): IterableX<[T, T2, T3]>;
export function zipProto<T, T2, T3, T4>(this: IterableX<T>, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>): IterableX<[T, T2, T3, T4]>;
export function zipProto<T, T2, T3, T4, T5>(this: IterableX<T>, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>, source5: Iterable<T5>): IterableX<[T, T2, T3, T4, T5]>;
export function zipProto<T, T2, T3, T4, T5, T6>(this: IterableX<T>, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>, source5: Iterable<T5>, source6: Iterable<T6>): IterableX<[T, T2, T3, T4, T5, T6]>;

export function zipProto<T, R>(this: IterableX<T>, project: (values: [T]) => R): IterableX<R>;
export function zipProto<T, T2, R>(this: IterableX<T>, project: (values: [T, T2]) => R, source2: Iterable<T2>): IterableX<R>;
export function zipProto<T, T2, T3, R>(this: IterableX<T>, project: (values: [T, T2, T3]) => R, source2: Iterable<T2>, source3: Iterable<T3>): IterableX<R>;
export function zipProto<T, T2, T3, T4, R>(this: IterableX<T>, project: (values: [T, T2, T3, T4]) => R, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>): IterableX<R>;
export function zipProto<T, T2, T3, T4, T5, R>(this: IterableX<T>, project: (values: [T, T2,T3, T4, T5]) => R, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>, source5: Iterable<T5>): IterableX<R>;
export function zipProto<T, T2, T3, T4, T5, T6, R>(this: IterableX<T>, project: (values: [T, T2, T3, T4, T5, T6]) => R, source2: Iterable<T2>, source3: Iterable<T3>, source4: Iterable<T4>, source5: Iterable<T5>, source6: Iterable<T6>): IterableX<R>;

export function zipProto<T>(this: IterableX<T>, ...sources: Iterable<T>[]): IterableX<T[]>;
export function zipProto<T, R>(this: IterableX<T>, project: (values: T[]) => R, ...sources: Iterable<T>[]): IterableX<R>;
/* tslint:enable:max-line-length */
export function zipProto<T, R>(this: IterableX<T>, ...args: any[]): IterableX<R> {
let [arg1, ...sources] = args;
sources = (typeof arg1 === 'function') ?
[this, ...sources] : (arg1 = this) && args;
return zip<T, R>(arg1, ...sources) as IterableX<R>;
}

IterableX.prototype.zip = zipProto;
Expand Down
70 changes: 45 additions & 25 deletions src/asynciterable/zip.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,58 @@
import { AsyncIterableX } from '../asynciterable';
import { identityAsync } from '../internal/identity';
import { returnAsyncIterator } from '../internal/returniterator';

class ZipIterable<TSource, TResult> extends AsyncIterableX<TResult> {
private _left: AsyncIterable<TSource>;
private _right: AsyncIterable<TSource>;
private _fn: (left: TSource, right: TSource) => TResult | Promise<TResult>;
class ZipAsyncIterable<TSource, TResult> extends AsyncIterableX<TResult> {
private _sources: AsyncIterable<TSource>[];
private _fn: (values: any[]) => TResult | Promise<TResult>;

constructor(
left: AsyncIterable<TSource>,
right: AsyncIterable<TSource>,
fn: (left: TSource, right: TSource) => TResult | Promise<TResult>) {
constructor(sources: AsyncIterable<TSource>[], fn: (values: any[]) => TResult | Promise<TResult>) {
super();
this._left = left;
this._right = right;
this._sources = sources;
this._fn = fn;
}

async *[Symbol.asyncIterator]() {
const it1 = this._left[Symbol.asyncIterator]();
const it2 = this._right[Symbol.asyncIterator]();
while (1) {
const xs = await Promise.all([it1.next(), it2.next()]);
const [next1, next2] = xs;
if (!next1.done && !next2.done) {
yield await this._fn(next1.value, next2.value);
} else {
break;
const fn = this._fn;
const sourcesLength = this._sources.length;
const its = this._sources.map((x) => x[Symbol.asyncIterator]());
do {
const values = new Array(sourcesLength);
for (let i = -1; ++i < sourcesLength;) {
const result = await its[i].next();
if (result.done) {
await Promise.all(its.map(returnAsyncIterator));
return;
}
values[i] = result.value;
}
}
yield await fn(values);
} while (1);
}
}

export function zip<TSource, TResult>(
left: AsyncIterable<TSource>,
right: AsyncIterable<TSource>,
fn: (left: TSource, right: TSource) => TResult | Promise<TResult>): AsyncIterableX<TResult> {
return new ZipIterable<TSource, TResult>(left, right, fn);
/* tslint:disable:max-line-length */
export function zip<T, T2>(source: AsyncIterable<T>, source2: AsyncIterable<T2>): AsyncIterableX<[T, T2]>;
export function zip<T, T2, T3>(source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>): AsyncIterableX<[T, T2, T3]>;
export function zip<T, T2, T3, T4>(source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>): AsyncIterableX<[T, T2, T3, T4]>;
export function zip<T, T2, T3, T4, T5>(source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>): AsyncIterableX<[T, T2, T3, T4, T5]>;
export function zip<T, T2, T3, T4, T5, T6>(source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>, source6: AsyncIterable<T6>): AsyncIterableX<[T, T2, T3, T4, T5, T6]>;

export function zip<T, R>(project: (values: [T]) => R | Promise<R>, source: AsyncIterable<T>): AsyncIterableX<R>;
export function zip<T, T2, R>(project: (values: [T, T2]) => R | Promise<R>, source: AsyncIterable<T>, source2: AsyncIterable<T2>): AsyncIterableX<R>;
export function zip<T, T2, T3, R>(project: (values: [T, T2, T3]) => R | Promise<R>, source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>): AsyncIterableX<R>;
export function zip<T, T2, T3, T4, R>(project: (values: [T, T2, T3, T4]) => R | Promise<R>, source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>): AsyncIterableX<R>;
export function zip<T, T2, T3, T4, T5, R>(project: (values: [T, T2,T3, T4, T5]) => R | Promise<R>, source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>): AsyncIterableX<R>;
export function zip<T, T2, T3, T4, T5, T6, R>(project: (values: [T, T2, T3, T4, T5, T6]) => R | Promise<R>, source: AsyncIterable<T>, source2: AsyncIterable<T2>, source3: AsyncIterable<T3>, source4: AsyncIterable<T4>, source5: AsyncIterable<T5>, source6: AsyncIterable<T6>): AsyncIterableX<R>;

export function zip<T>(...sources: AsyncIterable<T>[]): AsyncIterableX<T[]>;
export function zip<T, R>(project: (values: T[]) => R | Promise<R>, ...sources: AsyncIterable<T>[]): AsyncIterableX<R>;
/* tslint:enable:max-line-length */
export function zip<T, R>(...sources: any[]): AsyncIterableX<R> {
let fn = sources.shift() as (values: any[]) => R | Promise<R>;
if (typeof fn !== 'function') {
sources.push(fn);
fn = identityAsync;
}
return new ZipAsyncIterable<T, R>(sources as AsyncIterable<T>[], fn);
}
Loading

0 comments on commit 0c4d513

Please sign in to comment.