Skip to content

Commit

Permalink
fix(controller): Update container signaler to POSIX compliance. Fixes #…
Browse files Browse the repository at this point in the history
…6030

Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Jun 3, 2021
1 parent 493595a commit 6952bb5
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 44 deletions.
18 changes: 16 additions & 2 deletions docs/sidecar-injection.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,21 @@ Key:
| Executor | Sidecar | Injected Sidecar |
|---|---|---|
| `docker` | Any | Any |
| `emissary` | Any | None |
| `emissary` | Any | Shell/Configuration |
| `k8sapi` | Shell | Shell |
| `kubelet` | Shell | Shell |
| `pns` | Any | Any |
| `pns` | Any | Any |

## Kill Command Configuration

> v3.1 and after
You can override the kill command by using a pod annotation, for example:

```yaml
spec:
podMetadata:
annotations:
workflows.argoproj.io/kill-cmd-vault-agent: '["sh", "-c", "kill -%d 1"]'
workflows.argoproj.io/kill-cmd-sidecar: '["sh", "-c", "kill -%d $(pidof entrypoint.sh)"]'
```
7 changes: 4 additions & 3 deletions test/e2e/fixtures/needs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ var (
met, _ := None(K8SAPI, Kubelet)(s)
return met, "base layer artifact support"
}
Docker = Executor("docker")
K8SAPI = Executor("k8sapi")
Kubelet = Executor("kubelet")
Docker = Executor("docker")
Emissary = Executor("emissary")
K8SAPI = Executor("k8sapi")
Kubelet = Executor("kubelet")
)

func Executor(e string) Need {
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,22 @@ func (s *SignalsSuite) TestSidecars() {

// make sure Istio/Anthos and other sidecar injectors will work
func (s *SignalsSuite) TestInjectedSidecar() {
s.Need(fixtures.None(fixtures.Emissary)) // emissary cannot kill this
s.Given().
Workflow("@testdata/sidecar-injected-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded, kill2xDuration)
}

func (s *SignalsSuite) TestInjectedSidecarKillAnnotation() {
s.Given().
Workflow("@testdata/sidecar-injected-kill-annotation-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded, kill2xDuration)
}

func TestSignalsSuite(t *testing.T) {
suite.Run(t, new(SignalsSuite))
}
25 changes: 25 additions & 0 deletions test/e2e/testdata/sidecar-injected-kill-annotation-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: sidecar-injected-kill-annotation-
spec:
entrypoint: main
podMetadata:
annotations:
workflows.argoproj.io/kill-cmd-sidecar: '["sh", "-c", "kill -s%d -- -1"]'
podSpecPatch: |
terminationGracePeriodSeconds: 3
containers:
- name: wait
- name: main
- name: sidecar
image: argoproj/argosay:v1
command:
- sh
- -c
args:
- "sleep 999"
templates:
- name: main
container:
image: argoproj/argosay:v1
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ const (
SecretVolMountPath = "/argo/secret"
)

// AnnotationKeyKillCmd specifies the command to use to kill to container, useful for injected sidecars
var AnnotationKeyKillCmd = func(containerName string) string { return workflow.WorkflowFullName + "/kill-cmd-" + containerName }

// GlobalVarWorkflowRootTags is a list of root tags in workflow which could be used for variable reference
var GlobalVarValidWorkflowVariablePrefix = []string{"item.", "steps.", "inputs.", "outputs.", "pod.", "workflow.", "tasks."}

Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (wfc *WorkflowController) signalContainers(namespace string, podName string
if c.Name == common.WaitContainerName || c.State.Terminated != nil {
continue
}
if err := signal.SignalContainer(wfc.restConfig, pod.Namespace, pod.Name, c.Name, sig); err != nil {
if err := signal.SignalContainer(wfc.restConfig, pod, c.Name, sig); err != nil {
return 0, err
}
}
Expand Down
97 changes: 62 additions & 35 deletions workflow/executor/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,55 +207,78 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro
return err
}

type ctrInfo struct {
containerID string
status string
createdAt time.Time
}

func (d *DockerExecutor) listContainers() (map[string]ctrInfo, error) {

output, err := common.RunCommand(
"docker",
"ps",
"--all", // container could have already exited, but there could also have been two containers for the same pod (old container not yet cleaned-up)
"--no-trunc", // display long container IDs
"--format={{.Status}}|{{.Label \"io.kubernetes.container.name\"}}|{{.ID}}|{{.CreatedAt}}", // similar to `Up 3 hours,main,035a98c4e72e,2021-03-08 17:25:15 -0800 PST`
// https://github.com/kubernetes/kubernetes/blob/ca6bdba014f0a98efe0e0dd4e15f57d1c121d6c9/pkg/kubelet/dockertools/labels.go#L37
"--filter=label=io.kubernetes.pod.namespace="+d.namespace,
"--filter=label=io.kubernetes.pod.name="+d.podName,
)
if err != nil {
return nil, err
}
containers := make(map[string]ctrInfo)
for _, l := range strings.Split(string(output), "\n") {
parts := strings.Split(strings.TrimSpace(l), "|")
if len(parts) != 4 {
continue
}
status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up,
containerName := parts[1]
if containerName == "POD" {
continue
}
containerID := parts[2]
if containerID == "" {
continue
}
createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3])
if err != nil {
return nil, err
}
containers[containerName] = ctrInfo{containerID: containerID, status: status, createdAt: createdAt}

}
return containers, nil
}

func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []string) error {
containerStatus := make(map[string]string)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
output, err := common.RunCommand(
"docker",
"ps",
"--all", // container could have already exited, but there could also have been two containers for the same pod (old container not yet cleaned-up)
"--no-trunc", // display long container IDs
"--format={{.Status}}|{{.Label \"io.kubernetes.container.name\"}}|{{.ID}}|{{.CreatedAt}}", // similar to `Up 3 hours,main,035a98c4e72e,2021-03-08 17:25:15 -0800 PST`
// https://github.com/kubernetes/kubernetes/blob/ca6bdba014f0a98efe0e0dd4e15f57d1c121d6c9/pkg/kubelet/dockertools/labels.go#L37
"--filter=label=io.kubernetes.pod.namespace="+d.namespace,
"--filter=label=io.kubernetes.pod.name="+d.podName,
)
containers, err := d.listContainers()
if err != nil {
return err
}
containerStatus := make(map[string]string)
for _, l := range strings.Split(string(output), "\n") {
parts := strings.Split(strings.TrimSpace(l), "|")
if len(parts) != 4 {
for containerName, info := range containers {
if d.containers[containerName] == info.containerID { // already found
continue
}
status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up,
containerName := parts[1]
if containerName == "POD" {
if info.createdAt.Before(started.Add(-15 * time.Second)) {
log.Infof("ignoring container %q created at %v, too long before process started", containerName, info.createdAt)
continue
}
containerID := parts[2]
createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3])
if err != nil {
return err
}
if containerID == "" || d.containers[containerName] == containerID {
if info.status == "Created" && containerStatus[containerName] != "" {
log.Infof("ignoring created container %q that would %s -> %s", containerName, containerStatus[containerName], info.status)
continue
}
if createdAt.Before(started.Add(-15 * time.Second)) {
log.Infof("ignoring container %q created at %v, too long before process started", containerName, createdAt)
continue
}
if status == "Created" && containerStatus[containerName] != "" {
log.Infof("ignoring created container %q that would %s -> %s", containerName, containerStatus[containerName], status)
continue
}
d.containers[containerName] = containerID
containerStatus[containerName] = status
log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, containerID, createdAt, status)
d.containers[containerName] = info.containerID
containerStatus[containerName] = info.status
log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, info.containerID, info.createdAt, info.status)
}
}
// sidecars start after the main containers, so we can't just exit once we know about all the main containers,
Expand Down Expand Up @@ -338,8 +361,12 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerNames []string, term
}

