From d8686ee1ade34d7d5ef687bcb638415756b2f364 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Wed, 9 Jun 2021 15:15:13 -0700 Subject: [PATCH] fix(executor): Fix docker not terminating. Fixes #6064 (#6083) --- workflow/executor/docker/docker.go | 104 ++++++++++++++++++----------- 1 file changed, 64 insertions(+), 40 deletions(-) diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index 877b3556b2e4..4e272cfb538c 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -30,12 +30,18 @@ var started = time.Now() type DockerExecutor struct { namespace string podName string - containers map[string]string // containerName -> containerID + containers map[string]ctr // containerName -> ctr +} + +type ctr struct { + containerID string + status string + createdAt time.Time } func NewDockerExecutor(namespace, podName string) (*DockerExecutor, error) { log.Infof("Creating a docker executor") - return &DockerExecutor{namespace, podName, make(map[string]string)}, nil + return &DockerExecutor{namespace, podName, make(map[string]ctr)}, nil } func (d *DockerExecutor) GetFileContents(containerName string, sourcePath string) (string, error) { @@ -207,55 +213,69 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro return err } +func (d *DockerExecutor) listContainers() (map[string]ctr, error) { + output, err := common.RunCommand( + "docker", + "ps", + "--all", // container could have already exited, but there could also have been two containers for the same pod (old container not yet cleaned-up) + "--no-trunc", // display long container IDs + "--format={{.Status}}|{{.Label \"io.kubernetes.container.name\"}}|{{.ID}}|{{.CreatedAt}}", // similar to `Up 3 hours,main,035a98c4e72e,2021-03-08 17:25:15 -0800 PST` + // https://github.com/kubernetes/kubernetes/blob/ca6bdba014f0a98efe0e0dd4e15f57d1c121d6c9/pkg/kubelet/dockertools/labels.go#L37 + "--filter=label=io.kubernetes.pod.namespace="+d.namespace, + "--filter=label=io.kubernetes.pod.name="+d.podName, + ) + if err != nil { + return nil, err + } + containers := make(map[string]ctr) + for _, l := range strings.Split(string(output), "\n") { + parts := strings.Split(strings.TrimSpace(l), "|") + if len(parts) != 4 { + continue + } + status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up, + containerName := parts[1] + if containerName == "POD" { + continue + } + containerID := parts[2] + if containerID == "" { + continue + } + createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3]) + if err != nil { + return nil, err + } + containers[containerName] = ctr{containerID: containerID, status: status, createdAt: createdAt} + } + log.WithField("containers", containers).Info("listed containers") + return containers, nil +} + func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []string) error { for { select { case <-ctx.Done(): return ctx.Err() default: - output, err := common.RunCommand( - "docker", - "ps", - "--all", // container could have already exited, but there could also have been two containers for the same pod (old container not yet cleaned-up) - "--no-trunc", // display long container IDs - "--format={{.Status}}|{{.Label \"io.kubernetes.container.name\"}}|{{.ID}}|{{.CreatedAt}}", // similar to `Up 3 hours,main,035a98c4e72e,2021-03-08 17:25:15 -0800 PST` - // https://github.com/kubernetes/kubernetes/blob/ca6bdba014f0a98efe0e0dd4e15f57d1c121d6c9/pkg/kubelet/dockertools/labels.go#L37 - "--filter=label=io.kubernetes.pod.namespace="+d.namespace, - "--filter=label=io.kubernetes.pod.name="+d.podName, - ) + containers, err := d.listContainers() if err != nil { return err } - containerStatus := make(map[string]string) - for _, l := range strings.Split(string(output), "\n") { - parts := strings.Split(strings.TrimSpace(l), "|") - if len(parts) != 4 { - continue - } - status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up, - containerName := parts[1] - if containerName == "POD" { + for containerName, c := range containers { + if d.containers[containerName].containerID == c.containerID { // already found continue } - containerID := parts[2] - createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3]) - if err != nil { - return err - } - if containerID == "" || d.containers[containerName] == containerID { - continue - } - if createdAt.Before(started.Add(-15 * time.Second)) { - log.Infof("ignoring container %q created at %v, too long before process started", containerName, createdAt) + if c.createdAt.Before(started.Add(-15 * time.Second)) { + log.Infof("ignoring container %q created at %v, too long before process started", containerName, c.createdAt) continue } - if status == "Created" && containerStatus[containerName] != "" { - log.Infof("ignoring created container %q that would %s -> %s", containerName, containerStatus[containerName], status) + if c.status == "Created" && d.containers[containerName].status != "" { + log.Infof("ignoring created container %q that would %s -> %s", containerName, d.containers[containerName].status, c.status) continue } - d.containers[containerName] = containerID - containerStatus[containerName] = status - log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, containerID, createdAt, status) + d.containers[containerName] = c + log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, c.containerID, c.createdAt, c.status) } } // sidecars start after the main containers, so we can't just exit once we know about all the main containers, @@ -270,7 +290,7 @@ func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames [] func (d *DockerExecutor) haveContainers(containerNames []string) bool { for _, n := range containerNames { - if d.containers[n] == "" { + if _, ok := d.containers[n]; !ok { return false } } @@ -278,8 +298,8 @@ func (d *DockerExecutor) haveContainers(containerNames []string) bool { } func (d *DockerExecutor) getContainerID(containerName string) (string, error) { - if containerID, ok := d.containers[containerName]; ok { - return containerID, nil + if ctr, ok := d.containers[containerName]; ok { + return ctr.containerID, nil } return "", errContainerNotExist } @@ -338,8 +358,12 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerNames []string, term } func (d *DockerExecutor) ListContainerNames(ctx context.Context) ([]string, error) { + containers, err := d.listContainers() + if err != nil { + return nil, err + } var containerNames []string - for n := range d.containers { + for n := range containers { containerNames = append(containerNames, n) } return containerNames, nil