Skip to content

Commit

Permalink
add job-id annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre committed Aug 13, 2024
1 parent ec39730 commit f0bec34
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 2 deletions.
11 changes: 9 additions & 2 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@ const (
ReplicatedJobReplicas string = "jobset.sigs.k8s.io/replicatedjob-replicas"
// ReplicatedJobNameKey is used to index into a Jobs labels and retrieve the name of the parent ReplicatedJob
ReplicatedJobNameKey string = "jobset.sigs.k8s.io/replicatedjob-name"
JobIndexKey string = "jobset.sigs.k8s.io/job-index"
JobKey string = "jobset.sigs.k8s.io/job-key"
// JobIndexKey is a label/annotation set to the index of the Job replica within its parent replicatedJob.
// For each replicatedJob, this value will range from 0 to replicas-1, where `replicas`
// is equal to jobset.spec.replicatedJobs[*].replicas.
JobIndexKey string = "jobset.sigs.k8s.io/job-index"
// JobIDKey is a label/annotation set to an integer that is unique across the entire JobSet.
// For each JobSet, this value will range from 0 to N-1, where N=total number of jobs in the jobset.
JobIDKey string = "jobset.sigs.k8s.io/job-id"
// JobKey holds the SHA256 hash of the namespaced job name, which can be used to uniquely identify the job.
JobKey string = "jobset.sigs.k8s.io/job-key"
// ExclusiveKey is an annotation that can be set on the JobSet or on a ReplicatedJob template.
// If set at the JobSet level, all child jobs from all ReplicatedJobs will be scheduled using exclusive
// job placement per topology group (defined as the label value).
Expand Down
20 changes: 20 additions & 0 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
labels[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
labels[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
labels[jobset.JobKey] = jobHashKey(js.Namespace, jobName)
labels[jobset.JobIDKey] = calculateJobID(js, rjob, jobIdx)

// Set annotations on the object.
annotations := collections.CloneMap(obj.GetAnnotations())
Expand All @@ -739,6 +740,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
annotations[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName)
annotations[jobset.JobIDKey] = calculateJobID(js, rjob, jobIdx)

// Apply coordinator annotation/label if a coordinator is defined in the JobSet spec.
if js.Spec.Coordinator != nil {
Expand Down Expand Up @@ -1032,3 +1034,21 @@ func exclusiveConditions(cond1, cond2 metav1.Condition) bool {
func coordinatorEndpoint(js *jobset.JobSet) string {
return fmt.Sprintf("%s-%s-%d-%d.%s", js.Name, js.Spec.Coordinator.ReplicatedJob, js.Spec.Coordinator.JobIndex, js.Spec.Coordinator.PodIndex, GetSubdomain(js))
}

// calculateJobID deterministically assigns a unique integer Job ID for a particular
// job in a jobset. The job index `j` for replicatedJob[i] is calculated as the sum
// of all replicatedJob[k].replicas for k in range 0 to i-1 inclusive, plus `j`.
// This works because the replicatedJobs order is immutable.

// Returns an empty string if the parent replicated Job does not exist,
// although this should never happen in practice.
func calculateJobID(js *jobset.JobSet, parentReplicatedJob *jobset.ReplicatedJob, jobIdx int) string {
currTotalJobs := 0
for _, rjob := range js.Spec.ReplicatedJobs {
if rjob.Name == parentReplicatedJob.Name {
return strconv.Itoa(currTotalJobs + jobIdx)
}
currTotalJobs += int(rjob.Replicas)
}
return ""
}
61 changes: 61 additions & 0 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1381,3 +1381,64 @@ func TestCreateHeadlessSvcIfNecessary(t *testing.T) {
})
}
}

func TestCalculateJobID(t *testing.T) {
tests := []struct {
name string
jobSet *jobset.JobSet
parentReplicatedJob *jobset.ReplicatedJob
jobIdx int
expectedJobID string
}{
{
name: "single replicated job",
jobSet: &jobset.JobSet{
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{Name: "rjob", Replicas: 3},
},
},
},
parentReplicatedJob: &jobset.ReplicatedJob{Name: "rjob"},
jobIdx: 1,
expectedJobID: "1",
},
{
name: "multiple replicated jobs",
jobSet: &jobset.JobSet{
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{Name: "rjob1", Replicas: 2},
{Name: "rjob2", Replicas: 4},
{Name: "rjob3", Replicas: 1},
},
},
},
parentReplicatedJob: &jobset.ReplicatedJob{Name: "rjob2"},
jobIdx: 3,
expectedJobID: "5",
},
{
name: "replicated job not found",
jobSet: &jobset.JobSet{
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{Name: "rjob1", Replicas: 2},
},
},
},
parentReplicatedJob: &jobset.ReplicatedJob{Name: "rjob2"},
jobIdx: 0,
expectedJobID: "",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actualJobID := calculateJobID(tc.jobSet, tc.parentReplicatedJob, tc.jobIdx)
if diff := cmp.Diff(tc.expectedJobID, actualJobID); diff != "" {
t.Errorf("unexpected job ID (-want/+got): %s", diff)
}
})
}
}

0 comments on commit f0bec34

Please sign in to comment.