Skip to content

Commit

Permalink
feat(operator): add skipWhile operator
Browse files Browse the repository at this point in the history
  • Loading branch information
jinroh authored and kwonoj committed Nov 12, 2015
1 parent 8bca656 commit a2244e0
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 1 deletion.
181 changes: 181 additions & 0 deletions spec/operators/skipWhile-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.skipWhile()', function () {
it('should skip all elements with a true predicate', function () {
var source = hot('-1-^2--3--4--5--6--|');
var sourceSubs = '^ !';
var expected = '----------------|';

expectObservable(source.skipWhile(function () { return true; })).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should skip all elements with a truthy predicate', function () {
var source = hot('-1-^2--3--4--5--6--|');
var sourceSubs = '^ !';
var expected = '----------------|';

expectObservable(source.skipWhile(function () { return {}; })).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should not skip any element with a false predicate', function () {
var source = hot('-1-^2--3--4--5--6--|');
var sourceSubs = '^ !';
var expected = '-2--3--4--5--6--|';

expectObservable(source.skipWhile(function () { return false; })).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should not skip any elements with a falsy predicate', function () {
var source = hot('-1-^2--3--4--5--6--|');
var sourceSubs = '^ !';
var expected = '-2--3--4--5--6--|';

expectObservable(source.skipWhile(function () { return undefined; })).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should skip all elements until predicate is false', function () {
var source = hot('-1-^2--3--4--5--6--|');
var sourceSubs = '^ !';
var expected = '-------4--5--6--|';

var predicate = function (v) {
return +v < 4;
};

expectObservable(source.skipWhile(predicate)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should skip elements on hot source', function () {
var source = hot('--1--2-^-3--4--5--6--7--8--');
var sourceSubs = '^ ';
var expected = '--------5--6--7--8--';

var predicate = function (v) {
return +v < 5;
};

expectObservable(source.skipWhile(predicate)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should be possible to skip using the element\'s index', function () {
var source = hot('--a--b-^-c--d--e--f--g--h--|');
var sourceSubs = '^ !';
var expected = '--------e--f--g--h--|';

var predicate = function (v, index) {
return index < 2;
};

expectObservable(source.skipWhile(predicate)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should skip using index with source unsubscribes early', function () {
var source = hot('--a--b-^-c--d--e--f--g--h--|');
var sourceSubs = '^ !';
var unsub = '-----------!';
var expected = '-----d--e---';

var predicate = function (v, index) {
return index < 1;
};

expectObservable(source.skipWhile(predicate), unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should skip using value with source throws', function () {
var source = hot('--a--b-^-c--d--e--f--g--h--#');
var sourceSubs = '^ !';
var expected = '-----d--e--f--g--h--#';

var predicate = function (v) {
return v !== 'd';
};

expectObservable(source.skipWhile(predicate)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should invoke predicate while its false and never again', function () {
var source = hot('--a--b-^-c--d--e--f--g--h--|');
var sourceSubs = '^ !';
var expected = '--------e--f--g--h--|';

var invoked = 0;
var predicate = function (v) {
invoked++;
return v !== 'e';
};

expectObservable(
source.skipWhile(predicate).do(null, null, function () {
expect(invoked).toBe(3);
})
).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should handle predicate that throws', function () {
var source = hot('--a--b-^-c--d--e--f--g--h--|');
var sourceSubs = '^ !';
var expected = '--------#';

var predicate = function (v) {
if (v === 'e') {
throw new Error('nom d\'une pipe !');
}

return v !== 'f';
};

expectObservable(source.skipWhile(predicate)).toBe(expected, undefined, new Error('nom d\'une pipe !'));
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should accept a thisArg', function () {
var source = hot('-1-^--2--3--4--5--6--|');
var sourceSubs = '^ !';
var expected = '---------4--5--6--|';

function Skiper() {
this.doSkip = function (v) { return +v < 4; };
}

var skiper = new Skiper();

expectObservable(
source.skipWhile(function (v) { return this.doSkip(v); }, skiper)
).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should handle Observable.empty', function () {
var source = Observable.empty();
var expected = '|';

expectObservable(source.skipWhile(function () { return true; })).toBe(expected);
});

it('should handle Observable.never', function () {
var source = Observable.never();
var expected = '-';

expectObservable(source.skipWhile(function () { return true; })).toBe(expected);
});

it('should handle Observable.throw', function () {
var source = Observable.throw(new Error('oh no!'));
var expected = '#';

expectObservable(source.skipWhile(function () { return true; })).toBe(expected, undefined, new Error('oh no!'));
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export interface CoreOperators<T> {
single?: (predicate?: (value: T, index: number) => boolean, thisArg?: any) => Observable<T>;
skip?: (count: number) => Observable<T>;
skipUntil?: (notifier: Observable<any>) => Observable<T>;
skipWhile?: (predicate: (x: T, index: number) => boolean, thisArg?: any) => Observable<T>;
startWith?: (x: T) => Observable<T>;
subscribeOn?: (scheduler: Scheduler, delay?: number) => Observable<T>;
switch?: () => Observable<T>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ observableProto.skip = skip;
import {skipUntil} from './operators/skipUntil';
observableProto.skipUntil = skipUntil;

import {skipWhile} from './operators/skipWhile';
observableProto.skipWhile = skipWhile;

import {startWith} from './operators/startWith';
observableProto.startWith = startWith;

Expand Down
5 changes: 4 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ observableProto.skip = skip;
import {skipUntil} from './operators/skipUntil';
observableProto.skipUntil = skipUntil;

import {skipWhile} from './operators/skipWhile';
observableProto.skipWhile = skipWhile;

import {startWith} from './operators/startWith';
observableProto.startWith = startWith;

Expand Down Expand Up @@ -316,4 +319,4 @@ export {
Notification,
EmptyError,
ArgumentOutOfRangeError
};
};
48 changes: 48 additions & 0 deletions src/operators/skipWhile.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {bindCallback} from '../util/bindCallback';

export function skipWhile<T>(predicate: (x: T, index: number) => boolean, thisArg?: any): Observable<T> {
return this.lift(new SkipWhileOperator(predicate, thisArg));
}

class SkipWhileOperator<T, R> implements Operator<T, R> {
private predicate: (x: T, index: number) => boolean;

constructor(predicate: (x: T, index: number) => boolean, thisArg?: any) {
this.predicate = <(x: T, index: number) => boolean>bindCallback(predicate, thisArg, 2);
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new SkipWhileSubscriber(subscriber, this.predicate);
}
}

class SkipWhileSubscriber<T> extends Subscriber<T> {
private skipping: boolean = true;
private index: number = 0;

constructor(destination: Subscriber<T>,
private predicate: (x: T, index: number) => boolean) {
super(destination);
}

_next(value: T): void {
const destination = this.destination;
if (this.skipping === true) {
const index = this.index++;
const result = tryCatch(this.predicate)(value, index);
if (result === errorObject) {
destination.error(result.e);
} else {
this.skipping = Boolean(result);
}
}
if (this.skipping === false) {
destination.next(value);
}
}
}

0 comments on commit a2244e0

Please sign in to comment.