Skip to content

Commit

Permalink
feat: complete streams when component is torn down
Browse files Browse the repository at this point in the history
BREAKING CHANGE: streams are now completed when the component is torn
down.
  • Loading branch information
Tom Ashworth committed Feb 15, 2016
1 parent a6542eb commit a585bbf
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
24 changes: 15 additions & 9 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,33 @@ import Rx from 'rx';

export default function withObserve() {
this.before('initialize', function () {
this.localSubscriptions = [];
this.localDisposables = [];
});

/**
* Observe a sequence which can be disposed of on teardown.
* Observe a sequence is disposed of on teardown.
*
* Takes the sequence to observe.
* Returns the observable sequence.
*/
this.observe = function (upstream) {
return Rx.Observable.create((o) => {
var subscription = upstream.subscribe(o);
// Store the subscription so that the mixin can dispose on teardown.
this.localSubscriptions.push(subscription);
return subscription;
return Rx.Observable.create(observer => {
var upstreamDisposable = upstream.subscribe(observer);
// Create our own disposable so we can track teardown. When that happens,
// notify the observer.
var disposable = Rx.Disposable.create(function () {
upstreamDisposable.dispose();
observer.onCompleted();
});
// Store the disposable so that the mixin can dispose on teardown.
this.localDisposables.push(disposable);
return disposable;
}, upstream);
};

this.before('teardown', function () {
this.localSubscriptions.forEach(function (subscription) {
subscription.dispose();
this.localDisposables.forEach(function (disposable) {
disposable.dispose();
});
});
}
18 changes: 18 additions & 0 deletions src/with-observe.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ describe('withObserve', function () {
this.component = (new Component()).initialize(document.body);
});

afterEach(function () {
try {
this.component.teardown();
} catch (e) {}
});

it('should observe changed values', function (done) {
this.component.observe(observable).subscribeOnNext(function (value) {
if (value === 2) {
Expand All @@ -39,4 +45,16 @@ describe('withObserve', function () {
subject.onNext('not subscribed');
expect(called).toBe(2);
});

it('should end the stream on teardown', function () {
var onNextSpy = jasmine.createSpy('onNextSpy');
var onErrorSpy = jasmine.createSpy('onErrorSpy');
var onCompletedSpy = jasmine.createSpy('onCompletedSpy');
this.component.observe(observable).subscribe(onNextSpy, onErrorSpy, onCompletedSpy);
this.component.teardown();
subject.onNext(10);
expect(onNextSpy).not.toHaveBeenCalledWith(10);
expect(onErrorSpy).not.toHaveBeenCalled();
expect(onCompletedSpy).toHaveBeenCalled();
});
});

0 comments on commit a585bbf

Please sign in to comment.