Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(windowTime): clean up closed window with timeSpanOnly #2240

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 35 additions & 36 deletions src/operator/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ export function windowTime<T>(this: Observable<T>, windowTimeSpan: number,
}

class WindowTimeOperator<T> implements Operator<T, Observable<T>> {

constructor(private windowTimeSpan: number,
private windowCreationInterval: number,
private scheduler: Scheduler) {
Expand All @@ -83,6 +82,23 @@ interface CreationState<T> {
scheduler: Scheduler;
}

interface TimeSpanOnlyState<T> {
window: Subject<T>;
windowTimeSpan: number;
subscriber: WindowTimeSubscriber<T>;
}

interface CloseWindowContext<T> {
action: Action<CreationState<T>>;
subscription: Subscription;
}

interface CloseState<T> {
subscriber: WindowTimeSubscriber<T>;
window: Subject<T>;
context: CloseWindowContext<T>;
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
Expand All @@ -96,20 +112,20 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
private windowCreationInterval: number,
private scheduler: Scheduler) {
super(destination);

const window = this.openWindow();
if (windowCreationInterval !== null && windowCreationInterval >= 0) {
let window = this.openWindow();
const closeState = { subscriber: this, window, context: <any>null };
const closeState: CloseState<T> = { subscriber: this, window, context: <any>null };
const creationState: CreationState<T> = { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler };
this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, closeState));
this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, creationState));
} else {
let window = this.openWindow();
const timeSpanOnlyState = { subscriber: this, window, windowTimeSpan };
const timeSpanOnlyState: TimeSpanOnlyState<T> = { subscriber: this, window, windowTimeSpan };
this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState));
}
}

protected _next(value: T) {
protected _next(value: T): void {
const windows = this.windows;
const len = windows.length;
for (let i = 0; i < len; i++) {
Expand All @@ -120,15 +136,15 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
}
}

protected _error(err: any) {
protected _error(err: any): void {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().error(err);
}
this.destination.error(err);
}

protected _complete() {
protected _complete(): void {
const windows = this.windows;
while (windows.length > 0) {
const window = windows.shift();
Expand All @@ -147,52 +163,35 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
return window;
}

closeWindow(window: Subject<T>) {
closeWindow(window: Subject<T>): void {
window.complete();
const windows = this.windows;
windows.splice(windows.indexOf(window), 1);
}
}

interface TimeSpanOnlyState<T> {
window: Subject<any>;
windowTimeSpan: number;
subscriber: WindowTimeSubscriber<T>;
}

function dispatchWindowTimeSpanOnly<T>(this: Action<TimeSpanOnlyState<T>>, state: TimeSpanOnlyState<T>) {
function dispatchWindowTimeSpanOnly<T>(this: Action<TimeSpanOnlyState<T>>, state: TimeSpanOnlyState<T>): void {
const { subscriber, windowTimeSpan, window } = state;
if (window) {
window.complete();
subscriber.closeWindow(window);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kwonoj should've probably saved all of the typing refactors and formatting stuff for a different commit. It's hard for me to see what the "meat" of this PR is.. I'm assuming it's mostly this line here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, correct. previously timspan'ed window closed, but not teared down as same as default window behavior. Now both uses same teardown path. Sorry for creating commit include mixed changes, should I split up and update PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we might want to. This is a commit to merge in a fix to production code... so we might want to make sure the changes are isolated this time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do. I'll rather create as separate PR to have better scoped clarity.

}
state.window = subscriber.openWindow();
this.schedule(state, windowTimeSpan);
}

interface Context<T> {
action: Action<CreationState<T>>;
subscription: Subscription;
}

interface DispatchArg<T> {
subscriber: WindowTimeSubscriber<T>;
window: Subject<T>;
context: Context<T>;
}

function dispatchWindowCreation<T>(this: Action<CreationState<T>>, state: CreationState<T>) {
let { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state;
let window = subscriber.openWindow();
let action = this;
let context: Context<T> = { action, subscription: <any>null };
const timeSpanState: DispatchArg<T> = { subscriber, window, context };
function dispatchWindowCreation<T>(this: Action<CreationState<T>>, state: CreationState<T>): void {
const { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state;
const window = subscriber.openWindow();
const action = this;
const context: CloseWindowContext<T> = { action, subscription: <any>null };
const timeSpanState: CloseState<T> = { subscriber, window, context };
context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, timeSpanState);
action.add(context.subscription);
action.schedule(state, windowCreationInterval);
}

function dispatchWindowClose<T>(arg: DispatchArg<T>) {
const { subscriber, window, context } = arg;
function dispatchWindowClose<T>(state: CloseState<T>): void {
const { subscriber, window, context } = state;
if (context && context.action && context.subscription) {
context.action.remove(context.subscription);
}
Expand Down