Skip to content

Commit

Permalink
Merge pull request #311 from danielvegamyhre/label
Browse files Browse the repository at this point in the history
Add node selector strategy for exclusive job placement per topology
  • Loading branch information
k8s-ci-robot authored Sep 28, 2023
2 parents f793962 + 7ba3f58 commit 31b0262
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 3 deletions.
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ const (
JobKey string = "jobset.sigs.k8s.io/job-key"
JobNameKey string = "job-name" // TODO(#26): Migrate to the fully qualified label name.
ExclusiveKey string = "alpha.jobset.sigs.k8s.io/exclusive-topology"
// NodeSelectorStrategyKey is an annotation that acts as a flag, the value does not matter.
// If set, the JobSet controller will automatically inject nodeSelectors for the JobSetNameKey label to
// ensure exclusive job placement per topology, instead of injecting pod affinity/anti-affinites for this.
// The user must add the JobSet name node label to the desired topologies separately.
NodeSelectorStrategyKey string = "alpha.jobset.sigs.k8s.io/node-selector"
NamespacedJobKey string = "alpha.jobset.sigs.k8s.io/namespaced-job"
NoScheduleTaintKey string = "alpha.jobset.sigs.k8s.io/no-schedule"
)

type JobSetConditionType string
Expand Down
128 changes: 128 additions & 0 deletions hack/label_nodes/label_nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/python3
'''
This script can be used to add certain labels and taints
to a given list of GKE node pool, in order to dramatically
speed up scheduling of pods for very large JobSets which are
using an exclusive placement strategy of one job per node pool.
usage: label_nodes.py [-h] jobset nodepools
positional arguments:
jobset jobset yaml config
nodepools comma separated list of nodepool names
The way it works is by generating the list of namespaced job names
that will be created by the JobSet, and creating a 1:1 mapping of
job to node pool. For every given node pool, it looks up the job
mapped to that node pool, and applies a namespaced job name node label
to every node in that node pool, allowing the job nodeSelectors injected
by the JobSet controller to select those nodes to schedule the job pods on.
It also adds a NoSchedule taint to the nodes (tolerated by the job pods),
preventing any external workloads from running on the nodes. Together,
these allow for exclusive placement of 1 job per GKE node pool.
'''

import sys
import yaml
import argparse
import kubernetes.client
import kubernetes.config

NAMESPACED_JOB_KEY = "alpha.jobset.sigs.k8s.io/namespaced-job"
NODE_POOL_KEY = "cloud.google.com/gke-nodepool"
NO_SCHEDULE_TAINT_KEY = "alpha.jobset.sigs.k8s.io/no-schedule"

def main(node_pools: str, jobset_yaml: str) -> None:
namespaced_jobs = generate_namespaced_jobs(jobset_yaml)

# Get target set of node pools to label from user input.
target_node_pools = parse_node_pools(node_pools)

# Validate we have 1 job per node pool.
if len(namespaced_jobs) != len(target_node_pools):
raise ValueError(f"number of node pools ({len(target_node_pools)}) does not match number of child jobs ({len(namespaced_jobs)})")

# Map each job to a node pool.
job_mapping = generate_job_mapping(namespaced_jobs, target_node_pools)

# Load the Kubernetes configuration from the default location.
kubernetes.config.load_kube_config()

# Create a new Kubernetes client.
client = kubernetes.client.CoreV1Api()

# Get a list of all the nodes in the cluster.
nodes = client.list_node().items

# Add the label to each node that is part of a target node pool.
for node in nodes:
node_pool = node.metadata.labels[NODE_POOL_KEY]
if node_pool not in target_node_pools:
continue

# Look up the job this node pool is assigned and add it as a label.
body = {
"metadata": {
"labels": {
NAMESPACED_JOB_KEY: job_mapping[node_pool],
},
},
"spec": {
"taints": [
{
"key": NO_SCHEDULE_TAINT_KEY,
"value": "true",
"effect": "NoSchedule"
}
],
},
}

client.patch_node(node.metadata.name, body)
print(f"Successfully added label {NAMESPACED_JOB_KEY}={job_mapping[node_pool]} to node {node.metadata.name}")
print(f"Successfully added taint key={NO_SCHEDULE_TAINT_KEY}, value=true, effect=NoSchedule to node {node.metadata.name}")


def parse_node_pools(node_pools: str) -> set:
'''Parse comma separated list of node pools into a set, and confirm with user before continuing.'''
node_pools_set = set(node_pools.split(','))

print("Adding namespaced job name labels to the following node pools:")
for np in node_pools_set:
print(np)

input('Once confirmed, hit any key to continue, or ctrl+C to exit.')
return node_pools_set


def generate_namespaced_jobs(jobset_yaml: str) -> list:
'''Generate list of namespaced jobs from the jobset config.'''
with open(jobset_yaml, 'r') as f:
jobset = yaml.safe_load(f.read())

jobset_name = jobset['metadata']['name']
namespace = jobset['metadata'].get('namespace', 'default') # default namespace if unspecified

jobs = []
for replicated_job in jobset['spec']['replicatedJobs']:
replicas = int(replicated_job.get('replicas', '1')) # replicas defaults to 1 if unspecified
for job_idx in range(replicas):
jobs.append(f"{namespace}_{jobset_name}-{replicated_job['name']}-{job_idx}")
return jobs


