Skip to content

Commit

Permalink
fix(executor): Fix docker not terminating. Fixes #6064 (#6083)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec authored and sarabala1979 committed Jun 21, 2021
1 parent c2abdb8 commit d8686ee
Showing 1 changed file with 64 additions and 40 deletions.
104 changes: 64 additions & 40 deletions workflow/executor/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@ var started = time.Now()
type DockerExecutor struct {
namespace string
podName string
containers map[string]string // containerName -> containerID
containers map[string]ctr // containerName -> ctr
}

type ctr struct {
containerID string
status string
createdAt time.Time
}

func NewDockerExecutor(namespace, podName string) (*DockerExecutor, error) {
log.Infof("Creating a docker executor")
return &DockerExecutor{namespace, podName, make(map[string]string)}, nil
return &DockerExecutor{namespace, podName, make(map[string]ctr)}, nil
}

func (d *DockerExecutor) GetFileContents(containerName string, sourcePath string) (string, error) {
Expand Down Expand Up @@ -207,55 +213,69 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro
return err
}

func (d *DockerExecutor) listContainers() (map[string]ctr, error) {
output, err := common.RunCommand(
"docker",
"ps",
"--all", // container could have already exited, but there could also have been two containers for the same pod (old container not yet cleaned-up)
"--no-trunc", // display long container IDs
"--format={{.Status}}|{{.Label \"io.kubernetes.container.name\"}}|{{.ID}}|{{.CreatedAt}}", // similar to `Up 3 hours,main,035a98c4e72e,2021-03-08 17:25:15 -0800 PST`
// https://github.com/kubernetes/kubernetes/blob/ca6bdba014f0a98efe0e0dd4e15f57d1c121d6c9/pkg/kubelet/dockertools/labels.go#L37
"--filter=label=io.kubernetes.pod.namespace="+d.namespace,
"--filter=label=io.kubernetes.pod.name="+d.podName,
)
if err != nil {
return nil, err
}
containers := make(map[string]ctr)
for _, l := range strings.Split(string(output), "\n") {
parts := strings.Split(strings.TrimSpace(l), "|")
if len(parts) != 4 {
continue
}
status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up,
containerName := parts[1]
if containerName == "POD" {
continue
}
containerID := parts[2]
if containerID == "" {
continue
}
createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3])
if err != nil {
return nil, err
}
containers[containerName] = ctr{containerID: containerID, status: status, createdAt: createdAt}
}
log.WithField("containers", containers).Info("listed containers")
return containers, nil
}

func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []string) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
output, err := common.RunCommand(
"docker",
"ps",
"--all", // container could have already exited, but there could also have been two containers for the same pod (old container not yet cleaned-up)
"--no-trunc", // display long container IDs
"--format={{.Status}}|{{.Label \"io.kubernetes.container.name\"}}|{{.ID}}|{{.CreatedAt}}", // similar to `Up 3 hours,main,035a98c4e72e,2021-03-08 17:25:15 -0800 PST`
// https://github.com/kubernetes/kubernetes/blob/ca6bdba014f0a98efe0e0dd4e15f57d1c121d6c9/pkg/kubelet/dockertools/labels.go#L37
"--filter=label=io.kubernetes.pod.namespace="+d.namespace,
"--filter=label=io.kubernetes.pod.name="+d.podName,
)
containers, err := d.listContainers()
if err != nil {
return err
}
containerStatus := make(map[string]string)
for _, l := range strings.Split(string(output), "\n") {
parts := strings.Split(strings.TrimSpace(l), "|")
if len(parts) != 4 {
continue
}
status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up,
containerName := parts[1]
if containerName == "POD" {
for containerName, c := range containers {
if d.containers[containerName].containerID == c.containerID { // already found
continue
}
containerID := parts[2]
createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3])
if err != nil {
return err
}
if containerID == "" || d.containers[containerName] == containerID {
continue
}
if createdAt.Before(started.Add(-15 * time.Second)) {
log.Infof("ignoring container %q created at %v, too long before process started", containerName, createdAt)
if c.createdAt.Before(started.Add(-15 * time.Second)) {
log.Infof("ignoring container %q created at %v, too long before process started", containerName, c.createdAt)
continue
}
if status == "Created" && containerStatus[containerName] != "" {
log.Infof("ignoring created container %q that would %s -> %s", containerName, containerStatus[containerName], status)
if c.status == "Created" && d.containers[containerName].status != "" {
log.Infof("ignoring created container %q that would %s -> %s", containerName, d.containers[containerName].status, c.status)
continue
}
d.containers[containerName] = containerID
containerStatus[containerName] = status
log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, containerID, createdAt, status)
d.containers[containerName] = c
log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, c.containerID, c.createdAt, c.status)
}
}
// sidecars start after the main containers, so we can't just exit once we know about all the main containers,
Expand All @@ -270,16 +290,16 @@ func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []

func (d *DockerExecutor) haveContainers(containerNames []string) bool {
for _, n := range containerNames {
if d.containers[n] == "" {
if _, ok := d.containers[n]; !ok {
return false
}
}
return true
}

func (d *DockerExecutor) getContainerID(containerName string) (string, error) {
if containerID, ok := d.containers[containerName]; ok {
return containerID, nil
if ctr, ok := d.containers[containerName]; ok {
return ctr.containerID, nil
}
return "", errContainerNotExist
}
Expand Down Expand Up @@ -338,8 +358,12 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerNames []string, term
}

func (d *DockerExecutor) ListContainerNames(ctx context.Context) ([]string, error) {
containers, err := d.listContainers()
if err != nil {
return nil, err
}
var containerNames []string
for n := range d.containers {
for n := range containers {
containerNames = append(containerNames, n)
}
return containerNames, nil
Expand Down

0 comments on commit d8686ee

Please sign in to comment.