Skip to content

Commit

Permalink
Introducing InternalTektonResultType as a ResultType
Browse files Browse the repository at this point in the history
In light of #3087 the need for a ResultType that is not exposed
as a TaskRunResult or PipelineResourceResult arises.
In #3087, a Step can emit a result indicating a Step timeout
has occurred. This is a result that should not be exposed hence
the need for a new ResultType called InternalTektonResultType.
This commit ensures results of this type are filtered out.

Introducing an InternalTektonResultType ensures a future proof
solution to internal results that should not be exposed.
Aside from the example in #3087, a present candidate is the
result written out by a Step containing a "StartedAt" key.
Currently this result is filtered out with a specific function.
Marking it as an InternalTektonResultType now allows for
this result to automatically be filtered out.

Additionally this commit brings about refactoring (and sometimes
renaming) of functions related to converting pod statuses to
taskrun statuses from pkg/reconciler/taskrun/taskrun.go to
pkg/pod/status/status.go. This is accompanied with moving unit
test cases from taskrun_test.go to status_test.go.
  • Loading branch information
Peaorl committed Sep 1, 2020
1 parent f36d9ad commit d1efa81
Show file tree
Hide file tree
Showing 6 changed files with 464 additions and 395 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
TaskRunResultType ResultType = "TaskRunResult"
// PipelineResourceResultType default pipeline result value
PipelineResourceResultType ResultType = "PipelineResourceResult"
// InternalTektonResultType default internal tekton result value
InternalTektonResultType ResultType = "InternalTektonResult"
// UnknownResultType default unknown result type value
UnknownResultType ResultType = ""
)
Expand Down
10 changes: 6 additions & 4 deletions pkg/entrypoint/entrypointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func (e Entrypointer) Go() error {
// *but* we write postfile to make next steps bail too.
e.WritePostFile(e.PostFile, err)
output = append(output, v1beta1.PipelineResourceResult{
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
ResultType: v1beta1.InternalTektonResultType,
})

return err
Expand All @@ -114,8 +115,9 @@ func (e Entrypointer) Go() error {
e.Args = append([]string{e.Entrypoint}, e.Args...)
}
output = append(output, v1beta1.PipelineResourceResult{
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
ResultType: v1beta1.InternalTektonResultType,
})

err := e.Runner.Run(e.Args...)
Expand Down
136 changes: 101 additions & 35 deletions pkg/pod/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/hashicorp/go-multierror"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/names"
"github.com/tektoncd/pipeline/pkg/termination"
Expand Down Expand Up @@ -97,27 +98,57 @@ func SidecarsReady(podStatus corev1.PodStatus) bool {
}

// MakeTaskRunStatus returns a TaskRunStatus based on the Pod's status.
func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, taskSpec v1beta1.TaskSpec) v1beta1.TaskRunStatus {
func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, taskSpec v1beta1.TaskSpec) (v1beta1.TaskRunStatus, error) {
trs := &tr.Status
if trs.GetCondition(apis.ConditionSucceeded) == nil || trs.GetCondition(apis.ConditionSucceeded).Status == corev1.ConditionUnknown {
// If the taskRunStatus doesn't exist yet, it's because we just started running
MarkStatusRunning(trs, v1beta1.TaskRunReasonRunning.String(), "Not all Steps in the Task have finished executing")
}

// Complete if we did not find a step that is not complete, or the pod is in a definitely complete phase
complete := areStepsComplete(pod) || pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed

if complete {
updateCompletedTaskRun(trs, pod)
} else {
updateIncompleteTaskRun(trs, pod)
}

trs.PodName = pod.Name
trs.Steps = []v1beta1.StepState{}
trs.Sidecars = []v1beta1.SidecarState{}

var merr *multierror.Error

for _, s := range pod.Status.ContainerStatuses {
if IsContainerStep(s.Name) {
if s.State.Terminated != nil && len(s.State.Terminated.Message) != 0 {
message, time, err := removeStartInfoFromTerminationMessage(s)
msg := s.State.Terminated.Message

results, err := termination.ParseMessage(msg)
if err != nil {
logger.Errorf("error setting the start time of step %q in taskrun %q: %w", s.Name, tr.Name, err)
}
if time != nil {
s.State.Terminated.StartedAt = *time
s.State.Terminated.Message = message
logger.Errorf("termination message could not be parsed as JSON: %v", err)
merr = multierror.Append(merr, err)
} else {
time, err := extractStartedAtTimeFromResults(results)
if err != nil {
logger.Errorf("error setting the start time of step %q in taskrun %q: %v", s.Name, tr.Name, err)
merr = multierror.Append(merr, err)
}
taskResults, pipelineResourceResults, filteredResults := filterResultsAndResources(results)
if tr.IsSuccessful() {
trs.TaskRunResults = append(trs.TaskRunResults, taskResults...)
trs.ResourcesResult = append(trs.ResourcesResult, pipelineResourceResults...)
}
if time != nil {
s.State.Terminated.StartedAt = *time
msg, err = createMessageFromResults(filteredResults)
if err != nil {
logger.Errorf("%v", err)
err = multierror.Append(merr, err)
}
s.State.Terminated.Message = msg
}
}
}
trs.Steps = append(trs.Steps, v1beta1.StepState{
Expand All @@ -135,51 +166,86 @@ func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev
})
}
}

// Complete if we did not find a step that is not complete, or the pod is in a definitely complete phase
complete := areStepsComplete(pod) || pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed

if complete {
updateCompletedTaskRun(trs, pod)
} else {
updateIncompleteTaskRun(trs, pod)
}
trs.TaskRunResults = removeDuplicateResults(trs.TaskRunResults)

// Sort step states according to the order specified in the TaskRun spec's steps.
trs.Steps = sortTaskRunStepOrder(trs.Steps, taskSpec.Steps)

return *trs
return *trs, merr.ErrorOrNil()
}

