Skip to content

Commit

Permalink
ensure workload agg doesnt run until next interval when it fails
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Nov 18, 2020
1 parent 484437f commit 5f9c6aa
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { first, take, bufferCount } from 'rxjs/operators';
import { first, take, bufferCount, map } from 'rxjs/operators';
import { loggingSystemMock } from '../../../../../src/core/server/mocks';
import {
WorkloadAggregation,
Expand Down Expand Up @@ -477,6 +477,41 @@ describe('Workload Statistics Aggregator', () => {
}, reject);
});
});

test('recovery after errors occurrs at the next interval', async () => {
const refreshInterval = 1000;

const taskStore = taskStoreMock.create({});
const logger = loggingSystemMock.create().get();
const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
refreshInterval,
3000,
logger
);

return new Promise((resolve, reject) => {
let errorWasThrowAt = 0;
taskStore.aggregate.mockImplementation(async () => {
if (errorWasThrowAt === 0) {
errorWasThrowAt = Date.now();
throw new Error(`Elasticsearch has gone poof`);
} else if (Date.now() - errorWasThrowAt < refreshInterval) {
reject(new Error(`Elasticsearch is still poof`));
}

return setTaskTypeCount(mockAggregatedResult(), 'alerting_telemetry', {
idle: 2,
});
});

workloadAggregator.pipe(take(2), bufferCount(2)).subscribe((results) => {
expect(results.length).toEqual(2);
resolve();
}, reject);
});
});
});

describe('estimateRecurringTaskScheduling', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import { combineLatest, Observable, timer } from 'rxjs';
import { mergeMap, map, filter, catchError } from 'rxjs/operators';
import { mergeMap, map, filter, switchMap, catchError } from 'rxjs/operators';
import { Logger } from 'src/core/server';
import { JsonObject } from 'src/plugins/kibana_utils/common';
import { keyBy, mapValues } from 'lodash';
Expand Down Expand Up @@ -222,8 +222,8 @@ export function createWorkloadAggregator(
}),
catchError((ex: Error, caught) => {
logger.error(`[WorkloadAggregator]: ${ex}`);
// continue to pull values from the same observable
return caught;
// continue to pull values from the same observable but only on the next refreshInterval
return timer(refreshInterval).pipe(switchMap(() => caught));
})
);
}
Expand Down

0 comments on commit 5f9c6aa

Please sign in to comment.