diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 495f67f439d8..3bedf24cbdab 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -826,6 +826,39 @@ func (s *FunctionalSuite) TestK8SJSONPatch() { }) } +func (s *FunctionalSuite) TestWorkflowPodSpecPatch() { + s.Given(). + Workflow(`apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: basic + labels: + argo-e2e: true +spec: + entrypoint: main + templates: + - name: main + container: + image: argoproj/argosay:v2 + args: + - echo + - ":) Hello Argo!" + podSpecPatch: '{"terminationGracePeriodSeconds":5, "containers":[{"name":"main", "resources":{"limits":{"cpu": "100m"}}}]}' +`). + When(). + SubmitWorkflow(). + WaitForWorkflow(). + Then(). + ExpectWorkflowNode(wfv1.SucceededPodNode, func(t *testing.T, n *wfv1.NodeStatus, p *corev1.Pod) { + assert.Equal(t, *p.Spec.TerminationGracePeriodSeconds, int64(5)) + for _, c := range p.Spec.Containers { + if c.Name == "main" { + assert.Equal(t, c.Resources.Limits.Cpu().String(), "100m") + } + } + }) +} + func TestFunctionalSuite(t *testing.T) { suite.Run(t, new(FunctionalSuite)) } diff --git a/workflow/executor/common/common.go b/workflow/executor/common/common.go index ca307a24a895..8134b1f629c2 100644 --- a/workflow/executor/common/common.go +++ b/workflow/executor/common/common.go @@ -18,10 +18,6 @@ const ( containerShimPrefix = "://" ) -// killGracePeriod is the time in seconds after sending SIGTERM before -// forcefully killing the sidecar with SIGKILL (value matches k8s) -const KillGracePeriod = 30 - // GetContainerID returns container ID of a ContainerStatus resource func GetContainerID(container *v1.ContainerStatus) string { i := strings.Index(container.ContainerID, containerShimPrefix) @@ -94,13 +90,13 @@ func TerminatePodWithContainerID(ctx context.Context, c KubernetesClientInterfac } // KillGracefully kills a container gracefully. -func KillGracefully(ctx context.Context, c KubernetesClientInterface, containerID string) error { +func KillGracefully(ctx context.Context, c KubernetesClientInterface, containerID string, terminationGracePeriodDuration time.Duration) error { log.Infof("SIGTERM containerID %q: %s", containerID, syscall.SIGTERM.String()) err := TerminatePodWithContainerID(ctx, c, containerID, syscall.SIGTERM) if err != nil { return err } - err = WaitForTermination(ctx, c, containerID, time.Second*KillGracePeriod) + err = WaitForTermination(ctx, c, containerID, terminationGracePeriodDuration*time.Second) if err == nil { log.Infof("ContainerID %q successfully killed", containerID) return nil @@ -110,7 +106,7 @@ func KillGracefully(ctx context.Context, c KubernetesClientInterface, containerI if err != nil { return err } - err = WaitForTermination(ctx, c, containerID, time.Second*KillGracePeriod) + err = WaitForTermination(ctx, c, containerID, terminationGracePeriodDuration*time.Second) if err != nil { return err } diff --git a/workflow/executor/common/common_test.go b/workflow/executor/common/common_test.go index 1b467957481f..590f1e800562 100644 --- a/workflow/executor/common/common_test.go +++ b/workflow/executor/common/common_test.go @@ -5,6 +5,7 @@ import ( "context" "syscall" "testing" + "time" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -121,3 +122,52 @@ func TestTerminatePodWithContainerID(t *testing.T) { err = TerminatePodWithContainerID(ctx, mock, "container-id", syscall.SIGTERM) assert.NoError(t, err) } + +// TestWaitForTermination ensure we SIGTERM container with input wait time +func TestWaitForTermination(t *testing.T) { + // Successfully SIGTERM Container + mock := &MockKC{ + getContainerStatusContainerStatus: &v1.ContainerStatus{ + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }, + } + ctx := context.Background() + err := WaitForTermination(ctx, mock, "container-id", time.Duration(2)*time.Second) + assert.NoError(t, err) + + // Fail SIGTERM Container + mock = &MockKC{ + getContainerStatusContainerStatus: &v1.ContainerStatus{ + State: v1.ContainerState{ + Terminated: nil, + }, + }, + } + err = WaitForTermination(ctx, mock, "container-id", time.Duration(1)*time.Second) + assert.EqualError(t, err, "timeout after 1s") +} + +// TestKillGracefully ensure we kill container gracefully with input wait time +func TestKillGracefully(t *testing.T) { + // Graceful SIGTERM Container + mock := &MockKC{ + getContainerStatusPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + Spec: v1.PodSpec{ + RestartPolicy: "Never", + }, + }, + getContainerStatusContainerStatus: &v1.ContainerStatus{ + State: v1.ContainerState{ + Terminated: nil, + }, + }, + } + ctx := context.Background() + err := KillGracefully(ctx, mock, "container-id", 1) + assert.EqualError(t, err, "timeout after 1s") +} diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index f0c09fc0fd4b..a2f1c3ec8a95 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -21,7 +21,6 @@ import ( "github.com/argoproj/argo/v3/util" "github.com/argoproj/argo/v3/util/file" "github.com/argoproj/argo/v3/workflow/common" - execcommon "github.com/argoproj/argo/v3/workflow/executor/common" ) type DockerExecutor struct{} @@ -173,7 +172,7 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerID string) error { } // killContainers kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period -func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string) error { +func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error { killArgs := append([]string{"kill", "--signal", "TERM"}, containerIDs...) // docker kill will return with an error if a container has terminated already, which is not an error in this case. // We therefore ignore any error. docker wait that follows will re-raise any other error with the container. @@ -188,7 +187,7 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string) error return errors.InternalWrapError(err) } // waitCh needs buffer of 1 so it can always send the result of waitCmd.Wait() without blocking. - // Otherwise, if the KillGracePeriod elapses and the forced kill branch is run, there would + // Otherwise, if the terminationGracePeriodSeconds elapses and the forced kill branch is run, there would // be no receiver for waitCh and the goroutine would block forever waitCh := make(chan error, 1) go func() { @@ -198,8 +197,8 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string) error select { case err = <-waitCh: // waitCmd completed - case <-time.After(execcommon.KillGracePeriod * time.Second): - log.Infof("Timed out (%ds) for containers to terminate gracefully. Killing forcefully", execcommon.KillGracePeriod) + case <-time.After(terminationGracePeriodDuration * time.Second): + log.Infof("Timed out (%ds) for containers to terminate gracefully. Killing forcefully", terminationGracePeriodDuration) forceKillArgs := append([]string{"kill", "--signal", "KILL"}, containerIDs...) forceKillCmd := exec.Command("docker", forceKillArgs...) log.Info(forceKillCmd.Args) diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index b2ef59349a84..10ef188ed4e9 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -106,7 +106,7 @@ type ContainerRuntimeExecutor interface { Wait(ctx context.Context, containerID string) error // Kill a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period - Kill(ctx context.Context, containerIDs []string) error + Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error } // NewExecutor instantiates a new workflow executor @@ -679,6 +679,16 @@ func (we *WorkflowExecutor) GetSecrets(ctx context.Context, namespace, name, key return val, nil } +// GetTerminationGracePeriodDuration returns the terminationGracePeriodSeconds of podSpec in Time.Duration format +func (we *WorkflowExecutor) GetTerminationGracePeriodDuration(ctx context.Context) (time.Duration, error) { + pod, err := we.getPod(ctx) + if err != nil { + return time.Duration(0), err + } + terminationGracePeriodDuration := time.Duration(*pod.Spec.TerminationGracePeriodSeconds) + return terminationGracePeriodDuration, nil +} + // GetMainContainerStatus returns the container status of the main container, nil if the main container does not exist func (we *WorkflowExecutor) GetMainContainerStatus(ctx context.Context) (*apiv1.ContainerStatus, error) { pod, err := we.getPod(ctx) @@ -1180,7 +1190,8 @@ func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, annotationsUpda _ = we.AddAnnotation(ctx, common.AnnotationKeyNodeMessage, message) log.Infof("Killing main container") mainContainerID, _ := we.GetMainContainerID(ctx) - err := we.RuntimeExecutor.Kill(ctx, []string{mainContainerID}) + terminationGracePeriodDuration, _ := we.GetTerminationGracePeriodDuration(ctx) + err := we.RuntimeExecutor.Kill(ctx, []string{mainContainerID}, terminationGracePeriodDuration) if err != nil { log.Warnf("Failed to kill main container: %v", err) } @@ -1214,7 +1225,8 @@ func (we *WorkflowExecutor) KillSidecars(ctx context.Context) error { if len(sidecarIDs) == 0 { return nil } - return we.RuntimeExecutor.Kill(ctx, sidecarIDs) + terminationGracePeriodDuration, _ := we.GetTerminationGracePeriodDuration(ctx) + return we.RuntimeExecutor.Kill(ctx, sidecarIDs, terminationGracePeriodDuration) } // LoadExecutionControl reads the execution control definition from the the Kubernetes downward api annotations volume file diff --git a/workflow/executor/k8sapi/client.go b/workflow/executor/k8sapi/client.go index e10a802bb78d..0b6cb38a025e 100644 --- a/workflow/executor/k8sapi/client.go +++ b/workflow/executor/k8sapi/client.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "syscall" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -89,6 +90,6 @@ func (c *k8sAPIClient) KillContainer(pod *corev1.Pod, container *corev1.Containe return err } -func (c *k8sAPIClient) killGracefully(ctx context.Context, containerID string) error { - return execcommon.KillGracefully(ctx, c, containerID) +func (c *k8sAPIClient) killGracefully(ctx context.Context, containerID string, terminationGracePeriodDuration time.Duration) error { + return execcommon.KillGracefully(ctx, c, containerID, terminationGracePeriodDuration) } diff --git a/workflow/executor/k8sapi/k8sapi.go b/workflow/executor/k8sapi/k8sapi.go index 09759a352519..359f146a6fed 100644 --- a/workflow/executor/k8sapi/k8sapi.go +++ b/workflow/executor/k8sapi/k8sapi.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "time" log "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" @@ -66,10 +67,10 @@ func (k *K8sAPIExecutor) Wait(ctx context.Context, containerID string) error { } // Kill kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period -func (k *K8sAPIExecutor) Kill(ctx context.Context, containerIDs []string) error { +func (k *K8sAPIExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error { log.Infof("Killing containers %s", containerIDs) for _, containerID := range containerIDs { - err := k.client.killGracefully(ctx, containerID) + err := k.client.killGracefully(ctx, containerID, terminationGracePeriodDuration) if err != nil { return err } diff --git a/workflow/executor/kubelet/client.go b/workflow/executor/kubelet/client.go index 27e790ff69b0..6933c9ef6757 100644 --- a/workflow/executor/kubelet/client.go +++ b/workflow/executor/kubelet/client.go @@ -294,8 +294,8 @@ func (k *kubeletClient) KillContainer(pod *corev1.Pod, container *corev1.Contain return err } -func (k *kubeletClient) KillGracefully(ctx context.Context, containerID string) error { - return execcommon.KillGracefully(ctx, k, containerID) +func (k *kubeletClient) KillGracefully(ctx context.Context, containerID string, terminationGracePeriodDuration time.Duration) error { + return execcommon.KillGracefully(ctx, k, containerID, terminationGracePeriodDuration) } func (k *kubeletClient) CopyArchive(ctx context.Context, containerID, sourcePath, destPath string) error { diff --git a/workflow/executor/kubelet/kubelet.go b/workflow/executor/kubelet/kubelet.go index fc5075550cad..9345245513f4 100644 --- a/workflow/executor/kubelet/kubelet.go +++ b/workflow/executor/kubelet/kubelet.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "time" log "github.com/sirupsen/logrus" @@ -62,9 +63,9 @@ func (k *KubeletExecutor) Wait(ctx context.Context, containerID string) error { } // Kill kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period -func (k *KubeletExecutor) Kill(ctx context.Context, containerIDs []string) error { +func (k *KubeletExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error { for _, containerID := range containerIDs { - err := k.cli.KillGracefully(ctx, containerID) + err := k.cli.KillGracefully(ctx, containerID, terminationGracePeriodDuration) if err != nil { return err } diff --git a/workflow/executor/mocks/ContainerRuntimeExecutor.go b/workflow/executor/mocks/ContainerRuntimeExecutor.go index b2a217a8dfb3..4e80265654a9 100644 --- a/workflow/executor/mocks/ContainerRuntimeExecutor.go +++ b/workflow/executor/mocks/ContainerRuntimeExecutor.go @@ -8,6 +8,8 @@ import ( io "io" mock "github.com/stretchr/testify/mock" + + time "time" ) // ContainerRuntimeExecutor is an autogenerated mock type for the ContainerRuntimeExecutor type @@ -94,13 +96,13 @@ func (_m *ContainerRuntimeExecutor) GetOutputStream(ctx context.Context, contain return r0, r1 } -// Kill provides a mock function with given fields: ctx, containerIDs -func (_m *ContainerRuntimeExecutor) Kill(ctx context.Context, containerIDs []string) error { - ret := _m.Called(ctx, containerIDs) +// Kill provides a mock function with given fields: ctx, containerIDs, terminationGracePeriodDuration +func (_m *ContainerRuntimeExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error { + ret := _m.Called(ctx, containerIDs, terminationGracePeriodDuration) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []string) error); ok { - r0 = rf(ctx, containerIDs) + if rf, ok := ret.Get(0).(func(context.Context, []string, time.Duration) error); ok { + r0 = rf(ctx, containerIDs, terminationGracePeriodDuration) } else { r0 = ret.Error(0) } diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index 15afa5d27354..346cd6a5a418 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -243,13 +243,13 @@ func (p *PNSExecutor) GetExitCode(ctx context.Context, containerID string) (stri } // Kill a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period -func (p *PNSExecutor) Kill(ctx context.Context, containerIDs []string) error { +func (p *PNSExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error { var asyncErr error wg := sync.WaitGroup{} for _, cid := range containerIDs { wg.Add(1) go func(containerID string) { - err := p.killContainer(containerID) + err := p.killContainer(containerID, terminationGracePeriodDuration) if err != nil && asyncErr != nil { asyncErr = err } @@ -260,7 +260,7 @@ func (p *PNSExecutor) Kill(ctx context.Context, containerIDs []string) error { return asyncErr } -func (p *PNSExecutor) killContainer(containerID string) error { +func (p *PNSExecutor) killContainer(containerID string, terminationGracePeriodDuration time.Duration) error { pid, err := p.getContainerPID(containerID) if err != nil { log.Warnf("Ignoring kill container failure of %s: %v. Process assumed to have completed", containerID, err) @@ -274,8 +274,7 @@ func (p *PNSExecutor) killContainer(containerID string) error { if err != nil { log.Warnf("Failed to SIGTERM pid %d: %v", pid, err) } - - waitPIDOpts := executil.WaitPIDOpts{Timeout: execcommon.KillGracePeriod * time.Second} + waitPIDOpts := executil.WaitPIDOpts{Timeout: terminationGracePeriodDuration * time.Second} err = executil.WaitPID(pid, waitPIDOpts) if err == nil { log.Infof("PID %d completed", pid)