Skip to content

Commit

Permalink
[Expressions] Fix expression loader to emit throttled value upon expr…
Browse files Browse the repository at this point in the history
…ession completion (#133532)
  • Loading branch information
dokmic authored Jun 7, 2022
1 parent f1e4c0c commit c99ad10
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/plugins/expressions/public/loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ describe('ExpressionLoader', () => {
it('throttles partial results', async () => {
testScheduler.run(({ cold, expectObservable }) => {
const expressionLoader = new ExpressionLoader(element, 'var foo', {
variables: { foo: cold('a 5ms b 5ms c 10ms d', { a: 1, b: 2, c: 3, d: 4 }) },
variables: { foo: cold('a 5ms b 5ms c 10ms (d|)', { a: 1, b: 2, c: 3, d: 4 }) },
partial: true,
throttle: 20,
});

expectObservable(expressionLoader.data$).toBe('a 19ms c 19ms d', {
expectObservable(expressionLoader.data$).toBe('a 19ms c 2ms d', {
a: expect.objectContaining({ result: 1 }),
c: expect.objectContaining({ result: 3 }),
d: expect.objectContaining({ result: 4 }),
Expand Down
63 changes: 58 additions & 5 deletions src/plugins/expressions/public/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
* Side Public License, v 1.
*/

import { BehaviorSubject, Observable, Subject, Subscription, asyncScheduler, identity } from 'rxjs';
import { filter, map, delay, shareReplay, throttleTime } from 'rxjs/operators';
import { BehaviorSubject, Observable, Subject, Subscription, identity, timer } from 'rxjs';
import { delay, filter, finalize, map, shareReplay, takeWhile } from 'rxjs/operators';
import { defaults } from 'lodash';
import { SerializableRecord, UnwrapObservable } from '@kbn/utility-types';
import { Adapters } from '@kbn/inspector-plugin/public';
Expand All @@ -20,6 +20,61 @@ import { getExpressionsService } from './services';

type Data = unknown;

/**
* RxJS' `throttle` operator does not emit the last value immediately when the source observable is completed.
* Instead, it waits for the next throttle period to emit that.
* It might cause delays until we get the final value, even though it is already there.
* @see https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/throttle.ts#L121
*/
function throttle<T>(timeout: number) {
return (source: Observable<T>): Observable<T> =>
new Observable((subscriber) => {
let latest: T | undefined;
let hasValue = false;

const emit = () => {
if (hasValue) {
subscriber.next(latest);
hasValue = false;
latest = undefined;
}
};

let throttled: Subscription | undefined;
const timer$ = timer(0, timeout).pipe(
takeWhile(() => hasValue),
finalize(() => {
subscriber.remove(throttled!);
throttled = undefined;
})
);

subscriber.add(
source.subscribe({
next: (value) => {
latest = value;
hasValue = true;

if (!throttled) {
throttled = timer$.subscribe(emit);
subscriber.add(throttled);
}
},
error: (error) => subscriber.error(error),
complete: () => {
emit();
subscriber.complete();
},
})
);

subscriber.add(() => {
hasValue = false;
latest = undefined;
});
});
}

export class ExpressionLoader {
data$: ReturnType<ExecutionContract['getData']>;
update$: ExpressionRenderHandler['update$'];
Expand Down Expand Up @@ -151,9 +206,7 @@ export class ExpressionLoader {
.pipe(
delay(0), // delaying until the next tick since we execute the expression in the constructor
filter(({ partial }) => params.partial || !partial),
params.partial && params.throttle
? throttleTime(params.throttle, asyncScheduler, { leading: true, trailing: true })
: identity
params.partial && params.throttle ? throttle(params.throttle) : identity
)
.subscribe((value) => this.dataSubject.next(value));
};
Expand Down

0 comments on commit c99ad10

Please sign in to comment.