func (d *DockerExecutor) ListContainerNames(ctx context.Context) ([]string, error) {
containers, err := d.listContainers()
if err != nil {
return nil, err
}
var containerNames []string
for n := range d.containers {
for n := range containers {
containerNames = append(containerNames, n)
}
return containerNames, nil
Expand Down
31 changes: 28 additions & 3 deletions workflow/signal/signal.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
package signal

import (
"encoding/json"
"fmt"
"strings"
"syscall"

corev1 "k8s.io/api/core/v1"

log "github.com/sirupsen/logrus"
"k8s.io/client-go/rest"

"github.com/argoproj/argo-workflows/v3/workflow/common"
)

func SignalContainer(restConfig *rest.Config, namespace string, pod string, container string, s syscall.Signal) error {
return ExecPodContainerAndGetOutput(restConfig, namespace, pod, container, "/bin/sh", "-c", fmt.Sprintf("kill -s%d -- -1", s))
func SignalContainer(restConfig *rest.Config, pod *corev1.Pod, container string, s syscall.Signal) error {
command := []string{"/bin/sh", "-c", "kill -%d 1"}

if v, ok := pod.Annotations[common.AnnotationKeyKillCmd(container)]; ok {
if err := json.Unmarshal([]byte(v), &command); err != nil {
return fmt.Errorf("failed to unmarshall kill command annotation %q: %w", v, err)
}
}

for i, v := range command {
if strings.Contains(v, "%d") {
command[i] = fmt.Sprintf(v, s)
}
}

return ExecPodContainerAndGetOutput(restConfig, pod.Namespace, pod.Name, container, command...)
}

func ExecPodContainerAndGetOutput(restConfig *rest.Config, namespace string, pod string, container string, command ...string) error {
Expand All @@ -20,6 +38,13 @@ func ExecPodContainerAndGetOutput(restConfig *rest.Config, namespace string, pod
return err
}
stdout, stderr, err := common.GetExecutorOutput(x)
log.WithFields(log.Fields{"stdout": stdout, "stderr": stderr}).WithError(err).Debug()
log.
WithField("namespace", namespace).
WithField("pod", pod).
WithField("container", container).
WithField("stdout", stdout).
WithField("stderr", stderr).
WithError(err).
Info("signaled container")
return err
}

0 comments on commit 6952bb5

Please sign in to comment.