// removeStartInfoFromTerminationMessage searches for a result called "StartedAt" in the JSON-formatted
// createMessageFromResults searches for a result called "StartedAt" in the JSON-formatted
// termination message of a step and returns the values to use for sets State.Terminated if it's
// found. The "StartedAt" result is also removed from the list of results in the container status.
func removeStartInfoFromTerminationMessage(s corev1.ContainerStatus) (string, *metav1.Time, error) {
r, err := termination.ParseMessage(s.State.Terminated.Message)
func createMessageFromResults(results []v1beta1.PipelineResourceResult) (string, error) {
if len(results) == 0 {
return "", nil
}
bytes, err := json.Marshal(results)
if err != nil {
return "", nil, fmt.Errorf("termination message could not be parsed as JSON: %w", err)
return "", fmt.Errorf("error marshalling remaining results back into termination message: %w", err)
}
return string(bytes), nil
}

func filterResultsAndResources(results []v1beta1.PipelineResourceResult) ([]v1beta1.TaskRunResult, []v1beta1.PipelineResourceResult, []v1beta1.PipelineResourceResult) {
var taskResults []v1beta1.TaskRunResult
var pipelineResourceResults []v1beta1.PipelineResourceResult
var filteredResults []v1beta1.PipelineResourceResult
for _, r := range results {
switch r.ResultType {
case v1beta1.TaskRunResultType:
taskRunResult := v1beta1.TaskRunResult{
Name: r.Key,
Value: r.Value,
}
taskResults = append(taskResults, taskRunResult)
filteredResults = append(filteredResults, r)
case v1beta1.InternalTektonResultType:
// Internal messages are ignored because they're not used as external result
continue
case v1beta1.PipelineResourceResultType:
fallthrough
default:
pipelineResourceResults = append(pipelineResourceResults, r)
filteredResults = append(filteredResults, r)
}
}

return taskResults, pipelineResourceResults, filteredResults
}

func removeDuplicateResults(taskRunResult []v1beta1.TaskRunResult) []v1beta1.TaskRunResult {
if len(taskRunResult) == 0 {
return nil
}

uniq := make([]v1beta1.TaskRunResult, 0)
latest := make(map[string]v1beta1.TaskRunResult, 0)
for _, res := range taskRunResult {
if _, seen := latest[res.Name]; !seen {
uniq = append(uniq, res)
}
latest[res.Name] = res
}
for i, res := range uniq {
uniq[i] = latest[res.Name]
}
for index, result := range r {
return uniq
}

func extractStartedAtTimeFromResults(results []v1beta1.PipelineResourceResult) (*metav1.Time, error) {
for _, result := range results {
if result.Key == "StartedAt" {
t, err := time.Parse(timeFormat, result.Value)
if err != nil {
return "", nil, fmt.Errorf("could not parse time value %q in StartedAt field: %w", result.Value, err)
return nil, fmt.Errorf("could not parse time value %q in StartedAt field: %w", result.Value, err)
}
message := ""
startedAt := metav1.NewTime(t)
// remove the entry for the starting time
r = append(r[:index], r[index+1:]...)
if len(r) == 0 {
message = ""
} else if bytes, err := json.Marshal(r); err != nil {
return "", nil, fmt.Errorf("error marshalling remaining results back into termination message: %w", err)
} else {
message = string(bytes)
}
return message, &startedAt, nil
return &startedAt, nil
}
}
return "", nil, nil
return nil, nil
}

func updateCompletedTaskRun(trs *v1beta1.TaskRunStatus, pod *corev1.Pod) {
Expand Down
Loading

0 comments on commit d1efa81

Please sign in to comment.