Skip to content

Commit

Permalink
Actually stop trying to time out finished Runs ⏰
Browse files Browse the repository at this point in the history
In 10b6427 I got really enthusiastic about making sure even our
reads were threadsafe and so I thought I would be clever and,
instead of accessing attributes of a PipelineRun or TaskRun in
a go routine, use a value that wouldn't change - specifically the address.

But the address will change between reconcile loops, because the
reconcile logic will create a new instance of the Run object every time!
🤦‍♀️

Fortunately this doesn't cause any serious problems, it just makes
things slightly less efficient: for every Run you start, a go routine
will remain open until the timeout occurs, and when it fires, it will be
reconciled an extra time, even if it has completed. (In fact keeping
this functionality completed and dropping the "done" map might be a
reasonable option!)

With this change, we now return to using the namespace + name as a key
in the map that tracks the done channels; we pass these by value so that
reads will be threadsafe.

Instead of fixing this separately for the TaskRun and PipelineRun
functions, I've collapsed these and the callback into one. Each handler
instantiates its own timeout handler so there is no reason for the
timeout handler to have special knowledge of one vs the other.

Fixes #3047

_Test_

I tried several different approaches to add a test case that would
reveal the underlying problem but I now feel like it's more hassle than
it's worth. Approaches:

1. instantiate the controller in the reconciler tests with a custom
   timeout handler that has been overridden to use a custom logger,
   so we can check for the log indicating the timeout handler completed
2. Similar to (1) but instead of checking logs, just pass in a custom
   done channel and wait for it to close

Both 1 + 2 require changing the way that NewController works, i.e. the
way we always instantiate controllers. I tried working around this by
taking the same approach as `TestHandlePodCreationError` and
instantiating my own Reconciler but it a) wasn't instantiated properly
no matter what I tried (trying to use it created panics) and b) had a
confusingly different interface, exposing ReconcileKind instead of
Reconcile

I tried some other approaches but these went nowhere either; I don't
think it's worth adding a test to cover this, but if folks feel strongly
I don't mind opening an issue at least to continue to explore it? I feel
that this bug is one that is very specific to the implementation and I'm
not sure how valuable a test that covers it would be. If we do pursue
it, we might want to do it at the level of an end to end test that
actually checks the logs from a real running controller.
  • Loading branch information
bobcatfish committed Sep 9, 2020
1 parent 3a4d59c commit 2611ebf
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 153 deletions.
11 changes: 5 additions & 6 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ limitations under the License.
package v1beta1

import (
"fmt"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
)
Expand Down Expand Up @@ -98,10 +98,9 @@ func (pr *PipelineRun) IsCancelled() bool {
return pr.Spec.Status == PipelineRunSpecStatusCancelled
}

// GetRunKey return the pipelinerun key for timeout handler map
func (pr *PipelineRun) GetRunKey() string {
// The address of the pointer is a threadsafe unique identifier for the pipelinerun
return fmt.Sprintf("%s/%p", pipeline.PipelineRunControllerName, pr)
// GetNamespacedName returns a k8s namespaced name that identifies this PipelineRun
func (pr *PipelineRun) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Namespace: pr.Namespace, Name: pr.Name}
}

// IsTimedOut returns true if a pipelinerun has exceeded its spec.Timeout based on its status.Timeout
Expand Down Expand Up @@ -231,7 +230,7 @@ const (
PipelineRunReasonCancelled PipelineRunReason = "Cancelled"
// PipelineRunReasonTimedOut is the reason set when the PipelineRun has timed out
PipelineRunReasonTimedOut PipelineRunReason = "PipelineRunTimeout"
// ReasonStopping indicates that no new Tasks will be scheduled by the controller, and the
// PipelineRunReasonStopping indicates that no new Tasks will be scheduled by the controller, and the
// pipeline will stop once all running tasks complete their work
PipelineRunReasonStopping PipelineRunReason = "PipelineRunStopping"
)
Expand Down
13 changes: 6 additions & 7 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ limitations under the License.
package v1beta1_test

import (
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
tb "github.com/tektoncd/pipeline/internal/builder/v1beta1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/test/diff"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -167,11 +165,12 @@ func TestPipelineRunHasVolumeClaimTemplate(t *testing.T) {
}
}

func TestPipelineRunKey(t *testing.T) {
pr := tb.PipelineRun("prunname")
expectedKey := fmt.Sprintf("PipelineRun/%p", pr)
if pr.GetRunKey() != expectedKey {
t.Fatalf("Expected taskrun key to be %s but got %s", expectedKey, pr.GetRunKey())
func TestGetNamespacedName(t *testing.T) {
pr := &v1beta1.PipelineRun{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "prunname"}}
n := pr.GetNamespacedName()
expected := "foo/prunname"
if n.String() != expected {
t.Fatalf("Expected name to be %s but got %s", expected, n.String())
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
)
Expand Down Expand Up @@ -411,10 +412,9 @@ func (tr *TaskRun) GetTimeout() time.Duration {
return tr.Spec.Timeout.Duration
}

// GetRunKey return the taskrun key for timeout handler map
func (tr *TaskRun) GetRunKey() string {
// The address of the pointer is a threadsafe unique identifier for the taskrun
return fmt.Sprintf("%s/%p", pipeline.TaskRunControllerName, tr)
// GetNamespacedName returns a k8s namespaced name that identifies this TaskRun
func (tr *TaskRun) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Namespace: tr.Namespace, Name: tr.Name}
}

// IsPartOfPipeline return true if TaskRun is a part of a Pipeline.
Expand Down
14 changes: 5 additions & 9 deletions pkg/apis/pipeline/v1beta1/taskrun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package v1beta1_test

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -172,14 +171,11 @@ func TestTaskRunHasVolumeClaimTemplate(t *testing.T) {
}

func TestTaskRunKey(t *testing.T) {
tr := &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "taskrunname",
},
}
expectedKey := fmt.Sprintf("TaskRun/%p", tr)
if tr.GetRunKey() != expectedKey {
t.Fatalf("Expected taskrun key to be %s but got %s", expectedKey, tr.GetRunKey())
tr := &v1beta1.TaskRun{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "trunname"}}
n := tr.GetNamespacedName()
expected := "foo/trunname"
if n.String() != expected {
t.Fatalf("Expected name to be %s but got %s", expected, n.String())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
}
})

