diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index f66cf704b5..45fdcb5051 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -148,6 +148,19 @@ The label 'reason' can have the following values: }, []string{"cluster_queue", "reason"}, ) + PreemptedWorkloadsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: constants.KueueName, + Name: "preempted_workloads_total", + Help: `The number of preempted workloads per 'preempting_cluster_queue', +The label 'reason' can have the following values: +- "InClusterQueue" means that the workload was preempted by a workload in the same ClusterQueue. +- "InCohortReclamation" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota. +- "InCohortFairSharing" means that the workload was preempted by a workload in the same cohort due to fair sharing. +- "InCohortReclaimWhileBorrowing" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota while borrowing.`, + }, []string{"preempting_cluster_queue", "reason"}, + ) + // Metrics tied to the cache. ReservingActiveWorkloads = prometheus.NewGaugeVec( @@ -263,6 +276,11 @@ func ReportEvictedWorkloads(cqName, reason string) { EvictedWorkloadsTotal.WithLabelValues(cqName, reason).Inc() } +func ReportPreemption(preemptingCqName, preemptingReason, targetCqName string) { + PreemptedWorkloadsTotal.WithLabelValues(preemptingCqName, preemptingReason).Inc() + ReportEvictedWorkloads(targetCqName, kueue.WorkloadEvictedByPreemption) +} + func ClearQueueSystemMetrics(cqName string) { PendingWorkloads.DeleteLabelValues(cqName, PendingStatusActive) PendingWorkloads.DeleteLabelValues(cqName, PendingStatusInadmissible) @@ -272,6 +290,7 @@ func ClearQueueSystemMetrics(cqName string) { admissionWaitTime.DeleteLabelValues(cqName) admissionChecksWaitTime.DeleteLabelValues(cqName) EvictedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"cluster_queue": cqName}) + PreemptedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"preempting_cluster_queue": cqName}) } func ReportClusterQueueStatus(cqName string, cqStatus ClusterQueueStatus) { @@ -379,6 +398,7 @@ func Register() { quotaReservedWaitTime, AdmittedWorkloadsTotal, EvictedWorkloadsTotal, + PreemptedWorkloadsTotal, admissionWaitTime, admissionChecksWaitTime, ClusterQueueResourceUsage, diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index d5e948d612..fbc5bff4e5 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -154,7 +154,25 @@ func TestReportAndCleanupClusterQueueEvictedNumber(t *testing.T) { expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 2, "cluster_queue", "cluster_queue1") expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1", "reason", "Preempted") expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1", "reason", "Evicted") - // clear + + ClearQueueSystemMetrics("cluster_queue1") + expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1") +} + +func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) { + ReportPreemption("cluster_queue1", "InClusterQueue", "cluster_queue1") + ReportPreemption("cluster_queue1", "InCohortReclamation", "cluster_queue1") + ReportPreemption("cluster_queue1", "InCohortFairSharing", "cluster_queue1") + ReportPreemption("cluster_queue1", "InCohortReclaimWhileBorrowing", "cluster_queue1") + + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 4, "preempting_cluster_queue", "cluster_queue1") + expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InClusterQueue") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortFairSharing") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortReclaimWhileBorrowing") + ClearQueueSystemMetrics("cluster_queue1") + expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preempting_cluster_queue", "cluster_queue1") expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1") } diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index a3e0761f84..cd0b3ae30b 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -205,9 +205,9 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In return } - log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message) + log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message, "targetClusterQueue", klog.KRef("", target.WorkloadInfo.ClusterQueue)) p.recorder.Eventf(target.WorkloadInfo.Obj, corev1.EventTypeNormal, "Preempted", message) - metrics.ReportEvictedWorkloads(target.WorkloadInfo.ClusterQueue, kueue.WorkloadEvictedByPreemption) + metrics.ReportPreemption(preemptor.ClusterQueue, target.Reason, target.WorkloadInfo.ClusterQueue) } else { log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj)) } diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 7b5a0494db..c093a2889b 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -138,6 +138,7 @@ var _ = ginkgo.Describe("Preemption", func() { util.FinishEvictionForWorkloads(ctx, k8sClient, lowWl1, lowWl2) util.ExpectEvictedWorkloadsTotalMetric(cq.Name, kueue.WorkloadEvictedByPreemption, 2) + util.ExpectPreemptedWorkloadsTotalMetric(cq.Name, preemption.InClusterQueueReason, 2) util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, highWl2) util.ExpectWorkloadsToBePending(ctx, k8sClient, lowWl1, lowWl2) @@ -195,7 +196,7 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.Context("In a ClusterQueue that is part of a cohort", func() { var ( alphaCQ, betaCQ, gammaCQ *kueue.ClusterQueue - alphaQ, betaQ, gammaQ *kueue.LocalQueue + alphaLQ, betaLQ, gammaLQ *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -208,16 +209,16 @@ var _ = ginkgo.Describe("Preemption", func() { }). Obj() gomega.Expect(k8sClient.Create(ctx, alphaCQ)).To(gomega.Succeed()) - alphaQ = testing.MakeLocalQueue("alpha-q", ns.Name).ClusterQueue(alphaCQ.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, alphaQ)).To(gomega.Succeed()) + alphaLQ = testing.MakeLocalQueue("alpha-q", ns.Name).ClusterQueue(alphaCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, alphaLQ)).To(gomega.Succeed()) betaCQ = testing.MakeClusterQueue("beta-cq"). Cohort("all"). ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "2").Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, betaCQ)).To(gomega.Succeed()) - betaQ = testing.MakeLocalQueue("beta-q", ns.Name).ClusterQueue(betaCQ.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, betaQ)).To(gomega.Succeed()) + betaLQ = testing.MakeLocalQueue("beta-q", ns.Name).ClusterQueue(betaCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, betaLQ)).To(gomega.Succeed()) gammaCQ = testing.MakeClusterQueue("gamma-cq"). Cohort("all"). @@ -228,8 +229,8 @@ var _ = ginkgo.Describe("Preemption", func() { }). Obj() gomega.Expect(k8sClient.Create(ctx, gammaCQ)).To(gomega.Succeed()) - gammaQ = testing.MakeLocalQueue("gamma-q", ns.Name).ClusterQueue(gammaCQ.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, gammaQ)).To(gomega.Succeed()) + gammaLQ = testing.MakeLocalQueue("gamma-q", ns.Name).ClusterQueue(gammaCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, gammaLQ)).To(gomega.Succeed()) }) ginkgo.AfterEach(func() { @@ -243,20 +244,20 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.By("Creating workloads in beta-cq that borrow quota") alphaLowWl := testing.MakeWorkload("alpha-low", ns.Name). - Queue(alphaQ.Name). + Queue(alphaLQ.Name). Priority(lowPriority). Request(corev1.ResourceCPU, "1"). Obj() gomega.Expect(k8sClient.Create(ctx, alphaLowWl)).To(gomega.Succeed()) betaMidWl := testing.MakeWorkload("beta-mid", ns.Name). - Queue(betaQ.Name). + Queue(betaLQ.Name). Priority(midPriority). Request(corev1.ResourceCPU, "1"). Obj() gomega.Expect(k8sClient.Create(ctx, betaMidWl)).To(gomega.Succeed()) betaHighWl := testing.MakeWorkload("beta-high", ns.Name). - Queue(betaQ.Name). + Queue(betaLQ.Name). Priority(highPriority). Request(corev1.ResourceCPU, "4"). Obj() @@ -267,7 +268,7 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.By("Creating workload in alpha-cq to preempt workloads in both ClusterQueues") alphaMidWl := testing.MakeWorkload("alpha-mid", ns.Name). - Queue(alphaQ.Name). + Queue(alphaLQ.Name). Priority(midPriority). Request(corev1.ResourceCPU, "2"). Obj() @@ -283,6 +284,10 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.By("Verify the Preempted condition", func() { util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InClusterQueueReason, metav1.ConditionTrue, alphaLowWl, alphaMidWl) util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InCohortReclamationReason, metav1.ConditionTrue, betaMidWl, alphaMidWl) + util.ExpectPreemptedWorkloadsTotalMetric(alphaCQ.Name, preemption.InClusterQueueReason, 1) + util.ExpectPreemptedWorkloadsTotalMetric(alphaCQ.Name, preemption.InCohortReclamationReason, 1) + util.ExpectPreemptedWorkloadsTotalMetric(betaCQ.Name, preemption.InClusterQueueReason, 0) + util.ExpectPreemptedWorkloadsTotalMetric(betaCQ.Name, preemption.InCohortReclamationReason, 0) }) ginkgo.By("Verify the Preempted condition on re-admission, as the preemptor is finished", func() { @@ -316,13 +321,13 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.By("Creating workloads in beta-cq that borrow quota") alphaHighWl1 := testing.MakeWorkload("alpha-high-1", ns.Name). - Queue(alphaQ.Name). + Queue(alphaLQ.Name). Priority(highPriority). Request(corev1.ResourceCPU, "2"). Obj() gomega.Expect(k8sClient.Create(ctx, alphaHighWl1)).To(gomega.Succeed()) betaLowWl := testing.MakeWorkload("beta-low", ns.Name). - Queue(betaQ.Name). + Queue(betaLQ.Name). Priority(lowPriority). Request(corev1.ResourceCPU, "4"). Obj() @@ -333,7 +338,7 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.By("Creating high priority workload in alpha-cq that doesn't fit without borrowing") alphaHighWl2 := testing.MakeWorkload("alpha-high-2", ns.Name). - Queue(alphaQ.Name). + Queue(alphaLQ.Name). Priority(highPriority). Request(corev1.ResourceCPU, "2"). Obj() @@ -349,13 +354,13 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.By("Creating workloads in beta-cq that borrow quota") betaMidWl := testing.MakeWorkload("beta-mid", ns.Name). - Queue(betaQ.Name). + Queue(betaLQ.Name). Priority(midPriority). Request(corev1.ResourceCPU, "3"). Obj() gomega.Expect(k8sClient.Create(ctx, betaMidWl)).To(gomega.Succeed()) betaHighWl := testing.MakeWorkload("beta-high", ns.Name). - Queue(betaQ.Name). + Queue(betaLQ.Name). Priority(highPriority). Request(corev1.ResourceCPU, "3"). Obj() @@ -365,13 +370,13 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.By("Creating workload in alpha-cq and gamma-cq that need to preempt") alphaMidWl := testing.MakeWorkload("alpha-mid", ns.Name). - Queue(alphaQ.Name). + Queue(alphaLQ.Name). Priority(midPriority). Request(corev1.ResourceCPU, "2"). Obj() gammaMidWl := testing.MakeWorkload("gamma-mid", ns.Name). - Queue(gammaQ.Name). + Queue(gammaLQ.Name). Priority(midPriority). Request(corev1.ResourceCPU, "2"). Obj() @@ -400,7 +405,7 @@ var _ = ginkgo.Describe("Preemption", func() { var betaWls []*kueue.Workload for i := 0; i < 3; i++ { wl := testing.MakeWorkload(fmt.Sprintf("beta-%d", i), ns.Name). - Queue(betaQ.Name). + Queue(betaLQ.Name). Request(corev1.ResourceCPU, "2"). Obj() gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) @@ -411,13 +416,13 @@ var _ = ginkgo.Describe("Preemption", func() { ginkgo.By("Creating preempting pods") alphaWl := testing.MakeWorkload("alpha", ns.Name). - Queue(alphaQ.Name). + Queue(alphaLQ.Name). Request(corev1.ResourceCPU, "2"). Obj() gomega.Expect(k8sClient.Create(ctx, alphaWl)).To(gomega.Succeed()) gammaWl := testing.MakeWorkload("gamma", ns.Name). - Queue(gammaQ.Name). + Queue(gammaLQ.Name). Request(corev1.ResourceCPU, "2"). Obj() gomega.Expect(k8sClient.Create(ctx, gammaWl)).To(gomega.Succeed()) @@ -689,6 +694,8 @@ var _ = ginkgo.Describe("Preemption", func() { gomega.Expect(k8sClient.Create(ctx, aStandardVeryHighWl)).To(gomega.Succeed()) util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InCohortReclaimWhileBorrowingReason, metav1.ConditionTrue, aBestEffortLowWl, aStandardVeryHighWl) + util.ExpectPreemptedWorkloadsTotalMetric(aStandardCQ.Name, preemption.InCohortReclaimWhileBorrowingReason, 1) + util.ExpectPreemptedWorkloadsTotalMetric(aBestEffortCQ.Name, preemption.InCohortReclaimWhileBorrowingReason, 0) ginkgo.By("Finish eviction fo the a-best-effort-low workload") util.FinishEvictionForWorkloads(ctx, k8sClient, aBestEffortLowWl) diff --git a/test/util/util.go b/test/util/util.go index f9c5717f11..dcedcfa032 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -27,6 +27,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" zaplog "go.uber.org/zap" "go.uber.org/zap/zapcore" batchv1 "k8s.io/api/batch/v1" @@ -452,28 +453,29 @@ func ExpectReservingActiveWorkloadsMetric(cq *kueue.ClusterQueue, v int) { func ExpectAdmittedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) { metric := metrics.AdmittedWorkloadsTotal.WithLabelValues(cq.Name) - gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { - count, err := testutil.GetCounterMetricValue(metric) - g.Expect(err).ToNot(gomega.HaveOccurred()) - g.Expect(int(count)).Should(gomega.Equal(v)) - }, Timeout, Interval).Should(gomega.Succeed()) + expectCounterMetric(metric, v) } -func ExpectEvictedWorkloadsTotalMetric(cqName string, reason string, v int) { +func ExpectEvictedWorkloadsTotalMetric(cqName, reason string, v int) { metric := metrics.EvictedWorkloadsTotal.WithLabelValues(cqName, reason) - gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { - count, err := testutil.GetCounterMetricValue(metric) - g.Expect(err).ToNot(gomega.HaveOccurred()) - g.Expect(int(count)).Should(gomega.Equal(v)) - }, Timeout, Interval).Should(gomega.Succeed()) + expectCounterMetric(metric, v) +} + +func ExpectPreemptedWorkloadsTotalMetric(preemptorCqName, reason string, v int) { + metric := metrics.PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason) + expectCounterMetric(metric, v) } func ExpectQuotaReservedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) { metric := metrics.QuotaReservedWorkloadsTotal.WithLabelValues(cq.Name) + expectCounterMetric(metric, v) +} + +func expectCounterMetric(metric prometheus.Counter, count int) { gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { - count, err := testutil.GetCounterMetricValue(metric) + v, err := testutil.GetCounterMetricValue(metric) g.Expect(err).ToNot(gomega.HaveOccurred()) - g.Expect(int(count)).Should(gomega.Equal(v)) + g.Expect(int(v)).Should(gomega.Equal(count)) }, Timeout, Interval).Should(gomega.Succeed()) }