Skip to content

Commit

Permalink
fix(delayWhen): correctly handle synchronous duration observable (#2589)
Browse files Browse the repository at this point in the history
- closes #2587
  • Loading branch information
kwonoj authored and benlesh committed May 3, 2017
1 parent 83ebe90 commit 695f280
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
13 changes: 13 additions & 0 deletions spec/operators/delayWhen-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports
import { expect } from 'chai';

declare const { asDiagram };
declare const hot: typeof marbleTestingSignature.hot;
Expand Down Expand Up @@ -215,4 +216,16 @@ describe('Observable.prototype.delayWhen', () => {
expectSubscriptions(selector.subscriptions).toBe([]);
expectSubscriptions(subDelay.subscriptions).toBe(subDelaySub);
});

it('should complete when duration selector returns synchronous observable', () => {
let next: boolean = false;
let complete: boolean = false;

Rx.Observable.of(1)
.delayWhen(() => Rx.Observable.of(2))
.subscribe(() => next = true, null, () => complete = true);

expect(next).to.be.true;
expect(complete).to.be.true;
});
});
11 changes: 7 additions & 4 deletions src/operator/delayWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export function delayWhen<T>(this: Observable<T>, delayDurationSelector: (value:
subscriptionDelay?: Observable<any>): Observable<T> {
if (subscriptionDelay) {
return new SubscriptionDelayObservable(this, subscriptionDelay)
.lift(new DelayWhenOperator(delayDurationSelector));
.lift(new DelayWhenOperator(delayDurationSelector));
}
return this.lift(new DelayWhenOperator(delayDurationSelector));
}
Expand Down Expand Up @@ -112,7 +112,7 @@ class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
this.tryDelay(delayNotifier, value);
}
} catch (err) {
this.destination.error(err);
this.destination.error(err);
}
}

Expand All @@ -138,9 +138,12 @@ class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {

private tryDelay(delayNotifier: Observable<any>, value: T): void {
const notifierSubscription = subscribeToResult(this, delayNotifier, value);
this.add(notifierSubscription);

this.delayNotifierSubscriptions.push(notifierSubscription);
if (notifierSubscription && !notifierSubscription.closed) {
this.add(notifierSubscription);
this.delayNotifierSubscriptions.push(notifierSubscription);
}

this.values.push(value);
}

Expand Down

0 comments on commit 695f280

Please sign in to comment.