Skip to content

Commit

Permalink
fix(controller): Only clean-up pod when both main and wait containers…
Browse files Browse the repository at this point in the history
… have terminated. Fixes #5981 (#6033)

Signed-off-by: terrytangyuan <terrytangyuan@gmail.com>
  • Loading branch information
terrytangyuan committed Jun 1, 2021
1 parent 232d97e commit eaeaec7
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 14 deletions.
59 changes: 59 additions & 0 deletions test/e2e/resource_template_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// +build executor

package e2e

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
)

type ResourceTemplateSuite struct {
fixtures.E2ESuite
}

func (s *ResourceTemplateSuite) TestResourceTemplateWithWorkflow() {
s.Given().
Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: k8s-resource-tmpl-with-wf-
spec:
entrypoint: main
templates:
- name: main
resource:
action: create
successCondition: status.phase == Succeeded
failureCondition: status.phase == Failed
manifest: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: k8s-wf-resource-
spec:
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v2
command: ["/argosay"]
`).
When().
SubmitWorkflow().
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
})
}

func TestResourceTemplateSuite(t *testing.T) {
suite.Run(t, new(ResourceTemplateSuite))
}
26 changes: 12 additions & 14 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu
newDaemonStatus = pointer.BoolPtr(true)
woc.log.Infof("Processing ready daemon pod: %v", pod.ObjectMeta.SelfLink)
}
woc.cleanUpPod(pod)
woc.cleanUpPod(pod, tmpl)
default:
newPhase = wfv1.NodeError
message = fmt.Sprintf("Unexpected pod phase for %s: %s", pod.ObjectMeta.Name, pod.Status.Phase)
Expand Down Expand Up @@ -1209,24 +1209,22 @@ func getExitCode(pod *apiv1.Pod) *int32 {
return nil
}

func (woc *wfOperationCtx) cleanUpPod(pod *apiv1.Pod) {
func podHasContainerNeedingTermination(pod *apiv1.Pod, tmpl wfv1.Template) bool {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName && c.State.Terminated == nil {
return // we must not do anything if the wait or main containers are still running
}
}
// the wait container has terminated, so all other containers should be killed
for _, c := range pod.Status.ContainerStatuses {
if c.State.Terminated != nil {
continue
// Only clean up pod when both the wait and the main containers are terminated
if c.Name == common.WaitContainerName || tmpl.IsMainContainerName(c.Name) {
if c.State.Terminated == nil {
return false
}
}
woc.queuePodForCleanup(pod.Name, terminateContainers)
return
}
return true
}

func (woc *wfOperationCtx) queuePodForCleanup(podName string, podCleanupAction podCleanupAction) {
woc.controller.queuePodForCleanup(woc.wf.Namespace, podName, podCleanupAction)
func (woc *wfOperationCtx) cleanUpPod(pod *apiv1.Pod, tmpl wfv1.Template) {
if podHasContainerNeedingTermination(pod, tmpl) {
woc.controller.queuePodForCleanup(woc.wf.Namespace, pod.Name, terminateContainers)
}
}

// getLatestFinishedAt returns the latest finishAt timestamp from all the
Expand Down
65 changes: 65 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5880,6 +5880,71 @@ func TestParamAggregation(t *testing.T) {
}
}

func TestPodHasContainerNeedingTermination(t *testing.T) {
pod := apiv1.Pod{
Status: apiv1.PodStatus{
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: common.WaitContainerName,
State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 1}},
},
{
Name: common.MainContainerName,
State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 1}},
},
}}}
tmpl := wfv1.Template{}
assert.True(t, podHasContainerNeedingTermination(&pod, tmpl))

pod = apiv1.Pod{
Status: apiv1.PodStatus{
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: common.WaitContainerName,
State: apiv1.ContainerState{Running: &apiv1.ContainerStateRunning{}},
},
{
Name: common.MainContainerName,
State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 1}},
},
}}}
assert.False(t, podHasContainerNeedingTermination(&pod, tmpl))

pod = apiv1.Pod{
Status: apiv1.PodStatus{
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: common.WaitContainerName,
State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 1}},
},
{
Name: common.MainContainerName,
State: apiv1.ContainerState{Running: &apiv1.ContainerStateRunning{}},
},
}}}
assert.False(t, podHasContainerNeedingTermination(&pod, tmpl))

pod = apiv1.Pod{
Status: apiv1.PodStatus{
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: common.MainContainerName,
State: apiv1.ContainerState{Running: &apiv1.ContainerStateRunning{}},
},
}}}
assert.False(t, podHasContainerNeedingTermination(&pod, tmpl))

pod = apiv1.Pod{
Status: apiv1.PodStatus{
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: common.MainContainerName,
State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 1}},
},
}}}
assert.True(t, podHasContainerNeedingTermination(&pod, tmpl))
}

func TestRetryOnDiffHost(t *testing.T) {
cancel, controller := newController()
defer cancel()
Expand Down

0 comments on commit eaeaec7

Please sign in to comment.