Skip to content

Commit

Permalink
fix(schedulers): fix asap and animationFrame schedulers to execute ac…
Browse files Browse the repository at this point in the history
…ross async boundaries.

The AsapScheduler and AnimationFrameSchedulers were totally busted. My bad. Now they execute their scheduled actions in batches. If actions reschedule while executing a batch, a new frame is requested for the rescheduled action to execute in.

This PR also simplifies the public `Scheduler` and `Action` APIs. Implementation details like the `actions` queue and `active` boolean are now on the concrete implementations, so it's easier for people to implement the Scheduler API. This PR also renames `FutureAction` -> `AsyncAction` to conform to the same naming convention as the rest of the Action types.

Fixes ReactiveX#1814
  • Loading branch information
trxcllnt committed Jul 11, 2016
1 parent 2aa1433 commit b934493
Show file tree
Hide file tree
Showing 31 changed files with 638 additions and 602 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
"prepublish": "shx rm -rf ./typings && typings install && npm run build_all",
"publish_docs": "./publish_docs.sh",
"test_mocha": "mocha --opts spec/support/default.opts spec-js",
"debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js",
"test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html",
"test": "npm-run-all clean_spec build_spec test_mocha clean_spec",
"tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js",
Expand Down
17 changes: 16 additions & 1 deletion spec/Scheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ describe('Scheduler.queue', () => {
expect(call2).to.be.true;
});

it('should schedule things recursively via this.schedule', () => {
let call1 = false;
let call2 = false;
Scheduler.queue.active = false;
Scheduler.queue.schedule(function (state) {
call1 = state.call1;
call2 = state.call2;
if (!call2) {
this.schedule({ call1: true, call2: true });
}
}, 0, { call1: true, call2: false });
expect(call1).to.be.true;
expect(call2).to.be.true;
});

it('should schedule things in the future too', (done: MochaDone) => {
let called = false;
Scheduler.queue.schedule(() => {
Expand Down Expand Up @@ -55,4 +70,4 @@ describe('Scheduler.queue', () => {
});
}, 0);
});
});
});
23 changes: 23 additions & 0 deletions spec/schedulers/AsapScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,29 @@ describe('Scheduler.asap', () => {
}
});

it('should execute recursively scheduled actions in separate asynchronous contexts', (done: MochaDone) => {
let syncExec = true;
let startTime = asap.now();
let execTime1 = startTime;
let execTime2 = startTime;
asap.schedule(function (index) {
if (index === 0) {
execTime1 = startTime + 1;
this.schedule(1);
} else if (index === 1) {
execTime2 = execTime1 + 1;
this.schedule(2);
} else if (index === 2) {
if (syncExec === false && execTime2 > execTime1 && execTime1 > startTime) {
done();
} else {
done(new Error('Execution happened synchronously.'));
}
}
}, 0, 0);
syncExec = false;
});

