Skip to content

Commit

Permalink
Run finally pipeline even if task is failed at the validation
Browse files Browse the repository at this point in the history
Presently if one of the task in pipeline is consuming result from the previous task
but the previous failed to produce the result then pipeline fails without running
the finally tasks. These changes handles tasks which got failed in the validation
step.

Signed-off-by: divyansh42 <diagrawa@redhat.com>
  • Loading branch information
divyansh42 committed Oct 10, 2024
1 parent 7d286a4 commit 5265441
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 35 deletions.
31 changes: 23 additions & 8 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,20 +834,35 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1.Pipeline
recorder := controller.GetEventRecorder(ctx)

// nextRpts holds a list of pipeline tasks which should be executed next
nextRpts, err := pipelineRunFacts.DAGExecutionQueue()
// tmpNextRpts holds the nextRpts temporarily,
// tmpNextRpts is later filtered to check for the missing result reference
// if the pipelineTask is valid then it is added to the nextRpts
tmpNextRpts, err := pipelineRunFacts.DAGExecutionQueue()
if err != nil {
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return controller.NewPermanentError(err)
}

// Check for Missing Result References
err = resources.CheckMissingResultReferences(pipelineRunFacts.State, nextRpts)
if err != nil {
logger.Infof("Failed to resolve task result reference for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(v1.PipelineRunReasonInvalidTaskResultReference.String(), err.Error())
return controller.NewPermanentError(err)
var nextRpts resources.PipelineRunState
for _, nextRpt := range tmpNextRpts {
// Check for Missing Result References and
// store the faulty task in missingRefTask
missingRefTask, err := resources.CheckMissingResultReferences(pipelineRunFacts.State, nextRpt)
if err != nil {
logger.Infof("Failed to resolve task result reference for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(v1.PipelineRunReasonInvalidTaskResultReference.String(), err.Error())
// check if pipeline contains finally tasks
// return the permanent error only if there is no finally task
fTaskNames := pipelineRunFacts.GetFinalTaskNames()
pipelineRunFacts.ValidationFailedTask = append(pipelineRunFacts.ValidationFailedTask, missingRefTask)
if len(fTaskNames) == 0 {
return controller.NewPermanentError(err)
}
} else {
// if task is valid then add it to nextRpts for the further execution
nextRpts = append(nextRpts, nextRpt)
}
}

// GetFinalTasks only returns final tasks when a DAG is complete
fNextRpts := pipelineRunFacts.GetFinalTasks()
if len(fNextRpts) != 0 {
Expand Down
60 changes: 34 additions & 26 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (t *ResolvedPipelineTask) EvaluateCEL() error {

// isDone returns true only if the task is skipped, succeeded or failed
func (t ResolvedPipelineTask) isDone(facts *PipelineRunFacts) bool {
return t.Skip(facts).IsSkipped || t.isSuccessful() || t.isFailure()
return t.Skip(facts).IsSkipped || t.isSuccessful() || t.isFailure() || t.isValidationFailed(facts.ValidationFailedTask)
}

// IsRunning returns true only if the task is neither succeeded, cancelled nor failed
Expand Down Expand Up @@ -218,6 +218,16 @@ func (t ResolvedPipelineTask) isFailure() bool {
return t.haveAnyTaskRunsFailed() && isDone
}

// isValidationFailed return true if the task is failed at the validation step
func (t ResolvedPipelineTask) isValidationFailed(ftasks []*ResolvedPipelineTask) bool {
for _, ftask := range ftasks {
if ftask.ResolvedTask == t.ResolvedTask {
return true
}
}
return false
}

// isCancelledForTimeOut returns true only if the run is cancelled due to PipelineRun-controlled timeout
// If the PipelineTask has a Matrix, isCancelled returns true if any run is cancelled due to PipelineRun-controlled timeout and all other runs are done.
func (t ResolvedPipelineTask) isCancelledForTimeOut() bool {
Expand Down Expand Up @@ -825,35 +835,33 @@ func isCustomRunCancelledByPipelineRunTimeout(cr *v1beta1.CustomRun) bool {
// CheckMissingResultReferences returns an error if it is missing any result references.
// Missing result references can occur if task fails to produce a result but has
// OnError: continue (ie TestMissingResultWhenStepErrorIsIgnored)
func CheckMissingResultReferences(pipelineRunState PipelineRunState, targets PipelineRunState) error {
for _, target := range targets {
for _, resultRef := range v1.PipelineTaskResultRefs(target.PipelineTask) {
referencedPipelineTask, ok := pipelineRunState.ToMap()[resultRef.PipelineTask]
if !ok {
return fmt.Errorf("Result reference error: Could not find ref \"%s\" in internal pipelineRunState", resultRef.PipelineTask)
func CheckMissingResultReferences(pipelineRunState PipelineRunState, target *ResolvedPipelineTask) (*ResolvedPipelineTask, error) {
for _, resultRef := range v1.PipelineTaskResultRefs(target.PipelineTask) {
referencedPipelineTask, ok := pipelineRunState.ToMap()[resultRef.PipelineTask]
if !ok {
return target, fmt.Errorf("Result reference error: Could not find ref \"%s\" in internal pipelineRunState", resultRef.PipelineTask)
}
if referencedPipelineTask.IsCustomTask() {
if len(referencedPipelineTask.CustomRuns) == 0 {
return target, fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length CustomRuns", resultRef.PipelineTask)
}
if referencedPipelineTask.IsCustomTask() {
if len(referencedPipelineTask.CustomRuns) == 0 {
return fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length CustomRuns", resultRef.PipelineTask)
}
customRun := referencedPipelineTask.CustomRuns[0]
_, err := findRunResultForParam(customRun, resultRef)
if err != nil {
return err
}
} else {
if len(referencedPipelineTask.TaskRuns) == 0 {
return fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length TaskRuns", resultRef.PipelineTask)
}
taskRun := referencedPipelineTask.TaskRuns[0]
_, err := findTaskResultForParam(taskRun, resultRef)
if err != nil {
return err
}
customRun := referencedPipelineTask.CustomRuns[0]
_, err := findRunResultForParam(customRun, resultRef)
if err != nil {
return target, err
}
} else {
if len(referencedPipelineTask.TaskRuns) == 0 {
return target, fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length TaskRuns", resultRef.PipelineTask)
}
taskRun := referencedPipelineTask.TaskRuns[0]
_, err := findTaskResultForParam(taskRun, resultRef)
if err != nil {
return target, err
}
}
}
return nil
return target, nil
}

// createResultsCacheMatrixedTaskRuns creates a cache of results that have been fanned out from a
Expand Down
8 changes: 8 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ type PipelineRunFacts struct {
// The skip data is sensitive to changes in the state. The ResetSkippedCache method
// can be used to clean the cache and force re-computation when needed.
SkipCache map[string]TaskSkipStatus

// ValidationFailedTask are the tasks for which taskrun is not created as they
// never got added to the execution i.e. they failed in the validation step. One of
// the case of failing at the validation is during CheckMissingResultReferences method
// Tasks in ValidationFailedTask is added in method runNextSchedulableTask
ValidationFailedTask []*ResolvedPipelineTask
}

// PipelineRunTimeoutsState records information about start times and timeouts for the PipelineRun, so that the PipelineRunFacts
Expand Down Expand Up @@ -732,6 +738,8 @@ func (facts *PipelineRunFacts) getPipelineTasksCount() pipelineRunStatusCount {
} else {
s.Failed++
}
case t.isValidationFailed(facts.ValidationFailedTask):
s.Failed++
// increment skipped and skipped due to timeout counters since the task was skipped due to the pipeline, tasks, or finally timeout being reached before the task was launched
case t.Skip(facts).SkippingReason == v1.PipelineTimedOutSkip ||
t.Skip(facts).SkippingReason == v1.TasksTimedOutSkip ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,13 @@ func TestCheckMissingResultReferences(t *testing.T) {
wantErr: "Result reference error: Internal result ref \"lTask\" has zero-length TaskRuns",
}} {
t.Run(tt.name, func(t *testing.T) {
err := CheckMissingResultReferences(tt.pipelineRunState, tt.targets)
var err error
for _, target := range tt.targets {
_, tmpErr := CheckMissingResultReferences(tt.pipelineRunState, target)
if tmpErr != nil {
err = tmpErr
}
}
if (err != nil) && err.Error() != tt.wantErr {
t.Errorf("CheckMissingResultReferences() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
122 changes: 122 additions & 0 deletions test/pipelinefinally_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,98 @@ spec:
}
}

func TestPipelineLevelFinally_OneDAGNotProducingResult_SecondDAGUsingResult_Failure(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, namespace := setup(ctx, t)
knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf)
defer tearDown(ctx, t, c, namespace)

successTask := getSuccessTask(t, namespace)
successTask.Spec.Results = append(successTask.Spec.Results, v1.TaskResult{
Name: "result",
})
if _, err := c.V1TaskClient.Create(ctx, successTask, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create final Task: %s", err)
}

taskClaimingResultProductionButNotProducing := getSuccessTaskClaimProducingResultButNotProducing(t, namespace)
if _, err := c.V1TaskClient.Create(ctx, taskClaimingResultProductionButNotProducing, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task claiming result production but not producing task results: %s", err)
}

taskConsumingResultInParam := getTaskConsumingResults(t, namespace, "dagtask1-result")
if _, err := c.V1TaskClient.Create(ctx, taskConsumingResultInParam, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task consuming task results in param: %s", err)
}

pipeline := parse.MustParseV1Pipeline(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
finally:
- name: finaltask1
taskRef:
name: %s
tasks:
- name: dagtask1
taskRef:
name: %s
- name: dagtaskconsumingdagtask1
params:
- name: dagtask1-result
value: $(tasks.dagtask1.results.result)
taskRef:
name: %s
`, helpers.ObjectNameForTest(t), namespace, successTask.Name, taskClaimingResultProductionButNotProducing.Name, taskConsumingResultInParam.Name))
if _, err := c.V1PipelineClient.Create(ctx, pipeline, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline: %s", err)
}

pipelineRun := getPipelineRun(t, namespace, pipeline.Name)
if _, err := c.V1PipelineRunClient.Create(ctx, pipelineRun, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline Run `%s`: %s", pipelineRun.Name, err)
}

if err := WaitForPipelineRunState(ctx, c, pipelineRun.Name, timeout, PipelineRunFailed(pipelineRun.Name), "PipelineRunFailed", v1Version); err != nil {
t.Fatalf("Waiting for PipelineRun %s to fail: %v", pipelineRun.Name, err)
}

taskrunList, err := c.V1TaskRunClient.List(ctx, metav1.ListOptions{LabelSelector: "tekton.dev/pipelineRun=" + pipelineRun.Name})
if err != nil {
t.Fatalf("Error listing TaskRuns for PipelineRun %s: %s", pipelineRun.Name, err)
}

// expecting taskRuns for finaltask1 and dagtask
expectedTaskRunsCount := 2
if len(taskrunList.Items) != expectedTaskRunsCount {
var s []string
for _, n := range taskrunList.Items {
s = append(s, n.Labels["tekton.dev/pipelineTask"])
}
t.Fatalf("Error retrieving TaskRuns for PipelineRun %s. Expected %d taskRuns and found %d taskRuns for: %s",
pipelineRun.Name, expectedTaskRunsCount, len(taskrunList.Items), strings.Join(s, ", "))
}

// verify dag task failed, parallel dag task succeeded, and final task succeeded
for _, taskrunItem := range taskrunList.Items {
switch n := taskrunItem.Labels["tekton.dev/pipelineTask"]; {
case n == "dagtask":
if err := WaitForTaskRunState(ctx, c, taskrunItem.Name, TaskRunSucceed(taskrunItem.Name), "TaskRunSuccess", v1Version); err != nil {
t.Errorf("Error waiting for TaskRun to succeed: %v", err)
}
case n == "finaltask1":
if err := WaitForTaskRunState(ctx, c, taskrunItem.Name, TaskRunSucceed(taskrunItem.Name), "TaskRunSuccess", v1Version); err != nil {
t.Errorf("Error waiting for TaskRun to succeed: %v", err)
}
default:
t.Fatalf("Found unexpected taskRun %s", n)
}
}
}

func getSuccessTask(t *testing.T, namespace string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
Expand Down Expand Up @@ -760,6 +852,36 @@ spec:
`, helpers.ObjectNameForTest(t), namespace))
}

func getSuccessTaskClaimProducingResultButNotProducing(t *testing.T, namespace string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
steps:
- image: mirror.gcr.io/alpine
script: echo -n "Hello"
results:
- name: result
`, helpers.ObjectNameForTest(t), namespace))
}

func getTaskConsumingResults(t *testing.T, namespace string, paramName string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
steps:
- image: mirror.gcr.io/alpine
script: 'echo "Content of param: $(params.%s)" '
params:
- name: %s
`, helpers.ObjectNameForTest(t), namespace, paramName, paramName))
}

func getDelaySuccessTaskProducingResults(t *testing.T, namespace string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
Expand Down

0 comments on commit 5265441

Please sign in to comment.