Skip to content

Commit

Permalink
Add a metric that tracks the number of preemptions issued by a Cluste…
Browse files Browse the repository at this point in the history
…rQueue

* Merge metric for preempted workloads into ReportPreemption
* test helper
* Expect all preemption reasons
  • Loading branch information
vladikkuzn committed Jul 16, 2024
1 parent 5b1ee21 commit aaf0922
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 56 deletions.
11 changes: 6 additions & 5 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ The label 'reason' can have the following values:
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "preempted_workloads_total",
Help: `The number of preempted workloads per 'preemptor_cluster_queue',
Help: `The number of preempted workloads per 'preempting_cluster_queue',
The label 'reason' can have the following values:
- "InClusterQueue" means that the workload was preempted by a workload in the same ClusterQueue.
- "InCohortReclamation" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota.
- "InCohortFairSharing" means that the workload was preempted by a workload in the same cohort due to fair sharing.
- "InCohortReclaimWhileBorrowing" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota while borrowing.`,
}, []string{"preemptor_cluster_queue", "reason"},
}, []string{"preempting_cluster_queue", "reason"},
)

// Metrics tied to the cache.
Expand Down Expand Up @@ -276,8 +276,9 @@ func ReportEvictedWorkloads(cqName, reason string) {
EvictedWorkloadsTotal.WithLabelValues(cqName, reason).Inc()
}

func ReportPreemptedWorkloads(preemptorCqName, reason string) {
PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason).Inc()
func ReportPreemption(preemptingCqName, preemptingReason, targetCqName string) {
PreemptedWorkloadsTotal.WithLabelValues(preemptingCqName, preemptingReason).Inc()
ReportEvictedWorkloads(targetCqName, kueue.WorkloadEvictedByPreemption)
}

func ClearQueueSystemMetrics(cqName string) {
Expand All @@ -289,7 +290,7 @@ func ClearQueueSystemMetrics(cqName string) {
admissionWaitTime.DeleteLabelValues(cqName)
admissionChecksWaitTime.DeleteLabelValues(cqName)
EvictedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"cluster_queue": cqName})
PreemptedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"preemptor_cluster_queue": cqName})
PreemptedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"preempting_cluster_queue": cqName})
}

func ReportClusterQueueStatus(cqName string, cqStatus ClusterQueueStatus) {
Expand Down
22 changes: 14 additions & 8 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,25 @@ func TestReportAndCleanupClusterQueueEvictedNumber(t *testing.T) {
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 2, "cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1", "reason", "Preempted")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1", "reason", "Evicted")
// clear

ClearQueueSystemMetrics("cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1")
}

func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) {
ReportPreemptedWorkloads("cluster_queue1", "InCohortReclamation")
ReportPreemptedWorkloads("cluster_queue1", "InCohortFairSharing")
ReportPreemption("cluster_queue1", "InClusterQueue", "cluster_queue1")
ReportPreemption("cluster_queue1", "InCohortReclamation", "cluster_queue1")
ReportPreemption("cluster_queue1", "InCohortFairSharing", "cluster_queue1")
ReportPreemption("cluster_queue1", "InCohortReclaimWhileBorrowing", "cluster_queue1")

expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 4, "preempting_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InClusterQueue")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortFairSharing")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortReclaimWhileBorrowing")

expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 2, "preemptor_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortFairSharing")
// clear
ClearQueueSystemMetrics("cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preemptor_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preempting_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1")
}
6 changes: 2 additions & 4 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,9 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In
return
}

log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message)
metrics.ReportPreemptedWorkloads(preemptor.ClusterQueue, target.Reason)

log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message, "targetClusterQueue", klog.KRef("", target.WorkloadInfo.ClusterQueue))
p.recorder.Eventf(target.WorkloadInfo.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.WorkloadInfo.ClusterQueue, kueue.WorkloadEvictedByPreemption)
metrics.ReportPreemption(preemptor.ClusterQueue, target.Reason, target.WorkloadInfo.ClusterQueue)
} else {
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj))
}
Expand Down
48 changes: 26 additions & 22 deletions test/integration/scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ var _ = ginkgo.Describe("Preemption", func() {

util.FinishEvictionForWorkloads(ctx, k8sClient, lowWl1, lowWl2)
util.ExpectEvictedWorkloadsTotalMetric(cq.Name, kueue.WorkloadEvictedByPreemption, 2)
util.ExpectPreemptedWorkloadsTotalMetric(cq.Name, "InClusterQueue", 2)
util.ExpectPreemptedWorkloadsTotalMetric(cq.Name, preemption.InClusterQueueReason, 2)

util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, highWl2)
util.ExpectWorkloadsToBePending(ctx, k8sClient, lowWl1, lowWl2)
Expand Down Expand Up @@ -196,7 +196,7 @@ var _ = ginkgo.Describe("Preemption", func() {
ginkgo.Context("In a ClusterQueue that is part of a cohort", func() {
var (
alphaCQ, betaCQ, gammaCQ *kueue.ClusterQueue
alphaQ, betaQ, gammaQ *kueue.LocalQueue
alphaLQ, betaLQ, gammaLQ *kueue.LocalQueue
)

ginkgo.BeforeEach(func() {
Expand All @@ -209,16 +209,16 @@ var _ = ginkgo.Describe("Preemption", func() {
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, alphaCQ)).To(gomega.Succeed())
alphaQ = testing.MakeLocalQueue("alpha-q", ns.Name).ClusterQueue(alphaCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, alphaQ)).To(gomega.Succeed())
alphaLQ = testing.MakeLocalQueue("alpha-q", ns.Name).ClusterQueue(alphaCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, alphaLQ)).To(gomega.Succeed())

betaCQ = testing.MakeClusterQueue("beta-cq").
Cohort("all").
ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "2").Obj()).
Obj()
gomega.Expect(k8sClient.Create(ctx, betaCQ)).To(gomega.Succeed())
betaQ = testing.MakeLocalQueue("beta-q", ns.Name).ClusterQueue(betaCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, betaQ)).To(gomega.Succeed())
betaLQ = testing.MakeLocalQueue("beta-q", ns.Name).ClusterQueue(betaCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, betaLQ)).To(gomega.Succeed())

gammaCQ = testing.MakeClusterQueue("gamma-cq").
Cohort("all").
Expand All @@ -229,8 +229,8 @@ var _ = ginkgo.Describe("Preemption", func() {
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, gammaCQ)).To(gomega.Succeed())
gammaQ = testing.MakeLocalQueue("gamma-q", ns.Name).ClusterQueue(gammaCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, gammaQ)).To(gomega.Succeed())
gammaLQ = testing.MakeLocalQueue("gamma-q", ns.Name).ClusterQueue(gammaCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, gammaLQ)).To(gomega.Succeed())
})

ginkgo.AfterEach(func() {
Expand All @@ -244,20 +244,20 @@ var _ = ginkgo.Describe("Preemption", func() {
ginkgo.By("Creating workloads in beta-cq that borrow quota")

alphaLowWl := testing.MakeWorkload("alpha-low", ns.Name).
Queue(alphaQ.Name).
Queue(alphaLQ.Name).
Priority(lowPriority).
Request(corev1.ResourceCPU, "1").
Obj()
gomega.Expect(k8sClient.Create(ctx, alphaLowWl)).To(gomega.Succeed())

betaMidWl := testing.MakeWorkload("beta-mid", ns.Name).
Queue(betaQ.Name).
Queue(betaLQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "1").
Obj()
gomega.Expect(k8sClient.Create(ctx, betaMidWl)).To(gomega.Succeed())
betaHighWl := testing.MakeWorkload("beta-high", ns.Name).
Queue(betaQ.Name).
Queue(betaLQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "4").
Obj()
Expand All @@ -268,7 +268,7 @@ var _ = ginkgo.Describe("Preemption", func() {

ginkgo.By("Creating workload in alpha-cq to preempt workloads in both ClusterQueues")
alphaMidWl := testing.MakeWorkload("alpha-mid", ns.Name).
Queue(alphaQ.Name).
Queue(alphaLQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "2").
Obj()
Expand All @@ -284,6 +284,10 @@ var _ = ginkgo.Describe("Preemption", func() {
ginkgo.By("Verify the Preempted condition", func() {
util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InClusterQueueReason, metav1.ConditionTrue, alphaLowWl, alphaMidWl)
util.ExpectPreemptedCondition(ctx, k8sClient, preemption.InCohortReclaimWhileBorrowingReason, metav1.ConditionTrue, betaMidWl, alphaMidWl)
util.ExpectPreemptedWorkloadsTotalMetric(alphaCQ.Name, preemption.InClusterQueueReason, 1)
util.ExpectPreemptedWorkloadsTotalMetric(alphaCQ.Name, preemption.InCohortReclaimWhileBorrowingReason, 1)
util.ExpectPreemptedWorkloadsTotalMetric(betaCQ.Name, preemption.InClusterQueueReason, 0)
util.ExpectPreemptedWorkloadsTotalMetric(betaCQ.Name, preemption.InCohortReclaimWhileBorrowingReason, 0)
})

ginkgo.By("Verify the Preempted condition on re-admission, as the preemptor is finished", func() {
Expand Down Expand Up @@ -317,13 +321,13 @@ var _ = ginkgo.Describe("Preemption", func() {
ginkgo.By("Creating workloads in beta-cq that borrow quota")

alphaHighWl1 := testing.MakeWorkload("alpha-high-1", ns.Name).
Queue(alphaQ.Name).
Queue(alphaLQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, alphaHighWl1)).To(gomega.Succeed())
betaLowWl := testing.MakeWorkload("beta-low", ns.Name).
Queue(betaQ.Name).
Queue(betaLQ.Name).
Priority(lowPriority).
Request(corev1.ResourceCPU, "4").
Obj()
Expand All @@ -334,7 +338,7 @@ var _ = ginkgo.Describe("Preemption", func() {

ginkgo.By("Creating high priority workload in alpha-cq that doesn't fit without borrowing")
alphaHighWl2 := testing.MakeWorkload("alpha-high-2", ns.Name).
Queue(alphaQ.Name).
Queue(alphaLQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "2").
Obj()
Expand All @@ -350,13 +354,13 @@ var _ = ginkgo.Describe("Preemption", func() {
ginkgo.By("Creating workloads in beta-cq that borrow quota")

betaMidWl := testing.MakeWorkload("beta-mid", ns.Name).
Queue(betaQ.Name).
Queue(betaLQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "3").
Obj()
gomega.Expect(k8sClient.Create(ctx, betaMidWl)).To(gomega.Succeed())
betaHighWl := testing.MakeWorkload("beta-high", ns.Name).
Queue(betaQ.Name).
Queue(betaLQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "3").
Obj()
Expand All @@ -366,13 +370,13 @@ var _ = ginkgo.Describe("Preemption", func() {

ginkgo.By("Creating workload in alpha-cq and gamma-cq that need to preempt")
alphaMidWl := testing.MakeWorkload("alpha-mid", ns.Name).
Queue(alphaQ.Name).
Queue(alphaLQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "2").
Obj()

gammaMidWl := testing.MakeWorkload("gamma-mid", ns.Name).
Queue(gammaQ.Name).
Queue(gammaLQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "2").
Obj()
Expand Down Expand Up @@ -401,7 +405,7 @@ var _ = ginkgo.Describe("Preemption", func() {
var betaWls []*kueue.Workload
for i := 0; i < 3; i++ {
wl := testing.MakeWorkload(fmt.Sprintf("beta-%d", i), ns.Name).
Queue(betaQ.Name).
Queue(betaLQ.Name).
Request(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed())
Expand All @@ -412,13 +416,13 @@ var _ = ginkgo.Describe("Preemption", func() {
ginkgo.By("Creating preempting pods")

alphaWl := testing.MakeWorkload("alpha", ns.Name).
Queue(alphaQ.Name).
Queue(alphaLQ.Name).
Request(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, alphaWl)).To(gomega.Succeed())

gammaWl := testing.MakeWorkload("gamma", ns.Name).
Queue(gammaQ.Name).
Queue(gammaLQ.Name).
Request(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, gammaWl)).To(gomega.Succeed())
Expand Down
27 changes: 10 additions & 17 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -452,37 +453,29 @@ func ExpectReservingActiveWorkloadsMetric(cq *kueue.ClusterQueue, v int) {

func ExpectAdmittedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) {
metric := metrics.AdmittedWorkloadsTotal.WithLabelValues(cq.Name)
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
count, err := testutil.GetCounterMetricValue(metric)
g.Expect(err).ToNot(gomega.HaveOccurred())
g.Expect(int(count)).Should(gomega.Equal(v))
}, Timeout, Interval).Should(gomega.Succeed())
expectCounterMetric(metric, v)
}

func ExpectEvictedWorkloadsTotalMetric(cqName, reason string, v int) {
metric := metrics.EvictedWorkloadsTotal.WithLabelValues(cqName, reason)
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
count, err := testutil.GetCounterMetricValue(metric)
g.Expect(err).ToNot(gomega.HaveOccurred())
g.Expect(int(count)).Should(gomega.Equal(v))
}, Timeout, Interval).Should(gomega.Succeed())
expectCounterMetric(metric, v)
}

func ExpectPreemptedWorkloadsTotalMetric(preemptorCqName, reason string, v int) {
metric := metrics.PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason)
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
count, err := testutil.GetCounterMetricValue(metric)
g.Expect(err).ToNot(gomega.HaveOccurred())
g.Expect(int(count)).Should(gomega.Equal(v))
}, Timeout, Interval).Should(gomega.Succeed())
expectCounterMetric(metric, v)
}

func ExpectQuotaReservedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) {
metric := metrics.QuotaReservedWorkloadsTotal.WithLabelValues(cq.Name)
expectCounterMetric(metric, v)
}

func expectCounterMetric(metric prometheus.Counter, count int) {
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
count, err := testutil.GetCounterMetricValue(metric)
v, err := testutil.GetCounterMetricValue(metric)
g.Expect(err).ToNot(gomega.HaveOccurred())
g.Expect(int(count)).Should(gomega.Equal(v))
g.Expect(int(v)).Should(gomega.Equal(count))
}, Timeout, Interval).Should(gomega.Succeed())
}

Expand Down

0 comments on commit aaf0922

Please sign in to comment.