it('should execute the rest of the scheduled actions if the first action is canceled', (done: MochaDone) => {
let actionHappened = false;
let firstSubscription = null;
Expand Down
56 changes: 56 additions & 0 deletions spec/schedulers/QueueScheduler-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';

const Scheduler = Rx.Scheduler;
const queue = Scheduler.queue;

/** @test {Scheduler} */
describe('Scheduler.queue', () => {
it('should switch from synchronous to asynchronous at will', (done: MochaDone) => {
let lastExecTime = 0;
let asyncExec = false;
queue.schedule(function (index) {
if (index === 0) {
lastExecTime = queue.now();
this.schedule(1, 100);
} else if (index === 1) {
if (queue.now() - lastExecTime < 100) {
done(new Error('Execution happened synchronously.'));
} else {
asyncExec = true;
lastExecTime = queue.now();
this.schedule(2, 0);
}
} else if (index === 2) {
if (asyncExec === false) {
done(new Error('Execution happened synchronously.'));
} else {
done();
}
}
}, 0, 0);
asyncExec = false;
});
it('should unsubscribe the rest of the scheduled actions if an action throws an error', () => {
const actions = [];
let action2Exec = false;
let action3Exec = false;
let errorValue = undefined;
try {
queue.schedule(() => {
actions.push(
queue.schedule(() => { throw new Error('oops'); }),
queue.schedule(() => { action2Exec = true; }),
queue.schedule(() => { action3Exec = true; })
);
});
} catch (e) {
errorValue = e;
}
expect(actions.every((action) => action.isUnsubscribed)).to.be.true;
expect(action2Exec).to.be.false;
expect(action3Exec).to.be.false;
expect(errorValue).exist;
expect(errorValue.message).to.equal('oops');
});
});
2 changes: 1 addition & 1 deletion spec/support/debug.opts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
--bail
--full-trace
--check-leaks
--globals WebSocket,FormData
--globals WebSocket,FormData,XDomainRequest,ActiveXObject

--recursive
--timeout 100000
89 changes: 0 additions & 89 deletions src/MiscJSDoc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import {Subscriber} from './Subscriber';
import {TeardownLogic} from './Subscription';
import {Observable} from './Observable';
import {Subscription} from './Subscription';
import {Action} from './scheduler/Action';
import './scheduler/MiscJSDoc';
import './observable/dom/MiscJSDoc';

Expand Down Expand Up @@ -130,90 +128,3 @@ export class ObserverDoc<T> {
return void 0;
}
}

/**
* An execution context and a data structure to order tasks and schedule their
* execution. Provides a notion of (potentially virtual) time, through the
* `now()` getter method.
*
* Each unit of work in a Scheduler is called an {@link Action}.
*
* ```ts
* interface Scheduler {
* now(): number;
* schedule(work, delay?, state?): Subscription;
* flush(): void;
* active: boolean;
* actions: Action[];
* scheduledId: number;
* }
* ```
*
* @interface
* @name Scheduler
* @noimport true
*/
export class SchedulerDoc {
/**
* A getter method that returns a number representing the current time
* (at the time this function was called) according to the scheduler's own
* internal clock.
* @return {number} A number that represents the current time. May or may not
* have a relation to wall-clock time. May or may not refer to a time unit
* (e.g. milliseconds).
*/
now(): number {
return 0;
}

/**
* Schedules a function, `work`, for execution. May happen at some point in
* the future, according to the `delay` parameter, if specified. May be passed
* some context object, `state`, which will be passed to the `work` function.
*
* The given arguments will be processed an stored as an Action object in a
* queue of actions.
*
* @param {function(state: ?T): ?Subscription} work A function representing a
* task, or some unit of work to be executed by the Scheduler.
* @param {number} [delay] Time to wait before executing the work, where the
* time unit is implicit and defined by the Scheduler itself.
* @param {T} [state] Some contextual data that the `work` function uses when
* called by the Scheduler.
* @return {Subscription} A subscription in order to be able to unsubscribe
* the scheduled work.
*/
schedule<T>(work: (state?: T) => Subscription | void, delay?: number, state?: T): Subscription {
return void 0;
}

/**
* Prompt the Scheduler to execute all of its queued actions, therefore
* clearing its queue.
* @return {void}
*/
flush(): void {
return void 0;
}

/**
* A flag to indicate whether the Scheduler is currently executing a batch of
* queued actions.
* @type {boolean}
*/
active: boolean = false;

/**
* The queue of scheduled actions as an array.
* @type {Action[]}
*/
actions: Action<any>[] = [];

/**
* An internal ID used to track the latest asynchronous task such as those
* coming from `setTimeout`, `setInterval`, `requestAnimationFrame`, and
* others.
* @type {number}
*/
scheduledId: number = 0;
}
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ import observable from 'symbol-observable';
* asynchronous conversions.
* @property {Scheduler} async Schedules work with `setInterval`. Use this for
* time-based operations.
* @property {Scheduler} animation Schedules work with `requestAnimationFrame`.
* Use this for synchronizing with the platform's painting
*/
let Scheduler = {
asap,
Expand Down
64 changes: 56 additions & 8 deletions src/Scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,59 @@
import {Subscription} from './Subscription';
import {Action} from './scheduler/Action';
import {Subscription} from './Subscription';

/**
* An execution context and a data structure to order tasks and schedule their
* execution. Provides a notion of (potentially virtual) time, through the
* `now()` getter method.
*
* Each unit of work in a Scheduler is called an {@link Action}.
*
* ```ts
* class Scheduler {
* now(): number;
* schedule(work, delay?, state?): Subscription;
* }
* ```
*
* @class Scheduler
*/
export class Scheduler {

public static now: () => number = Date.now ? Date.now : () => +new Date();

constructor(private SchedulerAction: typeof Action,
now: () => number = Scheduler.now) {
this.now = now;
}

/**
* A getter method that returns a number representing the current time
* (at the time this function was called) according to the scheduler's own
* internal clock.
* @return {number} A number that represents the current time. May or may not
* have a relation to wall-clock time. May or may not refer to a time unit
* (e.g. milliseconds).
*/
public now: () => number;

export interface Scheduler {
now(): number;
schedule<T>(work: (state?: T) => Subscription | void, delay?: number, state?: T): Subscription;
flush(): void;
active: boolean;
actions: Action<any>[]; // XXX: use `any` to remove type param `T` from `Scheduler`.
scheduledId: number;
/**
* Schedules a function, `work`, for execution. May happen at some point in
* the future, according to the `delay` parameter, if specified. May be passed
* some context object, `state`, which will be passed to the `work` function.
*
* The given arguments will be processed an stored as an Action object in a
* queue of actions.
*
* @param {function(state: ?T): ?Subscription} work A function representing a
* task, or some unit of work to be executed by the Scheduler.
* @param {number} [delay] Time to wait before executing the work, where the
* time unit is implicit and defined by the Scheduler itself.
* @param {T} [state] Some contextual data that the `work` function uses when
* called by the Scheduler.
* @return {Subscription} A subscription in order to be able to unsubscribe
* the scheduled work.
*/
public schedule<T>(work: (state?: T) => void, delay: number = 0, state?: T): Subscription {
return new this.SchedulerAction<T>(this, work).schedule(state, delay);
}
}
21 changes: 10 additions & 11 deletions src/observable/GenerateObservable.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import {Observable} from '../Observable' ;
import {Scheduler} from '../Scheduler';
import {Action} from '../scheduler/Action';
import {Observable} from '../Observable' ;
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {Action} from '../scheduler/Action';

import {isScheduler} from '../util/isScheduler';

const selfSelector = <T>(value: T) => value;
Expand Down Expand Up @@ -70,16 +69,16 @@ export class GenerateObservable<T, S> extends Observable<T> {
* to send out observer messages.
*
* <img src="./img/generate.png" width="100%">
*
*
* @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
* var res = Rx.Observable.generate(0, x => x < 10, x => x + 1, x => x);
*
*
* @example <caption>Using asap scheduler, produces sequence of 2, 3, 5, then completes.</caption>
* var res = Rx.Observable.generate(1, x => x < 5, x => x * 2, x => x + 1, Rx.Scheduler.asap);
*
* @see {@link from}
* @see {@link create}
*
*
* @param {S} initialState Initial state.
* @param {function (state: S): boolean} condition Condition to terminate generation (upon returning false).
* @param {function (state: S): S} iterate Iteration step function.
Expand All @@ -98,12 +97,12 @@ export class GenerateObservable<T, S> extends Observable<T> {
* producing the sequence's elements, using the specified scheduler
* to send out observer messages.
* The overload uses state as an emitted value.
*
*
* <img src="./img/generate.png" width="100%">
*
* @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
* var res = Rx.Observable.generate(0, x => x < 10, x => x + 1);
*
*
* @example <caption>Using asap scheduler, produces sequence of 1, 2, 4, then completes.</caption>
* var res = Rx.Observable.generate(1, x => x < 5, x => x * 2, Rx.Scheduler.asap);
*
Expand All @@ -127,7 +126,7 @@ export class GenerateObservable<T, S> extends Observable<T> {
* to send out observer messages.
* The overload accepts options object that might contain inital state, iterate,
* condition and scheduler.
*
*
* <img src="./img/generate.png" width="100%">
*
* @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
Expand All @@ -151,7 +150,7 @@ export class GenerateObservable<T, S> extends Observable<T> {
* to send out observer messages.
* The overload accepts options object that might contain inital state, iterate,
* condition, result selector and scheduler.
*
*
* <img src="./img/generate.png" width="100%">
*
* @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
Expand Down Expand Up @@ -293,4 +292,4 @@ export class GenerateObservable<T, S> extends Observable<T> {
}
return (<Action<SchedulerState<T, S>>><any>this).schedule(state);
}
}
}
8 changes: 4 additions & 4 deletions src/operator/bufferTime.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {Action} from '../scheduler/Action';
import {Operator} from '../Operator';
import {async} from '../scheduler/async';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {isScheduler} from '../util/isScheduler';

/**
Expand Down
Loading

0 comments on commit b934493

Please sign in to comment.