Skip to content

Commit

Permalink
fix(TestScheduler): stop sorting actual results
Browse files Browse the repository at this point in the history
- fixes an issue where actual results were being sorted when they should not have been
- ensures rescheduled Actions will be added to subscription and torn down when appropriate
- uses state to convey notification information to action work function rather than closure

closes #422
  • Loading branch information
benlesh committed Oct 7, 2015
1 parent 8daddde commit 51db0b8
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 8 deletions.
17 changes: 17 additions & 0 deletions spec/operators/merge-map-spec.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
/* globals expectObservable, cold, hot, describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Promise = require('promise');

describe('Observable.prototype.mergeMap()', function () {
it('should mergeMap many regular interval inners', function () {
var a = cold('----a---a---a---(a|)' );
var b = cold( '----b---b---(b|)' );
var c = cold( '----c---c---c---c---(c|)');
var d = cold( '----(d|)' );
var e1 = hot('a---b-----------c-------d-------|' );
var expected = '----a---(ab)(ab)(ab)c---c---(cd)c---(c|)';

var observableLookup = { a: a, b: b, c: c, d: d };
var source = e1.mergeMap(function (value) {
return observableLookup[value];
});

expectObservable(source).toBe(expected);
});

it('should map values to constant resolved promises and merge', function (done) {
var source = Rx.Observable.from([4,3,2,1]);
var project = function (value) {
Expand Down
13 changes: 11 additions & 2 deletions src/schedulers/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export default class VirtualTimeScheduler implements Scheduler {
class VirtualAction<T> extends Subscription<T> implements Action {
state: any;
delay: number;
calls = 0;

constructor(public scheduler: VirtualTimeScheduler,
public work: (x?: any) => Subscription<T> | void,
Expand All @@ -79,8 +80,16 @@ class VirtualAction<T> extends Subscription<T> implements Action {
return this;
}
const scheduler = this.scheduler;
let action = scheduler.frame === this.delay ? this :
new VirtualAction(scheduler, this.work, scheduler.index += 1);
let action;
if (this.calls++ === 0) {
// the action is not being rescheduled.
action = this;
} else {
// the action is being rescheduled, and we can't mutate the one in the actions list
// in the scheduler, so we'll create a new one.
action = new VirtualAction(scheduler, this.work, scheduler.index += 1);
this.add(action);
}
action.state = state;
action.delay = scheduler.frame + delay;
scheduler.addAction(action);
Expand Down
9 changes: 4 additions & 5 deletions src/testing/ColdObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ export default class ColdObservable<T> extends Observable<T> implements Subscrip
scheduleMessages(subscriber) {
const messagesLength = this.messages.length;
for (let i = 0; i < messagesLength; i++) {
const message = this.messages[i];
let message = this.messages[i];
subscriber.add(
this.scheduler.schedule(
() => { message.notification.observe(subscriber); },
message.frame
)
this.scheduler.schedule(({message, subscriber}) => { message.notification.observe(subscriber); },
message.frame,
{message, subscriber})
);
}
}
Expand Down
1 change: 0 additions & 1 deletion src/testing/TestScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ export class TestScheduler extends VirtualTimeScheduler {
const readyFlushTests = this.flushTests.filter(test => test.ready);
while (readyFlushTests.length > 0) {
let test = readyFlushTests.shift();
test.actual.sort((a, b) => a.frame === b.frame ? 0 : (a.frame > b.frame ? 1 : -1));
this.assertDeepEqual(test.actual, test.expected);
}
}
Expand Down

0 comments on commit 51db0b8

Please sign in to comment.