Skip to content

Commit

Permalink
fix(windowCount): fix windowCount window opening times
Browse files Browse the repository at this point in the history
Fix bugs with windowCount. This commit reverts PR ReactiveX#273, in order to have
windowCount pass comprehensive marble tests. windowCount must open the
first window immediately, not when the first next() event arrives, to
comply with legacy windowWithCount and with RxJS Next window and
windowTime.
  • Loading branch information
Andre Medeiros committed Oct 13, 2015
1 parent 6504ef5 commit b11f0b4
Showing 1 changed file with 21 additions and 32 deletions.
53 changes: 21 additions & 32 deletions src/operators/windowCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,65 @@ import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function windowCount<T>(windowSize: number, startWindowEvery: number = 0): Observable<Observable<T>> {
export default function windowCount<T>(windowSize: number,
startWindowEvery: number = 0): Observable<Observable<T>> {
return this.lift(new WindowCountOperator(windowSize, startWindowEvery));
}

class WindowCountOperator<T, R> implements Operator<T, R> {

constructor(private windowSize: number, private startWindowEvery: number) {
constructor(private windowSize: number,
private startWindowEvery: number) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new WindowCountSubscriber(subscriber, this.windowSize, this.startWindowEvery);
}
}

interface WindowObject<T> {
count: number;
notified: boolean;
window: Subject<T>;
}

class WindowCountSubscriber<T> extends Subscriber<T> {
private windows: WindowObject<T>[] = [
{ count: 0, notified : false, window : new Subject<T>() }
];
private windows: Subject<T>[] = [ new Subject<T>() ];
private count: number = 0;

constructor(destination: Subscriber<T>, private windowSize: number, private startWindowEvery: number) {
constructor(destination: Subscriber<T>,
private windowSize: number,
private startWindowEvery: number) {
super(destination);
destination.next(this.windows[0]);
}

_next(value: T) {
const count = (this.count += 1);
const startWindowEvery = (this.startWindowEvery > 0) ? this.startWindowEvery : this.windowSize;
const windowSize = this.windowSize;
const windows = this.windows;
const len = windows.length;

if (count % startWindowEvery === 0) {
let window = new Subject<T>();
windows.push({ count: 0, notified : false, window : window });
}

for (let i = 0; i < len; i++) {
let w = windows[i];
const window = w.window;

if (!w.notified) {
w.notified = true;
this.destination.next(window);
}

window.next(value);
if (windowSize === (w.count += 1)) {
window.complete();
}
windows[i].next(value);
}
const c = this.count - windowSize + 1;
if (c >= 0 && c % startWindowEvery === 0) {
windows.shift().complete();
}
if (++this.count % startWindowEvery === 0) {
let window = new Subject<T>();
windows.push(window);
this.destination.next(window);
}
}

_error(err: any) {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().window.error(err);
windows.shift().error(err);
}
this.destination.error(err);
}

_complete() {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().window.complete();
windows.shift().complete();
}
this.destination.complete();
}
Expand Down

0 comments on commit b11f0b4

Please sign in to comment.