From 2611ebf8412f5314bb369a4661abefde9eb34217 Mon Sep 17 00:00:00 2001 From: Christie Wilson Date: Tue, 4 Aug 2020 14:15:36 -0400 Subject: [PATCH] =?UTF-8?q?Actually=20stop=20trying=20to=20time=20out=20fi?= =?UTF-8?q?nished=20Runs=20=E2=8F=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In 10b6427 I got really enthusiastic about making sure even our reads were threadsafe and so I thought I would be clever and, instead of accessing attributes of a PipelineRun or TaskRun in a go routine, use a value that wouldn't change - specifically the address. But the address will change between reconcile loops, because the reconcile logic will create a new instance of the Run object every time! 🤦‍♀️ Fortunately this doesn't cause any serious problems, it just makes things slightly less efficient: for every Run you start, a go routine will remain open until the timeout occurs, and when it fires, it will be reconciled an extra time, even if it has completed. (In fact keeping this functionality completed and dropping the "done" map might be a reasonable option!) With this change, we now return to using the namespace + name as a key in the map that tracks the done channels; we pass these by value so that reads will be threadsafe. Instead of fixing this separately for the TaskRun and PipelineRun functions, I've collapsed these and the callback into one. Each handler instantiates its own timeout handler so there is no reason for the timeout handler to have special knowledge of one vs the other. Fixes #3047 _Test_ I tried several different approaches to add a test case that would reveal the underlying problem but I now feel like it's more hassle than it's worth. Approaches: 1. instantiate the controller in the reconciler tests with a custom timeout handler that has been overridden to use a custom logger, so we can check for the log indicating the timeout handler completed 2. Similar to (1) but instead of checking logs, just pass in a custom done channel and wait for it to close Both 1 + 2 require changing the way that NewController works, i.e. the way we always instantiate controllers. I tried working around this by taking the same approach as `TestHandlePodCreationError` and instantiating my own Reconciler but it a) wasn't instantiated properly no matter what I tried (trying to use it created panics) and b) had a confusingly different interface, exposing ReconcileKind instead of Reconcile I tried some other approaches but these went nowhere either; I don't think it's worth adding a test to cover this, but if folks feel strongly I don't mind opening an issue at least to continue to explore it? I feel that this bug is one that is very specific to the implementation and I'm not sure how valuable a test that covers it would be. If we do pursue it, we might want to do it at the level of an end to end test that actually checks the logs from a real running controller. --- .../pipeline/v1beta1/pipelinerun_types.go | 11 +- .../v1beta1/pipelinerun_types_test.go | 13 +- pkg/apis/pipeline/v1beta1/taskrun_types.go | 8 +- .../pipeline/v1beta1/taskrun_types_test.go | 14 +- pkg/reconciler/pipelinerun/controller.go | 4 +- pkg/reconciler/pipelinerun/pipelinerun.go | 6 +- pkg/reconciler/taskrun/controller.go | 4 +- pkg/reconciler/taskrun/taskrun.go | 10 +- pkg/reconciler/taskrun/taskrun_test.go | 2 +- pkg/timeout/handler.go | 151 +++++++----------- pkg/timeout/handler_test.go | 42 ++--- test/timeout_test.go | 2 +- 12 files changed, 114 insertions(+), 153 deletions(-) diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go index f8928f80c05..fb26337595c 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go @@ -17,7 +17,6 @@ limitations under the License. package v1beta1 import ( - "fmt" "time" "github.com/tektoncd/pipeline/pkg/apis/config" @@ -25,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/apis" duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" ) @@ -98,10 +98,9 @@ func (pr *PipelineRun) IsCancelled() bool { return pr.Spec.Status == PipelineRunSpecStatusCancelled } -// GetRunKey return the pipelinerun key for timeout handler map -func (pr *PipelineRun) GetRunKey() string { - // The address of the pointer is a threadsafe unique identifier for the pipelinerun - return fmt.Sprintf("%s/%p", pipeline.PipelineRunControllerName, pr) +// GetNamespacedName returns a k8s namespaced name that identifies this PipelineRun +func (pr *PipelineRun) GetNamespacedName() types.NamespacedName { + return types.NamespacedName{Namespace: pr.Namespace, Name: pr.Name} } // IsTimedOut returns true if a pipelinerun has exceeded its spec.Timeout based on its status.Timeout @@ -231,7 +230,7 @@ const ( PipelineRunReasonCancelled PipelineRunReason = "Cancelled" // PipelineRunReasonTimedOut is the reason set when the PipelineRun has timed out PipelineRunReasonTimedOut PipelineRunReason = "PipelineRunTimeout" - // ReasonStopping indicates that no new Tasks will be scheduled by the controller, and the + // PipelineRunReasonStopping indicates that no new Tasks will be scheduled by the controller, and the // pipeline will stop once all running tasks complete their work PipelineRunReasonStopping PipelineRunReason = "PipelineRunStopping" ) diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go index 8388361bd22..4f953748a55 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go @@ -17,12 +17,10 @@ limitations under the License. package v1beta1_test import ( - "fmt" "testing" "time" "github.com/google/go-cmp/cmp" - tb "github.com/tektoncd/pipeline/internal/builder/v1beta1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/test/diff" corev1 "k8s.io/api/core/v1" @@ -167,11 +165,12 @@ func TestPipelineRunHasVolumeClaimTemplate(t *testing.T) { } } -func TestPipelineRunKey(t *testing.T) { - pr := tb.PipelineRun("prunname") - expectedKey := fmt.Sprintf("PipelineRun/%p", pr) - if pr.GetRunKey() != expectedKey { - t.Fatalf("Expected taskrun key to be %s but got %s", expectedKey, pr.GetRunKey()) +func TestGetNamespacedName(t *testing.T) { + pr := &v1beta1.PipelineRun{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "prunname"}} + n := pr.GetNamespacedName() + expected := "foo/prunname" + if n.String() != expected { + t.Fatalf("Expected name to be %s but got %s", expected, n.String()) } } diff --git a/pkg/apis/pipeline/v1beta1/taskrun_types.go b/pkg/apis/pipeline/v1beta1/taskrun_types.go index a314760bcef..e91f70aceec 100644 --- a/pkg/apis/pipeline/v1beta1/taskrun_types.go +++ b/pkg/apis/pipeline/v1beta1/taskrun_types.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/apis" duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" ) @@ -411,10 +412,9 @@ func (tr *TaskRun) GetTimeout() time.Duration { return tr.Spec.Timeout.Duration } -// GetRunKey return the taskrun key for timeout handler map -func (tr *TaskRun) GetRunKey() string { - // The address of the pointer is a threadsafe unique identifier for the taskrun - return fmt.Sprintf("%s/%p", pipeline.TaskRunControllerName, tr) +// GetNamespacedName returns a k8s namespaced name that identifies this TaskRun +func (tr *TaskRun) GetNamespacedName() types.NamespacedName { + return types.NamespacedName{Namespace: tr.Namespace, Name: tr.Name} } // IsPartOfPipeline return true if TaskRun is a part of a Pipeline. diff --git a/pkg/apis/pipeline/v1beta1/taskrun_types_test.go b/pkg/apis/pipeline/v1beta1/taskrun_types_test.go index 7b8195b9328..1982a21b59e 100644 --- a/pkg/apis/pipeline/v1beta1/taskrun_types_test.go +++ b/pkg/apis/pipeline/v1beta1/taskrun_types_test.go @@ -17,7 +17,6 @@ limitations under the License. package v1beta1_test import ( - "fmt" "testing" "time" @@ -172,14 +171,11 @@ func TestTaskRunHasVolumeClaimTemplate(t *testing.T) { } func TestTaskRunKey(t *testing.T) { - tr := &v1beta1.TaskRun{ - ObjectMeta: metav1.ObjectMeta{ - Name: "taskrunname", - }, - } - expectedKey := fmt.Sprintf("TaskRun/%p", tr) - if tr.GetRunKey() != expectedKey { - t.Fatalf("Expected taskrun key to be %s but got %s", expectedKey, tr.GetRunKey()) + tr := &v1beta1.TaskRun{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "trunname"}} + n := tr.GetNamespacedName() + expected := "foo/trunname" + if n.String() != expected { + t.Fatalf("Expected name to be %s but got %s", expected, n.String()) } } diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index af9b2b44bed..3f6f695169c 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -86,8 +86,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex } }) - timeoutHandler.SetPipelineRunCallbackFunc(impl.Enqueue) - timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset) + timeoutHandler.SetCallbackFunc(impl.EnqueueKey) + timeoutHandler.CheckTimeouts(ctx, namespace, kubeclientset, pipelineclientset) logger.Info("Setting up event handlers") pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index ac1b5772730..06e35c89df6 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -140,11 +140,11 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) pr.Status.InitializeConditions() // In case node time was not synchronized, when controller has been scheduled to other nodes. if pr.Status.StartTime.Sub(pr.CreationTimestamp.Time) < 0 { - logger.Warnf("PipelineRun %s createTimestamp %s is after the pipelineRun started %s", pr.GetRunKey(), pr.CreationTimestamp, pr.Status.StartTime) + logger.Warnf("PipelineRun %s createTimestamp %s is after the pipelineRun started %s", pr.GetNamespacedName().String(), pr.CreationTimestamp, pr.Status.StartTime) pr.Status.StartTime = &pr.CreationTimestamp } // start goroutine to track pipelinerun timeout only startTime is not set - go c.timeoutHandler.WaitPipelineRun(pr, pr.Status.StartTime) + go c.timeoutHandler.Wait(pr.GetNamespacedName(), *pr.Status.StartTime, *pr.Spec.Timeout) // Emit events. During the first reconcile the status of the PipelineRun may change twice // from not Started to Started and then to Running, so we need to sent the event here // and at the end of 'Reconcile' again. @@ -171,7 +171,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) logger.Errorf("Failed to delete StatefulSet for PipelineRun %s: %v", pr.Name, err) return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err) } - c.timeoutHandler.Release(pr) + c.timeoutHandler.Release(pr.GetNamespacedName()) if err := c.updateTaskRunsStatusDirectly(pr); err != nil { logger.Errorf("Failed to update TaskRun status for PipelineRun %s: %v", pr.Name, err) return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err) diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 1e728061dfb..162053b52d7 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -87,8 +87,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex } }) - timeoutHandler.SetTaskRunCallbackFunc(impl.Enqueue) - timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset) + timeoutHandler.SetCallbackFunc(impl.EnqueueKey) + timeoutHandler.CheckTimeouts(ctx, namespace, kubeclientset, pipelineclientset) logger.Info("Setting up event handlers") taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 5b919cc09b5..df9421cc689 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -93,7 +93,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg tr.Status.InitializeConditions() // In case node time was not synchronized, when controller has been scheduled to other nodes. if tr.Status.StartTime.Sub(tr.CreationTimestamp.Time) < 0 { - logger.Warnf("TaskRun %s createTimestamp %s is after the taskRun started %s", tr.GetRunKey(), tr.CreationTimestamp, tr.Status.StartTime) + logger.Warnf("TaskRun %s createTimestamp %s is after the taskRun started %s", tr.GetNamespacedName().String(), tr.CreationTimestamp, tr.Status.StartTime) tr.Status.StartTime = &tr.CreationTimestamp } // Emit events. During the first reconcile the status of the TaskRun may change twice @@ -120,7 +120,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg // send cloud events. So we stop here an return errors encountered this far. return merr.ErrorOrNil() } - c.timeoutHandler.Release(tr) + c.timeoutHandler.Release(tr.GetNamespacedName()) pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{}) if err == nil { err = podconvert.StopSidecars(c.Images.NopImage, c.KubeClientSet, *pod) @@ -379,7 +379,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, logger.Error("Failed to create task run pod for task %q: %v", tr.Name, newErr) return newErr } - go c.timeoutHandler.WaitTaskRun(tr, tr.Status.StartTime) + go c.timeoutHandler.Wait(tr.GetNamespacedName(), *tr.Status.StartTime, *tr.Spec.Timeout) } if err := c.tracker.Track(tr.GetBuildPodRef(), tr); err != nil { logger.Errorf("Failed to create tracker for build pod %q for taskrun %q: %v", tr.Name, tr.Name, err) @@ -460,9 +460,9 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1beta1.TaskRun) (*v1beta1.T func (c *Reconciler) handlePodCreationError(ctx context.Context, tr *v1beta1.TaskRun, err error) error { var msg string if isExceededResourceQuotaError(err) { - backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr) + backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr.GetNamespacedName(), *tr.Status.StartTime, *tr.Spec.Timeout) if !currentlyBackingOff { - go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt)) + go c.timeoutHandler.SetTimer(tr.GetNamespacedName(), time.Until(backoff.NextAttempt)) } msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts) tr.Status.SetCondition(&apis.Condition{ diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index 964bb449b23..149392ec374 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -2075,7 +2075,7 @@ func TestHandlePodCreationError(t *testing.T) { } // Prevent backoff timer from starting - c.timeoutHandler.SetTaskRunCallbackFunc(nil) + c.timeoutHandler.SetCallbackFunc(nil) testcases := []struct { description string diff --git a/pkg/timeout/handler.go b/pkg/timeout/handler.go index 0cf74446939..6c8c0179d0c 100644 --- a/pkg/timeout/handler.go +++ b/pkg/timeout/handler.go @@ -17,17 +17,18 @@ limitations under the License. package timeout import ( + "context" "math" "math/rand" "sync" "time" - "github.com/tektoncd/pipeline/pkg/apis/config" - "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" clientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned" + "github.com/tektoncd/pipeline/pkg/contexts" "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" ) @@ -36,7 +37,6 @@ const ( ) var ( - defaultFunc = func(i interface{}) {} maxBackoffExponent = math.Ceil(math.Log2(maxBackoffSeconds)) ) @@ -63,12 +63,9 @@ type jitterFunc func(numSeconds int) (jitteredSeconds int) type Handler struct { logger *zap.SugaredLogger - // taskRunCallbackFunc is the function to call when a TaskRun has timed out. - // This is usually set to the function that enqueues the taskRun for reconciling. - taskRunCallbackFunc func(interface{}) - // pipelineRunCallbackFunc is the function to call when a TaskRun has timed out - // This is usually set to the function that enqueues the taskRun for reconciling. - pipelineRunCallbackFunc func(interface{}) + // callbackFunc is the function to call when a run has timed out. + // This is usually set to the function that enqueues the namespaced name for reconciling. + callbackFunc func(types.NamespacedName) // stopCh is used to signal to all goroutines that they should stop, e.g. because // the reconciler is stopping stopCh <-chan struct{} @@ -96,71 +93,64 @@ func NewHandler( } } -// SetTaskRunCallbackFunc sets the callback function when timeout occurs for taskrun objects -func (t *Handler) SetTaskRunCallbackFunc(f func(interface{})) { - t.taskRunCallbackFunc = f +// SetCallbackFunc sets the callback function when timeout occurs +func (t *Handler) SetCallbackFunc(f func(types.NamespacedName)) { + t.callbackFunc = f } -// SetPipelineRunCallbackFunc sets the callback function when timeout occurs for pipelinerun objects -func (t *Handler) SetPipelineRunCallbackFunc(f func(interface{})) { - t.pipelineRunCallbackFunc = f -} - -// Release deletes channels and data that are specific to a StatusKey object. -func (t *Handler) Release(runObj StatusKey) { - key := runObj.GetRunKey() +// Release deletes channels and data that are specific to a namespacedName +func (t *Handler) Release(n types.NamespacedName) { t.doneMut.Lock() defer t.doneMut.Unlock() t.backoffsMut.Lock() defer t.backoffsMut.Unlock() - if done, ok := t.done[key]; ok { - delete(t.done, key) + if done, ok := t.done[n.String()]; ok { + delete(t.done, n.String()) close(done) } - delete(t.backoffs, key) + delete(t.backoffs, n.String()) } -func (t *Handler) getOrCreateDoneChan(runObj StatusKey) chan bool { - key := runObj.GetRunKey() +func (t *Handler) getOrCreateDoneChan(n types.NamespacedName) chan bool { t.doneMut.Lock() defer t.doneMut.Unlock() var done chan bool var ok bool - if done, ok = t.done[key]; !ok { + if done, ok = t.done[n.String()]; !ok { done = make(chan bool) } - t.done[key] = done + t.done[n.String()] = done return done } -// GetBackoff records the number of times it has seen a TaskRun and calculates an -// appropriate backoff deadline based on that count. Only one backoff per TaskRun +// GetBackoff records the number of times it has seen n and calculates an +// appropriate backoff deadline based on that count. Only one backoff per n // may be active at any moment. Requests for a new backoff in the face of an // existing one will be ignored and details of the existing backoff will be returned -// instead. Further, if a calculated backoff time is after the timeout of the TaskRun +// instead. Further, if a calculated backoff time is after the timeout of the runKey // then the time of the timeout will be returned instead. // // Returned values are a backoff struct containing a NumAttempts field with the -// number of attempts performed for this TaskRun and a NextAttempt field +// number of attempts performed for this n and a NextAttempt field // describing the time at which the next attempt should be performed. -// Additionally a boolean is returned indicating whether a backoff for the -// TaskRun is already in progress. -func (t *Handler) GetBackoff(tr *v1beta1.TaskRun) (Backoff, bool) { +// Additionally a boolean is returned indicating whether a backoff for n +// is already in progress. +func (t *Handler) GetBackoff(n types.NamespacedName, startTime metav1.Time, timeout metav1.Duration) (Backoff, bool) { t.backoffsMut.Lock() defer t.backoffsMut.Unlock() - b := t.backoffs[tr.GetRunKey()] + b := t.backoffs[n.String()] if time.Now().Before(b.NextAttempt) { return b, true } b.NumAttempts++ b.NextAttempt = time.Now().Add(backoffDuration(b.NumAttempts, rand.Intn)) - timeoutDeadline := tr.Status.StartTime.Time.Add(tr.Spec.Timeout.Duration) + timeoutDeadline := startTime.Time.Add(timeout.Duration) if timeoutDeadline.Before(b.NextAttempt) { b.NextAttempt = timeoutDeadline } - t.backoffs[tr.GetRunKey()] = b + t.backoffs[n.String()] = b return b, false } @@ -179,7 +169,7 @@ func backoffDuration(count uint, jf jitterFunc) time.Duration { // checkPipelineRunTimeouts function creates goroutines to wait for pipelinerun to // finish/timeout in a given namespace -func (t *Handler) checkPipelineRunTimeouts(namespace string, pipelineclientset clientset.Interface) { +func (t *Handler) checkPipelineRunTimeouts(ctx context.Context, namespace string, pipelineclientset clientset.Interface) { pipelineRuns, err := pipelineclientset.TektonV1beta1().PipelineRuns(namespace).List(metav1.ListOptions{}) if err != nil { t.logger.Errorf("Can't get pipelinerun list in namespace %s: %s", namespace, err) @@ -187,18 +177,19 @@ func (t *Handler) checkPipelineRunTimeouts(namespace string, pipelineclientset c } for _, pipelineRun := range pipelineRuns.Items { pipelineRun := pipelineRun + pipelineRun.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx)) if pipelineRun.IsDone() || pipelineRun.IsCancelled() { continue } if pipelineRun.HasStarted() { - go t.WaitPipelineRun(&pipelineRun, pipelineRun.Status.StartTime) + go t.Wait(pipelineRun.GetNamespacedName(), *pipelineRun.Status.StartTime, *pipelineRun.Spec.Timeout) } } } // CheckTimeouts function iterates through a given namespace or all namespaces // (if empty string) and calls corresponding taskrun/pipelinerun timeout functions -func (t *Handler) CheckTimeouts(namespace string, kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) { +func (t *Handler) CheckTimeouts(ctx context.Context, namespace string, kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) { // scoped namespace namespaceNames := []string{namespace} // all namespaces @@ -215,14 +206,14 @@ func (t *Handler) CheckTimeouts(namespace string, kubeclientset kubernetes.Inter } for _, namespace := range namespaceNames { - t.checkTaskRunTimeouts(namespace, pipelineclientset) - t.checkPipelineRunTimeouts(namespace, pipelineclientset) + t.checkTaskRunTimeouts(ctx, namespace, pipelineclientset) + t.checkPipelineRunTimeouts(ctx, namespace, pipelineclientset) } } // checkTaskRunTimeouts function creates goroutines to wait for pipelinerun to // finish/timeout in a given namespace -func (t *Handler) checkTaskRunTimeouts(namespace string, pipelineclientset clientset.Interface) { +func (t *Handler) checkTaskRunTimeouts(ctx context.Context, namespace string, pipelineclientset clientset.Interface) { taskruns, err := pipelineclientset.TektonV1beta1().TaskRuns(namespace).List(metav1.ListOptions{}) if err != nil { t.logger.Errorf("Can't get taskrun list in namespace %s: %s", namespace, err) @@ -230,83 +221,57 @@ func (t *Handler) checkTaskRunTimeouts(namespace string, pipelineclientset clien } for _, taskrun := range taskruns.Items { taskrun := taskrun + taskrun.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx)) if taskrun.IsDone() || taskrun.IsCancelled() { continue } if taskrun.HasStarted() { - go t.WaitTaskRun(&taskrun, taskrun.Status.StartTime) + go t.Wait(taskrun.GetNamespacedName(), *taskrun.Status.StartTime, *taskrun.Spec.Timeout) } } } -// WaitTaskRun function creates a blocking function for taskrun to wait for -// 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out, which is -// determined by checking if the tr's timeout has occurred since the startTime -func (t *Handler) WaitTaskRun(tr *v1beta1.TaskRun, startTime *metav1.Time) { - var timeout time.Duration - if tr.Spec.Timeout == nil { - timeout = config.DefaultTimeoutMinutes * time.Minute - } else { - timeout = tr.Spec.Timeout.Duration - } - t.waitRun(tr, timeout, startTime, t.taskRunCallbackFunc) -} - -// WaitPipelineRun function creates a blocking function for pipelinerun to wait for -// 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out which is -// determined by checking if the tr's timeout has occurred since the startTime -func (t *Handler) WaitPipelineRun(pr *v1beta1.PipelineRun, startTime *metav1.Time) { - var timeout time.Duration - if pr.Spec.Timeout == nil { - timeout = config.DefaultTimeoutMinutes * time.Minute - } else { - timeout = pr.Spec.Timeout.Duration - } - t.waitRun(pr, timeout, startTime, t.pipelineRunCallbackFunc) -} - -func (t *Handler) waitRun(runObj StatusKey, timeout time.Duration, startTime *metav1.Time, callback func(interface{})) { - if startTime == nil { - t.logger.Errorf("startTime must be specified in order for a timeout to be calculated accurately for %s", runObj.GetRunKey()) +// Wait creates a blocking function for n to wait for +// 1. Stop signal, 2. Completion 3. Or duration to elapse since startTime +// time out, which is determined by checking if the timeout has occurred since the startTime +func (t *Handler) Wait(n types.NamespacedName, startTime metav1.Time, timeout metav1.Duration) { + if t.callbackFunc == nil { + t.logger.Errorf("somehow the timeout handler was not initialized with a callback function") return } - if callback == nil { - callback = defaultFunc - } runtime := time.Since(startTime.Time) - t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", runObj.GetRunKey(), startTime.Time, timeout, runtime) - defer t.Release(runObj) - t.setTimer(runObj, timeout-runtime, callback) + t.logger.Infof("About to start timeout timer for %s. started at %s, timeout is %s, running for %s", n.String(), startTime.Time, timeout, runtime) + defer t.Release(n) + t.setTimer(n, timeout.Duration-runtime, t.callbackFunc) } -// SetTaskRunTimer creates a blocking function for taskrun to wait for +// SetTimer creates a blocking function for n to wait for // 1. Stop signal, 2. TaskRun to complete or 3. a given Duration to elapse. // // Since the timer's duration is a parameter rather than being tied to -// the lifetime of the TaskRun no resources are released after the timer -// fires. It is the caller's responsibility to Release() the TaskRun when +// the lifetime of the run object no resources are released after the timer +// fires. It is the caller's responsibility to Release() the run when // work with it has completed. -func (t *Handler) SetTaskRunTimer(tr *v1beta1.TaskRun, d time.Duration) { - callback := t.taskRunCallbackFunc - if callback == nil { - t.logger.Errorf("attempted to set a timer for %q but no task run callback has been assigned", tr.Name) +func (t *Handler) SetTimer(n types.NamespacedName, d time.Duration) { + if t.callbackFunc == nil { + t.logger.Errorf("somehow the timeout handler was not initialized with a callback function") return } - t.setTimer(tr, d, callback) + t.setTimer(n, d, t.callbackFunc) } -func (t *Handler) setTimer(runObj StatusKey, timeout time.Duration, callback func(interface{})) { - done := t.getOrCreateDoneChan(runObj) +func (t *Handler) setTimer(n types.NamespacedName, timeout time.Duration, callback func(types.NamespacedName)) { + done := t.getOrCreateDoneChan(n) started := time.Now() select { case <-t.stopCh: - t.logger.Infof("stopping timer for %q", runObj.GetRunKey()) + t.logger.Infof("stopping timer for %q", n.String()) return case <-done: - t.logger.Infof("%q finished, stopping timer", runObj.GetRunKey()) + t.logger.Infof("%q finished, stopping timer", n.String()) return case <-time.After(timeout): - t.logger.Infof("timer for %q has activated after %s", runObj.GetRunKey(), time.Since(started).String()) - callback(runObj) + t.logger.Infof("timer for %q has activated after %s", n.String(), time.Since(started).String()) + callback(n) } } diff --git a/pkg/timeout/handler_test.go b/pkg/timeout/handler_test.go index d3ae120e08a..e0260c00960 100644 --- a/pkg/timeout/handler_test.go +++ b/pkg/timeout/handler_test.go @@ -17,6 +17,7 @@ limitations under the License. package timeout import ( + "context" "fmt" "sync" "testing" @@ -31,6 +32,7 @@ import ( "go.uber.org/zap/zaptest/observer" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "knative.dev/pkg/apis" ) @@ -104,13 +106,12 @@ func TestTaskRunCheckTimeouts(t *testing.T) { th := NewHandler(stopCh, zap.New(observer).Sugar()) gotCallback := sync.Map{} - f := func(tr interface{}) { - trNew := tr.(*v1beta1.TaskRun) - gotCallback.Store(trNew.Name, struct{}{}) + f := func(n types.NamespacedName) { + gotCallback.Store(n.Name, struct{}{}) } - th.SetTaskRunCallbackFunc(f) - th.CheckTimeouts(allNs, c.Kube, c.Pipeline) + th.SetCallbackFunc(f) + th.CheckTimeouts(context.Background(), testNs, c.Kube, c.Pipeline) for _, tc := range []struct { name string @@ -194,13 +195,15 @@ func TestTaskRunSingleNamespaceCheckTimeouts(t *testing.T) { th := NewHandler(stopCh, zap.New(observer).Sugar()) gotCallback := sync.Map{} - f := func(tr interface{}) { - trNew := tr.(*v1beta1.TaskRun) - gotCallback.Store(trNew.Name, struct{}{}) + f := func(n types.NamespacedName) { + gotCallback.Store(n.Name, struct{}{}) } - th.SetTaskRunCallbackFunc(f) - th.CheckTimeouts(testNs, c.Kube, c.Pipeline) + th.SetCallbackFunc(f) + // Note that since f843899a11d5d09b29fac750f72f4a7e4882f615 CheckTimeouts is always called + // with a namespace so there is no reason to maintain all namespaces functionality; + // however in #2905 we should remove CheckTimeouts completely + th.CheckTimeouts(context.Background(), "", c.Kube, c.Pipeline) for _, tc := range []struct { name string @@ -308,13 +311,12 @@ func TestPipelinRunCheckTimeouts(t *testing.T) { th := NewHandler(stopCh, zap.New(observer).Sugar()) gotCallback := sync.Map{} - f := func(pr interface{}) { - prNew := pr.(*v1beta1.PipelineRun) - gotCallback.Store(prNew.Name, struct{}{}) + f := func(n types.NamespacedName) { + gotCallback.Store(n.Name, struct{}{}) } - th.SetPipelineRunCallbackFunc(f) - th.CheckTimeouts(allNs, c.Kube, c.Pipeline) + th.SetCallbackFunc(f) + th.CheckTimeouts(context.Background(), allNs, c.Kube, c.Pipeline) for _, tc := range []struct { name string pr *v1beta1.PipelineRun @@ -393,7 +395,7 @@ func TestWithNoFunc(t *testing.T) { t.Fatal("Expected CheckTimeouts function not to panic") } }() - testHandler.CheckTimeouts(allNs, c.Kube, c.Pipeline) + testHandler.CheckTimeouts(context.Background(), allNs, c.Kube, c.Pipeline) } @@ -402,7 +404,7 @@ func TestWithNoFunc(t *testing.T) { func TestSetTaskRunTimer(t *testing.T) { taskRun := tb.TaskRun("test-taskrun-arbitrary-timer", tb.TaskRunNamespace(testNs), tb.TaskRunSpec( tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), - tb.TaskRunTimeout(2*time.Second), + tb.TaskRunTimeout(50*time.Millisecond), ), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{ Type: apis.ConditionSucceeded, Status: corev1.ConditionUnknown}), @@ -415,11 +417,11 @@ func TestSetTaskRunTimer(t *testing.T) { timerDuration := 50 * time.Millisecond timerFailDeadline := 100 * time.Millisecond doneCh := make(chan struct{}) - callback := func(_ interface{}) { + f := func(_ types.NamespacedName) { close(doneCh) } - testHandler.SetTaskRunCallbackFunc(callback) - go testHandler.SetTaskRunTimer(taskRun, timerDuration) + testHandler.SetCallbackFunc(f) + go testHandler.SetTimer(taskRun.GetNamespacedName(), timerDuration) select { case <-doneCh: // The task run timer executed before the failure deadline diff --git a/test/timeout_test.go b/test/timeout_test.go index 266f2700931..7cbef3804ca 100644 --- a/test/timeout_test.go +++ b/test/timeout_test.go @@ -115,7 +115,7 @@ func TestPipelineRunTimeout(t *testing.T) { t.Errorf("Error waiting for PipelineRun %s to finish: %s", pipelineRun.Name, err) } - t.Logf("Waiting for TaskRuns from PipelineRun %s in namespace %s to be cancelled", pipelineRun.Name, namespace) + t.Logf("Waiting for TaskRuns from PipelineRun %s in namespace %s to time out and be cancelled", pipelineRun.Name, namespace) var wg sync.WaitGroup for _, taskrunItem := range taskrunList.Items { wg.Add(1)