Skip to content

Commit

Permalink
Add a metric that tracks the number of preemptions issued by a Cluste…
Browse files Browse the repository at this point in the history
…rQueue

* Merge metric for preempted workloads into ReportPreemption
* test helper
  • Loading branch information
vladikkuzn committed Jul 12, 2024
1 parent 5b1ee21 commit 98f2671
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 30 deletions.
5 changes: 3 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,9 @@ func ReportEvictedWorkloads(cqName, reason string) {
EvictedWorkloadsTotal.WithLabelValues(cqName, reason).Inc()
}

func ReportPreemptedWorkloads(preemptorCqName, reason string) {
PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason).Inc()
func ReportPreemption(preemptingCqName, preemptingReason, targetCqName string) {
PreemptedWorkloadsTotal.WithLabelValues(preemptingCqName, preemptingReason).Inc()
ReportEvictedWorkloads(targetCqName, kueue.WorkloadEvictedByPreemption)
}

func ClearQueueSystemMetrics(cqName string) {
Expand Down
20 changes: 13 additions & 7 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,25 @@ func TestReportAndCleanupClusterQueueEvictedNumber(t *testing.T) {
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 2, "cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1", "reason", "Preempted")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1", "reason", "Evicted")
// clear

ClearQueueSystemMetrics("cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1")
}

func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) {
ReportPreemptedWorkloads("cluster_queue1", "InCohortReclamation")
ReportPreemptedWorkloads("cluster_queue1", "InCohortFairSharing")

expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 2, "preemptor_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation")
ReportPreemption("cluster_queue1", "InClusterQueue", "cluster_queue1")
ReportPreemption("cluster_queue1", "InCohortReclamation", "cluster_queue1")
ReportPreemption("cluster_queue1", "InCohortFairSharing", "cluster_queue1")
ReportPreemption("cluster_queue1", "InCohortReclaimWhileBorrowing", "cluster_queue1")

expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 4, "preemptor_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InClusterQueue")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortFairSharing")
// clear
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preemptor_cluster_queue", "cluster_queue1", "reason", "InCohortReclaimWhileBorrowing")

ClearQueueSystemMetrics("cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preemptor_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 0, "cluster_queue", "cluster_queue1")
}
6 changes: 2 additions & 4 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,9 @@ func (p *Preemptor) IssuePreemptions(ctx context.Context, preemptor *workload.In
return
}

log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message)
metrics.ReportPreemptedWorkloads(preemptor.ClusterQueue, target.Reason)

log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj), "reason", target.Reason, "message", message, "targetClusterQueue", klog.KRef("", target.WorkloadInfo.ClusterQueue))
p.recorder.Eventf(target.WorkloadInfo.Obj, corev1.EventTypeNormal, "Preempted", message)
metrics.ReportEvictedWorkloads(target.WorkloadInfo.ClusterQueue, kueue.WorkloadEvictedByPreemption)
metrics.ReportPreemption(preemptor.ClusterQueue, target.Reason, target.WorkloadInfo.ClusterQueue)
} else {
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.WorkloadInfo.Obj))
}
Expand Down
27 changes: 10 additions & 17 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -452,37 +453,29 @@ func ExpectReservingActiveWorkloadsMetric(cq *kueue.ClusterQueue, v int) {

func ExpectAdmittedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) {
metric := metrics.AdmittedWorkloadsTotal.WithLabelValues(cq.Name)
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
count, err := testutil.GetCounterMetricValue(metric)
g.Expect(err).ToNot(gomega.HaveOccurred())
g.Expect(int(count)).Should(gomega.Equal(v))
}, Timeout, Interval).Should(gomega.Succeed())
expectCounterMetric(metric, v)
}

func ExpectEvictedWorkloadsTotalMetric(cqName, reason string, v int) {
metric := metrics.EvictedWorkloadsTotal.WithLabelValues(cqName, reason)
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
count, err := testutil.GetCounterMetricValue(metric)
g.Expect(err).ToNot(gomega.HaveOccurred())
g.Expect(int(count)).Should(gomega.Equal(v))
}, Timeout, Interval).Should(gomega.Succeed())
expectCounterMetric(metric, v)
}

func ExpectPreemptedWorkloadsTotalMetric(preemptorCqName, reason string, v int) {
metric := metrics.PreemptedWorkloadsTotal.WithLabelValues(preemptorCqName, reason)
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
count, err := testutil.GetCounterMetricValue(metric)
g.Expect(err).ToNot(gomega.HaveOccurred())
g.Expect(int(count)).Should(gomega.Equal(v))
}, Timeout, Interval).Should(gomega.Succeed())
expectCounterMetric(metric, v)
}

func ExpectQuotaReservedWorkloadsTotalMetric(cq *kueue.ClusterQueue, v int) {
metric := metrics.QuotaReservedWorkloadsTotal.WithLabelValues(cq.Name)
expectCounterMetric(metric, v)
}

func expectCounterMetric(metric prometheus.Counter, count int) {
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
count, err := testutil.GetCounterMetricValue(metric)
v, err := testutil.GetCounterMetricValue(metric)
g.Expect(err).ToNot(gomega.HaveOccurred())
g.Expect(int(count)).Should(gomega.Equal(v))
g.Expect(int(v)).Should(gomega.Equal(count))
}, Timeout, Interval).Should(gomega.Succeed())
}

Expand Down

0 comments on commit 98f2671

Please sign in to comment.