Skip to content

Commit

Permalink
Fix/symbol observable (#258)
Browse files Browse the repository at this point in the history
* update batch specs for jest

* add symbol-observable, fix observable typings/issue #245

* add symbol-observable to closure

* fix symbol-observable for real now

* polyfill Symbol.observable and add tests for RxJS compat

* cleanup, clarify Symbol.observable comment

* require symbol/observable from rx because jest
  • Loading branch information
trxcllnt authored Jan 3, 2019
1 parent 9c9bd7e commit 273e697
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 80 deletions.
6 changes: 5 additions & 1 deletion gulp/closure-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ const closureTask = ((cache) => memoizeTask(cache, async function closure(target
gulp.src([
/* external libs first */
`node_modules/tslib/package.json`,
`node_modules/tslib/tslib.es6.js`,
`node_modules/tslib/tslib.es6.js`,
`node_modules/rxjs/package.json`,
`node_modules/rxjs/util/root.js`,
`node_modules/rxjs/symbol/observable.js`,
`${src}/**/*.js` /* <-- then sources globs */
], { base: `./` }),
sourcemaps.init(),
Expand All @@ -86,6 +89,7 @@ const createClosureArgs = (entry_point, output, externs) => ({
// formatting: `PRETTY_PRINT`,
// debug: true,
compilation_level: `ADVANCED`,
process_common_js_modules: true,
allow_method_call_decomposing: true,
package_json_entry_names: `module,jsnext:main,main`,
assume_function_wrapper: true,
Expand Down
2 changes: 1 addition & 1 deletion gulp/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ const gulp = path.join(path.parse(require.resolve(`gulp`)).dir, `bin/gulp.js`);
function spawnGulpCommandInChildProcess(command, target, format) {
const args = [gulp, command, '-t', target, '-m', format];
const opts = {
stdio: [`ignore`, `ignore`, `inherit`],
stdio: [`ignore`, `inherit`, `inherit`],
env: { ...process.env, NODE_NO_WARNINGS: `1` }
};
return asyncDone(() => child_process.spawn(`node`, args, opts));
Expand Down
13 changes: 7 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@
"dependencies": {
"@types/node": "10.12.18",
"is-stream": "1.1.0",
"symbol-observable": "1.2.0",
"tslib": "^1.9.3"
},
"devDependencies": {
"@mattiasbuelens/web-streams-polyfill": "0.2.0",
"@mattiasbuelens/web-streams-polyfill": "0.2.1",
"@types/glob": "7.1.1",
"@types/jest": "23.3.10",
"@types/jest": "23.3.11",
"async-done": "1.3.1",
"benchmark": "2.1.4",
"command-line-args": "4.0.7",
Expand All @@ -78,25 +79,25 @@
"jest-codemods": "0.19.1",
"jest-environment-node-debug": "2.0.0",
"jest-silent-reporter": "0.1.1",
"lerna": "3.7.0",
"lerna": "3.8.4",
"lint-staged": "8.1.0",
"mkdirp": "0.5.1",
"npm-run-all": "4.1.5",
"pre-git": "3.17.1",
"prettier": "1.15.3",
"rimraf": "2.6.2",
"rimraf": "2.6.3",
"rxjs": "5.5.11",
"shx": "0.3.2",
"source-map-loader": "0.2.4",
"terser-webpack-plugin": "1.1.0",
"terser-webpack-plugin": "1.2.1",
"ts-jest": "23.10.5",
"ts-node": "7.0.1",
"tslint": "5.12.0",
"typedoc": "0.13.0",
"typescript": "3.2.2",
"validate-commit-msg": "2.14.0",
"web-stream-tools": "0.0.1",
"webpack": "4.27.1"
"webpack": "4.28.3"
},
"config": {
"commitizen": {
Expand Down
30 changes: 12 additions & 18 deletions spec/asynciterable-operators/batch-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const test = testOperator([Ix.asynciterable.batch]);

const delay = (ms = 0) => new Promise(resolve => setTimeout(resolve, ms));

test('AsyncIterable#batch basic', async (t, [batch]) => {
test('AsyncIterable#batch basic', async ([batch]) => {
const sink = new Ix.AsyncSink<number>();

const it = batch(sink)[Symbol.asyncIterator]();
Expand All @@ -13,29 +13,27 @@ test('AsyncIterable#batch basic', async (t, [batch]) => {
sink.write(2);

await delay();
t.deepEqual(await it.next(), { done: false, value: [1, 2] });
expect(await it.next()).toEqual({ done: false, value: [1, 2] });

setTimeout(() => sink.write(3), 50);

t.deepEqual(await it.next(), { done: false, value: [3] });
expect(await it.next()).toEqual({ done: false, value: [3] });

sink.write(4);
sink.write(5);
sink.end();

await delay();
t.deepEqual(await it.next(), {
expect(await it.next()).toEqual({
done: false,
value: [4, 5]
});
t.deepEqual(await it.next(), {
expect(await it.next()).toEqual({
done: true
});

t.end();
});

test('done while waiting', async (t, [batch]) => {
test('done while waiting', async ([batch]) => {
const sink = new Ix.AsyncSink<number>();

const it = batch(sink)[Symbol.asyncIterator]();
Expand All @@ -44,16 +42,14 @@ test('done while waiting', async (t, [batch]) => {
sink.write(2);

await delay();
t.deepEqual(await it.next(), { done: false, value: [1, 2] });
expect(await it.next()).toEqual({ done: false, value: [1, 2] });

setTimeout(() => sink.end(), 50);

t.deepEqual(await it.next(), { done: true });

t.end();
expect(await it.next()).toEqual({ done: true });
});

test('canceled', async (t, [batch]) => {
test('canceled', async ([batch]) => {
let canceled = false;

async function* generate() {
Expand All @@ -70,10 +66,8 @@ test('canceled', async (t, [batch]) => {
const it = batch(generate())[Symbol.asyncIterator]();

await delay(150);
t.deepEqual(await it.next(), { done: false, value: [0] });

t.deepEqual(await it.return!(), { done: true });
t.true(canceled);
expect(await it.next()).toEqual({ done: false, value: [0] });

t.end();
expect(await it.return!()).toEqual({ done: true });
expect(canceled).toBe(true);
});
105 changes: 105 additions & 0 deletions spec/asynciterable-operators/toobservable-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import * as Ix from '../Ix';
import { Observable as RxJSObservable } from 'rxjs';
import { Observable, PartialObserver } from '../../src/observer';
import { testOperator } from '../asynciterablehelpers';
const test = testOperator([Ix.asynciterable.toObservable]);
const { empty } = Ix.asynciterable;
Expand Down Expand Up @@ -62,3 +64,106 @@ test('AsyncIterable#toObservable error', async ([toObservable]) => {
}
});
});

test('AsyncIterable#toObservable Symbol.observable should return same instance', async ([
toObservable
]) => {
const ys = toObservable(of(1, 2, 3));
expect(ys).toBe(ys[Symbol.observable]());
});

test('AsyncIterable#toObservable accepts partial observers', async ([toObservable]) => {
const expectedValues = [1, 2, 3];
const expectedError = new Error();

let actualValues: number[] = [];
let actualError: any = null;
let completeCalled = false;

const valueObs = toObservable(of(1, 2, 3));
const errorObs = toObservable(_throw<number>(expectedError));

const onNext = (val: number) => actualValues.push(val);
const onError = (error: any) => (actualError = error);
const onComplete = () => (completeCalled = true);

await endOfObservable(valueObs, { next: onNext });
try {
await endOfObservable(errorObs, { error: onError });
} catch (e) {
/**/
}
await endOfObservable(valueObs, { complete: onComplete });

expect(actualValues).toEqual(expectedValues);
expect(actualError).toEqual(expectedError);
expect(completeCalled).toEqual(true);
});

test('AsyncIterable#toObservable accepts observer functions', async ([toObservable]) => {
const expectedValues = [1, 2, 3];
const expectedError = new Error();

let actualValues: number[] = [];
let actualError: any = null;
let completeCalled = false;

const valueObs = toObservable(of(1, 2, 3));
const errorObs = toObservable(_throw<number>(expectedError));

const onNext = (val: number) => actualValues.push(val);
const onError = (error: any) => (actualError = error);
const onComplete = () => (completeCalled = true);

await endOfObservable(valueObs, onNext);
try {
await endOfObservable(errorObs, null, onError);
} catch (e) {
/**/
}
await endOfObservable(valueObs, null, null, onComplete);

expect(actualValues).toEqual(expectedValues);
expect(actualError).toEqual(expectedError);
expect(completeCalled).toEqual(true);
});

test('AsyncIterable#toObservable interop with rxjs', async ([toObservable]) => {
const xs: number[] = [];
const ys = RxJSObservable.from(toObservable(of(1, 2, 3)));
await endOfObservable(ys, x => xs.push(x));
expect(xs).toEqual([1, 2, 3]);
});

function endOfObservable<T>(
observable: Observable<T> | RxJSObservable<T>,
next?: PartialObserver<T> | ((x: T) => void) | null,
error?: ((err: any) => void) | null,
complete?: (() => void) | null
): Promise<void> {
let reject: (x?: any) => void;
let resolve: (x?: any) => void;
// prettier-ignore
const done = new Promise<void>((a, b) => { resolve = a; reject = b; });
const wrap = (promiseFn: (x?: any) => void, originalFn: (...args: any[]) => any) => (
...args: any
) => {
promiseFn(...args);
originalFn(...args);
};
if (next && typeof next === 'object') {
// prettier-ignore
next.error = wrap(e => reject(e),(next.error || (() => { /**/ })).bind(next));
// prettier-ignore
next.complete = wrap(() => resolve(), (next.complete || (() => { /**/ })).bind(next));
} else {
// prettier-ignore
error = wrap(e => reject(e),error || (() => { /**/ }));
// prettier-ignore
complete = wrap(() => resolve(),complete || (() => { /**/ }));
}

observable.subscribe(<any>next, <any>error, <any>complete);

return done;
}
95 changes: 48 additions & 47 deletions src/Ix.dom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,69 +18,70 @@ export { AsyncSink, Iterable, AsyncIterable } from './Ix';

import { OperatorAsyncFunction } from './interfaces';

type PipeInput<T, R> = OperatorAsyncFunction<T, R>;

declare module './asynciterable/asynciterablex' {
interface AsyncIterableX<T> {
pipe(): AsyncIterableX<T>;
pipe<A>(op1: PipeInput<T, A>): AsyncIterableX<A>;
pipe<A, B>(op1: PipeInput<T, A>, op2: PipeInput<A, B>): AsyncIterableX<B>;
pipe<A>(op1: OperatorAsyncFunction<T, A>): AsyncIterableX<A>;
pipe<A, B>(
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>
): AsyncIterableX<B>;
pipe<A, B, C>(
op1: PipeInput<T, A>,
op2: PipeInput<A, B>,
op3: PipeInput<B, C>
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>
): AsyncIterableX<C>;
pipe<A, B, C, D>(
op1: PipeInput<T, A>,
op2: PipeInput<A, B>,
op3: PipeInput<B, C>,
op4: PipeInput<C, D>
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>
): AsyncIterableX<D>;
pipe<A, B, C, D, E>(
op1: PipeInput<T, A>,
op2: PipeInput<A, B>,
op3: PipeInput<B, C>,
op4: PipeInput<C, D>,
op5: PipeInput<D, E>
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>,
op5: OperatorAsyncFunction<D, E>
): AsyncIterableX<E>;
pipe<A, B, C, D, E, F>(
op1: PipeInput<T, A>,
op2: PipeInput<A, B>,
op3: PipeInput<B, C>,
op4: PipeInput<C, D>,
op5: PipeInput<D, E>,
op6: PipeInput<E, F>
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>,
op5: OperatorAsyncFunction<D, E>,
op6: OperatorAsyncFunction<E, F>
): AsyncIterableX<F>;
pipe<A, B, C, D, E, F, G>(
op1: PipeInput<T, A>,
op2: PipeInput<A, B>,
op3: PipeInput<B, C>,
op4: PipeInput<C, D>,
op5: PipeInput<D, E>,
op6: PipeInput<E, F>,
op7: PipeInput<F, G>
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>,
op5: OperatorAsyncFunction<D, E>,
op6: OperatorAsyncFunction<E, F>,
op7: OperatorAsyncFunction<F, G>
): AsyncIterableX<G>;
pipe<A, B, C, D, E, F, G, H>(
op1: PipeInput<T, A>,
op2: PipeInput<A, B>,
op3: PipeInput<B, C>,
op4: PipeInput<C, D>,
op5: PipeInput<D, E>,
op6: PipeInput<E, F>,
op7: PipeInput<F, G>,
op8: PipeInput<G, H>
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>,
op5: OperatorAsyncFunction<D, E>,
op6: OperatorAsyncFunction<E, F>,
op7: OperatorAsyncFunction<F, G>,
op8: OperatorAsyncFunction<G, H>
): AsyncIterableX<H>;
pipe<A, B, C, D, E, F, G, H, I>(
op1: PipeInput<T, A>,
op2: PipeInput<A, B>,
op3: PipeInput<B, C>,
op4: PipeInput<C, D>,
op5: PipeInput<D, E>,
op6: PipeInput<E, F>,
op7: PipeInput<F, G>,
op8: PipeInput<G, H>,
op9: PipeInput<H, I>
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>,
op5: OperatorAsyncFunction<D, E>,
op6: OperatorAsyncFunction<E, F>,
op7: OperatorAsyncFunction<F, G>,
op8: OperatorAsyncFunction<G, H>,
op9: OperatorAsyncFunction<H, I>
): AsyncIterableX<I>;
pipe<R>(...operations: PipeInput<T, R>[]): AsyncIterableX<R>;
pipe<R>(...operations: OperatorAsyncFunction<T, R>[]): AsyncIterableX<R>;
}
}
Loading

0 comments on commit 273e697

Please sign in to comment.