From 5f9c6aade4c9febbfb6aa672c2c3e4f948485bc4 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Wed, 18 Nov 2020 13:34:50 +0000 Subject: [PATCH 1/2] ensure workload agg doesnt run until next interval when it fails --- .../monitoring/workload_statistics.test.ts | 37 ++++++++++++++++++- .../server/monitoring/workload_statistics.ts | 6 +-- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts index c2e62b6e1898b..1d37cda1a66e3 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { first, take, bufferCount } from 'rxjs/operators'; +import { first, take, bufferCount, map } from 'rxjs/operators'; import { loggingSystemMock } from '../../../../../src/core/server/mocks'; import { WorkloadAggregation, @@ -477,6 +477,41 @@ describe('Workload Statistics Aggregator', () => { }, reject); }); }); + + test('recovery after errors occurrs at the next interval', async () => { + const refreshInterval = 1000; + + const taskStore = taskStoreMock.create({}); + const logger = loggingSystemMock.create().get(); + const workloadAggregator = createWorkloadAggregator( + taskStore, + of(true), + refreshInterval, + 3000, + logger + ); + + return new Promise((resolve, reject) => { + let errorWasThrowAt = 0; + taskStore.aggregate.mockImplementation(async () => { + if (errorWasThrowAt === 0) { + errorWasThrowAt = Date.now(); + throw new Error(`Elasticsearch has gone poof`); + } else if (Date.now() - errorWasThrowAt < refreshInterval) { + reject(new Error(`Elasticsearch is still poof`)); + } + + return setTaskTypeCount(mockAggregatedResult(), 'alerting_telemetry', { + idle: 2, + }); + }); + + workloadAggregator.pipe(take(2), bufferCount(2)).subscribe((results) => { + expect(results.length).toEqual(2); + resolve(); + }, reject); + }); + }); }); describe('estimateRecurringTaskScheduling', () => { diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts index a27b5e2282e32..8002ee44d01ff 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts @@ -5,7 +5,7 @@ */ import { combineLatest, Observable, timer } from 'rxjs'; -import { mergeMap, map, filter, catchError } from 'rxjs/operators'; +import { mergeMap, map, filter, switchMap, catchError } from 'rxjs/operators'; import { Logger } from 'src/core/server'; import { JsonObject } from 'src/plugins/kibana_utils/common'; import { keyBy, mapValues } from 'lodash'; @@ -222,8 +222,8 @@ export function createWorkloadAggregator( }), catchError((ex: Error, caught) => { logger.error(`[WorkloadAggregator]: ${ex}`); - // continue to pull values from the same observable - return caught; + // continue to pull values from the same observable but only on the next refreshInterval + return timer(refreshInterval).pipe(switchMap(() => caught)); }) ); } From 707904c3adc47276f02c33d22fb5a824240b7f10 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Wed, 18 Nov 2020 14:17:43 +0000 Subject: [PATCH 2/2] removed unused imports --- .../task_manager/server/monitoring/workload_statistics.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts index 1d37cda1a66e3..3470ee4d76486 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { first, take, bufferCount, map } from 'rxjs/operators'; +import { first, take, bufferCount } from 'rxjs/operators'; import { loggingSystemMock } from '../../../../../src/core/server/mocks'; import { WorkloadAggregation,