-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(bindCallback): only call function once even while scheduled
The previous implementation had issues with only calling the source function one time when scheduled. Since bindCallback shares its result with all subscribers, it is multicast, that means that it would need to maintain a list of subscribers internally. In leiu of that, I'm using AsyncSubject (which might need a better name) closes #881
- Loading branch information
Showing
6 changed files
with
330 additions
and
123 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,82 +1,246 @@ | ||
/* globals describe, it, expect */ | ||
/* globals describe, it, expect, rxTestScheduler */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
describe('Observable.bindCallback', function () { | ||
it('should emit one value from a callback', function (done) { | ||
describe('when not scheduled', function () { | ||
it('should emit one value from a callback', function () { | ||
function callback(datum, cb) { | ||
cb(datum); | ||
} | ||
var boundCallback = Observable.bindCallback(callback); | ||
var results = []; | ||
|
||
boundCallback(42) | ||
.subscribe(function (x) { | ||
results.push(x); | ||
}, null, function () { | ||
results.push('done'); | ||
}); | ||
|
||
expect(results).toEqual([42, 'done']); | ||
}); | ||
|
||
it('should emit one value chosen by a selector', function () { | ||
function callback(datum, cb) { | ||
cb(datum); | ||
} | ||
var boundCallback = Observable.bindCallback(callback, function (datum) { return datum; }); | ||
var results = []; | ||
|
||
boundCallback(42) | ||
.subscribe(function (x) { | ||
results.push(x); | ||
}, null, function () { | ||
results.push('done'); | ||
}); | ||
|
||
expect(results).toEqual([42, 'done']); | ||
}); | ||
|
||
it('should emit an error when the selector throws', function () { | ||
function callback(cb) { | ||
cb(42); | ||
} | ||
var boundCallback = Observable.bindCallback(callback, function (err) { throw new Error('Yikes!'); }); | ||
var results = []; | ||
|
||
boundCallback() | ||
.subscribe(function () { | ||
throw 'should not next'; | ||
}, function (err) { | ||
results.push(err); | ||
}, function () { | ||
throw 'should not complete'; | ||
}); | ||
|
||
expect(results).toEqual([new Error('Yikes!')]); | ||
}); | ||
|
||
it('should not emit, throw or complete if immediately unsubscribed', function (done) { | ||
var nextSpy = jasmine.createSpy('next'); | ||
var throwSpy = jasmine.createSpy('throw'); | ||
var completeSpy = jasmine.createSpy('complete'); | ||
var timeout; | ||
function callback(datum, cb) { | ||
// Need to cb async in order for the unsub to trigger | ||
timeout = setTimeout(function () { | ||
cb(datum); | ||
}); | ||
} | ||
var subscription = Observable.bindCallback(callback)(42) | ||
.subscribe(nextSpy, throwSpy, completeSpy); | ||
subscription.unsubscribe(); | ||
|
||
setTimeout(function () { | ||
expect(nextSpy).not.toHaveBeenCalled(); | ||
expect(throwSpy).not.toHaveBeenCalled(); | ||
expect(completeSpy).not.toHaveBeenCalled(); | ||
|
||
clearTimeout(timeout); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
|
||
describe('when scheduled', function () { | ||
it('should emit one value from a callback', function () { | ||
function callback(datum, cb) { | ||
cb(datum); | ||
} | ||
var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); | ||
var results = []; | ||
|
||
boundCallback(42) | ||
.subscribe(function (x) { | ||
results.push(x); | ||
}, null, function () { | ||
results.push('done'); | ||
}); | ||
|
||
rxTestScheduler.flush(); | ||
|
||
expect(results).toEqual([42, 'done']); | ||
}); | ||
|
||
it('should error if callback throws', function () { | ||
function callback(datum, cb) { | ||
throw new Error('haha no callback for you'); | ||
} | ||
var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); | ||
var results = []; | ||
|
||
boundCallback(42) | ||
.subscribe(function (x) { | ||
throw 'should not next'; | ||
}, function (err) { | ||
results.push(err); | ||
}, function () { | ||
throw 'should not complete'; | ||
}); | ||
|
||
rxTestScheduler.flush(); | ||
|
||
expect(results).toEqual([new Error('haha no callback for you')]); | ||
}); | ||
|
||
it('should error if selector throws', function () { | ||
function callback(datum, cb) { | ||
cb(datum); | ||
} | ||
function selector() { | ||
throw new Error('what? a selector? I don\'t think so'); | ||
} | ||
var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler); | ||
var results = []; | ||
|
||
boundCallback(42) | ||
.subscribe(function (x) { | ||
throw 'should not next'; | ||
}, function (err) { | ||
results.push(err); | ||
}, function () { | ||
throw 'should not complete'; | ||
}); | ||
|
||
rxTestScheduler.flush(); | ||
|
||
expect(results).toEqual([new Error('what? a selector? I don\'t think so')]); | ||
}); | ||
|
||
it('should use a selector', function () { | ||
function callback(datum, cb) { | ||
cb(datum); | ||
} | ||
function selector(x) { | ||
return x + '!!!'; | ||
} | ||
var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler); | ||
var results = []; | ||
|
||
boundCallback(42) | ||
.subscribe(function (x) { | ||
results.push(x); | ||
}, null, function () { | ||
results.push('done'); | ||
}); | ||
|
||
rxTestScheduler.flush(); | ||
|
||
expect(results).toEqual(['42!!!', 'done']); | ||
}); | ||
}); | ||
|
||
it('should pass multiple inner arguments as an array', function () { | ||
function callback(datum, cb) { | ||
cb(datum); | ||
cb(datum, 1, 2, 3); | ||
} | ||
var boundCallback = Observable.bindCallback(callback); | ||
var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); | ||
var results = []; | ||
|
||
boundCallback(42) | ||
.subscribe(function (x) { | ||
expect(x).toBe(42); | ||
}, function () { | ||
done.fail('should not be called'); | ||
}, | ||
done); | ||
results.push(x); | ||
}, null, function () { | ||
results.push('done'); | ||
}); | ||
|
||
rxTestScheduler.flush(); | ||
|
||
expect(results).toEqual([[42, 1, 2, 3], 'done']); | ||
}); | ||
|
||
it('should emit one value chosen by a selector', function (done) { | ||
it('should pass multiple inner arguments to the selector if there is one', function () { | ||
function callback(datum, cb) { | ||
cb(null, datum); | ||
cb(datum, 1, 2, 3); | ||
} | ||
function selector(a, b, c, d) { | ||
expect([a, b, c, d]).toEqual([42, 1, 2, 3]); | ||
return a + b + c + d; | ||
} | ||
var boundCallback = Observable.bindCallback(callback, function (err, datum) { return datum; }); | ||
var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler); | ||
var results = []; | ||
|
||
boundCallback(42) | ||
.subscribe(function (x) { | ||
expect(x).toBe(42); | ||
}, function () { | ||
done.fail('should not be called'); | ||
}, | ||
done); | ||
}); | ||
results.push(x); | ||
}, null, function () { | ||
results.push('done'); | ||
}); | ||
|
||
it('should emit an error when the selector throws', function (done) { | ||
function callback(cb) { | ||
cb(42); | ||
} | ||
var boundCallback = Observable.bindCallback(callback, function (err) { throw new Error('Yikes!'); }); | ||
|
||
boundCallback() | ||
.subscribe(function () { | ||
// Considered a failure if we don't go directly to err handler | ||
done.fail('should not be called'); | ||
}, | ||
function (err) { | ||
expect(err.message).toBe('Yikes!'); | ||
done(); | ||
}, | ||
function () { | ||
// Considered a failure if we don't go directly to err handler | ||
done.fail('should not be called'); | ||
} | ||
); | ||
rxTestScheduler.flush(); | ||
|
||
expect(results).toEqual([48, 'done']); | ||
}); | ||
|
||
it('should not emit, throw or complete if immediately unsubscribed', function (done) { | ||
var nextSpy = jasmine.createSpy('next'); | ||
var throwSpy = jasmine.createSpy('throw'); | ||
var completeSpy = jasmine.createSpy('complete'); | ||
var timeout; | ||
it('should cache value for next subscription and not call callbackFunc again', function () { | ||
var calls = 0; | ||
function callback(datum, cb) { | ||
// Need to cb async in order for the unsub to trigger | ||
timeout = setTimeout(function () { | ||
cb(datum); | ||
}); | ||
calls++; | ||
cb(datum); | ||
} | ||
var subscription = Observable.bindCallback(callback)(42) | ||
.subscribe(nextSpy, throwSpy, completeSpy); | ||
subscription.unsubscribe(); | ||
var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); | ||
var results1 = []; | ||
var results2 = []; | ||
|
||
setTimeout(function () { | ||
expect(nextSpy).not.toHaveBeenCalled(); | ||
expect(throwSpy).not.toHaveBeenCalled(); | ||
expect(completeSpy).not.toHaveBeenCalled(); | ||
var source = boundCallback(42); | ||
|
||
clearTimeout(timeout); | ||
done(); | ||
source.subscribe(function (x) { | ||
results1.push(x); | ||
}, null, function () { | ||
results1.push('done'); | ||
}); | ||
|
||
source.subscribe(function (x) { | ||
results2.push(x); | ||
}, null, function () { | ||
results2.push('done'); | ||
}); | ||
|
||
rxTestScheduler.flush(); | ||
|
||
expect(calls).toBe(1); | ||
expect(results1).toEqual([42, 'done']); | ||
expect(results2).toEqual([42, 'done']); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* globals describe, it, expect, rxTestScheduler */ | ||
var Rx = require('../../dist/cjs/Rx'); | ||
var Observable = Rx.Observable; | ||
|
||
/** | ||
* I'm starting this file to collect tests that when put in other files break jasmine for | ||
* no apparent reason. It seems like maybe we should move off of jasmine, but moving >1700 tests | ||
* sounds really gross, so I don't want to do that... | ||
*/ | ||
describe('jasmine is weird', function () { | ||
describe('bindCallback', function () { | ||
// HACK: If you move this test under the bindCallback-spec.js file, it will arbitrarily | ||
// break one bufferWhen-spec.js test. | ||
it('should not even call the callbackFn if immediately unsubscribed', function () { | ||
var calls = 0; | ||
function callback(datum, cb) { | ||
calls++; | ||
cb(datum); | ||
} | ||
var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); | ||
var results1 = []; | ||
|
||
var source = boundCallback(42); | ||
|
||
var subscription = source.subscribe(function (x) { | ||
results1.push(x); | ||
}, null, function () { | ||
results1.push('done'); | ||
}); | ||
|
||
subscription.unsubscribe(); | ||
|
||
rxTestScheduler.flush(); | ||
|
||
expect(calls).toBe(0); | ||
}); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
import {Observable} from '../../Observable'; | ||
import {BoundCallbackObsevable} from '../../observable/bindCallback'; | ||
Observable.bindCallback = BoundCallbackObsevable.create; | ||
import {BoundCallbackObservable} from '../../observable/bindCallback'; | ||
Observable.bindCallback = BoundCallbackObservable.create; |
Oops, something went wrong.