Skip to content

Commit

Permalink
feat(race): add race operator
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwoo authored and kwonoj committed Jan 11, 2016
1 parent 9d5f3c2 commit ee3b593
Show file tree
Hide file tree
Showing 11 changed files with 466 additions and 0 deletions.
2 changes: 2 additions & 0 deletions doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [merge](function/index.html#static-function-merge)
- [never](function/index.html#static-function-never)
- [of](function/index.html#static-function-of)
- [race](function/index.html#static-function-race)
- [range](function/index.html#static-function-range)
- [throw](function/index.html#static-function-throw)
- [timer](function/index.html#static-function-timer)
Expand Down Expand Up @@ -62,6 +63,7 @@
- [publish](function/index.html#static-function-publish)
- [publishBehavior](function/index.html#static-function-publishBehavior)
- [publishReplay](function/index.html#static-function-publishReplay)
- [race](function/index.html#static-function-race)
- [reduce](function/index.html#static-function-reduce)
- [repeat](function/index.html#static-function-repeat)
- [retry](function/index.html#static-function-retry)
Expand Down
193 changes: 193 additions & 0 deletions spec/observables/race-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/* globals expect, it, describe, hot, cold, expectObservable, expectSubscriptions */

var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.race(...observables)', function () {
it('should race a single observable', function () {
var e1 = cold('---a-----b-----c----|');
var e1subs = '^ !';
var expected = '---a-----b-----c----|';

var result = Observable.race(e1);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should race cold and cold', function () {
var e1 = cold('---a-----b-----c----|');
var e1subs = '^ !';
var e2 = cold('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---a-----b-----c----|';

var result = Observable.race(e1, e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race hot and hot', function () {
var e1 = hot('---a-----b-----c----|');
var e1subs = '^ !';
var e2 = hot('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---a-----b-----c----|';

var result = Observable.race(e1, e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race hot and cold', function () {
var e1 = cold('---a-----b-----c----|');
var e1subs = '^ !';
var e2 = hot('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---a-----b-----c----|';

var result = Observable.race(e1, e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race 2nd and 1st', function () {
var e1 = cold('------x-----y-----z----|');
var e1subs = '^ !';
var e2 = cold('---a-----b-----c----|');
var e2subs = '^ !';
var expected = '---a-----b-----c----|';

var result = Observable.race(e1, e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race emit and complete', function () {
var e1 = cold('-----|');
var e1subs = '^ !';
var e2 = hot('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '-----|';

var result = Observable.race(e1, e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should allow unsubscribing early and explicitly', function () {
var e1 = cold('---a-----b-----c----|');
var e1subs = '^ !';
var e2 = hot('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---a-----b---';
var unsub = ' !';

var result = Observable.race(e1, e2);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should not break unsubscription chains when unsubscribed explicitly', function () {
var e1 = hot('--a--^--b--c---d-| ');
var e1subs = '^ ! ';
var e2 = hot('---e-^---f--g---h-|');
var e2subs = '^ ! ';
var expected = '---b--c--- ';
var unsub = ' ! ';

var result = Observable.race(
e1.mergeMap(function (x) { return Observable.of(x); }),
e2.mergeMap(function (x) { return Observable.of(x); })
).mergeMap(function (x) { return Observable.of(x); });

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should never emit when given non emitting sources', function () {
var e1 = cold('---|');
var e2 = cold('---|');
var e1subs = '^ !';
var expected = '---|';

var source = Observable.race(e1, e2);

expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should throw when error occurs mid stream', function () {
var e1 = cold('---a-----#');
var e1subs = '^ !';
var e2 = cold('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---a-----#';

var result = Observable.race(e1, e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should throw when error occurs before a winner is found', function () {
var e1 = cold('---#');
var e1subs = '^ !';
var e2 = cold('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---#';

var result = Observable.race(e1, e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('handle empty', function () {
var e1 = cold('|');
var e1subs = '(^!)';
var expected = '|';

var source = Observable.race(e1);

expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('handle never', function () {
var e1 = cold('-');
var e1subs = '^';
var expected = '-';

var source = Observable.race(e1);

expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('handle throw', function () {
var e1 = cold('#');
var e1subs = '(^!)';
var expected = '#';

var source = Observable.race(e1);

expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
149 changes: 149 additions & 0 deletions spec/operators/race-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/* globals expect, it, describe, hot, cold, expectObservable, expectSubscriptions */

var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('...race(observables)', function () {
it('should race cold and cold', function () {
var e1 = cold('---a-----b-----c----|');
var e1subs = '^ !';
var e2 = cold('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---a-----b-----c----|';

var result = e1.race(e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race hot and hot', function () {
var e1 = hot('---a-----b-----c----|');
var e1subs = '^ !';
var e2 = hot('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---a-----b-----c----|';

var result = e1.race(e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race hot and cold', function () {
var e1 = cold('---a-----b-----c----|');
var e1subs = '^ !';
var e2 = hot('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---a-----b-----c----|';

var result = e1.race(e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race 2nd and 1st', function () {
var e1 = cold('------x-----y-----z----|');
var e1subs = '^ !';
var e2 = cold('---a-----b-----c----|');
var e2subs = '^ !';
var expected = '---a-----b-----c----|';

var result = e1.race(e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race emit and complete', function () {
var e1 = cold('-----|');
var e1subs = '^ !';
var e2 = hot('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '-----|';

var result = e1.race(e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should allow unsubscribing early and explicitly', function () {
var e1 = cold('---a-----b-----c----|');
var e1subs = '^ !';
var e2 = hot('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---a-----b---';
var unsub = ' !';

var result = e1.race(e2);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should not break unsubscription chains when unsubscribed explicitly', function () {
var e1 = hot('--a--^--b--c---d-| ');
var e1subs = '^ ! ';
var e2 = hot('---e-^---f--g---h-|');
var e2subs = '^ ! ';
var expected = '---b--c--- ';
var unsub = ' ! ';

var result = e1
.mergeMap(function (x) { return Observable.of(x); })
.race(e2)
.mergeMap(function (x) { return Observable.of(x); });

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should never emit when given non emitting sources', function () {
var e1 = cold('---|');
var e2 = cold('---|');
var e1subs = '^ !';
var expected = '---|';

var source = e1.race(e2);

expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should throw when error occurs mid stream', function () {
var e1 = cold('---a-----#');
var e1subs = '^ !';
var e2 = cold('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---a-----#';

var result = e1.race(e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should throw when error occurs before a winner is found', function () {
var e1 = cold('---#');
var e1subs = '^ !';
var e2 = cold('------x-----y-----z----|');
var e2subs = '^ !';
var expected = '---#';

var result = e1.race(e2);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
3 changes: 3 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {FromEventPatternObservable} from './observable/fromEventPattern';
import {PromiseObservable} from './observable/fromPromise';
import {IntervalObservable} from './observable/interval';
import {TimerObservable} from './observable/timer';
import {race as raceStatic} from './operator/race-static';
import {RangeObservable} from './observable/range';
import {InfiniteObservable} from './observable/never';
import {ErrorObservable} from './observable/throw';
Expand Down Expand Up @@ -175,6 +176,7 @@ export class Observable<T> implements CoreOperators<T> {
static merge: typeof mergeStatic;
static never: typeof InfiniteObservable.create;
static of: typeof ArrayObservable.of;
static race: typeof raceStatic;
static range: typeof RangeObservable.create;
static throw: typeof ErrorObservable.create;
static timer: typeof TimerObservable.create;
Expand Down Expand Up @@ -238,6 +240,7 @@ export class Observable<T> implements CoreOperators<T> {
publishBehavior: (value: any) => ConnectableObservable<T>;
publishReplay: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable<T>;
publishLast: () => ConnectableObservable<T>;
race: (...observables: Array<Observable<any>>) => Observable<any>;
reduce: <R>(project: (acc: R, x: T) => R, seed?: R) => Observable<R>;
repeat: (count?: number) => Observable<T>;
retry: (count?: number) => Observable<T>;
Expand Down
2 changes: 2 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface KitchenSinkOperators<T> extends CoreOperators<T> {
import './add/operator/combineLatest-static';
import './add/operator/concat-static';
import './add/operator/merge-static';
import './add/operator/race-static';
import './add/observable/bindCallback';
import './add/observable/defer';
import './add/observable/empty';
Expand Down Expand Up @@ -96,6 +97,7 @@ import './add/operator/publish';
import './add/operator/publishBehavior';
import './add/operator/publishReplay';
import './add/operator/publishLast';
import './add/operator/race';
import './add/operator/reduce';
import './add/operator/repeat';
import './add/operator/retry';
Expand Down
Loading

0 comments on commit ee3b593

Please sign in to comment.