From 3bd4c9800f791ba59e80b665f10a104b651765e9 Mon Sep 17 00:00:00 2001 From: Amy Simmons Date: Tue, 25 Apr 2017 17:09:42 +0100 Subject: [PATCH] Upgrade to RxJS 5 This commit upgrades flight-with-observe to to RxJS version 5: https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md --- CONTRIBUTING.md | 2 +- config/webpack.config.publish.js | 2 +- package.json | 2 +- src/index.js | 34 ++++++++++++++++++-------------- src/with-observe.spec.js | 30 ++++++++++++++-------------- 5 files changed, 37 insertions(+), 33 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 9a7ebc8..d9d3cc7 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -89,7 +89,7 @@ project's developers might not want to merge into the project. Please adhere to the coding conventions used throughout a project (indentation, accurate comments, etc.) and any other requirements (such as test coverage). -Adhering to the following this process is the best way to get your work +Adhering to the following process is the best way to get your work included in the project: 1. If you do not have permissions to push to the upstream remote origin, diff --git a/config/webpack.config.publish.js b/config/webpack.config.publish.js index 436fba5..b6a8781 100644 --- a/config/webpack.config.publish.js +++ b/config/webpack.config.publish.js @@ -9,6 +9,6 @@ module.exports = Object.assign(baseConfig, { path: constants.BUILD_DIRECTORY }, externals: [ - 'rx' + 'rxjs' ] }); diff --git a/package.json b/package.json index 0677826..531721a 100644 --- a/package.json +++ b/package.json @@ -45,7 +45,7 @@ "webpack": "^1.12.1" }, "dependencies": { - "rx": ">=2" + "rxjs": "^5.3.0" }, "repository": { "type": "git", diff --git a/src/index.js b/src/index.js index a45b41a..c1a9c8d 100644 --- a/src/index.js +++ b/src/index.js @@ -4,37 +4,41 @@ * patterns. * https://github.com/Reactive-Extensions/RxJS */ -import Rx from 'rx'; +import Rx from 'rxjs'; export default function withObserve() { this.before('initialize', function () { - this.localDisposables = []; + this.localSubscriptions = []; }); /** - * Observe a sequence is disposed of on teardown. + * Observe a sequence is unsubscribed from on teardown. * * Takes the sequence to observe. * Returns the observable sequence. */ this.observe = function (upstream) { return Rx.Observable.create(observer => { - var upstreamDisposable = upstream.subscribe(observer); - // Create our own disposable so we can track teardown. When that happens, - // notify the observer. - var disposable = Rx.Disposable.create(function () { - upstreamDisposable.dispose(); - observer.onCompleted(); + var upstreamSubscription = upstream.subscribe(observer); + // Create our own subscription so we can track teardown. When that happens, + // notify the observer that the observable is completed (no more values), + // and unsubscribe. + var subscription = new Rx.Subscription(() => { + observer.complete(); + upstreamSubscription.unsubscribe(); }); - // Store the disposable so that the mixin can dispose on teardown. - this.localDisposables.push(disposable); - return disposable; - }, upstream); + // Store the subscription so that the mixin can unsubscribe on teardown. + this.localSubscriptions.push(subscription); + // Returning the subscription means that manually unsubscribing from + // the observable returned from this.observe will cause the upstreamSubscription + // to complete and then unsubscribe. + return subscription; + }); }; this.before('teardown', function () { - this.localDisposables.forEach(function (disposable) { - disposable.dispose(); + this.localSubscriptions.forEach(subscription => { + subscription.unsubscribe(); }); }); } diff --git a/src/with-observe.spec.js b/src/with-observe.spec.js index 2d4e93d..6dea0ab 100644 --- a/src/with-observe.spec.js +++ b/src/with-observe.spec.js @@ -1,4 +1,4 @@ -import Rx from 'rx'; +import Rx from 'rxjs'; import { component } from 'flight'; import withObserve from '.'; @@ -20,41 +20,41 @@ describe('withObserve', function () { }); it('should observe changed values', function (done) { - this.component.observe(observable).subscribeOnNext(function (value) { + this.component.observe(observable).subscribe(function (value) { if (value === 2) { done(); } }); - subject.onNext(2); + subject.next(2); }); - it('should dispose of observables on teardown', function () { + it('should unsubscribe from subscriptions on teardown', function () { this.component.observe(observable); var called = 0; - this.component.observe(observable).subscribeOnNext(function (value) { + this.component.observe(observable).subscribe(function (value) { called = called + 1; }); expect(called).toBe(1); - subject.onNext('still subscribed'); + subject.next('still subscribed'); expect(called).toBe(2); // Teardown the component and check handler is not called again. this.component.teardown(); - subject.onNext('not subscribed'); + subject.next('not subscribed'); expect(called).toBe(2); }); it('should end the stream on teardown', function () { - var onNextSpy = jasmine.createSpy('onNextSpy'); - var onErrorSpy = jasmine.createSpy('onErrorSpy'); - var onCompletedSpy = jasmine.createSpy('onCompletedSpy'); - this.component.observe(observable).subscribe(onNextSpy, onErrorSpy, onCompletedSpy); + var nextSpy = jasmine.createSpy('nextSpy'); + var errorSpy = jasmine.createSpy('errorSpy'); + var completedSpy = jasmine.createSpy('completedSpy'); + this.component.observe(observable).subscribe(nextSpy, errorSpy, completedSpy); this.component.teardown(); - subject.onNext(10); - expect(onNextSpy).not.toHaveBeenCalledWith(10); - expect(onErrorSpy).not.toHaveBeenCalled(); - expect(onCompletedSpy).toHaveBeenCalled(); + subject.next(10); + expect(nextSpy).not.toHaveBeenCalledWith(10); + expect(errorSpy).not.toHaveBeenCalled(); + expect(completedSpy).toHaveBeenCalled(); }); });