From 65aa3dccef8e9e0798536929df56afdfde439084 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Thu, 3 Jun 2021 14:32:07 -0700 Subject: [PATCH 1/3] fix(executor): Fix docker not terminating. Fixes #6064 Signed-off-by: Alex Collins --- test/e2e/fixtures/needs.go | 6 +- workflow/executor/docker/docker.go | 97 +++++++++++++++++++----------- 2 files changed, 65 insertions(+), 38 deletions(-) diff --git a/test/e2e/fixtures/needs.go b/test/e2e/fixtures/needs.go index 3330f02f0491..9a5de7c963c5 100644 --- a/test/e2e/fixtures/needs.go +++ b/test/e2e/fixtures/needs.go @@ -14,9 +14,9 @@ var ( met, _ := None(K8SAPI, Kubelet)(s) return met, "base layer artifact support" } - Docker = Executor("docker") - K8SAPI = Executor("k8sapi") - Kubelet = Executor("kubelet") + Docker = Executor("docker") + K8SAPI = Executor("k8sapi") + Kubelet = Executor("kubelet") ) func Executor(e string) Need { diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index d2ca768a13df..19b20b0c315b 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -207,55 +207,78 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro return err } +type ctrInfo struct { + containerID string + status string + createdAt time.Time +} + +func (d *DockerExecutor) listContainers() (map[string]ctrInfo, error) { + + output, err := common.RunCommand( + "docker", + "ps", + "--all", // container could have already exited, but there could also have been two containers for the same pod (old container not yet cleaned-up) + "--no-trunc", // display long container IDs + "--format={{.Status}}|{{.Label \"io.kubernetes.container.name\"}}|{{.ID}}|{{.CreatedAt}}", // similar to `Up 3 hours,main,035a98c4e72e,2021-03-08 17:25:15 -0800 PST` + // https://github.com/kubernetes/kubernetes/blob/ca6bdba014f0a98efe0e0dd4e15f57d1c121d6c9/pkg/kubelet/dockertools/labels.go#L37 + "--filter=label=io.kubernetes.pod.namespace="+d.namespace, + "--filter=label=io.kubernetes.pod.name="+d.podName, + ) + if err != nil { + return nil, err + } + containers := make(map[string]ctrInfo) + for _, l := range strings.Split(string(output), "\n") { + parts := strings.Split(strings.TrimSpace(l), "|") + if len(parts) != 4 { + continue + } + status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up, + containerName := parts[1] + if containerName == "POD" { + continue + } + containerID := parts[2] + if containerID == "" { + continue + } + createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3]) + if err != nil { + return nil, err + } + containers[containerName] = ctrInfo{containerID: containerID, status: status, createdAt: createdAt} + + } + return containers, nil +} + func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []string) error { + containerStatus := make(map[string]string) for { select { case <-ctx.Done(): return ctx.Err() default: - output, err := common.RunCommand( - "docker", - "ps", - "--all", // container could have already exited, but there could also have been two containers for the same pod (old container not yet cleaned-up) - "--no-trunc", // display long container IDs - "--format={{.Status}}|{{.Label \"io.kubernetes.container.name\"}}|{{.ID}}|{{.CreatedAt}}", // similar to `Up 3 hours,main,035a98c4e72e,2021-03-08 17:25:15 -0800 PST` - // https://github.com/kubernetes/kubernetes/blob/ca6bdba014f0a98efe0e0dd4e15f57d1c121d6c9/pkg/kubelet/dockertools/labels.go#L37 - "--filter=label=io.kubernetes.pod.namespace="+d.namespace, - "--filter=label=io.kubernetes.pod.name="+d.podName, - ) + containers, err := d.listContainers() if err != nil { return err } - containerStatus := make(map[string]string) - for _, l := range strings.Split(string(output), "\n") { - parts := strings.Split(strings.TrimSpace(l), "|") - if len(parts) != 4 { + for containerName, info := range containers { + if d.containers[containerName] == info.containerID { // already found continue } - status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up, - containerName := parts[1] - if containerName == "POD" { + if info.createdAt.Before(started.Add(-15 * time.Second)) { + log.Infof("ignoring container %q created at %v, too long before process started", containerName, info.createdAt) continue } - containerID := parts[2] - createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3]) - if err != nil { - return err - } - if containerID == "" || d.containers[containerName] == containerID { + if info.status == "Created" && containerStatus[containerName] != "" { + log.Infof("ignoring created container %q that would %s -> %s", containerName, containerStatus[containerName], info.status) continue } - if createdAt.Before(started.Add(-15 * time.Second)) { - log.Infof("ignoring container %q created at %v, too long before process started", containerName, createdAt) - continue - } - if status == "Created" && containerStatus[containerName] != "" { - log.Infof("ignoring created container %q that would %s -> %s", containerName, containerStatus[containerName], status) - continue - } - d.containers[containerName] = containerID - containerStatus[containerName] = status - log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, containerID, createdAt, status) + d.containers[containerName] = info.containerID + containerStatus[containerName] = info.status + log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, info.containerID, info.createdAt, info.status) } } // sidecars start after the main containers, so we can't just exit once we know about all the main containers, @@ -338,8 +361,12 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerNames []string, term } func (d *DockerExecutor) ListContainerNames(ctx context.Context) ([]string, error) { + containers, err := d.listContainers() + if err != nil { + return nil, err + } var containerNames []string - for n := range d.containers { + for n := range containers { containerNames = append(containerNames, n) } return containerNames, nil From cee0d8a049ed91fc347ae67695ddba9b9473ba14 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Thu, 3 Jun 2021 14:32:43 -0700 Subject: [PATCH 2/3] ok Signed-off-by: Alex Collins --- test/e2e/fixtures/needs.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/fixtures/needs.go b/test/e2e/fixtures/needs.go index 9a5de7c963c5..3330f02f0491 100644 --- a/test/e2e/fixtures/needs.go +++ b/test/e2e/fixtures/needs.go @@ -14,9 +14,9 @@ var ( met, _ := None(K8SAPI, Kubelet)(s) return met, "base layer artifact support" } - Docker = Executor("docker") - K8SAPI = Executor("k8sapi") - Kubelet = Executor("kubelet") + Docker = Executor("docker") + K8SAPI = Executor("k8sapi") + Kubelet = Executor("kubelet") ) func Executor(e string) Need { From eb2ffa4104876e3bb2702aeb5030a5caabd775cf Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Sat, 5 Jun 2021 15:16:47 -0700 Subject: [PATCH 3/3] ok Signed-off-by: Alex Collins --- workflow/executor/docker/docker.go | 49 ++++++++++++++---------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index 19b20b0c315b..554bc4e720f1 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -30,12 +30,18 @@ var started = time.Now() type DockerExecutor struct { namespace string podName string - containers map[string]string // containerName -> containerID + containers map[string]ctr // containerName -> ctr +} + +type ctr struct { + containerID string + status string + createdAt time.Time } func NewDockerExecutor(namespace, podName string) (*DockerExecutor, error) { log.Infof("Creating a docker executor") - return &DockerExecutor{namespace, podName, make(map[string]string)}, nil + return &DockerExecutor{namespace, podName, make(map[string]ctr)}, nil } func (d *DockerExecutor) GetFileContents(containerName string, sourcePath string) (string, error) { @@ -207,14 +213,7 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro return err } -type ctrInfo struct { - containerID string - status string - createdAt time.Time -} - -func (d *DockerExecutor) listContainers() (map[string]ctrInfo, error) { - +func (d *DockerExecutor) listContainers() (map[string]ctr, error) { output, err := common.RunCommand( "docker", "ps", @@ -228,7 +227,7 @@ func (d *DockerExecutor) listContainers() (map[string]ctrInfo, error) { if err != nil { return nil, err } - containers := make(map[string]ctrInfo) + containers := make(map[string]ctr) for _, l := range strings.Split(string(output), "\n") { parts := strings.Split(strings.TrimSpace(l), "|") if len(parts) != 4 { @@ -247,14 +246,13 @@ func (d *DockerExecutor) listContainers() (map[string]ctrInfo, error) { if err != nil { return nil, err } - containers[containerName] = ctrInfo{containerID: containerID, status: status, createdAt: createdAt} - + containers[containerName] = ctr{containerID: containerID, status: status, createdAt: createdAt} } + log.WithField("containers", containers).Info("listed containers") return containers, nil } func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []string) error { - containerStatus := make(map[string]string) for { select { case <-ctx.Done(): @@ -264,21 +262,20 @@ func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames [] if err != nil { return err } - for containerName, info := range containers { - if d.containers[containerName] == info.containerID { // already found + for containerName, c := range containers { + if d.containers[containerName].containerID == c.containerID { // already found continue } - if info.createdAt.Before(started.Add(-15 * time.Second)) { - log.Infof("ignoring container %q created at %v, too long before process started", containerName, info.createdAt) + if c.createdAt.Before(started.Add(-15 * time.Second)) { + log.Infof("ignoring container %q created at %v, too long before process started", containerName, c.createdAt) continue } - if info.status == "Created" && containerStatus[containerName] != "" { - log.Infof("ignoring created container %q that would %s -> %s", containerName, containerStatus[containerName], info.status) + if c.status == "Created" && d.containers[containerName].status != "" { + log.Infof("ignoring created container %q that would %s -> %s", containerName, d.containers[containerName].status, c.status) continue } - d.containers[containerName] = info.containerID - containerStatus[containerName] = info.status - log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, info.containerID, info.createdAt, info.status) + d.containers[containerName] = c + log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, c.containerID, c.createdAt, c.status) } } // sidecars start after the main containers, so we can't just exit once we know about all the main containers, @@ -293,7 +290,7 @@ func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames [] func (d *DockerExecutor) haveContainers(containerNames []string) bool { for _, n := range containerNames { - if d.containers[n] == "" { + if _, ok := d.containers[n]; !ok { return false } } @@ -301,8 +298,8 @@ func (d *DockerExecutor) haveContainers(containerNames []string) bool { } func (d *DockerExecutor) getContainerID(containerName string) (string, error) { - if containerID, ok := d.containers[containerName]; ok { - return containerID, nil + if ctr, ok := d.containers[containerName]; ok { + return ctr.containerID, nil } return "", errContainerNotExist }