Skip to content

Commit

Permalink
fix(bufferToggle): handle closingSelector completes immediately
Browse files Browse the repository at this point in the history
relates to ReactiveX#1487
  • Loading branch information
kwonoj committed Apr 1, 2016
1 parent d399a6a commit a40c496
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 20 deletions.
32 changes: 22 additions & 10 deletions spec/operators/bufferToggle-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import {DoneSignature} from '../helpers/test-helper';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

const Observable = Rx.Observable;
Expand Down Expand Up @@ -344,7 +343,7 @@ describe('Observable.prototype.bufferToggle', () => {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should accept closing selector that returns a resolved promise', (done: DoneSignature) => {
it('should accept closing selector that returns a resolved promise', (done: MochaDone) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Expand All @@ -354,15 +353,16 @@ describe('Observable.prototype.bufferToggle', () => {

e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any) => { resolve(42); }))
.subscribe((x) => {
expect(x).toEqual(expected.shift()); },
done.fail,
() => {
expect(expected.length).toBe(0);
expect(x).to.deep.equal(expected.shift());
}, () => {
done(new Error('should not be called'));
}, () => {
expect(expected.length).to.be.equal(0);
done();
});
});

it('should accept closing selector that returns a rejected promise', (done: DoneSignature) => {
it('should accept closing selector that returns a rejected promise', (done: MochaDone) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Expand All @@ -373,12 +373,24 @@ describe('Observable.prototype.bufferToggle', () => {

e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any, reject: any) => { reject(expected); }))
.subscribe((x) => {
done.fail();
done(new Error('should not be called'));
}, (x) => {
expect(x).toBe(expected);
expect(x).to.equal(expected);
done();
}, () => {
done.fail();
done(new Error('should not be called'));
});
});

it('should handle empty closing observable', () => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------|');
const subs = '^ !';
const e2 = cold('--x-----------y--------z---| ');
const expected = '--l-----------m--------n-----------|';

const result = e1.bufferToggle(e2, () => Observable.empty());

expectObservable(result).toBe(expected, {l: [], m: [], n: []});
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});
25 changes: 15 additions & 10 deletions src/operator/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {

private closeBuffer(context: BufferContext<T>): void {
const contexts = this.contexts;
if (contexts === null) {
return;

if (contexts && context) {
const { buffer, subscription } = context;
this.destination.next(buffer);
contexts.splice(contexts.indexOf(context), 1);
this.remove(subscription);
subscription.unsubscribe();
}
const { buffer, subscription } = context;
this.destination.next(buffer);
contexts.splice(contexts.indexOf(context), 1);
this.remove(subscription);
subscription.unsubscribe();
}

private trySubscribe(closingNotifier: any): void {
Expand All @@ -161,10 +161,15 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
contexts.push(context);

const innerSubscription = subscribeToResult(this, closingNotifier, <any>context);
(<any> innerSubscription).context = context;

this.add(innerSubscription);
subscription.add(innerSubscription);
if (!innerSubscription.isUnsubscribed) {
(<any> innerSubscription).context = context;

this.add(innerSubscription);
subscription.add(innerSubscription);
} else {
this.closeBuffer(context);
}
}
}

Expand Down

0 comments on commit a40c496

Please sign in to comment.