Skip to content

Commit

Permalink
feat(withLatestFrom): default array output, handle other types
Browse files Browse the repository at this point in the history
- now handles promises, iteratables, lowercase-o observables and Observables
- updates marble tests to be more comprehensive
- fixes issue where values were emitted before all observables responded
- makes project function optional and will output arrays
- updates documentation
  • Loading branch information
benlesh committed Sep 23, 2015
1 parent 4d37812 commit cb393dc
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 52 deletions.
128 changes: 109 additions & 19 deletions spec/operators/withLatestFrom-spec.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,117 @@
/* globals describe, it, expect */
/* globals describe, it, expect, expectObservable, hot, cold, lowerCaseO */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Promise = require('promise');

describe('Observable.prototype.withLatestFrom()', function () {
it('should merge the emitted value with the latest values of the other observables', function (done) {
var a = Observable.of('a');
var b = Observable.of('b', 'c');

Observable.of('d').delay(100)
.withLatestFrom(a, b, function (x, a, b) { return [x, a, b]; })
.subscribe(function (x) {
expect(x).toEqual(['d', 'a', 'c']);
}, null, done);
describe('Observable.prototype.withLatestFrom()', function () {
it('should merge the value with the latest values from the other observables into arrays', function () {
var e1 = hot('--a--^---b---c---d--|');
var e2 = hot('--e--^-f---g---h----|');
var e3 = hot('--i--^-j---k---l----|');
var expected = '----x---y---z--|';
var values = {
x: ['b', 'f', 'j'],
y: ['c', 'g', 'k'],
z: ['d', 'h', 'l']
};
expectObservable(e1.withLatestFrom(e2, e3)).toBe(expected, values);
});

it('should emit nothing if the other observables never emit', function (done) {
var a = Observable.of('a');
var b = Observable.never();

Observable.of('d').delay(100)
.withLatestFrom(a, b, function (x, a, b) { return [x, a, b]; })
.subscribe(function (x) {
expect('this was called').toBe(false);
it('should merge the value with the latest values from the other observables into arrays and a project argument', function () {
var e1 = hot('--a--^---b---c---d--|');
var e2 = hot('--e--^-f---g---h----|');
var e3 = hot('--i--^-j---k---l----|');
var expected = '----x---y---z--|';
var values = {
x: 'bfj',
y: 'cgk',
z: 'dhl'
};
var project = function(a, b, c) { return a + b + c };
expectObservable(e1.withLatestFrom(e2, e3, project)).toBe(expected, values);
});

it('should handle empty', function (){
var e1 = Observable.empty();
var e2 = hot('--e--^-f---g---h----|');
var e3 = hot('--i--^-j---k---l----|');
var expected = '|'; // empty
expectObservable(e1.withLatestFrom(e2, e3)).toBe(expected);
});

it('should handle never', function (){
var e1 = Observable.never();
var e2 = hot('--e--^-f---g---h----|');
var e3 = hot('--i--^-j---k---l----|');
var expected = '--------------------'; // never
expectObservable(e1.withLatestFrom(e2, e3)).toBe(expected);
});

it('should handle throw', function (){
var e1 = Observable.throw(new Error('sad'));
var e2 = hot('--e--^-f---g---h----|');
var e3 = hot('--i--^-j---k---l----|');
var expected = '#'; // throw
expectObservable(e1.withLatestFrom(e2, e3)).toBe(expected, null, new Error('sad'));
});

it('should handle error', function (){
var e1 = hot('--a--^---b---#', undefined, new Error('boo-hoo'));
var e2 = hot('--e--^-f---g---h----|');
var e3 = hot('--i--^-j---k---l----|');
var expected = '----x---#'; // throw
var values = {
x: ['b','f','j']
};
expectObservable(e1.withLatestFrom(e2, e3)).toBe(expected, values, new Error('boo-hoo'));
});

it('should handle error with project argument', function (){
var e1 = hot('--a--^---b---#', undefined, new Error('boo-hoo'));
var e2 = hot('--e--^-f---g---h----|');
var e3 = hot('--i--^-j---k---l----|');
var expected = '----x---#'; // throw
var values = {
x: 'bfj'
};
var project = function(a, b, c) { return a + b + c; };
expectObservable(e1.withLatestFrom(e2, e3, project)).toBe(expected, values, new Error('boo-hoo'));
});

it('should handle merging with empty', function (){
var e1 = hot('--a--^---b---c---d--|');
var e2 = Observable.empty();
var e3 = hot('--i--^-j---k---l----|');
var expected = '---------------|';
expectObservable(e1.withLatestFrom(e2, e3)).toBe(expected);
});

it('should handle merging with never', function (){
var e1 = hot('--a--^---b---c---d--|');
var e2 = Observable.never();
var e3 = hot('--i--^-j---k---l----|');
var expected = '---------------|';
expectObservable(e1.withLatestFrom(e2, e3)).toBe(expected);
});

it('should handle promises', function (done){
Observable.of(1).delay(1).withLatestFrom(Promise.resolve(2), Promise.resolve(3))
.subscribe(function(x) {
expect(x).toEqual([1,2,3]);
}, null, done);
});

it('should handle arrays', function() {
Observable.of(1).delay(1).withLatestFrom([2,3,4], [4,5,6])
.subscribe(function(x) {
expect(x).toEqual([1,4,6]);
});
});

it('should handle lowercase-o observables', function (){
Observable.of(1).delay(1).withLatestFrom(lowerCaseO(2, 3, 4), lowerCaseO(4, 5, 6))
.subscribe(function(x) {
expect(x).toEqual([1,4,6]);
});
});
});
93 changes: 60 additions & 33 deletions src/operators/withLatestFrom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,95 @@ import Observable from '../Observable';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import OuterSubscriber from '../OuterSubscriber';
import subscribeToResult from '../util/subscribeToResult';

/**
* @param {Observable} observables the observables to get the latest values from.
* @param {Function} [project] optional projection function for merging values together. Receives all values in order
* of observables passed. (e.g. `a.withLatestFrom(b, c, (a1, b1, c1) => a1 + b1 + c1)`). If this is not passed, arrays
* will be returned.
* @description merges each value from an observable with the latest values from the other passed observables.
* All observables must emit at least one value before the resulting observable will emit
*
* #### example
* ```
* A.withLatestFrom(B, C)
*
* A: ----a-----------------b---------------c-----------|
* B: ---d----------------e--------------f---------|
* C: --x----------------y-------------z-------------|
* result: ---([a,d,x])---------([b,e,y])--------([c,f,z])---|
* ```
*/
export default function withLatestFrom<R>(...args: (Observable<any>|((...values: any[]) => Observable<R>))[]): Observable<R> {
const project = <((...values: any[]) => Observable<R>)>args.pop();
let project;
if(typeof args[args.length - 1] === 'function') {
project = args.pop();
}
const observables = <Observable<any>[]>args;
return this.lift(new WithLatestFromOperator(observables, project));
}

class WithLatestFromOperator<T, R> implements Operator<T, R> {
constructor(private observables: Observable<any>[], private project: (...values: any[]) => Observable<R>) {
constructor(private observables: Observable<any>[], private project?: (...values: any[]) => Observable<R>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new WithLatestFromSubscriber<T, R>(subscriber, this.observables, this.project);
}
}

class WithLatestFromSubscriber<T, R> extends Subscriber<T> {
class WithLatestFromSubscriber<T, R> extends OuterSubscriber<T, R> {
private values: any[];
private toSet: number;
private toRespond: number[] = [];

constructor(destination: Subscriber<T>, private observables: Observable<any>[], private project: (...values: any[]) => Observable<R>) {
constructor(destination: Subscriber<T>, private observables: Observable<any>[], private project?: (...values: any[]) => Observable<R>) {
super(destination);
const len = observables.length;
this.values = new Array(len);
this.toSet = len;

for (let i = 0; i < len; i++) {
this.toRespond.push(i);
}

for (let i = 0; i < len; i++) {
this.add(observables[i]._subscribe(new WithLatestInnerSubscriber(this, i)))
let observable = observables[i];
this.add(subscribeToResult<T, R>(this, observable, <any>observable, i));
}
}

notifyNext(value, observable, index, observableIndex) {
this.values[observableIndex] = value;
const toRespond = this.toRespond;
if(toRespond.length > 0) {
const found = toRespond.indexOf(observableIndex);
if(found !== -1) {
toRespond.splice(found, 1);
}
}
}

notifyValue(index, value) {
this.values[index] = value;
this.toSet--;
notifyComplete() {
// noop
}

_next(value: T) {
if (this.toSet === 0) {
if (this.toRespond.length === 0) {
const values = this.values;
let result = tryCatch(this.project)([value, ...values]);
if (result === errorObject) {
this.destination.error(result.e);
const destination = this.destination;
const project = this.project;
const args = [value, ...values];
if(project) {
let result = tryCatch(this.project).apply(this, args);
if (result === errorObject) {
destination.error(result.e);
} else {
destination.next(result);
}
} else {
this.destination.next(result);
destination.next(args);
}
}
}
}

class WithLatestInnerSubscriber<T, R> extends Subscriber<T> {
constructor(private parent: WithLatestFromSubscriber<T, R>, private valueIndex: number) {
super(null)
}

_next(value: T) {
this.parent.notifyValue(this.valueIndex, value);
}

_error(err: any) {
this.parent.error(err);
}

_complete() {
// noop
}
}

0 comments on commit cb393dc

Please sign in to comment.