Skip to content

Commit

Permalink
feat(VirtualTimeScheduler): add VirtualTimeScheduler
Browse files Browse the repository at this point in the history
- Refactors Action to be an interface
- Adds VirtualTimeScheduler
- Refactors Scheduler interface to include actions list

related to #151
closes #269
  • Loading branch information
benlesh committed Sep 6, 2015
1 parent 2c1a9dc commit 96f9386
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 59 deletions.
46 changes: 46 additions & 0 deletions spec/schedulers/VirtualTimeScheduler-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var VirtualTimeScheduler = Rx.VirtualTimeScheduler;

describe('VirtualTimeScheduler', function() {
it('should exist', function () {
expect(typeof VirtualTimeScheduler).toBe('function');
});

it('should schedule things in order when flushed if each this is scheduled synchrously', function () {
var v = new VirtualTimeScheduler();
var invoked = [];
var invoke = function (state) {
invoked.push(state);
};
v.schedule(invoke, 0, 1);
v.schedule(invoke, 0, 2);
v.schedule(invoke, 0, 3);
v.schedule(invoke, 0, 4);
v.schedule(invoke, 0, 5);

v.flush();

expect(invoked).toEqual([1, 2, 3, 4, 5]);
});



it('should schedule things in order when flushed if each this is scheduled at random', function () {
var v = new VirtualTimeScheduler();
var invoked = [];
var invoke = function (state) {
invoked.push(state);
};
v.schedule(invoke, 0, 1);
v.schedule(invoke, 100, 2);
v.schedule(invoke, 0, 3);
v.schedule(invoke, 500, 4);
v.schedule(invoke, 0, 5);
v.schedule(invoke, 100, 6);

v.flush();

expect(invoked).toEqual([1, 3, 5, 2, 6, 4]);
});
});
4 changes: 3 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Subject from './Subject';
import ImmediateScheduler from './schedulers/ImmediateScheduler';
import NextTickScheduler from './schedulers/NextTickScheduler';
import VirtualTimeScheduler from './schedulers/VirtualTimeScheduler';
import immediate from './schedulers/immediate';
import nextTick from './schedulers/nextTick';
import Observable from './Observable';
Expand Down Expand Up @@ -227,5 +228,6 @@ export {
ReplaySubject,
BehaviorSubject,
ConnectableObservable,
Notification 
Notification,
VirtualTimeScheduler
};
9 changes: 9 additions & 0 deletions src/Scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import Subscription from './Subscription';
import Action from './schedulers/Action';

interface Scheduler {
now(): number;

schedule<T>(work: (state?: any) => Subscription<T>|void, delay?: number, state?: any): Subscription<T>;

flush(): void;

actions: Action[];

scheduled: boolean;

active: boolean;
}

