From 91bc308ba7eaab5937143d63d97fb473ad48dca7 Mon Sep 17 00:00:00 2001 From: vladikkuzn Date: Fri, 5 Jul 2024 09:38:31 +0300 Subject: [PATCH] Add a metric that tracks the number of preemptions issued by a ClusterQueue * Metric for preempted workloads --- pkg/metrics/metrics.go | 19 +++++++++++++++++++ pkg/metrics/metrics_test.go | 12 ++++++++++++ pkg/scheduler/preemption/preemption.go | 2 ++ test/integration/scheduler/preemption_test.go | 1 + test/util/util.go | 11 ++++++++++- 5 files changed, 44 insertions(+), 1 deletion(-) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 23c1c3c260..44472ceb57 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -148,6 +148,19 @@ The label 'reason' can have the following values: }, []string{"cluster_queue", "reason"}, ) + PreemptedWorkloadsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: constants.KueueName, + Name: "preempted_workloads_total", + Help: `The number of preempted workloads per 'preemptor_cluster_queue', +The label 'reason' can have the following values: +- "InClusterQueue" means that the workload was preempted by a workload in the same ClusterQueue. +- "InCohortReclamation" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota. +- "InCohortFairSharing" means that the workload was preempted by a workload in the same cohort due to fair sharing. +- "InCohortReclaimWhileBorrowing" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota while borrowing.`, + }, []string{"preemptor_cluster_queue", "reason"}, + ) + // Metrics tied to the cache. ReservingActiveWorkloads = prometheus.NewGaugeVec( @@ -262,6 +275,10 @@ func ReportEvictedWorkloads(cqName, reason string) { EvictedWorkloadsTotal.WithLabelValues(cqName, reason).Inc() } +func ReportPreemptedWorkloads(preemptorCqName, reason string) { + PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason).Inc() +} + func ClearQueueSystemMetrics(cqName string) { PendingWorkloads.DeleteLabelValues(cqName, PendingStatusActive) PendingWorkloads.DeleteLabelValues(cqName, PendingStatusInadmissible) @@ -271,6 +288,7 @@ func ClearQueueSystemMetrics(cqName string) { admissionWaitTime.DeleteLabelValues(cqName) admissionChecksWaitTime.DeleteLabelValues(cqName) EvictedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"cluster_queue": cqName}) + PreemptedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"preemptor_cluster_queue": cqName}) } func ReportClusterQueueStatus(cqName string, cqStatus ClusterQueueStatus) { @@ -378,6 +396,7 @@ func Register() { quotaReservedWaitTime, AdmittedWorkloadsTotal, EvictedWorkloadsTotal, + PreemptedWorkloadsTotal, admissionWaitTime, admissionChecksWaitTime, ClusterQueueResourceUsage, diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index d5e948d612..2f41760ad3 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -158,3 +158,15 @@ func TestReportAndCleanupClusterQueueEvictedNumber(t *testing.T) { ClearQueueSystemMetrics("cluster_queue1") expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1") } + +func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) { + ReportPreemptedWorkloads("cluster_queue1", "InCohortReclamation") + ReportPreemptedWorkloads("cluster_queue1", "InCohortFairSharing") + + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 2, "preemptor_cluster_queue", "cluster_queue1") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortFairSharing") + // clear + ClearQueueSystemMetrics("cluster_queue1") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preemptor_cluster_queue", "cluster_queue1") +} diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index 91d72b00c5..3003539b52 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -187,6 +187,8 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In } log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Obj), "reason", reason, "message", message) + metrics.ReportPreemptedWorkloads(preemptor.ClusterQueue, reason) + p.recorder.Eventf(target.Obj, corev1.EventTypeNormal, "Preempted", message) metrics.ReportEvictedWorkloads(target.ClusterQueue, kueue.WorkloadEvictedByPreemption) } else { diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 980b639000..39ff46508e 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -137,6 +137,7 @@ var _ = ginkgo.Describe("Preemption", func() { util.FinishEvictionForWorkloads(ctx, k8sClient, lowWl1, lowWl2) util.ExpectEvictedWorkloadsTotalMetric(cq.Name, kueue.WorkloadEvictedByPreemption, 2) + util.ExpectPreemptedWorkloadsTotalMetric(cq.Name, "InClusterQueue", 2) util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, highWl2) util.ExpectWorkloadsToBePending(ctx, k8sClient, lowWl1, lowWl2) diff --git a/test/util/util.go b/test/util/util.go index 3e26c8dbf9..f45a449e5c 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -495,7 +495,7 @@ func ExpectAdmittedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) { }, Timeout, Interval).Should(gomega.Succeed()) } -func ExpectEvictedWorkloadsTotalMetric(cqName string, reason string, v int) { +func ExpectEvictedWorkloadsTotalMetric(cqName, reason string, v int) { metric := metrics.EvictedWorkloadsTotal.WithLabelValues(cqName, reason) gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { count, err := testutil.GetCounterMetricValue(metric) @@ -504,6 +504,15 @@ func ExpectEvictedWorkloadsTotalMetric(cqName string, reason string, v int) { }, Timeout, Interval).Should(gomega.Succeed()) } +func ExpectPreemptedWorkloadsTotalMetric(preemptorCqName, reason string, v int) { + metric := metrics.PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason) + gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { + count, err := testutil.GetCounterMetricValue(metric) + g.Expect(err).ToNot(gomega.HaveOccurred()) + g.Expect(int(count)).Should(gomega.Equal(v)) + }, Timeout, Interval).Should(gomega.Succeed()) +} + func ExpectQuotaReservedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) { metric := metrics.QuotaReservedWorkloadsTotal.WithLabelValues(cq.Name) gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {