From 44a4ee1f0d16037f8edeb02efa33ca7af520a10b Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Fri, 28 Aug 2015 14:31:26 -0700 Subject: [PATCH] feat(observable): add Observable.all (forkJoin) --- spec/observables/forkJoin-spec.js | 14 +++++++ src/Observable.ts | 3 +- src/Rx.ts | 2 + src/observables/ForkJoinObservable.ts | 56 +++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 spec/observables/forkJoin-spec.js create mode 100644 src/observables/ForkJoinObservable.ts diff --git a/spec/observables/forkJoin-spec.js b/spec/observables/forkJoin-spec.js new file mode 100644 index 0000000000..dfb4a8a5c6 --- /dev/null +++ b/spec/observables/forkJoin-spec.js @@ -0,0 +1,14 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.forkJoin', function () { + it('should join the last values of the provided observables into an array', function(done) { + Observable.forkJoin(Observable.of(1, 2, 3, 'a'), + Observable.of('b'), + Observable.of(1, 2, 3, 4, 'c')) + .subscribe(function (x) { + expect(x).toEqual(['a', 'b', 'c']); + }, null, done); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 3512ffa9c4..cc56b1c40d 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -89,7 +89,8 @@ export default class Observable { static fromPromise: (promise: Promise) => Observable; static timer: (delay: number) => Observable; static interval: (interval: number) => Observable; - + static forkJoin: (...observables: Observable[]) => Observable; + static concat: (...observables: any[]) => Observable; concat: (...observables: any[]) => Observable; concatAll: () => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index dac6fbc786..89e454cf06 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -22,6 +22,7 @@ import ScalarObservable from './observables/ScalarObservable'; import TimerObservable from './observables/TimerObservable'; import FromEventPatternObservable from './observables/FromEventPatternObservable'; import FromEventObservable from './observables/FromEventObservable'; +import ForkJoinObservable from './observables/ForkJoinObservable'; Observable.defer = DeferObservable.create; Observable.from = IteratorObservable.create; @@ -30,6 +31,7 @@ Observable.fromPromise = PromiseObservable.create; Observable.of = ArrayObservable.of; Observable.range = RangeObservable.create; Observable.fromEventPattern = FromEventPatternObservable.create; +Observable.forkJoin = ForkJoinObservable.create; Observable.just = ScalarObservable.create; Observable.return = ScalarObservable.create; diff --git a/src/observables/ForkJoinObservable.ts b/src/observables/ForkJoinObservable.ts new file mode 100644 index 0000000000..f307ecd838 --- /dev/null +++ b/src/observables/ForkJoinObservable.ts @@ -0,0 +1,56 @@ +import Observable from '../Observable'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; + +export default class ForkJoinObservable extends Observable { + constructor(private observables: Observable[]) { + super(); + } + + static create(...observables: Observable[]): Observable { + return new ForkJoinObservable(observables); + } + + _subscribe(subscriber: Observer) { + const observables = this.observables; + const len = observables.length; + let context = { complete: 0, total: len, values: emptyArray(len) }; + for (let i = 0; i < len; i++) { + observables[i].subscribe(new AllSubscriber(subscriber, this, i, context)) + } + } +} + +class AllSubscriber extends Subscriber { + private _value: T; + + constructor(destination: Observer, private parent: ForkJoinObservable, private index: number, + private context: { complete: number, total: number, values: any[] }) { + super(destination); + } + + _next(value: T) { + this._value = value; + } + + _complete() { + const context = this.context; + context.values[this.index] = this._value; + if (context.values.every(hasValue)) { + this.destination.next(context.values); + this.destination.complete(); + } + } +} + +function hasValue(x) { + return x !== null; +} + +function emptyArray(len: number): any[] { + var arr = []; + for (let i = 0; i < len; i++) { + arr.push(null); + } + return arr; +} \ No newline at end of file