def generate_job_mapping(namespaced_jobs: list[str], node_pools: set) -> dict:
'''Map each job to a node pool, and return this mapping in a dictionary.'''
mapping = {}
for np in node_pools:
mapping[np] = namespaced_jobs.pop()
return mapping


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("jobset", help="jobset yaml config")
parser.add_argument("nodepools", help="comma separated list of nodepool names")
args = parser.parse_args()
main(args.nodepools, args.jobset)
2 changes: 2 additions & 0 deletions hack/label_nodes/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
kubernetes
pyyaml
39 changes: 36 additions & 3 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,10 +594,20 @@ func constructJob(js *jobset.JobSet, rjob *jobset.ReplicatedJob, jobIdx int) (*b
job.Spec.Template.Spec.Subdomain = js.Spec.Network.Subdomain
}

// If this job should be exclusive per topology, set the pod affinities/anti-affinities accordingly.
// If this job should be exclusive per topology, configure the scheduling constraints accordingly.
if topologyDomain, ok := js.Annotations[jobset.ExclusiveKey]; ok {
setExclusiveAffinities(job, topologyDomain)
// If user has set the nodeSelectorStrategy annotation flag, add the job name label as a
// nodeSelector, and add a toleration for the no schedule taint.
// The node label and node taint must be added separately by a user/script.
if _, exists := js.Annotations[jobset.NodeSelectorStrategyKey]; exists {
addNodeSelector(job)
addTaintToleration(job)
} else {
// Otherwise, default to using exclusive pod affinities/anti-affinities strategy.
setExclusiveAffinities(job, topologyDomain)
}
}

// if Suspend is set, then we assume all jobs will be suspended also.
jobsetSuspended := js.Spec.Suspend != nil && *js.Spec.Suspend
job.Spec.Suspend = ptr.To(jobsetSuspended)
Expand Down Expand Up @@ -652,6 +662,23 @@ func setExclusiveAffinities(job *batchv1.Job, topologyKey string) {
})
}

func addNodeSelector(job *batchv1.Job) {
if job.Spec.Template.Spec.NodeSelector == nil {
job.Spec.Template.Spec.NodeSelector = make(map[string]string)
}
job.Spec.Template.Spec.NodeSelector[jobset.NamespacedJobKey] = namespacedJobName(job.Namespace, job.Name)
}

func addTaintToleration(job *batchv1.Job) {
job.Spec.Template.Spec.Tolerations = append(job.Spec.Template.Spec.Tolerations,
corev1.Toleration{
Key: jobset.NoScheduleTaintKey,
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoSchedule,
},
)
}

func shouldCreateJob(jobName string, ownedJobs *childJobs) bool {
// Check if this job exists already.
// TODO: maybe we can use a job map here so we can do O(1) lookups
Expand Down Expand Up @@ -703,10 +730,16 @@ func GenSubdomain(js *jobset.JobSet) string {
}

// jobHashKey returns the SHA1 hash of the namespaced job name (i.e. <namespace>/<jobName>).
func jobHashKey(ns string, jobName string) string {
func jobHashKey(ns, jobName string) string {
return sha1Hash(fmt.Sprintf("%s/%s", ns, jobName))
}

// Human readable namespaced job name. We must use '_' to separate namespace and job instead of '/'
// since the '/' character is not allowed in label values.
func namespacedJobName(ns, jobName string) string {
return fmt.Sprintf("%s_%s", ns, jobName)
}

func jobSetFinished(js *jobset.JobSet) bool {
for _, c := range js.Status.Conditions {
if (c.Type == string(jobset.JobSetCompleted) || c.Type == string(jobset.JobSetFailed)) && c.Status == metav1.ConditionTrue {
Expand Down
38 changes: 38 additions & 0 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,44 @@ func TestConstructJobsFromTemplate(t *testing.T) {
Subdomain(jobSetName).Obj(),
},
},
{
name: "node selector exclusive placement strategy enabled",
js: testutils.MakeJobSet(jobSetName, ns).
SetAnnotations(map[string]string{
jobset.ExclusiveKey: "cloud.google.com/gke-nodepool",
jobset.NodeSelectorStrategyKey: "true",
}).
EnableDNSHostnames(true).
NetworkSubdomain(jobSetName).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Subdomain(jobSetName).
Replicas(1).
Obj()).
Obj(),
ownedJobs: &childJobs{},
want: []*batchv1.Job{
makeJob(&makeJobArgs{
jobSetName: jobSetName,
replicatedJobName: replicatedJobName,
jobName: "test-jobset-replicated-job-0",
ns: ns,
replicas: 1,
jobIdx: 0}).
Suspend(false).
Subdomain(jobSetName).
NodeSelector(map[string]string{
jobset.NamespacedJobKey: namespacedJobName(ns, "test-jobset-replicated-job-0"),
}).
Tolerations([]corev1.Toleration{
{
Key: jobset.NoScheduleTaintKey,
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoSchedule,
},
}).Obj(),
},
},
}

for _, tc := range tests {
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,18 @@ func (j *JobWrapper) Ready(ready int32) *JobWrapper {
return j
}

// Tolerations set the tolerations.
func (j *JobWrapper) Tolerations(t []corev1.Toleration) *JobWrapper {
j.Spec.Template.Spec.Tolerations = t
return j
}

// NodeSelector sets the node selector.
func (j *JobWrapper) NodeSelector(nodeSelector map[string]string) *JobWrapper {
j.Spec.Template.Spec.NodeSelector = nodeSelector
return j
}

// Obj returns the wrapped Job.
func (j *JobWrapper) Obj() *batchv1.Job {
return &j.Job
Expand Down

0 comments on commit 31b0262

Please sign in to comment.