Skip to content

Commit

Permalink
Add a metric that tracks the number of preemptions issued by a Cluste…
Browse files Browse the repository at this point in the history
…rQueue

* Metric for preempted workloads
  • Loading branch information
vladikkuzn committed Jul 8, 2024
1 parent 81ad017 commit 91bc308
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 1 deletion.
19 changes: 19 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ The label 'reason' can have the following values:
}, []string{"cluster_queue", "reason"},
)

PreemptedWorkloadsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "preempted_workloads_total",
Help: `The number of preempted workloads per 'preemptor_cluster_queue',
The label 'reason' can have the following values:
- "InClusterQueue" means that the workload was preempted by a workload in the same ClusterQueue.
- "InCohortReclamation" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota.
- "InCohortFairSharing" means that the workload was preempted by a workload in the same cohort due to fair sharing.
- "InCohortReclaimWhileBorrowing" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota while borrowing.`,
}, []string{"preemptor_cluster_queue", "reason"},
)

// Metrics tied to the cache.

ReservingActiveWorkloads = prometheus.NewGaugeVec(
Expand Down Expand Up @@ -262,6 +275,10 @@ func ReportEvictedWorkloads(cqName, reason string) {
EvictedWorkloadsTotal.WithLabelValues(cqName, reason).Inc()
}

func ReportPreemptedWorkloads(preemptorCqName, reason string) {
PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason).Inc()
}

func ClearQueueSystemMetrics(cqName string) {
PendingWorkloads.DeleteLabelValues(cqName, PendingStatusActive)
PendingWorkloads.DeleteLabelValues(cqName, PendingStatusInadmissible)
Expand All @@ -271,6 +288,7 @@ func ClearQueueSystemMetrics(cqName string) {
admissionWaitTime.DeleteLabelValues(cqName)
admissionChecksWaitTime.DeleteLabelValues(cqName)
EvictedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"cluster_queue": cqName})
PreemptedWorkloadsTotal.DeletePartialMatch(prometheus.Labels{"preemptor_cluster_queue": cqName})
}

func ReportClusterQueueStatus(cqName string, cqStatus ClusterQueueStatus) {
Expand Down Expand Up @@ -378,6 +396,7 @@ func Register() {
quotaReservedWaitTime,
AdmittedWorkloadsTotal,
EvictedWorkloadsTotal,
PreemptedWorkloadsTotal,
admissionWaitTime,
admissionChecksWaitTime,
ClusterQueueResourceUsage,
Expand Down
12 changes: 12 additions & 0 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,15 @@ func TestReportAndCleanupClusterQueueEvictedNumber(t *testing.T) {
ClearQueueSystemMetrics("cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1")
}

func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) {
ReportPreemptedWorkloads("cluster_queue1", "InCohortReclamation")
ReportPreemptedWorkloads("cluster_queue1", "InCohortFairSharing")

expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 2, "preemptor_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortFairSharing")
// clear
ClearQueueSystemMetrics("cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preemptor_cluster_queue", "cluster_queue1")
}
2 changes: 2 additions & 0 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In
}

log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Obj), "reason", reason, "message", message)
metrics.ReportPreemptedWorkloads(preemptor.ClusterQueue, reason)

p.recorder.Eventf(target.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.ClusterQueue, kueue.WorkloadEvictedByPreemption)
} else {
Expand Down
1 change: 1 addition & 0 deletions test/integration/scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ var _ = ginkgo.Describe("Preemption", func() {

util.FinishEvictionForWorkloads(ctx, k8sClient, lowWl1, lowWl2)
util.ExpectEvictedWorkloadsTotalMetric(cq.Name, kueue.WorkloadEvictedByPreemption, 2)
util.ExpectPreemptedWorkloadsTotalMetric(cq.Name, "InClusterQueue", 2)

util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, highWl2)
util.ExpectWorkloadsToBePending(ctx, k8sClient, lowWl1, lowWl2)
Expand Down
11 changes: 10 additions & 1 deletion test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func ExpectAdmittedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) {
}, Timeout, Interval).Should(gomega.Succeed())
}

func ExpectEvictedWorkloadsTotalMetric(cqName string, reason string, v int) {
func ExpectEvictedWorkloadsTotalMetric(cqName, reason string, v int) {
metric := metrics.EvictedWorkloadsTotal.WithLabelValues(cqName, reason)
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
count, err := testutil.GetCounterMetricValue(metric)
Expand All @@ -504,6 +504,15 @@ func ExpectEvictedWorkloadsTotalMetric(cqName string, reason string, v int) {
}, Timeout, Interval).Should(gomega.Succeed())
}

func ExpectPreemptedWorkloadsTotalMetric(preemptorCqName, reason string, v int) {
metric := metrics.PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason)
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
count, err := testutil.GetCounterMetricValue(metric)
g.Expect(err).ToNot(gomega.HaveOccurred())
g.Expect(int(count)).Should(gomega.Equal(v))
}, Timeout, Interval).Should(gomega.Succeed())
}

func ExpectQuotaReservedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) {
metric := metrics.QuotaReservedWorkloadsTotal.WithLabelValues(cq.Name)
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
Expand Down

0 comments on commit 91bc308

Please sign in to comment.