Skip to content

Commit

Permalink
fix(timeoutWith): fix to avoid unnecessary inner subscription
Browse files Browse the repository at this point in the history
Fix timeoutWith operator to avoid an unnecessary subscription to the inner withObservable Observable
if it is known that the outer is already unsubscribed. Also support
preserving the Subscriber chain, while not retaining the subscription to
the source once it times out.
  • Loading branch information
staltz authored and benlesh committed Dec 8, 2015
1 parent 04e1479 commit 6e63752
Showing 1 changed file with 15 additions and 19 deletions.
34 changes: 15 additions & 19 deletions src/operator/timeoutWith.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class TimeoutWithOperator<T, R> implements Operator<T, R> {

class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
private timeoutSubscription: Subscription<T> = undefined;
private timedOut: boolean = false;
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
Expand All @@ -43,12 +42,13 @@ class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
return this._hasCompleted;
}

constructor(destination: Subscriber<T>,
constructor(public destination: Subscriber<T>,
private absoluteTimeout: boolean,
private waitFor: number,
private withObservable: Observable<any>,
private scheduler: Scheduler) {
super(destination);
super(null);
destination.add(this);
this.scheduleTimeout();
}

Expand All @@ -69,31 +69,27 @@ class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
}

_next(value: T) {
if (!this.timedOut) {
this.destination.next(value);
if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
this.destination.next(value);
if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
}

_error(err) {
if (!this.timedOut) {
this.destination.error(err);
this._hasCompleted = true;
}
this.destination.error(err);
this._hasCompleted = true;
}

_complete() {
if (!this.timedOut) {
this.destination.complete();
this._hasCompleted = true;
}
this.destination.complete();
this._hasCompleted = true;
}

handleTimeout(): void {
const withObservable = this.withObservable;
this.timedOut = true;
this.add(this.timeoutSubscription = subscribeToResult(this, withObservable));
if (!this.isUnsubscribed) {
const withObservable = this.withObservable;
this.unsubscribe();
this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable));
}
}
}

0 comments on commit 6e63752

Please sign in to comment.