Skip to content

Commit

Permalink
feat(observable): add to/from Observable
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Jul 20, 2017
1 parent 2809866 commit 825b3d9
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 0 deletions.
67 changes: 67 additions & 0 deletions spec/asynciterable-operators/toobservable-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import * as Ix from '../Ix';
import * as test from 'tape';
const { empty } = Ix.asynciterable;
const { of } = Ix.asynciterable;
const { _throw } = Ix.asynciterable;
const { toObservable } = Ix.asynciterable;

test('AsyncIterable#toObservable empty', async t => {
const xs = empty<number>();
const ys = toObservable(xs);
let fail = false;

ys.subscribe({
next: (value: number) => {
fail = true;
},
error: (err: any) => {
fail = true;
},
complete: () => {
t.false(fail);
t.end();
}
});
});

test('AsyncIterable#toObservable non-empty', async t => {
const results: number[] = [];
const xs = of(1, 2, 3);
const ys = toObservable(xs);
let fail = false;

ys.subscribe({
next: (value: number) => {
results.push(value);
},
error: (err: any) => {
fail = true;
},
complete: () => {
t.deepEqual(results, [1, 2, 3]);
t.false(fail);
t.end();
}
});
});

test('AsyncIterable#toObservable error', async t => {
const error = new Error();
const xs = _throw<number>(error);
const ys = toObservable(xs);
let fail = false;

ys.subscribe({
next: (value: number) => {
fail = true;
},
error: (err: any) => {
t.same(err, error);
t.false(fail);
t.end();
},
complete: () => {
fail = true;
}
});
});
68 changes: 68 additions & 0 deletions spec/asynciterable/fromobservable-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import * as Ix from '../Ix';
import * as test from 'tape';
const { fromObservable } = Ix.asynciterable;
import { hasNext, noNext } from '../asynciterablehelpers';

interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}

interface Subscription {
unsubscribe: () => void;
}

interface Observable<T> {
subscribe: (observer: Observer<T>) => Subscription;
}

class EmptySubscription implements Subscription {
unsubscribe() {
// tslint:disable-next-line:no-empty
}
}

class TestObservable<T> implements Observable<T> {
private _subscribe: (observer: Observer<T>) => Subscription;

constructor(subscribe: (observer: Observer<T>) => Subscription) {
this._subscribe = subscribe;
}

subscribe(observer: Observer<T>) {
return this._subscribe(observer);
}
}

test('AsyncIterable#fromObservable with completion', async t => {
const xs = new TestObservable<number>(obs => {
obs.next(42);
obs.complete();
return new EmptySubscription();
});
const ys = fromObservable(xs);

const it = ys[Symbol.asyncIterator]();
await hasNext(t, it, 42);
await noNext(t, it);
t.end();
});

test('AsyncIterable#fromObservable with error', async t => {
const err = new Error();
const xs = new TestObservable<number>(obs => {
obs.error(err);
return new EmptySubscription();
});
const ys = fromObservable(xs);

const it = ys[Symbol.asyncIterator]();
try {
await it.next();
} catch (e) {
t.same(err, e);
}

t.end();
});
18 changes: 18 additions & 0 deletions src/add/asynciterable-operators/toobservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { AsyncIterableX } from '../../asynciterable';
import { toObservable } from '../../asynciterable/toobservable';
import { Observable } from '../../observer';

/**
* @ignore
*/
export function toObservableProto<TSource>(this: AsyncIterableX<TSource>): Observable<TSource> {
return toObservable(this);
}

AsyncIterableX.prototype.toObservable = toObservableProto;

declare module '../../asynciterable' {
interface AsyncIterableX<T> {
toObservable: typeof toObservableProto;
}
}
10 changes: 10 additions & 0 deletions src/add/asynciterable/fromobservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { AsyncIterableX } from '../../asynciterable';
import { fromObservable as fromObservableStatic } from '../../asynciterable/fromobservable';

AsyncIterableX.fromObservable = fromObservableStatic;

declare module '../../asynciterable' {
namespace AsyncIterableX {
export let fromObservable: typeof fromObservableStatic;
}
}
2 changes: 2 additions & 0 deletions src/asynciterable/__modules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export { forEach } from './foreach';
export { from } from './from';
export { fromEvent } from './fromevent';
export { fromEventPattern } from './fromeventpattern';
export { fromObservable } from './fromobservable';
export { generate } from './generate';
export { generateTime } from './generatetime';
export { groupBy } from './groupby';
Expand Down Expand Up @@ -86,6 +87,7 @@ export { throttle } from './throttle';
export { _throw } from './throw';
export { toArray } from './toarray';
export { toMap } from './tomap';
export { toObservable } from './toobservable';
export { toSet } from './toset';
export { union } from './union';
export { _while } from './while';
Expand Down
1 change: 1 addition & 0 deletions src/asynciterable/toobservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class AsyncIterableObservable<TSource> implements Observable<TSource> {
}
});
};
f();

return subscription;
}
Expand Down

0 comments on commit 825b3d9

Please sign in to comment.