Skip to content

Commit

Permalink
fix(bufferToggle): accepts closing selector returns promise
Browse files Browse the repository at this point in the history
relates to ReactiveX#1246
  • Loading branch information
kwonoj committed Mar 27, 2016
1 parent 01075b6 commit c643efc
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 48 deletions.
39 changes: 39 additions & 0 deletions spec/operators/bufferToggle-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as Rx from '../../dist/cjs/Rx';
import {DoneSignature} from '../helpers/test-helper';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

const Observable = Rx.Observable;
Expand Down Expand Up @@ -341,4 +342,42 @@ describe('Observable.prototype.bufferToggle', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should accept closing selector that returns a resolved promise', (done: DoneSignature) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Observable.timer(100).mapTo(4)
);
const expected = [[1]];

e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any) => { resolve(42); }))
.subscribe((x) => {
expect(x).toEqual(expected.shift()); },
done.fail,
() => {
expect(expected.length).toBe(0);
done();
});
});

it('should accept closing selector that returns a rejected promise', (done: DoneSignature) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Observable.timer(100).mapTo(4)
);

const expected = 42;

e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any, reject: any) => { reject(expected); }))
.subscribe((x) => {
done.fail();
}, (x) => {
expect(x).toBe(expected);
done();
}, () => {
done.fail();
});
});
});
96 changes: 48 additions & 48 deletions src/operator/bufferToggle.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

import {subscribeToResult} from '../util/subscribeToResult';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';

/**
* Buffers the source Observable values starting from an emission from
Expand Down Expand Up @@ -44,18 +46,18 @@ import {errorObject} from '../util/errorObject';
* @owner Observable
*/
export function bufferToggle<T, O>(openings: Observable<O>,
closingSelector: (value: O) => Observable<any>): Observable<T[]> {
closingSelector: (value: O) => SubscribableOrPromise<any> | void): Observable<T[]> {
return this.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
}

export interface BufferToggleSignature<T> {
<O>(openings: Observable<O>, closingSelector: (value: O) => Observable<any>): Observable<T[]>;
<O>(openings: Observable<O>, closingSelector: (value: O) => SubscribableOrPromise<any> | void): Observable<T[]>;
}

class BufferToggleOperator<T, O> implements Operator<T, T[]> {

constructor(private openings: Observable<O>,
private closingSelector: (value: O) => Observable<any>) {
private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
}

call(subscriber: Subscriber<T[]>): Subscriber<T> {
Expand All @@ -68,25 +70,25 @@ interface BufferContext<T> {
subscription: Subscription;
}

class BufferToggleSubscriber<T, O> extends Subscriber<T> {
class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
private contexts: Array<BufferContext<T>> = [];

constructor(destination: Subscriber<T[]>,
private openings: Observable<O>,
private closingSelector: (value: O) => Observable<any>) {
private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
super(destination);
this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this)));
}

protected _next(value: T) {
protected _next(value: T): void {
const contexts = this.contexts;
const len = contexts.length;
for (let i = 0; i < len; i++) {
contexts[i].buffer.push(value);
}
}

protected _error(err: any) {
protected _error(err: any): void {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
Expand All @@ -98,7 +100,7 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
super._error(err);
}

protected _complete() {
protected _complete(): void {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
Expand All @@ -111,27 +113,29 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
super._complete();
}

openBuffer(value: O) {
const closingSelector = this.closingSelector;
const contexts = this.contexts;

let closingNotifier = tryCatch(closingSelector)(value);
if (closingNotifier === errorObject) {
this._error(errorObject.e);
} else {
let context = {
buffer: <T[]>[],
subscription: new Subscription()
};
contexts.push(context);
const subscriber = new BufferToggleClosingsSubscriber<T>(this, context);
const subscription = closingNotifier.subscribe(subscriber);
context.subscription.add(subscription);
this.add(subscription);
openBuffer(value: O): void {
try {
const closingSelector = this.closingSelector;
const closingNotifier = closingSelector.call(this, value);
if (closingNotifier) {
this.trySubscribe(closingNotifier);
}
} catch (err) {
this._error(err);
}
}

closeBuffer(context: BufferContext<T>) {
notifyNext(outerValue: any, innerValue: O,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, O>): void {
this.closeBuffer(outerValue);
}

notifyComplete(innerSub: InnerSubscriber<T, O>): void {
this.closeBuffer((<any> innerSub).context);
}

private closeBuffer(context: BufferContext<T>): void {
const contexts = this.contexts;
if (contexts === null) {
return;
Expand All @@ -142,41 +146,37 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
this.remove(subscription);
subscription.unsubscribe();
}
}

class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> {
constructor(private parent: BufferToggleSubscriber<T, O>) {
super(null);
}
private trySubscribe(closingNotifier: any): void {
const contexts = this.contexts;

protected _next(value: O) {
this.parent.openBuffer(value);
}
const buffer: Array<T> = [];
const subscription = new Subscription();
const context = { buffer, subscription };
contexts.push(context);

protected _error(err: any) {
this.parent.error(err);
}
const innerSubscription = subscribeToResult(this, closingNotifier, <any>context);
(<any> innerSubscription).context = context;

protected _complete() {
// noop
this.add(innerSubscription);
subscription.add(innerSubscription);
}
}

class BufferToggleClosingsSubscriber<T> extends Subscriber<any> {
constructor(private parent: BufferToggleSubscriber<T, any>,
private context: { subscription: any, buffer: T[] }) {
class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> {
constructor(private parent: BufferToggleSubscriber<T, O>) {
super(null);
}

protected _next() {
this.parent.closeBuffer(this.context);
protected _next(value: O) {
this.parent.openBuffer(value);
}

protected _error(err: any) {
this.parent.error(err);
}

protected _complete() {
this.parent.closeBuffer(this.context);
// noop
}
}

0 comments on commit c643efc

Please sign in to comment.