Skip to content

Commit

Permalink
Merge pull request #112948 from mimowo/112873-fix-job-finished-metric
Browse files Browse the repository at this point in the history
Fix the job finished metric issue due to the final job status update occasionally failing
  • Loading branch information
k8s-ci-robot committed Oct 14, 2022
2 parents 0207f7a + b64e5b2 commit 9bedff1
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 11 deletions.
38 changes: 27 additions & 11 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,10 +699,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
return false, nil
}

completionMode := string(batch.NonIndexedCompletion)
if isIndexedJob(&job) {
completionMode = string(batch.IndexedCompletion)
}
completionMode := getCompletionMode(&job)
action := metrics.JobSyncActionReconciling

defer func() {
Expand Down Expand Up @@ -906,11 +903,14 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
job.Status.CompletedIndexes = succeededIndexes.String()
}
job.Status.UncountedTerminatedPods = nil
jm.enactJobFinished(&job, finishedCondition)
jobFinished := jm.enactJobFinished(&job, finishedCondition)

if _, err := jm.updateStatusHandler(ctx, &job); err != nil {
return forget, err
}
if jobFinished {
jm.recordJobFinished(&job, finishedCondition)
}

if jobHasNewFailure && !IsJobFinished(&job) {
// returning an error will re-enqueue Job after the backoff period
Expand Down Expand Up @@ -1105,13 +1105,17 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
return err
}
if jm.enactJobFinished(job, finishedCond) {
jobFinished := jm.enactJobFinished(job, finishedCond)
if jobFinished {
needsFlush = true
}
if needsFlush {
if _, err := jm.updateStatusHandler(ctx, job); err != nil {
return fmt.Errorf("removing uncounted pods from status: %w", err)
}
if jobFinished {
jm.recordJobFinished(job, finishedCond)
}
recordJobPodFinished(job, oldCounters)
}
return nil
Expand Down Expand Up @@ -1244,16 +1248,20 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo
return false
}
}
completionMode := string(batch.NonIndexedCompletion)
if isIndexedJob(job) {
completionMode = string(*job.Spec.CompletionMode)
}
job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message)
if finishedCond.Type == batch.JobComplete {
job.Status.CompletionTime = &finishedCond.LastTransitionTime
}
return true
}

// recordJobFinished records events and the job_finished_total metric for a finished job.
func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool {
completionMode := getCompletionMode(job)
if finishedCond.Type == batch.JobComplete {
if job.Spec.Completions != nil && job.Status.Succeeded > *job.Spec.Completions {
jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
}
job.Status.CompletionTime = &finishedCond.LastTransitionTime
jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed")
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc()
} else {
Expand Down Expand Up @@ -1613,6 +1621,14 @@ func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Str
return result
}

// getCompletionMode returns string representation of the completion mode. Used as a label value for metrics.
func getCompletionMode(job *batch.Job) string {
if isIndexedJob(job) {
return string(batch.IndexedCompletion)
}
return string(batch.NonIndexedCompletion)
}

func trackingUncountedPods(job *batch.Job) bool {
return feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && hasJobTrackingAnnotation(job)
}
Expand Down
135 changes: 135 additions & 0 deletions test/integration/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,154 @@ import (
"k8s.io/client-go/restmapper"
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
basemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/klog/v2"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer"
)

const waitInterval = time.Second

type metricLabelsWithValue struct {
Labels []string
Value int
}

func TestMetrics(t *testing.T) {
nonIndexedCompletion := batchv1.NonIndexedCompletion
indexedCompletion := batchv1.IndexedCompletion
wFinalizers := true
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()

// setup the job controller
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()

testCases := map[string]struct {
job *batchv1.Job
wantJobFinishedNumMetricDelta metricLabelsWithValue
wantJobPodsFinishedMetricDelta metricLabelsWithValue
}{
"non-indexed job": {
job: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(2),
Parallelism: pointer.Int32(2),
CompletionMode: &nonIndexedCompletion,
},
},
wantJobFinishedNumMetricDelta: metricLabelsWithValue{
Labels: []string{"NonIndexed", "succeeded"},
Value: 1,
},
wantJobPodsFinishedMetricDelta: metricLabelsWithValue{
Labels: []string{"NonIndexed", "succeeded"},
Value: 2,
},
},
"indexed job": {
job: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(2),
Parallelism: pointer.Int32(2),
CompletionMode: &indexedCompletion,
},
},
wantJobFinishedNumMetricDelta: metricLabelsWithValue{
Labels: []string{"Indexed", "succeeded"},
Value: 1,
},
wantJobPodsFinishedMetricDelta: metricLabelsWithValue{
Labels: []string{"Indexed", "succeeded"},
Value: 2,
},
},
}
job_index := 0 // job index to avoid collisions between job names created by different test cases
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {

// record the metrics after the job is created
jobFinishedNumBefore, err := getCounterMetricValueForLabels(metrics.JobFinishedNum, tc.wantJobFinishedNumMetricDelta.Labels)
if err != nil {
t.Fatalf("Failed to collect the JobFinishedNum metric before creating the job: %q", err)
}
jobPodsFinishedBefore, err := getCounterMetricValueForLabels(metrics.JobPodsFinished, tc.wantJobPodsFinishedMetricDelta.Labels)
if err != nil {
t.Fatalf("Failed to collect the JobPodsFinished metric before creating the job: %q", err)
}

// create a single job and wait for its completion
job := tc.job.DeepCopy()
job.Name = fmt.Sprintf("job-%v", job_index)
job_index++
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, job)
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: int(*jobObj.Spec.Parallelism),
Ready: pointer.Int32(0),
}, wFinalizers)
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, int(*jobObj.Spec.Parallelism)); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)

// verify metric values after the job is finished
validateMetricValueDeltas(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetricDelta, jobFinishedNumBefore)
validateMetricValueDeltas(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetricDelta, jobPodsFinishedBefore)
})
}
}

func validateMetricValueDeltas(t *testing.T, counterVer *basemetrics.CounterVec, wantMetricDelta metricLabelsWithValue, metricValuesBefore metricLabelsWithValue) {
t.Helper()
var cmpErr error
err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
cmpErr = nil
metricValuesAfter, err := getCounterMetricValueForLabels(counterVer, wantMetricDelta.Labels)
if err != nil {
return true, fmt.Errorf("Failed to collect the %q metric after the job is finished: %q", counterVer.Name, err)
}
wantDelta := wantMetricDelta.Value
gotDelta := metricValuesAfter.Value - metricValuesBefore.Value
if wantDelta != gotDelta {
cmpErr = fmt.Errorf("Unexepected metric delta for %q metric with labels %q. want: %v, got: %v", counterVer.Name, wantMetricDelta.Labels, wantDelta, gotDelta)
return false, nil
}
return true, nil
})
if err != nil {
t.Errorf("Failed waiting for expected metric delta: %q", err)
}
if cmpErr != nil {
t.Error(cmpErr)
}
}

func getCounterMetricValueForLabels(counterVec *basemetrics.CounterVec, labels []string) (metricLabelsWithValue, error) {
var result metricLabelsWithValue = metricLabelsWithValue{Labels: labels}
value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(labels...))
if err != nil {
return result, err
}
result.Value = int(value)
return result, nil
}

// TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart verifies that the job is properly marked as Failed
// in a scenario when the job controller crashes between removing pod finalizers and marking the job as Failed (based on
// the pod failure policy). After the finalizer for the failed pod is removed we remove the failed pod. This step is
Expand Down

0 comments on commit 9bedff1

Please sign in to comment.