Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for 1:1 job per topology assignment #36

Merged
merged 12 commits into from
Apr 18, 2023
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ endif

.PHONY: install
install: manifests kustomize ## Install CRDs into the K8s cluster specified in ~/.kube/config.
$(KUSTOMIZE) build config/crd | kubectl apply -f -
$(KUSTOMIZE) build config/crd | kubectl create -f -

.PHONY: uninstall
uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion.
Expand Down
14 changes: 14 additions & 0 deletions api/v1alpha1/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
const (
JobIndexLabel string = "jobset.sigs.k8s.io/job-index"
RestartsLabel string = "jobset.sigs.k8s.io/restart-attempt"
JobNameKey string = "job-name" // TODO(#26): Migrate to the fully qualified label name.
)

type JobSetConditionType string
Expand Down Expand Up @@ -89,7 +90,11 @@ type ReplicatedJob struct {
// Jobs names will be in the format: <jobSet.name>-<spec.replicatedJob.name>-<job-index>
// +kubebuilder:default=1
Replicas int `json:"replicas,omitempty"`
// Exclusive defines that the jobs are 1:1 with the specified topology. This is enforced
// against all jobs, whether or not they are created by JobSet.
Exclusive *Exclusive `json:"exclusive,omitempty"`
}

type Network struct {
// EnableDNSHostnames allows pods to be reached via their hostnames.
// Pods will be reachable using the fully qualified pod hostname, which is in the format:
Expand All @@ -98,6 +103,15 @@ type Network struct {
EnableDNSHostnames *bool `json:"enableDNSHostnames,omitempty"`
}

type Exclusive struct {
// TopologyKey refers to the topology on which exclusive placement will be
// enforced (e.g., node, rack, zone etc.)
TopologyKey string `json:"topologyKey,omitempty"`
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
// A label query over the set of namespaces that exclusiveness applies to. Defaults to the job's namespace.
// An empty selector ({}) matches all namespaces.
NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"`
}

type TargetOperator string

// TerminationPolicyTargetAny applies to any job in the JobSet.
Expand Down
25 changes: 25 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions config/crd/bases/batch.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,65 @@ spec:
description: Jobs is the group of jobs that will form the set.
items:
properties:
exclusive:
description: Exclusive defines that the jobs are 1:1 with the
specified topology. This is enforced against all jobs, whether
or not they are created by JobSet.
properties:
namespaceSelector:
description: A label query over the set of namespaces that
exclusiveness applies to. Defaults to the job's namespace.
An empty selector ({}) matches all namespaces.
properties:
matchExpressions:
description: matchExpressions is a list of label selector
requirements. The requirements are ANDed.
items:
description: A label selector requirement is a selector
that contains values, a key, and an operator that
relates the key and values.
properties:
key:
description: key is the label key that the selector
applies to.
type: string
operator:
description: operator represents a key's relationship
to a set of values. Valid operators are In,
NotIn, Exists and DoesNotExist.
type: string
values:
description: values is an array of string values.
If the operator is In or NotIn, the values array
must be non-empty. If the operator is Exists
or DoesNotExist, the values array must be empty.
This array is replaced during a strategic merge
patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
matchLabels:
additionalProperties:
type: string
description: matchLabels is a map of {key,value} pairs.
A single {key,value} in the matchLabels map is equivalent
to an element of matchExpressions, whose key field
is "key", the operator is "In", and the values array
contains only "value". The requirements are ANDed.
type: object
type: object
x-kubernetes-map-type: atomic
topologyKey:
description: TopologyKey refers to the topology on which
exclusive placement will be enforced (e.g., node, rack,
zone etc.)
type: string
type: object
name:
description: Name is the name of the entry and will be used
as a suffix for the Job name.
Expand Down
56 changes: 54 additions & 2 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ func (r *JobSetReconciler) createHeadlessSvcIfNotExist(ctx context.Context, js *
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
// TODO: Migrate to the fully qualified label name.
"job-name": job.Name,
jobset.JobNameKey: job.Name,
},
},
}
Expand Down Expand Up @@ -398,10 +397,16 @@ func (r *JobSetReconciler) constructJob(js *jobset.JobSet, rjob *jobset.Replicat
job.Spec.Template.Spec.Subdomain = job.Name
}

// If this job should be exclusive per topology, set the pod affinities/anti-affinities accordingly.
if rjob.Exclusive != nil {
setExclusiveAffinities(job, rjob.Exclusive.TopologyKey, rjob.Exclusive.NamespaceSelector)
}

// Set controller owner reference for garbage collection and reconcilation.
if err := ctrl.SetControllerReference(js, job, r.Scheme); err != nil {
return nil, err
}

return job, nil
}

Expand Down Expand Up @@ -436,6 +441,53 @@ func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet) er
})
}

// Appends pod affinity/anti-affinity terms to the job pod template spec,
// ensuring that exclusively one job runs per topology domain and that all pods
// from each job land on the same topology domain.
func setExclusiveAffinities(job *batchv1.Job, topologyKey string, nsSelector *metav1.LabelSelector) {
if job.Spec.Template.Spec.Affinity == nil {
job.Spec.Template.Spec.Affinity = &corev1.Affinity{}
}
if job.Spec.Template.Spec.Affinity.PodAffinity == nil {
job.Spec.Template.Spec.Affinity.PodAffinity = &corev1.PodAffinity{}
}
if job.Spec.Template.Spec.Affinity.PodAntiAffinity == nil {
job.Spec.Template.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{}
}

// Pod affinity ensures the pods of this job land on the same topology domain.
job.Spec.Template.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(job.Spec.Template.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: jobset.JobNameKey,
Operator: metav1.LabelSelectorOpIn,
Values: []string{job.Name},
},
}},
TopologyKey: topologyKey,
NamespaceSelector: nsSelector,
})

// Pod anti-affinity ensures exclusively this job lands on the topology, preventing multiple jobs per topology domain.
job.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(job.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: jobset.JobNameKey,
Operator: metav1.LabelSelectorOpExists,
},
{
Key: jobset.JobNameKey,
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{job.Name},
},
}},
TopologyKey: topologyKey,
NamespaceSelector: nsSelector,
})
}

func isJobFinished(job *batchv1.Job) (bool, batchv1.JobConditionType) {
for _, c := range job.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
Expand Down
Loading