Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing InternalTektonResultType as a ResultType #3138

Merged
merged 1 commit into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/git-init/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func main() {
{
Key: "commit",
Value: commit,
ResourceRef: v1beta1.PipelineResourceRef{
ResourceRef: &v1beta1.PipelineResourceRef{
Name: resourceName,
},
ResourceName: resourceName,
},
{
Key: "url",
Value: fetchSpec.URL,
ResourceRef: v1beta1.PipelineResourceRef{
ResourceRef: &v1beta1.PipelineResourceRef{
Name: resourceName,
},
ResourceName: resourceName,
Expand Down
4 changes: 2 additions & 2 deletions cmd/imagedigestexporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ func main() {
Key: "digest",
Value: digest.String(),
ResourceName: imageResource.Name,
ResourceRef: v1beta1.PipelineResourceRef{
ResourceRef: &v1beta1.PipelineResourceRef{
Name: imageResource.Name,
},
})
output = append(output, v1beta1.PipelineResourceResult{
Key: "url",
Value: imageResource.URL,
ResourceName: imageResource.Name,
ResourceRef: v1beta1.PipelineResourceRef{
ResourceRef: &v1beta1.PipelineResourceRef{
Name: imageResource.Name,
},
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/pipeline/v1beta1/resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ type PipelineResourceResult struct {
Key string `json:"key"`
Value string `json:"value"`
ResourceName string `json:"resourceName,omitempty"`
// This field should be deprecated and removed in the next API version.
// The field ResourceRef should be deprecated and removed in the next API version.
// See https://github.com/tektoncd/pipeline/issues/2694 for more information.
ResourceRef PipelineResourceRef `json:"resourceRef,omitempty"`
ResultType ResultType `json:"type,omitempty"`
ResourceRef *PipelineResourceRef `json:"resourceRef,omitempty"`
ResultType ResultType `json:"type,omitempty"`
}

// ResultType used to find out whether a PipelineResourceResult is from a task result or not
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
TaskRunResultType ResultType = "TaskRunResult"
// PipelineResourceResultType default pipeline result value
PipelineResourceResultType ResultType = "PipelineResourceResult"
// InternalTektonResultType default internal tekton result value
InternalTektonResultType ResultType = "InternalTektonResult"
// UnknownResultType default unknown result type value
UnknownResultType ResultType = ""
)
Expand Down
10 changes: 8 additions & 2 deletions pkg/apis/pipeline/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/entrypoint/entrypointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func (e Entrypointer) Go() error {
// *but* we write postfile to make next steps bail too.
e.WritePostFile(e.PostFile, err)
output = append(output, v1beta1.PipelineResourceResult{
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
ResultType: v1beta1.InternalTektonResultType,
})

return err
Expand All @@ -114,8 +115,9 @@ func (e Entrypointer) Go() error {
e.Args = append([]string{e.Entrypoint}, e.Args...)
}
output = append(output, v1beta1.PipelineResourceResult{
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
ResultType: v1beta1.InternalTektonResultType,
})

err := e.Runner.Run(e.Args...)
Expand Down
185 changes: 137 additions & 48 deletions pkg/pod/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/hashicorp/go-multierror"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/names"
"github.com/tektoncd/pipeline/pkg/termination"
Expand Down Expand Up @@ -97,89 +98,177 @@ func SidecarsReady(podStatus corev1.PodStatus) bool {
}

// MakeTaskRunStatus returns a TaskRunStatus based on the Pod's status.
func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, taskSpec v1beta1.TaskSpec) v1beta1.TaskRunStatus {
func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, taskSpec v1beta1.TaskSpec) (v1beta1.TaskRunStatus, error) {
trs := &tr.Status
if trs.GetCondition(apis.ConditionSucceeded) == nil || trs.GetCondition(apis.ConditionSucceeded).Status == corev1.ConditionUnknown {
// If the taskRunStatus doesn't exist yet, it's because we just started running
MarkStatusRunning(trs, v1beta1.TaskRunReasonRunning.String(), "Not all Steps in the Task have finished executing")
}

complete := areStepsComplete(pod) || pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed

if complete {
updateCompletedTaskRun(trs, pod)
} else {
updateIncompleteTaskRun(trs, pod)
}

trs.PodName = pod.Name
trs.Steps = []v1beta1.StepState{}
trs.Sidecars = []v1beta1.SidecarState{}

var stepStatuses []corev1.ContainerStatus
var sidecarStatuses []corev1.ContainerStatus
for _, s := range pod.Status.ContainerStatuses {
if IsContainerStep(s.Name) {
Peaorl marked this conversation as resolved.
Show resolved Hide resolved
if s.State.Terminated != nil && len(s.State.Terminated.Message) != 0 {
message, time, err := removeStartInfoFromTerminationMessage(s)
stepStatuses = append(stepStatuses, s)
} else if isContainerSidecar(s.Name) {
sidecarStatuses = append(sidecarStatuses, s)
}
}

var merr *multierror.Error
if err := setTaskRunStatusBasedOnStepStatus(logger, stepStatuses, &tr); err != nil {
merr = multierror.Append(merr, err)
}

setTaskRunStatusBasedOnSidecarStatus(sidecarStatuses, trs)

trs.TaskRunResults = removeDuplicateResults(trs.TaskRunResults)

// Sort step states according to the order specified in the TaskRun spec's steps.
trs.Steps = sortTaskRunStepOrder(trs.Steps, taskSpec.Steps)

return *trs, merr.ErrorOrNil()
}

func setTaskRunStatusBasedOnStepStatus(logger *zap.SugaredLogger, stepStatuses []corev1.ContainerStatus, tr *v1beta1.TaskRun) *multierror.Error {
trs := &tr.Status
var merr *multierror.Error

for _, s := range stepStatuses {
if s.State.Terminated != nil && len(s.State.Terminated.Message) != 0 {
msg := s.State.Terminated.Message

results, err := termination.ParseMessage(logger, msg)
if err != nil {
logger.Errorf("termination message could not be parsed as JSON: %v", err)
merr = multierror.Append(merr, err)
Peaorl marked this conversation as resolved.
Show resolved Hide resolved
} else {
time, err := extractStartedAtTimeFromResults(results)
if err != nil {
logger.Errorf("error setting the start time of step %q in taskrun %q: %w", s.Name, tr.Name, err)
logger.Errorf("error setting the start time of step %q in taskrun %q: %v", s.Name, tr.Name, err)
merr = multierror.Append(merr, err)
}
taskResults, pipelineResourceResults, filteredResults := filterResultsAndResources(results)
if tr.IsSuccessful() {
trs.TaskRunResults = append(trs.TaskRunResults, taskResults...)
trs.ResourcesResult = append(trs.ResourcesResult, pipelineResourceResults...)
}
msg, err = createMessageFromResults(filteredResults)
if err != nil {
logger.Errorf("%v", err)
err = multierror.Append(merr, err)
} else {
s.State.Terminated.Message = msg
}
if time != nil {
s.State.Terminated.StartedAt = *time
s.State.Terminated.Message = message
}
}
trs.Steps = append(trs.Steps, v1beta1.StepState{
ContainerState: *s.State.DeepCopy(),
Name: trimStepPrefix(s.Name),
ContainerName: s.Name,
ImageID: s.ImageID,
})
} else if isContainerSidecar(s.Name) {
trs.Sidecars = append(trs.Sidecars, v1beta1.SidecarState{
ContainerState: *s.State.DeepCopy(),
Name: TrimSidecarPrefix(s.Name),
ContainerName: s.Name,
ImageID: s.ImageID,
})
}
trs.Steps = append(trs.Steps, v1beta1.StepState{
ContainerState: *s.State.DeepCopy(),
Name: trimStepPrefix(s.Name),
ContainerName: s.Name,
ImageID: s.ImageID,
})
}

// Complete if we did not find a step that is not complete, or the pod is in a definitely complete phase
complete := areStepsComplete(pod) || pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed
return merr

if complete {
updateCompletedTaskRun(trs, pod)
} else {
updateIncompleteTaskRun(trs, pod)
}

func setTaskRunStatusBasedOnSidecarStatus(sidecarStatuses []corev1.ContainerStatus, trs *v1beta1.TaskRunStatus) {
for _, s := range sidecarStatuses {
trs.Sidecars = append(trs.Sidecars, v1beta1.SidecarState{
ContainerState: *s.State.DeepCopy(),
Name: TrimSidecarPrefix(s.Name),
ContainerName: s.Name,
ImageID: s.ImageID,
})
}
}

// Sort step states according to the order specified in the TaskRun spec's steps.
trs.Steps = sortTaskRunStepOrder(trs.Steps, taskSpec.Steps)
func createMessageFromResults(results []v1beta1.PipelineResourceResult) (string, error) {
if len(results) == 0 {
return "", nil
}
bytes, err := json.Marshal(results)
if err != nil {
return "", fmt.Errorf("error marshalling remaining results back into termination message: %w", err)
}
return string(bytes), nil
}

return *trs
func filterResultsAndResources(results []v1beta1.PipelineResourceResult) ([]v1beta1.TaskRunResult, []v1beta1.PipelineResourceResult, []v1beta1.PipelineResourceResult) {
var taskResults []v1beta1.TaskRunResult
var pipelineResourceResults []v1beta1.PipelineResourceResult
var filteredResults []v1beta1.PipelineResourceResult
for _, r := range results {
switch r.ResultType {
case v1beta1.TaskRunResultType:
taskRunResult := v1beta1.TaskRunResult{
Name: r.Key,
Value: r.Value,
}
taskResults = append(taskResults, taskRunResult)
filteredResults = append(filteredResults, r)
case v1beta1.InternalTektonResultType:
// Internal messages are ignored because they're not used as external result
continue
case v1beta1.PipelineResourceResultType:
fallthrough
default:
pipelineResourceResults = append(pipelineResourceResults, r)
filteredResults = append(filteredResults, r)
}
}

return taskResults, pipelineResourceResults, filteredResults
}

// removeStartInfoFromTerminationMessage searches for a result called "StartedAt" in the JSON-formatted
// termination message of a step and returns the values to use for sets State.Terminated if it's
// found. The "StartedAt" result is also removed from the list of results in the container status.
func removeStartInfoFromTerminationMessage(s corev1.ContainerStatus) (string, *metav1.Time, error) {
r, err := termination.ParseMessage(s.State.Terminated.Message)
if err != nil {
return "", nil, fmt.Errorf("termination message could not be parsed as JSON: %w", err)
func removeDuplicateResults(taskRunResult []v1beta1.TaskRunResult) []v1beta1.TaskRunResult {
if len(taskRunResult) == 0 {
return nil
}

uniq := make([]v1beta1.TaskRunResult, 0)
latest := make(map[string]v1beta1.TaskRunResult, 0)
for _, res := range taskRunResult {
if _, seen := latest[res.Name]; !seen {
uniq = append(uniq, res)
}
latest[res.Name] = res
}
for index, result := range r {
for i, res := range uniq {
uniq[i] = latest[res.Name]
}
return uniq
}

func extractStartedAtTimeFromResults(results []v1beta1.PipelineResourceResult) (*metav1.Time, error) {
for _, result := range results {
if result.Key == "StartedAt" {
t, err := time.Parse(timeFormat, result.Value)
if err != nil {
return "", nil, fmt.Errorf("could not parse time value %q in StartedAt field: %w", result.Value, err)
return nil, fmt.Errorf("could not parse time value %q in StartedAt field: %w", result.Value, err)
}
message := ""
startedAt := metav1.NewTime(t)
// remove the entry for the starting time
r = append(r[:index], r[index+1:]...)
if len(r) == 0 {
message = ""
} else if bytes, err := json.Marshal(r); err != nil {
return "", nil, fmt.Errorf("error marshalling remaining results back into termination message: %w", err)
} else {
message = string(bytes)
}
return message, &startedAt, nil
return &startedAt, nil
}
}
return "", nil, nil
return nil, nil
}

func updateCompletedTaskRun(trs *v1beta1.TaskRunStatus, pod *corev1.Pod) {
Expand Down
Loading