Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): configurable terminationGracePeriodSeconds #4940

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,39 @@ func (s *FunctionalSuite) TestK8SJSONPatch() {
})
}

func (s *FunctionalSuite) TestWorkflowPodSpecPatch() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: basic
labels:
argo-e2e: true
spec:
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v2
args:
- echo
- ":) Hello Argo!"
podSpecPatch: '{"terminationGracePeriodSeconds":5, "containers":[{"name":"main", "resources":{"limits":{"cpu": "100m"}}}]}'
`).
When().
SubmitWorkflow().
WaitForWorkflow().
Then().
ExpectWorkflowNode(wfv1.SucceededPodNode, func(t *testing.T, n *wfv1.NodeStatus, p *corev1.Pod) {
assert.Equal(t, *p.Spec.TerminationGracePeriodSeconds, int64(5))
for _, c := range p.Spec.Containers {
if c.Name == "main" {
assert.Equal(t, c.Resources.Limits.Cpu().String(), "100m")
}
}
})
}

func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}
10 changes: 3 additions & 7 deletions workflow/executor/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ const (
containerShimPrefix = "://"
)

// killGracePeriod is the time in seconds after sending SIGTERM before
// forcefully killing the sidecar with SIGKILL (value matches k8s)
const KillGracePeriod = 30

// GetContainerID returns container ID of a ContainerStatus resource
func GetContainerID(container *v1.ContainerStatus) string {
i := strings.Index(container.ContainerID, containerShimPrefix)
Expand Down Expand Up @@ -94,13 +90,13 @@ func TerminatePodWithContainerID(ctx context.Context, c KubernetesClientInterfac
}

// KillGracefully kills a container gracefully.
func KillGracefully(ctx context.Context, c KubernetesClientInterface, containerID string) error {
func KillGracefully(ctx context.Context, c KubernetesClientInterface, containerID string, terminationGracePeriodDuration time.Duration) error {
log.Infof("SIGTERM containerID %q: %s", containerID, syscall.SIGTERM.String())
err := TerminatePodWithContainerID(ctx, c, containerID, syscall.SIGTERM)
if err != nil {
return err
}
err = WaitForTermination(ctx, c, containerID, time.Second*KillGracePeriod)
err = WaitForTermination(ctx, c, containerID, terminationGracePeriodDuration*time.Second)
if err == nil {
log.Infof("ContainerID %q successfully killed", containerID)
return nil
Expand All @@ -110,7 +106,7 @@ func KillGracefully(ctx context.Context, c KubernetesClientInterface, containerI
if err != nil {
return err
}
err = WaitForTermination(ctx, c, containerID, time.Second*KillGracePeriod)
err = WaitForTermination(ctx, c, containerID, terminationGracePeriodDuration*time.Second)
if err != nil {
return err
}
Expand Down
50 changes: 50 additions & 0 deletions workflow/executor/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -121,3 +122,52 @@ func TestTerminatePodWithContainerID(t *testing.T) {
err = TerminatePodWithContainerID(ctx, mock, "container-id", syscall.SIGTERM)
assert.NoError(t, err)
}

// TestWaitForTermination ensure we SIGTERM container with input wait time
func TestWaitForTermination(t *testing.T) {
// Successfully SIGTERM Container
mock := &MockKC{
getContainerStatusContainerStatus: &v1.ContainerStatus{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{},
},
},
}
ctx := context.Background()
err := WaitForTermination(ctx, mock, "container-id", time.Duration(2)*time.Second)
assert.NoError(t, err)

// Fail SIGTERM Container
mock = &MockKC{
getContainerStatusContainerStatus: &v1.ContainerStatus{
State: v1.ContainerState{
Terminated: nil,
},
},
}
err = WaitForTermination(ctx, mock, "container-id", time.Duration(1)*time.Second)
assert.EqualError(t, err, "timeout after 1s")
}

// TestKillGracefully ensure we kill container gracefully with input wait time
func TestKillGracefully(t *testing.T) {
// Graceful SIGTERM Container
mock := &MockKC{
getContainerStatusPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Spec: v1.PodSpec{
RestartPolicy: "Never",
},
},
getContainerStatusContainerStatus: &v1.ContainerStatus{
State: v1.ContainerState{
Terminated: nil,
},
},
}
ctx := context.Background()
err := KillGracefully(ctx, mock, "container-id", 1)
assert.EqualError(t, err, "timeout after 1s")
}
9 changes: 4 additions & 5 deletions workflow/executor/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/argoproj/argo/v3/util"
"github.com/argoproj/argo/v3/util/file"
"github.com/argoproj/argo/v3/workflow/common"
execcommon "github.com/argoproj/argo/v3/workflow/executor/common"
)

type DockerExecutor struct{}
Expand Down Expand Up @@ -173,7 +172,7 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerID string) error {
}

// killContainers kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string) error {
func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error {
killArgs := append([]string{"kill", "--signal", "TERM"}, containerIDs...)
// docker kill will return with an error if a container has terminated already, which is not an error in this case.
// We therefore ignore any error. docker wait that follows will re-raise any other error with the container.
Expand All @@ -188,7 +187,7 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string) error
return errors.InternalWrapError(err)
}
// waitCh needs buffer of 1 so it can always send the result of waitCmd.Wait() without blocking.
// Otherwise, if the KillGracePeriod elapses and the forced kill branch is run, there would
// Otherwise, if the terminationGracePeriodSeconds elapses and the forced kill branch is run, there would
// be no receiver for waitCh and the goroutine would block forever
waitCh := make(chan error, 1)
go func() {
Expand All @@ -198,8 +197,8 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string) error
select {
case err = <-waitCh:
// waitCmd completed
case <-time.After(execcommon.KillGracePeriod * time.Second):
log.Infof("Timed out (%ds) for containers to terminate gracefully. Killing forcefully", execcommon.KillGracePeriod)
case <-time.After(terminationGracePeriodDuration * time.Second):
log.Infof("Timed out (%ds) for containers to terminate gracefully. Killing forcefully", terminationGracePeriodDuration)
forceKillArgs := append([]string{"kill", "--signal", "KILL"}, containerIDs...)
forceKillCmd := exec.Command("docker", forceKillArgs...)
log.Info(forceKillCmd.Args)
Expand Down
18 changes: 15 additions & 3 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type ContainerRuntimeExecutor interface {
Wait(ctx context.Context, containerID string) error

// Kill a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
Kill(ctx context.Context, containerIDs []string) error
Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error
}

// NewExecutor instantiates a new workflow executor
Expand Down Expand Up @@ -679,6 +679,16 @@ func (we *WorkflowExecutor) GetSecrets(ctx context.Context, namespace, name, key
return val, nil
}

// GetTerminationGracePeriodDuration returns the terminationGracePeriodSeconds of podSpec in Time.Duration format
func (we *WorkflowExecutor) GetTerminationGracePeriodDuration(ctx context.Context) (time.Duration, error) {
pod, err := we.getPod(ctx)
if err != nil {
return time.Duration(0), err
}
terminationGracePeriodDuration := time.Duration(*pod.Spec.TerminationGracePeriodSeconds)
return terminationGracePeriodDuration, nil
}

// GetMainContainerStatus returns the container status of the main container, nil if the main container does not exist
func (we *WorkflowExecutor) GetMainContainerStatus(ctx context.Context) (*apiv1.ContainerStatus, error) {
pod, err := we.getPod(ctx)
Expand Down Expand Up @@ -1180,7 +1190,8 @@ func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, annotationsUpda
_ = we.AddAnnotation(ctx, common.AnnotationKeyNodeMessage, message)
log.Infof("Killing main container")
mainContainerID, _ := we.GetMainContainerID(ctx)
err := we.RuntimeExecutor.Kill(ctx, []string{mainContainerID})
terminationGracePeriodDuration, _ := we.GetTerminationGracePeriodDuration(ctx)
err := we.RuntimeExecutor.Kill(ctx, []string{mainContainerID}, terminationGracePeriodDuration)
if err != nil {
log.Warnf("Failed to kill main container: %v", err)
}
Expand Down Expand Up @@ -1214,7 +1225,8 @@ func (we *WorkflowExecutor) KillSidecars(ctx context.Context) error {
if len(sidecarIDs) == 0 {
return nil
}
return we.RuntimeExecutor.Kill(ctx, sidecarIDs)
terminationGracePeriodDuration, _ := we.GetTerminationGracePeriodDuration(ctx)
return we.RuntimeExecutor.Kill(ctx, sidecarIDs, terminationGracePeriodDuration)
}

// LoadExecutionControl reads the execution control definition from the the Kubernetes downward api annotations volume file
Expand Down
5 changes: 3 additions & 2 deletions workflow/executor/k8sapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"syscall"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -89,6 +90,6 @@ func (c *k8sAPIClient) KillContainer(pod *corev1.Pod, container *corev1.Containe
return err
}

func (c *k8sAPIClient) killGracefully(ctx context.Context, containerID string) error {
return execcommon.KillGracefully(ctx, c, containerID)
func (c *k8sAPIClient) killGracefully(ctx context.Context, containerID string, terminationGracePeriodDuration time.Duration) error {
return execcommon.KillGracefully(ctx, c, containerID, terminationGracePeriodDuration)
}
5 changes: 3 additions & 2 deletions workflow/executor/k8sapi/k8sapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -66,10 +67,10 @@ func (k *K8sAPIExecutor) Wait(ctx context.Context, containerID string) error {
}

// Kill kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
func (k *K8sAPIExecutor) Kill(ctx context.Context, containerIDs []string) error {
func (k *K8sAPIExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error {
log.Infof("Killing containers %s", containerIDs)
for _, containerID := range containerIDs {
err := k.client.killGracefully(ctx, containerID)
err := k.client.killGracefully(ctx, containerID, terminationGracePeriodDuration)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions workflow/executor/kubelet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ func (k *kubeletClient) KillContainer(pod *corev1.Pod, container *corev1.Contain
return err
}

func (k *kubeletClient) KillGracefully(ctx context.Context, containerID string) error {
return execcommon.KillGracefully(ctx, k, containerID)
func (k *kubeletClient) KillGracefully(ctx context.Context, containerID string, terminationGracePeriodDuration time.Duration) error {
return execcommon.KillGracefully(ctx, k, containerID, terminationGracePeriodDuration)
}

func (k *kubeletClient) CopyArchive(ctx context.Context, containerID, sourcePath, destPath string) error {
Expand Down
5 changes: 3 additions & 2 deletions workflow/executor/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"time"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -62,9 +63,9 @@ func (k *KubeletExecutor) Wait(ctx context.Context, containerID string) error {
}

// Kill kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
func (k *KubeletExecutor) Kill(ctx context.Context, containerIDs []string) error {
func (k *KubeletExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error {
for _, containerID := range containerIDs {
err := k.cli.KillGracefully(ctx, containerID)
err := k.cli.KillGracefully(ctx, containerID, terminationGracePeriodDuration)
if err != nil {
return err
}
Expand Down
12 changes: 7 additions & 5 deletions workflow/executor/mocks/ContainerRuntimeExecutor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,13 @@ func (p *PNSExecutor) GetExitCode(ctx context.Context, containerID string) (stri
}

// Kill a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
func (p *PNSExecutor) Kill(ctx context.Context, containerIDs []string) error {
func (p *PNSExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error {
var asyncErr error
wg := sync.WaitGroup{}
for _, cid := range containerIDs {
wg.Add(1)
go func(containerID string) {
err := p.killContainer(containerID)
err := p.killContainer(containerID, terminationGracePeriodDuration)
if err != nil && asyncErr != nil {
asyncErr = err
}
Expand All @@ -260,7 +260,7 @@ func (p *PNSExecutor) Kill(ctx context.Context, containerIDs []string) error {
return asyncErr
}

func (p *PNSExecutor) killContainer(containerID string) error {
func (p *PNSExecutor) killContainer(containerID string, terminationGracePeriodDuration time.Duration) error {
pid, err := p.getContainerPID(containerID)
if err != nil {
log.Warnf("Ignoring kill container failure of %s: %v. Process assumed to have completed", containerID, err)
Expand All @@ -274,8 +274,7 @@ func (p *PNSExecutor) killContainer(containerID string) error {
if err != nil {
log.Warnf("Failed to SIGTERM pid %d: %v", pid, err)
}

waitPIDOpts := executil.WaitPIDOpts{Timeout: execcommon.KillGracePeriod * time.Second}
waitPIDOpts := executil.WaitPIDOpts{Timeout: terminationGracePeriodDuration * time.Second}
err = executil.WaitPID(pid, waitPIDOpts)
if err == nil {
log.Infof("PID %d completed", pid)
Expand Down