Skip to content

Commit

Permalink
feat(expand): now handles promises, iterables and lowercase-o observa…
Browse files Browse the repository at this point in the history
…bles

- also minor refactoring to subscribeToResult function
  • Loading branch information
benlesh committed Sep 23, 2015
1 parent 24fdd34 commit c5239e9
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 20 deletions.
62 changes: 62 additions & 0 deletions spec/operators/expand-spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* globals describe, it, expect, expectObservable, hot, cold */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Promise = require('promise');

describe('Observable.prototype.expand()', function () {
it('should map and recursively flatten', function() {
Expand Down Expand Up @@ -54,4 +55,65 @@ describe('Observable.prototype.expand()', function () {
return Observable.of(x + x); // scalar
})).toBe(expected, values);
});

it('should recursively flatten promises', function(done) {
var expected = [1, 2, 4, 8, 16];
Observable.of(1)
.expand(function(x) {
if(x === 16) {
return Observable.empty();
}
return Promise.resolve(x + x);
})
.subscribe(function(x) {
expect(x).toBe(expected.shift());
}, null, function(){
expect(expected.length).toBe(0);
done();
});
});

it('should recursively flatten Arrays', function(done) {
var expected = [1, 2, 4, 8, 16];
Observable.of(1)
.expand(function(x) {
if(x === 16) {
return Observable.empty();
}
return [x + x];
})
.subscribe(function(x) {
expect(x).toBe(expected.shift());
}, null, function(){
expect(expected.length).toBe(0);
done();
});
});

it('should recursively flatten lowercase-o observables', function(done) {
var expected = [1, 2, 4, 8, 16];

Observable.of(1)
.expand(function(x) {
if(x === 16) {
return Observable.empty();
}

var ish = {
subscribe: function(observer){
observer.next(x + x);
observer.complete();
}
};

ish[Symbol.observable] = function(){ return this; };
return ish;
})
.subscribe(function(x) {
expect(x).toBe(expected.shift());
}, null, function(){
expect(expected.length).toBe(0);
done();
});
});
});
2 changes: 1 addition & 1 deletion spec/operators/switch-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var Promise = require('promise');
var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;

fdescribe('Observable.prototype.switch()', function(){
describe('Observable.prototype.switch()', function(){
it("should switch to each immediately-scheduled inner Observable", function (done) {
var a = Observable.of(1, 2, 3, immediateScheduler);
var b = Observable.of(4, 5, 6, immediateScheduler);
Expand Down
24 changes: 8 additions & 16 deletions src/operators/expand-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import ScalarObservable from '../observables/ScalarObservable';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import OuterSubscriber from '../OuterSubscriber';
import subscribeToResult from '../util/subscribeToResult';

export class ExpandOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => Observable<any>,
Expand All @@ -20,11 +22,11 @@ export class ExpandOperator<T, R> implements Operator<T, R> {
}
}

export class ExpandSubscriber<T, R> extends Subscriber<T> {
export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
private index: number = 0;
private active: number = 0;
private hasCompleted: boolean = false;
private buffer: T[];
private buffer: any[];

constructor(destination: Observer<T>, private project: (value: T, index: number) => Observable<R>,
private concurrent: number = Number.POSITIVE_INFINITY) {
Expand All @@ -34,7 +36,7 @@ export class ExpandSubscriber<T, R> extends Subscriber<T> {
}
}

_next(value: T) {
_next(value: any) {
const index = this.index++;
this.destination.next(value);
if(this.active < this.concurrent) {
Expand All @@ -46,7 +48,7 @@ export class ExpandSubscriber<T, R> extends Subscriber<T> {
this._next(result.value);
} else {
this.active++;
this.add(result.subscribe(new ExpandInnerSubscriber(this.destination, this)));
this.add(subscribeToResult<T, R>(this, result, value, index));
}
}
} else {
Expand All @@ -72,18 +74,8 @@ export class ExpandSubscriber<T, R> extends Subscriber<T> {
this.destination.complete();
}
}
}

export class ExpandInnerSubscriber<T, R> extends Subscriber<T> {
constructor(destination: Observer<T>, private parent: ExpandSubscriber<T, R>) {
super(destination);
}

_next(value) {
this.parent._next(value);
}

_complete() {
this.parent.notifyComplete(this);
notifyNext(innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) {
this._next(innerValue);
}
}
2 changes: 1 addition & 1 deletion src/operators/mergeMap-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class MergeMapSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}

_innerSub(ish: any, value: T, index: number) {
this.add(subscribeToResult<T,R,R2>(this, ish, value, index));
this.add(subscribeToResult<T, R>(this, ish, value, index));
}

_complete() {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/mergeMapTo-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class MergeMapToSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}

_innerSub(ish: any, destination: Observer<R>, resultSelector: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2, value: T, index: number) {
this.add(subscribeToResult<T,R,R2>(this, ish, value, index));
this.add(subscribeToResult<T, R>(this, ish, value, index));
}

_complete() {
Expand Down
2 changes: 1 addition & 1 deletion src/util/subscribeToResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import OuterSubscriber from '../OuterSubscriber';

const isArray = Array.isArray;

export default function subscribeToResult<T, R, R2>(outerSubscriber: OuterSubscriber<T, R>,
export default function subscribeToResult<T, R>(outerSubscriber: OuterSubscriber<T, R>,
result: any, outerValue?: T, outerIndex?: number): Subscription<T> {
let destination: Subscriber<R> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex);

Expand Down

0 comments on commit c5239e9

Please sign in to comment.