Skip to content

Commit

Permalink
Merge pull request #1727 from trxcllnt/refCount-race-fix
Browse files Browse the repository at this point in the history
fix(ConnectableObservable): fix ConnectableObservable connection handling issue
  • Loading branch information
benlesh committed May 25, 2016
2 parents a98bec7 + 41ce80c commit ceb9990
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 5 deletions.
44 changes: 43 additions & 1 deletion spec/operators/publishReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,48 @@ describe('Observable.prototype.publishReplay', () => {
done();
});

it('should emit replayed values and resubscribe to the source when ' +
'reconnected without source completion', () => {
const results1 = [];
const results2 = [];
let subscriptions = 0;

const source = new Observable((observer: Rx.Observer<number>) => {
subscriptions++;
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
// observer.complete();
});

const connectable = source.publishReplay(2);
const subscription1 = connectable.subscribe((x: number) => {
results1.push(x);
});

expect(results1).to.deep.equal([]);
expect(results2).to.deep.equal([]);

connectable.connect().unsubscribe();
subscription1.unsubscribe();

expect(results1).to.deep.equal([1, 2, 3, 4]);
expect(results2).to.deep.equal([]);
expect(subscriptions).to.equal(1);

const subscription2 = connectable.subscribe((x: number) => {
results2.push(x);
});

connectable.connect().unsubscribe();
subscription2.unsubscribe();

expect(results1).to.deep.equal([1, 2, 3, 4]);
expect(results2).to.deep.equal([3, 4, 1, 2, 3, 4]);
expect(subscriptions).to.equal(2);
});

it('should emit replayed values plus completed when subscribed after completed', (done: MochaDone) => {
const results1 = [];
const results2 = [];
Expand Down Expand Up @@ -358,4 +400,4 @@ describe('Observable.prototype.publishReplay', () => {

published.connect();
});
});
});
39 changes: 35 additions & 4 deletions src/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ export class ConnectableObservable<T> extends Observable<T> {
connect(): Subscription {
let connection = this._connection;
if (!connection) {
connection = this.source.subscribe(new ConnectableSubscriber(this.getSubject(), this));
connection = this._connection = new Subscription();
connection.add(this.source
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
if (connection.isUnsubscribed) {
this._connection = null;
connection = Subscription.EMPTY;
Expand Down Expand Up @@ -66,9 +68,13 @@ class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
const { connectable } = this;
if (connectable) {
this.connectable = null;
const connection = (<any> connectable)._connection;
(<any> connectable)._refCount = 0;
(<any> connectable)._subject = null;
(<any> connectable)._connection = null;
if (connection) {
connection.unsubscribe();
}
}
}
}
Expand Down Expand Up @@ -122,10 +128,35 @@ class RefCountSubscriber<T> extends Subscriber<T> {
return;
}

///
// Compare the local RefCountSubscriber's connection Subscription to the
// connection Subscription on the shared ConnectableObservable. In cases
// where the ConnectableObservable source synchronously emits values, and
// the RefCountSubscriber's dowstream Observers synchronously unsubscribe,
// execution continues to here before the RefCountOperator has a chance to
// supply the RefCountSubscriber with the shared connection Subscription.
// For example:
// ```
// Observable.range(0, 10)
// .publish()
// .refCount()
// .take(5)
// .subscribe();
// ```
// In order to account for this case, RefCountSubscriber should only dispose
// the ConnectableObservable's shared connection Subscription if the
// connection Subscription exists, *and* either:
// a. RefCountSubscriber doesn't have a reference to the shared connection
// Subscription yet, or,
// b. RefCountSubscriber's connection Subscription reference is identical
// to the shared connection Subscription
///
const { connection } = this;
if (connection) {
this.connection = null;
connection.unsubscribe();
const sharedConnection = (<any> connectable)._connection;
this.connection = null;

if (sharedConnection && (!connection || sharedConnection === connection)) {
sharedConnection.unsubscribe();
}
}
}

0 comments on commit ceb9990

Please sign in to comment.