Skip to content

Commit

Permalink
feat(exhaustMap): simplify interface
Browse files Browse the repository at this point in the history
- Removes resultSelector argument
- updates tests

BREAKING CHANGE: `resultSelector` no longer supported, to get this functionality use: `source.pipe(exhaustMap(x => of(x + x).pipe(map(y => x + y))))`
  • Loading branch information
benlesh committed Mar 2, 2018
1 parent 959fb6a commit 42589d0
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 117 deletions.
52 changes: 0 additions & 52 deletions spec/operators/exhaustMap-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,6 @@ describe('Observable.prototype.exhaustMap', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should raise error if selector throws', () => {
const x = cold( '--a--b--c--| ');
const xsubs = ' ^ ! ';
const e1 = hot('---x---------y----z----|');
const e1subs = '^ ! ';
const expected = '-----# ';

const result = e1.exhaustMap((value) => x,
() => {
throw 'error';
});

expectObservable(result).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should switch with a selector function', () => {
const x = cold( '--a--b--c--| ');
const xsubs = ' ^ ! ';
Expand Down Expand Up @@ -336,39 +319,4 @@ describe('Observable.prototype.exhaustMap', () => {
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should switch with resultSelector goodness', () => {
const x = cold( '--a--b--c--d--e-| ');
const xsubs = ' ^ ! ';
const y = cold( '---f---g---h---i--| ');
const ysubs: string[] = [];
const z = cold( '---k---l---m---n--|');
const zsubs = ' ^ !';
const e1 = hot('--x---------y------z-| ');
const e1subs = '^ !';
const expected = '----a--b--c--d--e-----k---l---m---n--|';

const observableLookup = { x: x, y: y, z: z };

const expectedValues = {
a: ['x', 'a', 0, 0],
b: ['x', 'b', 0, 1],
c: ['x', 'c', 0, 2],
d: ['x', 'd', 0, 3],
e: ['x', 'e', 0, 4],
k: ['z', 'k', 1, 0],
l: ['z', 'l', 1, 1],
m: ['z', 'm', 1, 2],
n: ['z', 'n', 1, 3],
};

const result = e1.exhaustMap((value) => observableLookup[value],
(innerValue, outerValue, innerIndex, outerIndex) => [innerValue, outerValue, innerIndex, outerIndex]);

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
expectSubscriptions(z.subscriptions).toBe(zsubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
63 changes: 15 additions & 48 deletions src/internal/operators/exhaustMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { ObservableInput, OperatorFunction } from '../types';

/* tslint:disable:max-line-length */
export function exhaustMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
export function exhaustMap<T, I, R>(project: (value: T, index: number) => ObservableInput<I>, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction<T, R>;
/* tslint:enable:max-line-length */

/**
* Projects each source value to an Observable which is merged in the output
* Observable only if the previous projected Observable has completed.
Expand Down Expand Up @@ -43,34 +38,23 @@ export function exhaustMap<T, I, R>(project: (value: T, index: number) => Observ
* @param {function(value: T, ?index: number): ObservableInput} project A function
* that, when applied to an item emitted by the source Observable, returns an
* Observable.
* @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector]
* A function to produce the value on the output Observable based on the values
* and the indices of the source (outer) emission and the inner Observable
* emission. The arguments passed to this function are:
* - `outerValue`: the value that came from the source
* - `innerValue`: the value that came from the projected Observable
* - `outerIndex`: the "index" of the value that came from the source
* - `innerIndex`: the "index" of the value from the projected Observable
* @return {Observable} An Observable containing projected Observables
* of each item of the source, ignoring projected Observables that start before
* their preceding Observable has completed.
* @method exhaustMap
* @owner Observable
*/
export function exhaustMap<T, I, R>(
project: (value: T, index: number) => ObservableInput<I>,
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R
): OperatorFunction<T, R> {
return (source: Observable<T>) => source.lift(new SwitchFirstMapOperator(project, resultSelector));
}
export function exhaustMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R> {
return (source: Observable<T>) =>
source.lift(new SwitchFirstMapOperator(project));
}

class SwitchFirstMapOperator<T, I, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => ObservableInput<I>,
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) {
class SwitchFirstMapOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => ObservableInput<R>) {
}

call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new SwitchFirstMapSubscriber(subscriber, this.project, this.resultSelector));
return source.subscribe(new SwitchFirstMapSubscriber(subscriber, this.project));
}
}

Expand All @@ -79,14 +63,13 @@ class SwitchFirstMapOperator<T, I, R> implements Operator<T, R> {
* @ignore
* @extends {Ignored}
*/
class SwitchFirstMapSubscriber<T, I, R> extends OuterSubscriber<T, I> {
private hasSubscription: boolean = false;
private hasCompleted: boolean = false;
private index: number = 0;
class SwitchFirstMapSubscriber<T, R> extends OuterSubscriber<T, R> {
private hasSubscription = false;
private hasCompleted = false;
private index = 0;

constructor(destination: Subscriber<R>,
private project: (value: T, index: number) => ObservableInput<I>,
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) {
private project: (value: T, index: number) => ObservableInput<R>) {
super(destination);
}

Expand Down Expand Up @@ -115,26 +98,10 @@ class SwitchFirstMapSubscriber<T, I, R> extends OuterSubscriber<T, I> {
}
}

notifyNext(outerValue: T, innerValue: I,
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, I>): void {
const { resultSelector, destination } = this;
if (resultSelector) {
this.trySelectResult(outerValue, innerValue, outerIndex, innerIndex);
} else {
destination.next(innerValue);
}
}

private trySelectResult(outerValue: T, innerValue: I,
outerIndex: number, innerIndex: number): void {
const { resultSelector, destination } = this;
try {
const result = resultSelector(outerValue, innerValue, outerIndex, innerIndex);
destination.next(result);
} catch (err) {
destination.error(err);
}
innerSub: InnerSubscriber<T, R>): void {
this.destination.next(innerValue);
}

notifyError(err: any): void {
Expand Down
22 changes: 5 additions & 17 deletions src/internal/patching/operator/exhaustMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@
import { Observable } from '../../Observable';
import { ObservableInput } from '../../types';
import { exhaustMap as higherOrder } from '../../operators/exhaustMap';

/* tslint:disable:max-line-length */
export function exhaustMap<T, R>(this: Observable<T>, project: (value: T, index: number) => ObservableInput<R>): Observable<R>;
export function exhaustMap<T, I, R>(this: Observable<T>, project: (value: T, index: number) => ObservableInput<I>, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable<R>;
/* tslint:enable:max-line-length */

/**
* Projects each source value to an Observable which is merged in the output
* Observable only if the previous projected Observable has completed.
Expand Down Expand Up @@ -39,21 +33,15 @@ export function exhaustMap<T, I, R>(this: Observable<T>, project: (value: T, ind
* @param {function(value: T, ?index: number): ObservableInput} project A function
* that, when applied to an item emitted by the source Observable, returns an
* Observable.
* @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector]
* A function to produce the value on the output Observable based on the values
* and the indices of the source (outer) emission and the inner Observable
* emission. The arguments passed to this function are:
* - `outerValue`: the value that came from the source
* - `innerValue`: the value that came from the projected Observable
* - `outerIndex`: the "index" of the value that came from the source
* - `innerIndex`: the "index" of the value from the projected Observable
* @return {Observable} An Observable containing projected Observables
* of each item of the source, ignoring projected Observables that start before
* their preceding Observable has completed.
* @method exhaustMap
* @owner Observable
*/
export function exhaustMap<T, I, R>(this: Observable<T>, project: (value: T, index: number) => ObservableInput<I>,
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable<R> {
return higherOrder(project, resultSelector)(this);
export function exhaustMap<T, R>(
this: Observable<T>,
project: (value: T, index: number) => ObservableInput<R>
): Observable<R> {
return higherOrder(project)(this);
}

0 comments on commit 42589d0

Please sign in to comment.