diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 5c50c6909e..45fdcb5051 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -152,13 +152,13 @@ The label 'reason' can have the following values: prometheus.CounterOpts{ Subsystem: constants.KueueName, Name: "preempted_workloads_total", - Help: `The number of preempted workloads per 'preemptor_cluster_queue', + Help: `The number of preempted workloads per 'preempting_cluster_queue', The label 'reason' can have the following values: - "InClusterQueue" means that the workload was preempted by a workload in the same ClusterQueue. - "InCohortReclamation" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota. - "InCohortFairSharing" means that the workload was preempted by a workload in the same cohort due to fair sharing. - "InCohortReclaimWhileBorrowing" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota while borrowing.`, - }, []string{"preemptor_cluster_queue", "reason"}, + }, []string{"preempting_cluster_queue", "reason"}, ) // Metrics tied to the cache. @@ -276,8 +276,9 @@ func ReportEvictedWorkloads(cqName, reason string) { EvictedWorkloadsTotal.WithLabelValues(cqName, reason).Inc() } -func ReportPreemptedWorkloads(preemptorCqName, reason string) { - PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason).Inc() +func ReportPreemption(preemptingCqName, preemptingReason, targetCqName string) { + PreemptedWorkloadsTotal.WithLabelValues(preemptingCqName, preemptingReason).Inc() + ReportEvictedWorkloads(targetCqName, kueue.WorkloadEvictedByPreemption) } func ClearQueueSystemMetrics(cqName string) { @@ -289,7 +290,7 @@ func ClearQueueSystemMetrics(cqName string) { admissionWaitTime.DeleteLabelValues(cqName) admissionChecksWaitTime.DeleteLabelValues(cqName) EvictedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"cluster_queue": cqName}) - PreemptedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"preemptor_cluster_queue": cqName}) + PreemptedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"preempting_cluster_queue": cqName}) } func ReportClusterQueueStatus(cqName string, cqStatus ClusterQueueStatus) { diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 2f41760ad3..fbc5bff4e5 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -154,19 +154,25 @@ func TestReportAndCleanupClusterQueueEvictedNumber(t *testing.T) { expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 2, "cluster_queue", "cluster_queue1") expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1", "reason", "Preempted") expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1", "reason", "Evicted") - // clear + ClearQueueSystemMetrics("cluster_queue1") expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1") } func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) { - ReportPreemptedWorkloads("cluster_queue1", "InCohortReclamation") - ReportPreemptedWorkloads("cluster_queue1", "InCohortFairSharing") + ReportPreemption("cluster_queue1", "InClusterQueue", "cluster_queue1") + ReportPreemption("cluster_queue1", "InCohortReclamation", "cluster_queue1") + ReportPreemption("cluster_queue1", "InCohortFairSharing", "cluster_queue1") + ReportPreemption("cluster_queue1", "InCohortReclaimWhileBorrowing", "cluster_queue1") + + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 4, "preempting_cluster_queue", "cluster_queue1") + expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InClusterQueue") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortFairSharing") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortReclaimWhileBorrowing") - expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 2, "preemptor_cluster_queue", "cluster_queue1") - expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation") - expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortFairSharing") - // clear ClearQueueSystemMetrics("cluster_queue1") - expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preemptor_cluster_queue", "cluster_queue1") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preempting_cluster_queue", "cluster_queue1") + expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1") } diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index 78c392d390..b3c15fec4f 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -205,11 +205,9 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In return } - log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message) - metrics.ReportPreemptedWorkloads(preemptor.ClusterQueue, target.Reason) - + log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message, "targetClusterQueue", klog.KRef("", target.WorkloadInfo.ClusterQueue)) p.recorder.Eventf(target.WorkloadInfo.Obj, corev1.EventTypeNormal, "Preempted", message) - metrics.ReportEvictedWorkloads(target.WorkloadInfo.ClusterQueue, kueue.WorkloadEvictedByPreemption) + metrics.ReportPreemption(preemptor.ClusterQueue, target.Reason, target.WorkloadInfo.ClusterQueue) } else { log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj)) } diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index f04a5c9a7f..5a49d62b1b 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -138,7 +138,7 @@ var _ = ginkgo.Describe("Preemption", func() { util.FinishEvictionForWorkloads(ctx, k8sClient, lowWl1, lowWl2) util.ExpectEvictedWorkloadsTotalMetric(cq.Name, kueue.WorkloadEvictedByPreemption, 2) - util.ExpectPreemptedWorkloadsTotalMetric(cq.Name, "InClusterQueue", 2) + util.ExpectPreemptedWorkloadsTotalMetric(cq.Name, preemption.InClusterQueueReason, 2) util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, highWl2) util.ExpectWorkloadsToBePending(ctx, k8sClient, lowWl1, lowWl2) @@ -284,6 +284,8 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.By("Verify the Preempted condition", func() { util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InClusterQueueReason, metav1.ConditionTrue, alphaLowWl, alphaMidWl) util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InCohortReclaimWhileBorrowingReason, metav1.ConditionTrue, betaMidWl, alphaMidWl) + //util.ExpectPreemptedWorkloadsTotalMetric(alphaQ.Name, preemption.InClusterQueueReason, 1) + //util.ExpectPreemptedWorkloadsTotalMetric(betaQ.Name, preemption.InCohortReclaimWhileBorrowingReason, 1) }) ginkgo.By("Verify the Preempted condition on re-admission, as the preemptor is finished", func() { diff --git a/test/util/util.go b/test/util/util.go index d984536fe9..dcedcfa032 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -27,6 +27,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" zaplog "go.uber.org/zap" "go.uber.org/zap/zapcore" batchv1 "k8s.io/api/batch/v1" @@ -452,37 +453,29 @@ func ExpectReservingActiveWorkloadsMetric(cq *kueue.ClusterQueue, v int) { func ExpectAdmittedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) { metric := metrics.AdmittedWorkloadsTotal.WithLabelValues(cq.Name) - gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { - count, err := testutil.GetCounterMetricValue(metric) - g.Expect(err).ToNot(gomega.HaveOccurred()) - g.Expect(int(count)).Should(gomega.Equal(v)) - }, Timeout, Interval).Should(gomega.Succeed()) + expectCounterMetric(metric, v) } func ExpectEvictedWorkloadsTotalMetric(cqName, reason string, v int) { metric := metrics.EvictedWorkloadsTotal.WithLabelValues(cqName, reason) - gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { - count, err := testutil.GetCounterMetricValue(metric) - g.Expect(err).ToNot(gomega.HaveOccurred()) - g.Expect(int(count)).Should(gomega.Equal(v)) - }, Timeout, Interval).Should(gomega.Succeed()) + expectCounterMetric(metric, v) } func ExpectPreemptedWorkloadsTotalMetric(preemptorCqName, reason string, v int) { metric := metrics.PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason) - gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { - count, err := testutil.GetCounterMetricValue(metric) - g.Expect(err).ToNot(gomega.HaveOccurred()) - g.Expect(int(count)).Should(gomega.Equal(v)) - }, Timeout, Interval).Should(gomega.Succeed()) + expectCounterMetric(metric, v) } func ExpectQuotaReservedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) { metric := metrics.QuotaReservedWorkloadsTotal.WithLabelValues(cq.Name) + expectCounterMetric(metric, v) +} + +func expectCounterMetric(metric prometheus.Counter, count int) { gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { - count, err := testutil.GetCounterMetricValue(metric) + v, err := testutil.GetCounterMetricValue(metric) g.Expect(err).ToNot(gomega.HaveOccurred()) - g.Expect(int(count)).Should(gomega.Equal(v)) + g.Expect(int(v)).Should(gomega.Equal(count)) }, Timeout, Interval).Should(gomega.Succeed()) }