Skip to content

Commit

Permalink
[Response Ops][Alerting] SLIs Phase 2 - Histogram for task schedule d…
Browse files Browse the repository at this point in the history
…elay (elastic#166122)

Towards elastic/response-ops-team#130

## Summary

This implements the second of 3 SLIs described in
elastic/response-ops-team#130 - the histogram
for tracking task schedule delay in seconds. We bucket up to 30 minutes.

## To Verify

Run Kibana and create some alerting rules. Navigate to
https://localhost:5601/api/task_manager/metrics?reset=false and you
should see that the new metric under `task_run.value.overall.delay`

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
ymao1 and kibanamachine authored Sep 18, 2023
1 parent 70a2f4c commit 1574502
Show file tree
Hide file tree
Showing 11 changed files with 923 additions and 115 deletions.
797 changes: 734 additions & 63 deletions x-pack/plugins/task_manager/server/metrics/create_aggregator.test.ts

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions x-pack/plugins/task_manager/server/metrics/metrics_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { set } from '@kbn/safer-lodash-set';
import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle';
import { TaskManagerConfig } from '../config';
import { AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { isTaskPollingCycleEvent, isTaskRunEvent } from '../task_events';
import { isTaskManagerStatEvent, isTaskPollingCycleEvent, isTaskRunEvent } from '../task_events';
import { TaskClaimMetric, TaskClaimMetricsAggregator } from './task_claim_metrics_aggregator';
import { createAggregator } from './create_aggregator';
import { TaskRunMetric, TaskRunMetricsAggregator } from './task_run_metrics_aggregator';
Expand Down Expand Up @@ -54,7 +54,8 @@ export function createMetricsAggregators({
taskPollingLifecycle,
config,
resetMetrics$,
taskEventFilter: (taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent),
taskEventFilter: (taskEvent: TaskLifecycleEvent) =>
isTaskRunEvent(taskEvent) || isTaskManagerStatEvent(taskEvent),
metricsAggregator: new TaskRunMetricsAggregator(),
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,21 @@ describe('SimpleHistogram', () => {

expect(histogram.get(true)).toEqual([]);
});

test('should correctly serialize histogram data', () => {
const histogram = new SimpleHistogram(100, 10);
histogram.record(23);
histogram.record(34);
histogram.record(21);
histogram.record(56);
histogram.record(78);
histogram.record(33);
histogram.record(99);
histogram.record(1);
histogram.record(2);
expect(histogram.serialize()).toEqual({
counts: [2, 0, 2, 2, 0, 1, 0, 1, 0, 1],
values: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
});
});
});
18 changes: 18 additions & 0 deletions x-pack/plugins/task_manager/server/metrics/simple_histogram.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* 2.0.
*/

import { JsonObject } from '@kbn/utility-types';
import { last } from 'lodash';

interface Bucket {
Expand All @@ -13,6 +14,11 @@ interface Bucket {
count: number;
}

export interface SerializedHistogram extends JsonObject {
counts: number[];
values: number[];
}

export class SimpleHistogram {
private maxValue: number;
private bucketSize: number;
Expand Down Expand Up @@ -68,6 +74,18 @@ export class SimpleHistogram {
}));
}

public serialize(): SerializedHistogram {
const counts: number[] = [];
const values: number[] = [];

for (const { count, value } of this.get(true)) {
counts.push(count);
values.push(value);
}

return { counts, values };
}

private initializeBuckets() {
let i = 0;
while (i < this.maxValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { JsonObject } from '@kbn/utility-types';
import { isOk } from '../lib/result_type';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { TaskRun } from '../task_events';
import { SimpleHistogram } from './simple_histogram';
import { SerializedHistogram, SimpleHistogram } from './simple_histogram';
import { ITaskMetricsAggregator } from './types';
import { MetricCounterService } from './counter/metric_counter_service';

Expand All @@ -26,10 +26,7 @@ interface TaskClaimCounts extends JsonObject {
}

export type TaskClaimMetric = TaskClaimCounts & {
duration: {
counts: number[];
values: number[];
};
duration: SerializedHistogram;
};

export class TaskClaimMetricsAggregator implements ITaskMetricsAggregator<TaskClaimMetric> {
Expand All @@ -47,7 +44,7 @@ export class TaskClaimMetricsAggregator implements ITaskMetricsAggregator<TaskCl
public collect(): TaskClaimMetric {
return {
...this.counter.collect(),
duration: this.serializeHistogram(),
duration: this.durationHistogram.serialize(),
};
}

Expand All @@ -68,16 +65,4 @@ export class TaskClaimMetricsAggregator implements ITaskMetricsAggregator<TaskCl
this.durationHistogram.record(durationInMs);
}
}

private serializeHistogram() {
const counts: number[] = [];
const values: number[] = [];

for (const { count, value } of this.durationHistogram.get(true)) {
counts.push(count);
values.push(value);
}

return { counts, values };
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
import * as uuid from 'uuid';
import { asOk, asErr } from '../lib/result_type';
import { TaskStatus } from '../task';
import { asTaskRunEvent, TaskPersistence } from '../task_events';
import {
asTaskManagerStatEvent,
asTaskRunEvent,
TaskManagerStats,
TaskPersistence,
} from '../task_events';
import { TaskRunResult } from '../task_running';
import { TaskRunMetricsAggregator } from './task_run_metrics_aggregator';

Expand Down Expand Up @@ -69,6 +74,10 @@ export const getTaskRunFailedEvent = (type: string, isExpired: boolean = false)
);
};

export const getTaskManagerStatEvent = (value: number, id: TaskManagerStats = 'runDelay') => {
return asTaskManagerStatEvent(id, asOk(value));
};

describe('TaskRunMetricsAggregator', () => {
let taskRunMetricsAggregator: TaskRunMetricsAggregator;
beforeEach(() => {
Expand All @@ -77,32 +86,49 @@ describe('TaskRunMetricsAggregator', () => {

test('should correctly initialize', () => {
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 0, not_timed_out: 0, total: 0 },
overall: { success: 0, not_timed_out: 0, total: 0, delay: { counts: [], values: [] } },
});
});

test('should correctly return initialMetrics', () => {
expect(taskRunMetricsAggregator.initialMetric()).toEqual({
overall: { success: 0, not_timed_out: 0, total: 0 },
overall: { success: 0, not_timed_out: 0, total: 0, delay: { counts: [], values: [] } },
by_type: {},
});
});

test('should correctly process task run success event', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry'));
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 2, not_timed_out: 2, total: 2 },
overall: { success: 2, not_timed_out: 2, total: 2, delay: { counts: [], values: [] } },
by_type: {
telemetry: { success: 2, not_timed_out: 2, total: 2 },
},
});
});

test('should correctly process task manager runDelay stat', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskManagerStatEvent(3.343));
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 0, not_timed_out: 0, total: 0, delay: { counts: [1], values: [10] } },
});
});

test('should ignore task manager stats that are not runDelays', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(
getTaskManagerStatEvent(3.343, 'pollingDelay')
);
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 0, not_timed_out: 0, total: 0, delay: { counts: [], values: [] } },
});
});

test('should correctly process task run success event where task run has timed out', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry', true));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry', true));
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 2, not_timed_out: 0, total: 2 },
overall: { success: 2, not_timed_out: 0, total: 2, delay: { counts: [], values: [] } },
by_type: {
telemetry: { success: 2, not_timed_out: 0, total: 2 },
},
Expand All @@ -113,7 +139,7 @@ describe('TaskRunMetricsAggregator', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry'));
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 0, not_timed_out: 2, total: 2 },
overall: { success: 0, not_timed_out: 2, total: 2, delay: { counts: [], values: [] } },
by_type: {
telemetry: { success: 0, not_timed_out: 2, total: 2 },
},
Expand All @@ -124,7 +150,7 @@ describe('TaskRunMetricsAggregator', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry', true));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry', true));
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 0, not_timed_out: 0, total: 2 },
overall: { success: 0, not_timed_out: 0, total: 2, delay: { counts: [], values: [] } },
by_type: {
telemetry: { success: 0, not_timed_out: 0, total: 2 },
},
Expand All @@ -137,7 +163,7 @@ describe('TaskRunMetricsAggregator', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report', true));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunFailedEvent('telemetry'));
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 3, not_timed_out: 3, total: 4 },
overall: { success: 3, not_timed_out: 3, total: 4, delay: { counts: [], values: [] } },
by_type: {
report: { success: 2, not_timed_out: 1, total: 2 },
telemetry: { success: 1, not_timed_out: 2, total: 2 },
Expand Down Expand Up @@ -167,7 +193,7 @@ describe('TaskRunMetricsAggregator', () => {
getTaskRunSuccessEvent('alerting:.index-threshold', true)
);
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 11, not_timed_out: 12, total: 14 },
overall: { success: 11, not_timed_out: 12, total: 14, delay: { counts: [], values: [] } },
by_type: {
actions: { success: 3, not_timed_out: 3, total: 3 },
'actions:__email': { success: 1, not_timed_out: 1, total: 1 },
Expand All @@ -182,6 +208,11 @@ describe('TaskRunMetricsAggregator', () => {
});

test('should correctly reset counter', () => {
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskManagerStatEvent(3.343));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskManagerStatEvent(25.45));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskManagerStatEvent(6.4478));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskManagerStatEvent(9.241));

taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('telemetry'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report'));
taskRunMetricsAggregator.processTaskLifecycleEvent(getTaskRunSuccessEvent('report'));
Expand All @@ -203,7 +234,12 @@ describe('TaskRunMetricsAggregator', () => {
getTaskRunSuccessEvent('alerting:.index-threshold')
);
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 11, not_timed_out: 12, total: 14 },
overall: {
success: 11,
not_timed_out: 12,
total: 14,
delay: { counts: [3, 0, 1], values: [10, 20, 30] },
},
by_type: {
actions: { success: 3, not_timed_out: 3, total: 3 },
'actions:__email': { success: 1, not_timed_out: 1, total: 1 },
Expand All @@ -218,7 +254,7 @@ describe('TaskRunMetricsAggregator', () => {

taskRunMetricsAggregator.reset();
expect(taskRunMetricsAggregator.collect()).toEqual({
overall: { success: 0, not_timed_out: 0, total: 0 },
overall: { success: 0, not_timed_out: 0, total: 0, delay: { counts: [], values: [] } },
by_type: {
actions: { success: 0, not_timed_out: 0, total: 0 },
'actions:__email': { success: 0, not_timed_out: 0, total: 0 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,26 @@
*/

import { JsonObject } from '@kbn/utility-types';
import { isOk, unwrap } from '../lib/result_type';
import { merge } from 'lodash';
import { isOk, Ok, unwrap } from '../lib/result_type';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { ErroredTask, RanTask, TaskRun } from '../task_events';
import {
ErroredTask,
RanTask,
TaskRun,
isTaskManagerStatEvent,
isTaskRunEvent,
TaskManagerStat,
} from '../task_events';
import { MetricCounterService } from './counter/metric_counter_service';
import { SerializedHistogram, SimpleHistogram } from './simple_histogram';
import { ITaskMetricsAggregator } from './types';

const taskTypeGrouping = new Set<string>(['alerting:', 'actions:']);

const HDR_HISTOGRAM_MAX = 1800; // 30 minutes
const HDR_HISTOGRAM_BUCKET_SIZE = 10; // 10 seconds

enum TaskRunKeys {
SUCCESS = 'success',
NOT_TIMED_OUT = 'not_timed_out',
Expand All @@ -31,33 +43,53 @@ interface TaskRunCounts extends JsonObject {
[TaskRunKeys.TOTAL]: number;
}

export interface TaskRunMetric extends JsonObject {
export interface TaskRunMetrics extends JsonObject {
[TaskRunMetricKeys.OVERALL]: TaskRunCounts;
[TaskRunMetricKeys.BY_TYPE]: {
[key: string]: TaskRunCounts;
};
}

export interface TaskRunMetric extends JsonObject {
overall: TaskRunMetrics['overall'] & {
delay: SerializedHistogram;
};
by_type: TaskRunMetrics['by_type'];
}

export class TaskRunMetricsAggregator implements ITaskMetricsAggregator<TaskRunMetric> {
private counter: MetricCounterService<TaskRunMetric> = new MetricCounterService(
Object.values(TaskRunKeys),
TaskRunMetricKeys.OVERALL
);
private delayHistogram = new SimpleHistogram(HDR_HISTOGRAM_MAX, HDR_HISTOGRAM_BUCKET_SIZE);

public initialMetric(): TaskRunMetric {
return this.counter.initialMetrics();
return merge(this.counter.initialMetrics(), {
by_type: {},
overall: { delay: { counts: [], values: [] } },
});
}

public collect(): TaskRunMetric {
return this.counter.collect();
return merge(this.counter.collect(), { overall: { delay: this.delayHistogram.serialize() } });
}

public reset() {
this.counter.reset();
this.delayHistogram.reset();
}

public processTaskLifecycleEvent(taskEvent: TaskLifecycleEvent) {
const { task, isExpired }: RanTask | ErroredTask = unwrap((taskEvent as TaskRun).event);
if (isTaskRunEvent(taskEvent)) {
this.processTaskRunEvent(taskEvent);
} else if (isTaskManagerStatEvent(taskEvent)) {
this.processTaskManagerStatEvent(taskEvent);
}
}

private processTaskRunEvent(taskEvent: TaskRun) {
const { task, isExpired }: RanTask | ErroredTask = unwrap(taskEvent.event);
const success = isOk((taskEvent as TaskRun).event);
const taskType = task.taskType.replaceAll('.', '__');
const taskTypeGroup = this.getTaskTypeGroup(taskType);
Expand All @@ -76,6 +108,13 @@ export class TaskRunMetricsAggregator implements ITaskMetricsAggregator<TaskRunM
}
}

private processTaskManagerStatEvent(taskEvent: TaskManagerStat) {
if (taskEvent.id === 'runDelay') {
const delayInSec = Math.round((taskEvent.event as Ok<number>).value);
this.delayHistogram.record(delayInSec);
}
}

private incrementCounters(key: TaskRunKeys, taskType: string, group?: string) {
this.counter.increment(key, TaskRunMetricKeys.OVERALL);
this.counter.increment(key, `${TaskRunMetricKeys.BY_TYPE}.${taskType}`);
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/server/task_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ export type TaskManagerStats =
| 'claimDuration'
| 'queuedEphemeralTasks'
| 'ephemeralTaskDelay'
| 'workerUtilization';
| 'workerUtilization'
| 'runDelay';
export type TaskManagerStat = TaskEvent<number, never, TaskManagerStats>;

export type OkResultOf<EventType> = EventType extends TaskEvent<infer OkResult, infer ErrorResult>
Expand Down
Loading

0 comments on commit 1574502

Please sign in to comment.