diff --git a/modules/nuclide-commons/observable.js b/modules/nuclide-commons/observable.js index 3cbf2350..9c52fb9c 100644 --- a/modules/nuclide-commons/observable.js +++ b/modules/nuclide-commons/observable.js @@ -28,6 +28,7 @@ import UniversalDisposable from './UniversalDisposable'; import invariant from 'assert'; import {Observable, ReplaySubject} from 'rxjs'; import {setDifference} from './collection'; +import debounce from './debounce'; /** * Splits a stream of strings on newlines. @@ -358,6 +359,33 @@ export function completingSwitchMap( }); } +/** + * RxJS's debounceTime is actually fairly inefficient: + * on each event, it always clears its interval and [creates a new one][1]. + * Until this is fixed, this uses our debounce implementation which + * reuses a timeout and just sets a timestamp when possible. + * + * This may seem like a micro-optimization but we often use debounces + * for very hot events, like keypresses. Exceeding the frame budget can easily lead + * to increased key latency! + * + * [1]: https://github.com/ReactiveX/rxjs/blob/master/src/operators/debounceTime.ts#L106 + */ +export function fastDebounce( + delay: number, +): (Observable) => Observable { + return (observable: Observable) => + Observable.create(observer => { + const debouncedNext = debounce((x: T) => observer.next(x), delay); + const subscription = observable.subscribe( + debouncedNext, + observer.error.bind(observer), + observer.complete.bind(observer), + ); + return new UniversalDisposable(subscription, debouncedNext); + }); +} + export const microtask = Observable.create(observer => { process.nextTick(() => { observer.next(); diff --git a/modules/nuclide-commons/spec/observable-spec.js b/modules/nuclide-commons/spec/observable-spec.js index 4eb11cee..c54d9294 100644 --- a/modules/nuclide-commons/spec/observable-spec.js +++ b/modules/nuclide-commons/spec/observable-spec.js @@ -12,8 +12,11 @@ import { bufferUntil, - diffSets, cacheWhileSubscribed, + completingSwitchMap, + concatLatest, + diffSets, + fastDebounce, macrotask, microtask, nextAnimationFrame, @@ -22,8 +25,6 @@ import { takeWhileInclusive, throttle, toggle, - concatLatest, - completingSwitchMap, } from '../observable'; import {Disposable} from 'event-kit'; import {Observable, Subject} from 'rxjs'; @@ -517,6 +518,58 @@ describe('nuclide-commons/observable', () => { }); }); + describe('fastDebounce', () => { + it('debounces events', () => { + waitsForPromise(async () => { + let nextSpy: jasmine$Spy; + const originalCreate = Observable.create.bind(Observable); + // Spy on the created observer's next to ensure that we always cancel + // the last debounced timer on unsubscribe. + spyOn(Observable, 'create').andCallFake(callback => { + return originalCreate(observer => { + nextSpy = spyOn(observer, 'next').andCallThrough(); + return callback(observer); + }); + }); + + const subject = new Subject(); + const promise = subject + .let(fastDebounce(10)) + .toArray() + .toPromise(); + + subject.next(1); + subject.next(2); + advanceClock(20); + + subject.next(3); + advanceClock(5); + + subject.next(4); + advanceClock(15); + + subject.next(5); + subject.complete(); + advanceClock(20); + + expect(await promise).toEqual([2, 4]); + expect(nextSpy.callCount).toBe(2); + }); + }); + + it('passes errors through immediately', () => { + let caught = false; + Observable.throw(1) + .let(fastDebounce(10)) + .subscribe({ + error() { + caught = true; + }, + }); + expect(caught).toBe(true); + }); + }); + describe('microtask', () => { it('is cancelable', () => { waitsForPromise(async () => {