export default Scheduler;
2 changes: 1 addition & 1 deletion src/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ function dispatchBufferTimeSpanOnly(state) {
function dispatchBufferCreation(state) {
let { bufferTimeSpan, subscriber, scheduler } = state;
let buffer = subscriber.openBuffer();
var action = <Action<any>>this;
var action = <Action>this;
action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer }));
action.schedule(state);
}
Expand Down
2 changes: 1 addition & 1 deletion src/operators/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ function dispatchWindowTimeSpanOnly(state) {
function dispatchWindowCreation(state) {
let { windowTimeSpan, subscriber, scheduler } = state;
let window = subscriber.openWindow();
let action = <Action<any>>this;
let action = <Action>this;
let context = { action, subscription: null };
action.add(context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, { subscriber, window, context }));
action.schedule(state);
Expand Down
51 changes: 7 additions & 44 deletions src/schedulers/Action.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,11 @@
import Subscription from '../Subscription';
import ImmediateScheduler from './ImmediateScheduler';

export default class Action<T> extends Subscription<T> {

interface Action extends Subscription<any> {
work: (state?: any) => void|Subscription<any>
state: any;
delay?: number;
schedule(state: any);
execute(): void;
}

constructor(public scheduler: ImmediateScheduler,
public work: (x?: any) => Subscription<T> | void) {
super();
}

schedule(state?: any): Action<T> {
if (this.isUnsubscribed) {
return this;
}

this.state = state;
const scheduler = this.scheduler;
scheduler.actions.push(this);
scheduler.flush();
return this;
}

execute() {
if (this.isUnsubscribed) {
throw new Error("How did did we execute a canceled Action?");
}
this.work(this.state);
}

unsubscribe() {

const scheduler = this.scheduler;
const actions = scheduler.actions;
const index = actions.indexOf(this);

this.work = void 0;
this.state = void 0;
this.scheduler = void 0;

if (index !== -1) {
actions.splice(index, 1);
}

super.unsubscribe();
}
}
export default Action;
7 changes: 4 additions & 3 deletions src/schedulers/FutureAction.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import Subscription from '../Subscription';
import ImmediateScheduler from './ImmediateScheduler';
import Action from './Action';
import ImmediateAction from './ImmediateAction';

export default class FutureAction<T> extends Action<T> {
export default class FutureAction<T> extends ImmediateAction<T> {

id: number;
id: any;

constructor(public scheduler: ImmediateScheduler,
public work: (x?: any) => Subscription<T> | void,
public delay: number) {
super(scheduler, work);
}

schedule(state?:any): Action<T> {
schedule(state?:any): Action {
if (this.isUnsubscribed) {
return this;
}
Expand Down
49 changes: 49 additions & 0 deletions src/schedulers/ImmediateAction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import Subscription from '../Subscription';
import Scheduler from '../Scheduler';
import Action from './Action';

export default class ImmediateAction<T> extends Subscription<T> implements Action {

state: any;

constructor(public scheduler: Scheduler,
public work: (x?: any) => Subscription<T> | void) {
super();
}

schedule(state?: any): Action {
if (this.isUnsubscribed) {
return this;
}

this.state = state;
const scheduler = this.scheduler;
scheduler.actions.push(this);
scheduler.flush();
return this;
}

execute() {
if (this.isUnsubscribed) {
throw new Error("How did did we execute a canceled Action?");
}
this.work(this.state);
}

unsubscribe() {

const scheduler = this.scheduler;
const actions = scheduler.actions;
const index = actions.indexOf(this);

this.work = void 0;
this.state = void 0;
this.scheduler = void 0;

if (index !== -1) {
actions.splice(index, 1);
}

super.unsubscribe();
}
}
11 changes: 6 additions & 5 deletions src/schedulers/ImmediateScheduler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Immediate } from '../util/Immediate';
import Scheduler from '../Scheduler';
import Action from './Action';
import ImmediateAction from './ImmediateAction';
import Subscription from '../Subscription';
import FutureAction from './FutureAction';
import Action from './Action';

export default class ImmediateScheduler implements Scheduler {
actions: Action<any>[] = [];
actions: ImmediateAction<any>[] = [];
active: boolean = false;
scheduled: boolean = false;

Expand All @@ -31,11 +32,11 @@ export default class ImmediateScheduler implements Scheduler {
this.scheduleLater(work, delay, state);
}

scheduleNow<T>(work: (x?: any) => Subscription<T> | void, state?: any): Action<T> {
return new Action(this, work).schedule(state);
scheduleNow<T>(work: (x?: any) => Subscription<T> | void, state?: any): Action {
return new ImmediateAction(this, work).schedule(state);
}

scheduleLater<T>(work: (x?: any) => Subscription<T> | void, delay: number, state?: any): Action<T> {
scheduleLater<T>(work: (x?: any) => Subscription<T> | void, delay: number, state?: any): Action {
return new FutureAction(this, work, delay).schedule(state);
}
}
5 changes: 3 additions & 2 deletions src/schedulers/NextTickAction.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import {Immediate} from '../util/Immediate';
import Subscription from '../Subscription';
import ImmediateAction from './ImmediateAction';
import Action from './Action';

export default class NextTickAction<T> extends Action<T> {
export default class NextTickAction<T> extends ImmediateAction<T> {

id: number;

schedule(state?: any): Action<T> {
schedule(state?: any): Action {
if (this.isUnsubscribed) {
return this;
}
Expand Down
5 changes: 3 additions & 2 deletions src/schedulers/NextTickScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ import ImmediateScheduler from './ImmediateScheduler';
import Subscription from '../Subscription';
import Action from './Action';
import NextTickAction from './NextTickAction';
import ImmediateAction from './ImmediateAction';

export default class NextTickScheduler extends ImmediateScheduler {
scheduleNow<T>(work: (x?: any) => Subscription<T>, state?: any): Action<T> {
scheduleNow<T>(work: (x?: any) => Subscription<T>, state?: any): Action {
return (this.scheduled ?
new Action(this, work) :
new ImmediateAction(this, work) :
new NextTickAction(this, work)).schedule(state);
}
}
82 changes: 82 additions & 0 deletions src/schedulers/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import Scheduler from '../Scheduler';
import Subscription from '../Subscription';
import Action from './Action';

export default class VirtualTimeScheduler implements Scheduler {
actions: Action[] = [];
active: boolean = false;
scheduled: boolean = false;
index: number = 0;
sorted: boolean = false;

now() {
return 0;
}

sortActions() {
if (!this.sorted) {
(<VirtualAction<any>[]>this.actions).sort((a, b) => {
return a.delay === b.delay ? (a.index > b.index ? 1 : -1) : (a.delay > b.delay ? 1 : -1);
});
this.sorted = true;
}
}

flush() {
this.sortActions();
this.actions.forEach(action => {
action.execute();
});
this.actions.length = 0;
}

schedule<T>(work: (x?: any) => Subscription<T> | void, delay: number = 0, state?: any): Subscription<T> {
this.sorted = false;
return new VirtualAction(this, work, delay, this.index++).schedule(state);
}
}

class VirtualAction<T> extends Subscription<T> implements Action {
state: any;

constructor(public scheduler: VirtualTimeScheduler,
public work: (x?: any) => Subscription<T> | void,
public delay: number,
public index: number) {
super();
}

schedule(state?: any): VirtualAction<T> {
if (this.isUnsubscribed) {
return this;
}

this.state = state;
const scheduler = this.scheduler;
scheduler.actions.push(this);
return this;
}

execute() {
if (this.isUnsubscribed) {
throw new Error("How did did we execute a canceled Action?");
}
this.work(this.state);
}

unsubscribe() {
const scheduler = this.scheduler;
const actions = scheduler.actions;
const index = actions.indexOf(this);

this.work = void 0;
this.state = void 0;
this.scheduler = void 0;

if (index !== -1) {
actions.splice(index, 1);
}

super.unsubscribe();
}
}

0 comments on commit 96f9386

Please sign in to comment.