From 908ae5627ff5e097dd9041e42f9b288e47ead3d7 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Tue, 13 Oct 2015 17:25:50 +0300 Subject: [PATCH] fix(windowCount): fix windowCount window opening times Fix bugs with windowCount. This commit reverts PR #273, in order to have windowCount pass comprehensive marble tests. windowCount must open the first window immediately, not when the first next() event arrives, to comply with legacy windowWithCount and with RxJS Next window and windowTime. --- src/operators/windowCount.ts | 53 ++++++++++++++---------------------- 1 file changed, 21 insertions(+), 32 deletions(-) diff --git a/src/operators/windowCount.ts b/src/operators/windowCount.ts index bd8576683e..ef5d6986b3 100644 --- a/src/operators/windowCount.ts +++ b/src/operators/windowCount.ts @@ -10,13 +10,15 @@ import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import bindCallback from '../util/bindCallback'; -export default function windowCount(windowSize: number, startWindowEvery: number = 0): Observable> { +export default function windowCount(windowSize: number, + startWindowEvery: number = 0): Observable> { return this.lift(new WindowCountOperator(windowSize, startWindowEvery)); } class WindowCountOperator implements Operator { - constructor(private windowSize: number, private startWindowEvery: number) { + constructor(private windowSize: number, + private startWindowEvery: number) { } call(subscriber: Subscriber): Subscriber { @@ -24,54 +26,41 @@ class WindowCountOperator implements Operator { } } -interface WindowObject { - count: number; - notified: boolean; - window: Subject; -} - class WindowCountSubscriber extends Subscriber { - private windows: WindowObject[] = [ - { count: 0, notified : false, window : new Subject() } - ]; + private windows: Subject[] = [ new Subject() ]; private count: number = 0; - constructor(destination: Subscriber, private windowSize: number, private startWindowEvery: number) { + constructor(destination: Subscriber, + private windowSize: number, + private startWindowEvery: number) { super(destination); + destination.next(this.windows[0]); } _next(value: T) { - const count = (this.count += 1); const startWindowEvery = (this.startWindowEvery > 0) ? this.startWindowEvery : this.windowSize; const windowSize = this.windowSize; const windows = this.windows; const len = windows.length; - if (count % startWindowEvery === 0) { - let window = new Subject(); - windows.push({ count: 0, notified : false, window : window }); - } - for (let i = 0; i < len; i++) { - let w = windows[i]; - const window = w.window; - - if (!w.notified) { - w.notified = true; - this.destination.next(window); - } - - window.next(value); - if (windowSize === (w.count += 1)) { - window.complete(); - } + windows[i].next(value); + } + const c = this.count - windowSize + 1; + if (c >= 0 && c % startWindowEvery === 0) { + windows.shift().complete(); + } + if (++this.count % startWindowEvery === 0) { + let window = new Subject(); + windows.push(window); + this.destination.next(window); } } _error(err: any) { const windows = this.windows; while (windows.length > 0) { - windows.shift().window.error(err); + windows.shift().error(err); } this.destination.error(err); } @@ -79,7 +68,7 @@ class WindowCountSubscriber extends Subscriber { _complete() { const windows = this.windows; while (windows.length > 0) { - windows.shift().window.complete(); + windows.shift().complete(); } this.destination.complete(); }