timeoutHandler.SetPipelineRunCallbackFunc(impl.Enqueue)
timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset)
timeoutHandler.SetCallbackFunc(impl.EnqueueKey)
timeoutHandler.CheckTimeouts(ctx, namespace, kubeclientset, pipelineclientset)

logger.Info("Setting up event handlers")
pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
pr.Status.InitializeConditions()
// In case node time was not synchronized, when controller has been scheduled to other nodes.
if pr.Status.StartTime.Sub(pr.CreationTimestamp.Time) < 0 {
logger.Warnf("PipelineRun %s createTimestamp %s is after the pipelineRun started %s", pr.GetRunKey(), pr.CreationTimestamp, pr.Status.StartTime)
logger.Warnf("PipelineRun %s createTimestamp %s is after the pipelineRun started %s", pr.GetNamespacedName().String(), pr.CreationTimestamp, pr.Status.StartTime)
pr.Status.StartTime = &pr.CreationTimestamp
}
// start goroutine to track pipelinerun timeout only startTime is not set
go c.timeoutHandler.WaitPipelineRun(pr, pr.Status.StartTime)
go c.timeoutHandler.Wait(pr.GetNamespacedName(), *pr.Status.StartTime, *pr.Spec.Timeout)
// Emit events. During the first reconcile the status of the PipelineRun may change twice
// from not Started to Started and then to Running, so we need to sent the event here
// and at the end of 'Reconcile' again.
Expand All @@ -171,7 +171,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
logger.Errorf("Failed to delete StatefulSet for PipelineRun %s: %v", pr.Name, err)
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
}
c.timeoutHandler.Release(pr)
c.timeoutHandler.Release(pr.GetNamespacedName())
if err := c.updateTaskRunsStatusDirectly(pr); err != nil {
logger.Errorf("Failed to update TaskRun status for PipelineRun %s: %v", pr.Name, err)
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
}
})

timeoutHandler.SetTaskRunCallbackFunc(impl.Enqueue)
timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset)
timeoutHandler.SetCallbackFunc(impl.EnqueueKey)
timeoutHandler.CheckTimeouts(ctx, namespace, kubeclientset, pipelineclientset)

logger.Info("Setting up event handlers")
taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
10 changes: 5 additions & 5 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
tr.Status.InitializeConditions()
// In case node time was not synchronized, when controller has been scheduled to other nodes.
if tr.Status.StartTime.Sub(tr.CreationTimestamp.Time) < 0 {
logger.Warnf("TaskRun %s createTimestamp %s is after the taskRun started %s", tr.GetRunKey(), tr.CreationTimestamp, tr.Status.StartTime)
logger.Warnf("TaskRun %s createTimestamp %s is after the taskRun started %s", tr.GetNamespacedName().String(), tr.CreationTimestamp, tr.Status.StartTime)
tr.Status.StartTime = &tr.CreationTimestamp
}
// Emit events. During the first reconcile the status of the TaskRun may change twice
Expand All @@ -120,7 +120,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
// send cloud events. So we stop here an return errors encountered this far.
return merr.ErrorOrNil()
}
c.timeoutHandler.Release(tr)
c.timeoutHandler.Release(tr.GetNamespacedName())
pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{})
if err == nil {
err = podconvert.StopSidecars(c.Images.NopImage, c.KubeClientSet, *pod)
Expand Down Expand Up @@ -379,7 +379,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun,
logger.Error("Failed to create task run pod for task %q: %v", tr.Name, newErr)
return newErr
}
go c.timeoutHandler.WaitTaskRun(tr, tr.Status.StartTime)
go c.timeoutHandler.Wait(tr.GetNamespacedName(), *tr.Status.StartTime, *tr.Spec.Timeout)
}
if err := c.tracker.Track(tr.GetBuildPodRef(), tr); err != nil {
logger.Errorf("Failed to create tracker for build pod %q for taskrun %q: %v", tr.Name, tr.Name, err)
Expand Down Expand Up @@ -460,9 +460,9 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1beta1.TaskRun) (*v1beta1.T
func (c *Reconciler) handlePodCreationError(ctx context.Context, tr *v1beta1.TaskRun, err error) error {
var msg string
if isExceededResourceQuotaError(err) {
backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr)
backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr.GetNamespacedName(), *tr.Status.StartTime, *tr.Spec.Timeout)
if !currentlyBackingOff {
go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt))
go c.timeoutHandler.SetTimer(tr.GetNamespacedName(), time.Until(backoff.NextAttempt))
}
msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts)
tr.Status.SetCondition(&apis.Condition{
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2075,7 +2075,7 @@ func TestHandlePodCreationError(t *testing.T) {
}

// Prevent backoff timer from starting
c.timeoutHandler.SetTaskRunCallbackFunc(nil)
c.timeoutHandler.SetCallbackFunc(nil)

testcases := []struct {
description string
Expand Down
Loading

0 comments on commit 2611ebf

Please sign in to comment.