diff --git a/api/jobset/v1alpha2/jobset_types.go b/api/jobset/v1alpha2/jobset_types.go index 3cb54a8db..174291f69 100644 --- a/api/jobset/v1alpha2/jobset_types.go +++ b/api/jobset/v1alpha2/jobset_types.go @@ -42,6 +42,7 @@ const ( // job placement per topology group (defined as the label value). // If set at the ReplicatedJob level, all child jobs from the target ReplicatedJobs will be scheduled // using exclusive job placement per topology group. + // Exclusive placement is enforced within a priority level. ExclusiveKey string = "alpha.jobset.sigs.k8s.io/exclusive-topology" // NodeSelectorStrategyKey is an annotation that acts as a flag, the value does not matter. // If set, the JobSet controller will automatically inject nodeSelectors for the JobSetNameKey label to diff --git a/go.mod b/go.mod index 3f74e4ee6..afd0810ae 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module sigs.k8s.io/jobset -go 1.22.0 +go 1.22.8 require ( github.com/google/go-cmp v0.6.0 diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 094a63a67..e5c6498c3 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -28,6 +28,10 @@ const ( // the JobSet is currently on. RestartsKey = "jobset.sigs.k8s.io/restart-attempt" + // PriorityKey is a label key to record the pod priority. This is needed to enfroce exclusive placement + // only among jobs within the same priority. + PriorityKey = "jobset.sigs.k8s.io/priority" + // MaxParallelism defines the maximum number of parallel Job creations/deltions that // the JobSet controller can perform. MaxParallelism = 50 diff --git a/pkg/webhooks/pod_mutating_webhook.go b/pkg/webhooks/pod_mutating_webhook.go index f6c73f4af..3fdfd4802 100644 --- a/pkg/webhooks/pod_mutating_webhook.go +++ b/pkg/webhooks/pod_mutating_webhook.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" + "sigs.k8s.io/jobset/pkg/constants" ) // +kubebuilder:webhook:path=/mutate--v1-pod,mutating=true,failurePolicy=fail,groups="",resources=pods,verbs=create,versions=v1,name=mpod.kb.io,sideEffects=None,admissionReviewVersions=v1 @@ -82,6 +83,9 @@ func (p *podWebhook) Default(ctx context.Context, obj runtime.Object) error { // scheduled on. func (p *podWebhook) patchPod(ctx context.Context, pod *corev1.Pod) error { log := ctrl.LoggerFrom(ctx) + if pod.Spec.Priority != nil { + pod.Labels[constants.PriorityKey] = fmt.Sprint(*pod.Spec.Priority) + } if pod.Annotations[batchv1.JobCompletionIndexAnnotation] == "0" { log.V(3).Info(fmt.Sprintf("pod webhook: setting exclusive affinities for pod: %s", pod.Name)) setExclusiveAffinities(pod) @@ -128,6 +132,11 @@ func setExclusiveAffinities(pod *corev1.Pod) { Operator: metav1.LabelSelectorOpNotIn, Values: []string{pod.Labels[jobset.JobKey]}, }, + { + Key: constants.PriorityKey, + Operator: metav1.LabelSelectorOpIn, + Values: []string{pod.Labels[constants.PriorityKey]}, + }, }}, TopologyKey: pod.Annotations[jobset.ExclusiveKey], NamespaceSelector: &metav1.LabelSelector{},