Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move backoff logic into own struct and increase test coverage of timeout logic #3031

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/reconciler/taskrun/controller.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
podInformer := podinformer.Get(ctx) podInformer := podinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx) resourceInformer := resourceinformer.Get(ctx)
timeoutHandler := timeout.NewHandler(ctx.Done(), logger) timeoutHandler := timeout.NewHandler(ctx.Done(), logger)
podCreationBackoff := timeout.NewBackoff(ctx.Done(), logger)
metrics, err := NewRecorder() metrics, err := NewRecorder()
if err != nil { if err != nil {
logger.Errorf("Failed to create taskrun metrics recorder %v", err) logger.Errorf("Failed to create taskrun metrics recorder %v", err)
Expand All @@ -72,6 +73,7 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
clusterTaskLister: clusterTaskInformer.Lister(), clusterTaskLister: clusterTaskInformer.Lister(),
resourceLister: resourceInformer.Lister(), resourceLister: resourceInformer.Lister(),
timeoutHandler: timeoutHandler, timeoutHandler: timeoutHandler,
podCreationBackoff: podCreationBackoff,
cloudEventClient: cloudeventclient.Get(ctx), cloudEventClient: cloudeventclient.Get(ctx),
metrics: metrics, metrics: metrics,
entrypointCache: entrypointCache, entrypointCache: entrypointCache,
Expand Down
6 changes: 4 additions & 2 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Reconciler struct {
tracker tracker.Interface tracker tracker.Interface
entrypointCache podconvert.EntrypointCache entrypointCache podconvert.EntrypointCache
timeoutHandler *timeout.Handler timeoutHandler *timeout.Handler
podCreationBackoff *timeout.Backoff
metrics *Recorder metrics *Recorder
pvcHandler volumeclaim.PvcHandler pvcHandler volumeclaim.PvcHandler
} }
Expand Down Expand Up @@ -121,6 +122,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
return merr.ErrorOrNil() return merr.ErrorOrNil()
} }
c.timeoutHandler.Release(tr) c.timeoutHandler.Release(tr)
c.podCreationBackoff.Release(tr)
pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{}) pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{})
if err == nil { if err == nil {
err = podconvert.StopSidecars(c.Images.NopImage, c.KubeClientSet, *pod) err = podconvert.StopSidecars(c.Images.NopImage, c.KubeClientSet, *pod)
Expand Down Expand Up @@ -460,9 +462,9 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1beta1.TaskRun) (*v1beta1.T
func (c *Reconciler) handlePodCreationError(ctx context.Context, tr *v1beta1.TaskRun, err error) error { func (c *Reconciler) handlePodCreationError(ctx context.Context, tr *v1beta1.TaskRun, err error) error {
var msg string var msg string
if isExceededResourceQuotaError(err) { if isExceededResourceQuotaError(err) {
backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr) backoff, currentlyBackingOff := c.podCreationBackoff.Get(tr)
if !currentlyBackingOff { if !currentlyBackingOff {
go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt)) go c.podCreationBackoff.SetTimer(tr, time.Until(backoff.NextAttempt))
} }
msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts) msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts)
tr.Status.SetCondition(&apis.Condition{ tr.Status.SetCondition(&apis.Condition{
Expand Down
5 changes: 2 additions & 3 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -2068,15 +2068,14 @@ func TestHandlePodCreationError(t *testing.T) {
clusterTaskLister: testAssets.Informers.ClusterTask.Lister(), clusterTaskLister: testAssets.Informers.ClusterTask.Lister(),
resourceLister: testAssets.Informers.PipelineResource.Lister(), resourceLister: testAssets.Informers.PipelineResource.Lister(),
timeoutHandler: timeout.NewHandler(ctx.Done(), testAssets.Logger), timeoutHandler: timeout.NewHandler(ctx.Done(), testAssets.Logger),
// This has not been instantiated with a timeoutcallback so backoffs will not start
podCreationBackoff: timeout.NewBackoff(ctx.Done(), testAssets.Logger),
cloudEventClient: testAssets.Clients.CloudEvents, cloudEventClient: testAssets.Clients.CloudEvents,
metrics: nil, // Not used metrics: nil, // Not used
entrypointCache: nil, // Not used entrypointCache: nil, // Not used
pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger), pvcHandler: volumeclaim.NewPVCHandler(testAssets.Clients.Kube, testAssets.Logger),
} }


// Prevent backoff timer from starting
c.timeoutHandler.SetTaskRunCallbackFunc(nil)

testcases := []struct { testcases := []struct {
description string description string
err error err error
Expand Down
148 changes: 148 additions & 0 deletions pkg/timeout/backoff.go
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2020 The Tekton Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package timeout

import (
"math"
"math/rand"
"sync"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"go.uber.org/zap"
)

// Backoff can be used to start timers used to perform exponential backoffs with jitter.
type Backoff struct {
logger *zap.SugaredLogger

// attempts is a map from the name of the item being backed off to the Attemps object
// containing its current state
attempts map[string]Attempts
// attemptsMut is used to protect access to attempts to ensure that multiple goroutines
// don't try to update it simultaneously
attemptsMut sync.Mutex
// timeoutCallback is the function to call when a timeout has occurred.
timeoutCallback func(interface{})
// timer is used to start timers in separate goroutines
timer *Timer
// j is the function that will be called to jitter the backoff intervals.
j jitterFunc
// now is the function that will be used to get the current time.
now nowFunc
}

// Attempts contains state of exponential backoff for a given StatusKey
type Attempts struct {
// NumAttempts reflects the number of times a given StatusKey has been delayed
NumAttempts uint
// NextAttempt is the point in time at which this backoff expires
NextAttempt time.Time
}

// jitterFunc is a func applied to a computed backoff duration to remove uniformity
// from its results. A jitterFunc receives the number of seconds calculated by a
// backoff algorithm and returns the "jittered" result.
type jitterFunc func(numSeconds int) (jitteredSeconds int)

// nowFunc is a function that is used to get the current time
type nowFunc func() time.Time

// NewBackoff returns an instance of Backoff with the specified stopCh and logger, instantiated
// and ready to track go routines.
func NewBackoff(
stopCh <-chan struct{},
logger *zap.SugaredLogger,
) *Backoff {
return &Backoff{
timer: NewTimer(stopCh, logger),
attempts: make(map[string]Attempts),
j: rand.Intn,
now: time.Now,
logger: logger,
}
}

// Release will remove keys tracking the specified runObj.
func (b *Backoff) Release(runObj StatusKey) {
b.attemptsMut.Lock()
defer b.attemptsMut.Unlock()
delete(b.attempts, runObj.GetRunKey())
}

// SetTimeoutCallback will set the function to be called when a timeout has occurred.
func (b *Backoff) SetTimeoutCallback(f func(interface{})) {
b.timeoutCallback = f
}

// Get records the number of times it has seen a TaskRun and calculates an
// appropriate backoff deadline based on that count. Only one backoff per TaskRun
// may be active at any moment. Requests for a new backoff in the face of an
// existing one will be ignored and details of the existing backoff will be returned
// instead. Further, if a calculated backoff time is after the timeout of the TaskRun
// then the time of the timeout will be returned instead.
//
// Returned values are a backoff struct containing a NumAttempts field with the
// number of attempts performed for this TaskRun and a NextAttempt field
// describing the time at which the next attempt should be performed.
// Additionally a boolean is returned indicating whether a backoff for the
// TaskRun is already in progress.
func (b *Backoff) Get(tr *v1beta1.TaskRun) (a Attempts, inProgress bool) {
b.attemptsMut.Lock()
defer b.attemptsMut.Unlock()
a = b.attempts[tr.GetRunKey()]
if b.now().Before(a.NextAttempt) {
inProgress = true
return
}
a.NumAttempts++
a.NextAttempt = b.now().Add(GetExponentialBackoffWithJitter(a.NumAttempts, b.j))
duration := timeoutFromSpec((tr.Spec.Timeout))
timeoutDeadline := tr.Status.StartTime.Time.Add(duration)
if timeoutDeadline.Before(a.NextAttempt) {
a.NextAttempt = timeoutDeadline
}
b.attempts[tr.GetRunKey()] = a
return
}

// GetExponentialBackoffWithJitter will return a number which is 2 to the power
// of count, but with a jittered value obtained via jf.
func GetExponentialBackoffWithJitter(count uint, jf jitterFunc) time.Duration {
exp := float64(count)
if exp > maxBackoffExponent {
exp = maxBackoffExponent
}
seconds := int(math.Exp2(exp))
jittered := 1 + jf(seconds)
if jittered > maxBackoffSeconds {
jittered = maxBackoffSeconds
}
return time.Duration(jittered) * time.Second
}

// SetTimer creates a blocking function to wait for
// 1. Stop signal, 2. completion or 3. a given Duration to elapse.
func (b *Backoff) SetTimer(runObj StatusKey, d time.Duration) {
if b.timeoutCallback == nil {
b.logger.Errorf("attempted to set a timer for %q but no callback has been assigned", runObj)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds very odd (at least to me 😉), commenting out this logger from here fixes race condition. This function invocation within handlePodCreationError and the following SetCondition are somehow causing this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: DATA RACE
Read at 0x00c000c3c198 by goroutine 103:
...
go.uber.org/zap.(*SugaredLogger).log()
...

Previous write at 0x00c000c3c198 by goroutine 79:
  knative.dev/pkg/apis/duck/v1beta1.(*Status).SetConditions()
...

Copy link

@ghost ghost Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, well spotted! It looks like swapping out the %q for a %p in this log statement also removes the error. Using %q results in a string-formatted struct being printed, which probably walks the entire object and accesses all the fields. My hunch is that the %q formatter is not threadsafe in that case? In hindsight this may be why we have the StatusKey interface - the GetRunKey() func of TaskRun returns a thread-safe descriptor.

Copy link
Member

@pritidesai pritidesai Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, yup it does walk the entire object. The next log statement (using GetRunKey() on runObj) looks safe and just prints the address of taskRun object instead.

"TaskRun/0xc00015af00"

return
}
b.logger.Infof("About to start backoff timer for %s. backing off for %s", runObj.GetRunKey(), d)
defer b.Release(runObj)
b.timer.SetTimer(runObj, d, b.timeoutCallback)
}
Loading