From 3583cd3813b66244d27f743bd499e6555e1ca3ef Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Wed, 13 Jan 2016 13:49:58 -0800 Subject: [PATCH] feat(takeLast): adds takeLast operator. --- .../operators/takelast.js | 18 +++ .../immediate-scheduler/operators/takelast.js | 18 +++ spec/operators/takeLast-spec.js | 125 ++++++++++++++++++ src/CoreOperators.ts | 1 + src/Observable.ts | 1 + src/Rx.DOM.ts | 1 + src/Rx.KitchenSink.ts | 1 + src/Rx.ts | 1 + src/add/operator/takeLast.ts | 10 ++ src/operator/takeLast.ts | 76 +++++++++++ 10 files changed, 252 insertions(+) create mode 100644 perf/micro/current-thread-scheduler/operators/takelast.js create mode 100644 perf/micro/immediate-scheduler/operators/takelast.js create mode 100644 spec/operators/takeLast-spec.js create mode 100644 src/add/operator/takeLast.ts create mode 100644 src/operator/takeLast.ts diff --git a/perf/micro/current-thread-scheduler/operators/takelast.js b/perf/micro/current-thread-scheduler/operators/takelast.js new file mode 100644 index 0000000000..c7417423f2 --- /dev/null +++ b/perf/micro/current-thread-scheduler/operators/takelast.js @@ -0,0 +1,18 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldTakeLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.currentThread).takeLast(50); + var newTakeLastWithImmediateScheduler = RxNew.Observable.range(0, 500, RxNew.Scheduler.queue).takeLast(50); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old take with immediate scheduler', function () { + oldTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new take with immediate scheduler', function () { + newTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/perf/micro/immediate-scheduler/operators/takelast.js b/perf/micro/immediate-scheduler/operators/takelast.js new file mode 100644 index 0000000000..956a69ce9e --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/takelast.js @@ -0,0 +1,18 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldTakeLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.immediate).takeLast(50); + var newTakeLastWithImmediateScheduler = RxNew.Observable.range(0, 500).takeLast(50); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old take with immediate scheduler', function () { + oldTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new take with immediate scheduler', function () { + newTakeLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/spec/operators/takeLast-spec.js b/spec/operators/takeLast-spec.js new file mode 100644 index 0000000000..c79b789216 --- /dev/null +++ b/spec/operators/takeLast-spec.js @@ -0,0 +1,125 @@ +/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.takeLast()', function () { + it.asDiagram('takeLast(2)')('should take two values of an observable with many values', function () { + var e1 = cold('--a-----b----c---d--| '); + var e1subs = '^ ! '; + var expected = '--------------------(cd|)'; + + expectObservable(e1.takeLast(2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should work with empty', function () { + var e1 = cold('|'); + var e1subs = '(^!)'; + var expected = '|'; + + expectObservable(e1.takeLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should go on forever on never', function () { + var e1 = cold('-'); + var e1subs = '^'; + var expected = '-'; + + expectObservable(e1.takeLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should be empty on takeLast(0)', function () { + var e1 = hot('--a--^--b----c---d--|'); + var e1subs = []; // Don't subscribe at all + var expected = '|'; + + expectObservable(e1.takeLast(0)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should take one value from an observable with one value', function () { + var e1 = hot('---(a|)'); + var e1subs = '^ ! '; + var expected = '---(a|)'; + + expectObservable(e1.takeLast(1)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should take one value from an observable with many values', function () { + var e1 = hot('--a--^--b----c---d--| '); + var e1subs = '^ ! '; + var expected = '---------------(d|)'; + + expectObservable(e1.takeLast(1)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should error on empty', function () { + var e1 = hot('--a--^----|'); + var e1subs = '^ !'; + var expected = '-----|'; + + expectObservable(e1.takeLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should propagate error from the source observable', function () { + var e1 = hot('---^---#', null, 'too bad'); + var e1subs = '^ !'; + var expected = '----#'; + + expectObservable(e1.takeLast(42)).toBe(expected, null, 'too bad'); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should propagate error from an observable with values', function () { + var e1 = hot('---^--a--b--#'); + var e1subs = '^ !'; + var expected = '---------#'; + + expectObservable(e1.takeLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow unsubscribing explicitly and early', function () { + var e1 = hot('---^--a--b-----c--d--e--|'); + var unsub = ' ! '; + var e1subs = '^ ! '; + var expected = '---------- '; + + expectObservable(e1.takeLast(42), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should work with throw', function () { + var e1 = cold('#'); + var e1subs = '(^!)'; + var expected = '#'; + + expectObservable(e1.takeLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should throw if total is less than zero', function () { + expect(function () { Observable.range(0,10).takeLast(-1); }) + .toThrow(new Rx.ArgumentOutOfRangeError()); + }); + + it('should not break unsubscription chain when unsubscribed explicitly', function () { + var e1 = hot('---^--a--b-----c--d--e--|'); + var unsub = ' ! '; + var e1subs = '^ ! '; + var expected = '---------- '; + + var result = e1 + .mergeMap(function (x) { return Observable.of(x); }) + .takeLast(42) + .mergeMap(function (x) { return Observable.of(x); }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); +}); diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 58f13509f9..5044b5b84c 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -76,6 +76,7 @@ export interface CoreOperators { switchMap?: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; switchMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; take?: (count: number) => Observable; + takeLast?: (count: number) => Observable; takeUntil?: (notifier: Observable) => Observable; takeWhile?: (predicate: (value: T, index: number) => boolean) => Observable; throttle?: (durationSelector: (value: T) => Observable | Promise) => Observable; diff --git a/src/Observable.ts b/src/Observable.ts index baf5ce4a67..9cb5336df1 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -261,6 +261,7 @@ export class Observable implements CoreOperators { switchMap: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; switchMapTo: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; take: (count: number) => Observable; + takeLast: (count: number) => Observable; takeUntil: (notifier: Observable) => Observable; takeWhile: (predicate: (value: T, index: number) => boolean) => Observable; throttle: (durationSelector: (value: T) => Observable | Promise) => Observable; diff --git a/src/Rx.DOM.ts b/src/Rx.DOM.ts index ed325624ec..8bd91e5fff 100644 --- a/src/Rx.DOM.ts +++ b/src/Rx.DOM.ts @@ -91,6 +91,7 @@ import './add/operator/switch'; import './add/operator/switchMap'; import './add/operator/switchMapTo'; import './add/operator/take'; +import './add/operator/takeLast'; import './add/operator/takeUntil'; import './add/operator/takeWhile'; import './add/operator/throttle'; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 14e3b6eb51..7601b369df 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -118,6 +118,7 @@ import './add/operator/switch'; import './add/operator/switchMap'; import './add/operator/switchMapTo'; import './add/operator/take'; +import './add/operator/takeLast'; import './add/operator/takeUntil'; import './add/operator/takeWhile'; import './add/operator/throttle'; diff --git a/src/Rx.ts b/src/Rx.ts index b424848583..ec86e38563 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -93,6 +93,7 @@ import './add/operator/switch'; import './add/operator/switchMap'; import './add/operator/switchMapTo'; import './add/operator/take'; +import './add/operator/takeLast'; import './add/operator/takeUntil'; import './add/operator/takeWhile'; import './add/operator/throttle'; diff --git a/src/add/operator/takeLast.ts b/src/add/operator/takeLast.ts new file mode 100644 index 0000000000..f3efba6116 --- /dev/null +++ b/src/add/operator/takeLast.ts @@ -0,0 +1,10 @@ +/** + * Everything in this file is generated by the 'tools/generate-operator-patches.ts' script. + * Any manual edits to this file will be lost next time the script is run. + **/ +import {Observable} from '../../Observable'; +import {takeLast} from '../../operator/takeLast'; + +Observable.prototype.takeLast = takeLast; + +export var _void: void; \ No newline at end of file diff --git a/src/operator/takeLast.ts b/src/operator/takeLast.ts new file mode 100644 index 0000000000..fecb239cc4 --- /dev/null +++ b/src/operator/takeLast.ts @@ -0,0 +1,76 @@ +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; +import {ArgumentOutOfRangeError} from '../util/ArgumentOutOfRangeError'; +import {EmptyObservable} from '../observable/empty'; +import {Observable} from '../Observable'; + +export function takeLast(total: number): Observable { + if (total === 0) { + return new EmptyObservable(); + } else { + return this.lift(new TakeLastOperator(total)); + } +} + +class TakeLastOperator implements Operator { + constructor(private total: number) { + if (this.total < 0) { + throw new ArgumentOutOfRangeError; + } + } + + call(subscriber: Subscriber): Subscriber { + return new TakeLastSubscriber(subscriber, this.total); + } +} + +class TakeLastSubscriber extends Subscriber { + private ring: T[]; + private count: number = 0; + private index: number = 0; + + constructor(destination: Subscriber, private total: number) { + super(destination); + this.ring = new Array(total); + } + + protected _next(value: T): void { + + let index = this.index; + const ring = this.ring; + const total = this.total; + const count = this.count; + + if (total > 1) { + if (count < total) { + this.count = count + 1; + this.index = index + 1; + } else if (index === 0) { + this.index = ++index; + } else if (index < total) { + this.index = index + 1; + } else { + this.index = index = 0; + } + } else if (count < total) { + this.count = total; + } + + ring[index] = value; + } + + protected _complete(): void { + + let iter = -1; + const { ring, count, total, destination } = this; + let index = (total === 1 || count < total) ? 0 : this.index - 1; + + while (++iter < count) { + if (iter + index === total) { + index = total - iter; + } + destination.next(ring[iter + index]); + } + destination.complete(); + } +}