Skip to content

Commit

Permalink
Job controller implementation of backoff limit per index (kubernetes#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Jul 18, 2023
1 parent f55f278 commit a15c276
Show file tree
Hide file tree
Showing 9 changed files with 2,345 additions and 73 deletions.
26 changes: 23 additions & 3 deletions pkg/controller/job/backoff_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -213,20 +214,39 @@ func getFinishTimeFromDeletionTimestamp(p *v1.Pod) *time.Time {
}

func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration) time.Duration {
if backoff.failuresAfterLastSuccess == 0 {
return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, backoff.failuresAfterLastSuccess, backoff.lastFailureTime)
}

// getRemainingTimePerIndex returns the remaining time left for a given index to
// create the replacement pods. The number of consecutive pod failures for the
// index is retrieved from the `job-index-failure-count` annotation of the
// last failed pod within the index (represented by `lastFailedPod`).
// The last failed pod is also used to determine the time of the last failure.
func getRemainingTimePerIndex(logger klog.Logger, clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, lastFailedPod *v1.Pod) time.Duration {
if lastFailedPod == nil {
// There is no previous failed pod for this index
return time.Duration(0)
}
failureCount := getIndexAbsoluteFailureCount(logger, lastFailedPod) + 1
lastFailureTime := getFinishedTime(lastFailedPod)
return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, failureCount, &lastFailureTime)
}

func getRemainingTimeForFailuresCount(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, failuresCount int32, lastFailureTime *time.Time) time.Duration {
if failuresCount == 0 {
return 0
}

backoffDuration := defaultBackoff
for i := 1; i < int(backoff.failuresAfterLastSuccess); i++ {
for i := 1; i < int(failuresCount); i++ {
backoffDuration = backoffDuration * 2
if backoffDuration >= maxBackoff {
backoffDuration = maxBackoff
break
}
}

timeElapsedSinceLastFailure := clock.Since(*backoff.lastFailureTime)
timeElapsedSinceLastFailure := clock.Since(*lastFailureTime)

if backoffDuration < timeElapsedSinceLastFailure {
return 0
Expand Down
44 changes: 44 additions & 0 deletions pkg/controller/job/backoff_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2/ktesting"
clocktesting "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
)
Expand Down Expand Up @@ -466,3 +467,46 @@ func TestGetRemainingBackoffTime(t *testing.T) {
})
}
}

func TestGetRemainingBackoffTimePerIndex(t *testing.T) {
defaultTestTime := metav1.NewTime(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC))
testCases := map[string]struct {
currentTime time.Time
maxBackoff time.Duration
defaultBackoff time.Duration
lastFailedPod *v1.Pod
wantDuration time.Duration
}{
"no failures": {
lastFailedPod: nil,
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 0 * time.Second,
},
"two prev failures; current time and failure time are same": {
lastFailedPod: buildPod().phase(v1.PodFailed).indexFailureCount("2").customDeletionTimestamp(defaultTestTime.Time).Pod,
currentTime: defaultTestTime.Time,
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 20 * time.Second,
},
"one prev failure counted and one ignored; current time and failure time are same": {
lastFailedPod: buildPod().phase(v1.PodFailed).indexFailureCount("1").indexIgnoredFailureCount("1").customDeletionTimestamp(defaultTestTime.Time).Pod,
currentTime: defaultTestTime.Time,
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 20 * time.Second,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
fakeClock := clocktesting.NewFakeClock(tc.currentTime.Truncate(time.Second))
d := getRemainingTimePerIndex(logger, fakeClock, tc.defaultBackoff, tc.maxBackoff, tc.lastFailedPod)
if d.Seconds() != tc.wantDuration.Seconds() {
t.Errorf("Expected value of duration %v; got %v", tc.wantDuration, d)
}
})
}
}
Loading

0 comments on commit a15c276

Please sign in to comment.