Skip to content

Commit

Permalink
perf(mergeMap): extra 1x factor gains from custom tryCatch member fun…
Browse files Browse the repository at this point in the history
…ction
  • Loading branch information
benlesh committed Jan 27, 2016
1 parent 9c1e725 commit c4ce2fb
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions src/operator/mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {subscribeToResult} from '../util/subscribeToResult';
import {OuterSubscriber} from '../OuterSubscriber';

Expand Down Expand Up @@ -39,7 +37,7 @@ export class MergeMapOperator<T, R, R2> implements Operator<T, R> {

export class MergeMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
private hasCompleted: boolean = false;
private buffer: T[] = [];
private buffer: Observable<any>[] = [];
private active: number = 0;
protected index: number = 0;

Expand All @@ -50,23 +48,28 @@ export class MergeMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
super(destination);
}

protected _next(value: T): void {
protected _next(value: any): void {
if (this.active < this.concurrent) {
const index = this.index++;
const ish = tryCatch(this.project)(value, index);
const destination = this.destination;
if (ish === errorObject) {
destination.error(errorObject.e);
} else {
this.active++;
this._innerSub(ish, value, index);
}
this._tryNext(value);
} else {
this.buffer.push(value);
}
}

private _innerSub(ish: Observable<R>, value: T, index: number): void {
protected _tryNext(value: any) {
let result: any;
const index = this.index++;
try {
result = this.project(value, index);
} catch (err) {
this.destination.error(err);
return;
}
this.active++;
this._innerSub(result, value, index);
}

private _innerSub(ish: any, value: T, index: number): void {
this.add(subscribeToResult<T, R>(this, ish, value, index));
}

Expand All @@ -78,17 +81,22 @@ export class MergeMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
const { destination, resultSelector } = this;
if (resultSelector) {
const result = tryCatch(resultSelector)(outerValue, innerValue, outerIndex, innerIndex);
if (result === errorObject) {
destination.error(errorObject.e);
} else {
destination.next(result);
}
if (this.resultSelector) {
this._notifyResultSelector(outerValue, innerValue, outerIndex, innerIndex);
} else {
destination.next(innerValue);
this.destination.next(innerValue);
}
}

_notifyResultSelector(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) {
let result: any;
try {
result = this.resultSelector(outerValue, innerValue, outerIndex, innerIndex);
} catch (err) {
this.destination.error(err);
return;
}
this.destination.next(result);
}

notifyComplete(innerSub: Subscription): void {
Expand Down

0 comments on commit c4ce2fb

Please sign in to comment.