From 71e3dd19a32654ac77356482212e937831882289 Mon Sep 17 00:00:00 2001 From: Pierre Guilleminot Date: Thu, 5 Nov 2015 16:30:35 +0100 Subject: [PATCH] feat(operator): add switchFirst and switchMapFirst --- .../operators/switchFirst.js | 28 ++ .../operators/switchMapFirst.js | 24 ++ .../operators/switchFirst.js | 27 ++ .../operators/switchMapFirst.js | 24 ++ spec/operators/switchFirst-spec.js | 163 +++++++++++ spec/operators/switchMapFirst-spec.js | 267 ++++++++++++++++++ src/Rx.KitchenSink.ts | 9 + src/operators/switchFirst.ts | 55 ++++ src/operators/switchMapFirst.ts | 86 ++++++ 9 files changed, 683 insertions(+) create mode 100644 perf/micro/current-thread-scheduler/operators/switchFirst.js create mode 100644 perf/micro/current-thread-scheduler/operators/switchMapFirst.js create mode 100644 perf/micro/immediate-scheduler/operators/switchFirst.js create mode 100644 perf/micro/immediate-scheduler/operators/switchMapFirst.js create mode 100644 spec/operators/switchFirst-spec.js create mode 100644 spec/operators/switchMapFirst-spec.js create mode 100644 src/operators/switchFirst.ts create mode 100644 src/operators/switchMapFirst.ts diff --git a/perf/micro/current-thread-scheduler/operators/switchFirst.js b/perf/micro/current-thread-scheduler/operators/switchFirst.js new file mode 100644 index 0000000000..2386fe28af --- /dev/null +++ b/perf/micro/current-thread-scheduler/operators/switchFirst.js @@ -0,0 +1,28 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +var source = Array.apply(null, { length: 25 }); + +module.exports = function (suite) { + var oldMergeAllWithCurrentThreadScheduler = RxOld.Observable.fromArray( + source.map(function () { return RxOld.Observable.range(0, 25, RxOld.Scheduler.currentThread); }), + RxOld.Scheduler.currentThread + ) + .switchFirst(); + var newMergeAllWithCurrentThreadScheduler = RxNew.Observable.fromArray( + source.map(function () { return RxNew.Observable.range(0, 25, RxNew.Scheduler.immediate); }), + RxNew.Scheduler.immediate + ) + .switchFirst(); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old switchFirst with current thread scheduler', function () { + oldMergeAllWithCurrentThreadScheduler.subscribe(_next, _error, _complete); + }) + .add('new switchFirst with current thread scheduler', function () { + newMergeAllWithCurrentThreadScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/perf/micro/current-thread-scheduler/operators/switchMapFirst.js b/perf/micro/current-thread-scheduler/operators/switchMapFirst.js new file mode 100644 index 0000000000..891781a99b --- /dev/null +++ b/perf/micro/current-thread-scheduler/operators/switchMapFirst.js @@ -0,0 +1,24 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldMergeMapWithCurrentThreadScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.currentThread) + .flatMapFirst(function (x) { + return RxOld.Observable.range(x, 25, RxOld.Scheduler.currentThread); + }); + var newMergeMapWithCurrentThreadScheduler = RxNew.Observable.range(0, 25, RxNew.Scheduler.immediate) + .switchMapFirst(function (x) { + return RxNew.Observable.range(x, 25, RxNew.Scheduler.immediate); + }); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old switchMapFirst with current thread scheduler', function () { + oldMergeMapWithCurrentThreadScheduler.subscribe(_next, _error, _complete); + }) + .add('new switchMapFirst with current thread scheduler', function () { + newMergeMapWithCurrentThreadScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/perf/micro/immediate-scheduler/operators/switchFirst.js b/perf/micro/immediate-scheduler/operators/switchFirst.js new file mode 100644 index 0000000000..e36d3b677a --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/switchFirst.js @@ -0,0 +1,27 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +var source = Array.apply(null, { length: 25 }); + +module.exports = function (suite) { + var oldMergeAllWithImmediateScheduler = RxOld.Observable.fromArray( + source.map(function () { return RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate); }), + RxOld.Scheduler.immediate + ) + .switchFirst(); + var newMergeAllWithImmediateScheduler = RxNew.Observable.fromArray( + source.map(function () { return RxNew.Observable.range(0, 25); }) + ) + .switchFirst(); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old switchFirst with immediate scheduler', function () { + oldMergeAllWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new switchFirst with immediate scheduler', function () { + newMergeAllWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/perf/micro/immediate-scheduler/operators/switchMapFirst.js b/perf/micro/immediate-scheduler/operators/switchMapFirst.js new file mode 100644 index 0000000000..5bdf27a27e --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/switchMapFirst.js @@ -0,0 +1,24 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldMergeMapWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate) + .flatMapFirst(function (x) { + return RxOld.Observable.range(x, 25, RxOld.Scheduler.immediate); + }); + var newMergeMapWithImmediateScheduler = RxNew.Observable.range(0, 25) + .switchMapFirst(function (x) { + return RxNew.Observable.range(x, 25); + }); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old switchMapFirst with immediate scheduler', function () { + oldMergeMapWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new switchMapFirst with immediate scheduler', function () { + newMergeMapWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/spec/operators/switchFirst-spec.js b/spec/operators/switchFirst-spec.js new file mode 100644 index 0000000000..0daef885ae --- /dev/null +++ b/spec/operators/switchFirst-spec.js @@ -0,0 +1,163 @@ +/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */ +var Rx = require('../../dist/cjs/Rx'); +var Promise = require('promise'); + +var Observable = Rx.Observable; +var immediateScheduler = Rx.Scheduler.immediate; + +describe('Observable.prototype.switchFirst()', function () { + it('should switch to first immediately-scheduled inner Observable', function () { + var e1 = cold( '(ab|)'); + var e1subs = '(^!)'; + var e2 = cold( '(cd|)'); + var e2subs = []; + var expected = '(ab|)'; + expectObservable(Observable.of(e1, e2).switchFirst()).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should handle throw', function () { + var e1 = Observable.throw('damn'); + var expected = '#'; + expectObservable(e1.switchFirst()).toBe(expected, null, 'damn'); + }); + + it('should handle empty', function () { + var e1 = Observable.empty(); + var expected = '|'; + expectObservable(e1.switchFirst()).toBe(expected); + }); + + it('should handle never', function () { + var e1 = Observable.never(); + var expected = '-'; + expectObservable(e1.switchFirst()).toBe(expected); + }); + + it('should handle a hot observable of observables', function () { + var x = cold( '--a---b---c--| '); + var xsubs = ' ^ ! '; + var y = cold( '---d--e---f---| '); + var ysubs = []; + var z = cold( '---g--h---i---|'); + var zsubs = ' ^ !'; + var e1 = hot( '------x-------y-----z-------------|', { x: x, y: y, z: z }); + var expected = '--------a---b---c------g--h---i---|'; + expectObservable(e1.switchFirst()).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(z.subscriptions).toBe(zsubs); + }); + + it('should handle a hot observable of observables, outer is unsubscribed early', function () { + var x = cold( '--a---b---c--| '); + var xsubs = ' ^ ! '; + var y = cold( '---d--e---f---|'); + var ysubs = []; + var e1 = hot( '------x-------y------| ', { x: x, y: y }); + var unsub = ' ! '; + var expected = '--------a---b--- '; + expectObservable(e1.switchFirst(), unsub).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should handle a hot observable of observables, inner never completes', function () { + var x = cold( '--a---b--| '); + var xsubs = ' ^ ! '; + var y = cold( '-d---e- '); + var ysubs = []; + var z = cold( '---f--g---h--'); + var zsubs = ' ^ '; + var e1 = hot( '---x---y------z----------| ', { x: x, y: y, z: z }); + var expected = '-----a---b-------f--g---h--'; + expectObservable(e1.switchFirst()).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(z.subscriptions).toBe(zsubs); + }); + + it('should handle a synchronous switch and stay on the first inner observable', function () { + var x = cold( '--a---b---c--| '); + var xsubs = ' ^ ! '; + var y = cold( '---d--e---f---| '); + var ysubs = []; + var e1 = hot( '------(xy)------------|', { x: x, y: y }); + var expected = '--------a---b---c-----|'; + expectObservable(e1.switchFirst()).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should handle a hot observable of observables, one inner throws', function () { + var x = cold( '--a---# '); + var xsubs = ' ^ ! '; + var y = cold( '---d--e---f---|'); + var ysubs = []; + var e1 = hot( '------x-------y------| ', { x: x, y: y }); + var expected = '--------a---# '; + expectObservable(e1.switchFirst()).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should handle a hot observable of observables, outer throws', function () { + var x = cold( '--a---b---c--| '); + var xsubs = ' ^ ! '; + var y = cold( '---d--e---f---|'); + var ysubs = []; + var e1 = hot( '------x-------y-------# ', { x: x, y: y }); + var expected = '--------a---b---c-----# '; + expectObservable(e1.switchFirst()).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should handle an empty hot observable', function () { + var e1 = hot( '------|'); + var expected = '------|'; + expectObservable(e1.switchFirst()).toBe(expected); + }); + + it('should handle a never hot observable', function () { + var e1 = hot('-'); + var expected = '-'; + expectObservable(e1.switchFirst()).toBe(expected); + }); + + it('should complete not before the outer completes', function () { + var x = cold( '--a---b---c--| '); + var xsubs = ' ^ ! '; + var e1 = hot( '------x---------------|', { x: x }); + var expected = '--------a---b---c-----|'; + expectObservable(e1.switchFirst()).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + }); + + it('should handle an observable of promises', function (done) { + var expected = [1]; + + Observable.of(Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)) + .switchFirst() + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, function () { + expect(expected.length).toBe(0); + done(); + }); + }); + + it('should handle an observable of promises, where one rejects', function (done) { + Observable.of(Promise.reject(2), Promise.resolve(1)) + .switchFirst() + .subscribe(function (x) { + expect(false).toBe(true); + }, function (err) { + expect(err).toBe(2); + done(); + }, function () { + expect(false).toBe(true); + }); + }); +}); diff --git a/spec/operators/switchMapFirst-spec.js b/spec/operators/switchMapFirst-spec.js new file mode 100644 index 0000000000..bef38f1b9b --- /dev/null +++ b/spec/operators/switchMapFirst-spec.js @@ -0,0 +1,267 @@ +/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */ +var Rx = require('../../dist/cjs/Rx'); +var Promise = require('promise'); + +var Observable = Rx.Observable; +var immediateScheduler = Rx.Scheduler.immediate; + +describe('Observable.prototype.switchMapFirst()', function () { + it('should handle outer throw', function () { + var x = cold('--a--b--c--|'); + var xsubs = []; + var e1 = Observable.throw('damn'); + var expected = '#'; + expectObservable(e1.switchMapFirst(function () { return x; })).toBe(expected, null, 'damn'); + expectSubscriptions(x.subscriptions).toBe(xsubs); + }); + + it('should handle outer empty', function () { + var x = cold('--a--b--c--|'); + var xsubs = []; + var e1 = Observable.empty(); + var expected = '|'; + expectObservable(e1.switchMapFirst(function () { return x; })).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + }); + + it('should handle outer never', function () { + var x = cold('--a--b--c--|'); + var xsubs = []; + var e1 = Observable.never(); + var expected = '-'; + expectObservable(e1.switchMapFirst(function () { return x; })).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + }); + + it('should switch with a selector function', function () { + var x = cold( '--a--b--c--| '); + var xsubs = ' ^ ! '; + var y = cold( '--d--e--f--| '); + var ysubs = []; + var z = cold( '--g--h--i--| '); + var zsubs = ' ^ ! '; + var e1 = hot('---x---------y-----------------z-------------|'); + var expected = '-----a--b--c---------------------g--h--i-----|'; + + var observableLookup = { x: x, y: y, z: z }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + })).toBe(expected); + + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(z.subscriptions).toBe(zsubs); + }); + + it('should switch inner cold observables, outer is unsubscribed early', function () { + var x = cold( '--a--b--c--| '); + var xsubs = ' ^ ! '; + var y = cold( '--d--e--f--| '); + var ysubs = []; + var z = cold( '--g--h--i--| '); + var zsubs = ' ^ ! '; + var e1 = hot('---x---------y-----------------z-------------|'); + var unsub = ' ! '; + var expected = '-----a--b--c---------------------g'; + + var observableLookup = { x: x, y: y, z: z }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + }), unsub).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(z.subscriptions).toBe(zsubs); + }); + + it('should switch inner cold observables, inner never completes', function () { + var x = cold( '--a--b--c--| '); + var xsubs = ' ^ ! '; + var y = cold( '--d--e--f--| '); + var ysubs = []; + var z = cold( '--g--h--i-----'); + var zsubs = ' ^ '; + var e1 = hot('---x---------y-----------------z---------| '); + var expected = '-----a--b--c---------------------g--h--i-----'; + + var observableLookup = { x: x, y: y, z: z }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + })).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(z.subscriptions).toBe(zsubs); + }); + + it('should handle a synchronous switch an stay on the first inner observable', function () { + var x = cold( '--a--b--c--d--e--| '); + var xsubs = ' ^ ! '; + var y = cold( '---f---g---h---i--| '); + var ysubs = []; + var e1 = hot('---------(xy)----------------|'); + var expected = '-----------a--b--c--d--e-----|'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + })).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should switch inner cold observables, one inner throws', function () { + var x = cold( '--a--b--c--d--#--e--|'); + var xsubs = ' ^ ! '; + var y = cold( '---f---g---h---i--'); + var ysubs = []; + var e1 = hot('---------x---------y---------|'); + var expected = '-----------a--b--c--d--#'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + })).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should switch inner hot observables', function () { + var x = hot('-----a--b--c--d--e--|'); + var xsubs = ' ^ !'; + var y = hot('--p-o-o-p-------f---g---h---i--|'); + var ysubs = []; + var z = hot('---z-o-o-m-------------j---k---l---m--|'); + var zsubs = ' ^ !'; + var e1 = hot('---------x----y-----z--------|'); + var expected = '-----------c--d--e-----j---k---l---m--|'; + + var observableLookup = { x: x, y: y, z: z }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + })).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(z.subscriptions).toBe(zsubs); + }); + + it('should switch inner empty and empty', function () { + var x = cold('|'); + var y = cold('|'); + var xsubs = ' (^!)'; + var ysubs = ' (^!)'; + var e1 = hot('---------x---------y---------|'); + var expected = '-----------------------------|'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + })).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should switch inner empty and never', function () { + var x = cold('|'); + var y = cold('-'); + var xsubs = ' (^!) '; + var ysubs = ' ^ '; + var e1 = hot('---------x---------y---------|'); + var expected = '----------------------------------'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + })).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should never switch inner never', function () { + var x = cold('-'); + var y = cold('#', null, 'sad'); + var xsubs = ' ^ '; + var ysubs = []; + var e1 = hot('---------x---------y----------|'); + var expected = '-------------------------------'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + })).toBe(expected, undefined, 'sad'); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should switch inner empty and throw', function () { + var x = cold('|'); + var y = cold('#', null, 'sad'); + var xsubs = ' (^!) '; + var ysubs = ' (^!) '; + var e1 = hot('---------x---------y---------|'); + var expected = '-------------------#'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + })).toBe(expected, undefined, 'sad'); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should handle outer error', function () { + var x = cold( '--a--b--c--d--e--|'); + var xsubs = ' ^ !'; + var e1 = hot('---------x---------#', undefined, new Error('boo-hoo')); + var expected = '-----------a--b--c-#'; + + var observableLookup = { x: x }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + })).toBe(expected, undefined, new Error('boo-hoo')); + expectSubscriptions(x.subscriptions).toBe(xsubs); + }); + + it('should switch with resultSelector goodness', function () { + var x = cold( '--a--b--c--d--e-|'); + var xsubs = ' ^ !'; + var y = cold( '---f---g---h---i--|'); + var ysubs = []; + var z = cold( '---k---l---m---n--|'); + var zsubs = ' ^ !'; + var e1 = hot('--x---------y------z-|'); + var expected = '----a--b--c--d--e-----k---l---m---n--|'; + + var observableLookup = { x: x, y: y, z: z }; + + var expectedValues = { + a: ['x', 'a', 0, 0], + b: ['x', 'b', 0, 1], + c: ['x', 'c', 0, 2], + d: ['x', 'd', 0, 3], + e: ['x', 'e', 0, 4], + k: ['z', 'k', 1, 0], + l: ['z', 'l', 1, 1], + m: ['z', 'm', 1, 2], + n: ['z', 'n', 1, 3], + }; + + expectObservable(e1.switchMapFirst(function (value) { + return observableLookup[value]; + }, function (innerValue, outerValue, innerIndex, outerIndex) { + return [innerValue, outerValue, innerIndex, outerIndex]; + })).toBe(expected, expectedValues); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(z.subscriptions).toBe(zsubs); + }); +}); diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 34c0abca08..064480827b 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -12,6 +12,9 @@ interface KitchenSinkOperators extends CoreOperators { min?: (comparer?: (x: R, y: T) => R) => Observable; timeInterval?: (scheduler?: IScheduler) => Observable; mergeScan?: (project: (acc: R, x: T) => Observable, seed: R) => Observable; + switchFirst?: () => Observable; + switchMapFirst?: (project: ((x: T, ix: number) => Observable), + projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; } // operators @@ -270,6 +273,12 @@ observableProto.switchMap = switchMap; import {switchMapTo} from './operators/switchMapTo'; observableProto.switchMapTo = switchMapTo; +import {switchFirst} from './operators/switchFirst'; +observableProto.switchFirst = switchFirst; + +import {switchMapFirst} from './operators/switchMapFirst'; +observableProto.switchMapFirst = switchMapFirst; + import {take} from './operators/take'; observableProto.take = take; diff --git a/src/operators/switchFirst.ts b/src/operators/switchFirst.ts new file mode 100644 index 0000000000..798e139c2e --- /dev/null +++ b/src/operators/switchFirst.ts @@ -0,0 +1,55 @@ +import {Operator} from '../Operator'; +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; +import {OuterSubscriber} from '../OuterSubscriber'; +import {subscribeToResult} from '../util/subscribeToResult'; + +export function switchFirst(): Observable { + return this.lift(new SwitchFirstOperator()); +} + +class SwitchFirstOperator implements Operator { + call(subscriber: Subscriber): Subscriber { + return new SwitchFirstSubscriber(subscriber); + } +} + +class SwitchFirstSubscriber extends OuterSubscriber { + private hasSubscription: boolean = false; + private hasCompleted: boolean = false; + + constructor(destination: Subscriber) { + super(destination); + } + + _next(value: T): void { + if (!this.hasSubscription) { + this.hasSubscription = true; + this.add(subscribeToResult(this, value)); + } + } + + _complete(): void { + this.hasCompleted = true; + if (!this.hasSubscription) { + this.destination.complete(); + } + } + + notifyNext(outerValue: T, innerValue: any): void { + this.destination.next(innerValue); + } + + notifyError(err: any): void { + this.destination.error(err); + } + + notifyComplete(innerSub: Subscription): void { + this.remove(innerSub); + this.hasSubscription = false; + if (this.hasCompleted) { + this.destination.complete(); + } + } +} diff --git a/src/operators/switchMapFirst.ts b/src/operators/switchMapFirst.ts new file mode 100644 index 0000000000..0080a44992 --- /dev/null +++ b/src/operators/switchMapFirst.ts @@ -0,0 +1,86 @@ +import {Operator} from '../Operator'; +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; +import {tryCatch} from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import {OuterSubscriber} from '../OuterSubscriber'; +import {subscribeToResult} from '../util/subscribeToResult'; + +export function switchMapFirst(project: (value: T, index: number) => Observable, + resultSelector?: (outerValue: T, + innerValue: R, + outerIndex: number, + innerIndex: number) => R2): Observable { + return this.lift(new SwitchMapFirstOperator(project, resultSelector)); +} + +class SwitchMapFirstOperator implements Operator { + constructor(private project: (value: T, index: number) => Observable, + private resultSelector?: (outerValue: T, + innerValue: R, + outerIndex: number, + innerIndex: number) => R2) { + } + + call(subscriber: Subscriber): Subscriber { + return new SwitchMapFirstSubscriber(subscriber, this.project, this.resultSelector); + } +} + +class SwitchMapFirstSubscriber extends OuterSubscriber { + private hasSubscription: boolean = false; + private hasCompleted: boolean = false; + private index: number = 0; + + constructor(destination: Subscriber, + private project: (value: T, index: number) => Observable, + private resultSelector?: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => R2) { + super(destination); + } + + _next(value: T): void { + if (!this.hasSubscription) { + const index = this.index++; + const destination = this.destination; + let result = tryCatch(this.project)(value, index); + if (result === errorObject) { + destination.error(result.e); + } else { + this.hasSubscription = true; + this.add(subscribeToResult(this, result, value, index)); + } + } + } + + _complete(): void { + this.hasCompleted = true; + if (!this.hasSubscription) { + this.destination.complete(); + } + } + + notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + const { resultSelector, destination } = this; + if (resultSelector) { + const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex); + if (result === errorObject) { + destination.error(errorObject.e); + } else { + destination.next(result); + } + } else { + destination.next(innerValue); + } + } + + notifyError(err: any): void { + this.destination.error(err); + } + + notifyComplete(): void { + this.hasSubscription = false; + if (this.hasCompleted) { + this.destination.complete(); + } + } +}