Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(executor): Fix docker not terminating. Fixes #6064 #6083

Merged
merged 7 commits into from
Jun 9, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 64 additions & 40 deletions workflow/executor/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@ var started = time.Now()
type DockerExecutor struct {
namespace string
podName string
containers map[string]string // containerName -> containerID
containers map[string]ctr // containerName -> ctr
}

type ctr struct {
containerID string
status string
createdAt time.Time
}

func NewDockerExecutor(namespace, podName string) (*DockerExecutor, error) {
log.Infof("Creating a docker executor")
return &DockerExecutor{namespace, podName, make(map[string]string)}, nil
return &DockerExecutor{namespace, podName, make(map[string]ctr)}, nil
}

func (d *DockerExecutor) GetFileContents(containerName string, sourcePath string) (string, error) {
Expand Down Expand Up @@ -207,55 +213,69 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro
return err
}

func (d *DockerExecutor) listContainers() (map[string]ctr, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lift-and-shift

output, err := common.RunCommand(
"docker",
"ps",
"--all", // container could have already exited, but there could also have been two containers for the same pod (old container not yet cleaned-up)
"--no-trunc", // display long container IDs
"--format={{.Status}}|{{.Label \"io.kubernetes.container.name\"}}|{{.ID}}|{{.CreatedAt}}", // similar to `Up 3 hours,main,035a98c4e72e,2021-03-08 17:25:15 -0800 PST`
// https://github.com/kubernetes/kubernetes/blob/ca6bdba014f0a98efe0e0dd4e15f57d1c121d6c9/pkg/kubelet/dockertools/labels.go#L37
"--filter=label=io.kubernetes.pod.namespace="+d.namespace,
"--filter=label=io.kubernetes.pod.name="+d.podName,
)
if err != nil {
return nil, err
}
containers := make(map[string]ctr)
for _, l := range strings.Split(string(output), "\n") {
parts := strings.Split(strings.TrimSpace(l), "|")
if len(parts) != 4 {
continue
}
status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up,
containerName := parts[1]
if containerName == "POD" {
continue
}
containerID := parts[2]
if containerID == "" {
continue
}
createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3])
if err != nil {
return nil, err
}
containers[containerName] = ctr{containerID: containerID, status: status, createdAt: createdAt}
}
log.WithField("containers", containers).Info("listed containers")
return containers, nil
}

func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []string) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
output, err := common.RunCommand(
"docker",
"ps",
"--all", // container could have already exited, but there could also have been two containers for the same pod (old container not yet cleaned-up)
"--no-trunc", // display long container IDs
"--format={{.Status}}|{{.Label \"io.kubernetes.container.name\"}}|{{.ID}}|{{.CreatedAt}}", // similar to `Up 3 hours,main,035a98c4e72e,2021-03-08 17:25:15 -0800 PST`
// https://github.com/kubernetes/kubernetes/blob/ca6bdba014f0a98efe0e0dd4e15f57d1c121d6c9/pkg/kubelet/dockertools/labels.go#L37
"--filter=label=io.kubernetes.pod.namespace="+d.namespace,
"--filter=label=io.kubernetes.pod.name="+d.podName,
)
containers, err := d.listContainers()
if err != nil {
return err
}
containerStatus := make(map[string]string)
for _, l := range strings.Split(string(output), "\n") {
parts := strings.Split(strings.TrimSpace(l), "|")
if len(parts) != 4 {
continue
}
status := strings.SplitN(parts[0], " ", 2)[0] // Created,Exited,Up,
containerName := parts[1]
if containerName == "POD" {
for containerName, c := range containers {
if d.containers[containerName].containerID == c.containerID { // already found
continue
}
containerID := parts[2]
createdAt, err := time.Parse("2006-01-02 15:04:05 -0700 MST", parts[3])
if err != nil {
return err
}
if containerID == "" || d.containers[containerName] == containerID {
continue
}
if createdAt.Before(started.Add(-15 * time.Second)) {
log.Infof("ignoring container %q created at %v, too long before process started", containerName, createdAt)
if c.createdAt.Before(started.Add(-15 * time.Second)) {
log.Infof("ignoring container %q created at %v, too long before process started", containerName, c.createdAt)
continue
}
if status == "Created" && containerStatus[containerName] != "" {
log.Infof("ignoring created container %q that would %s -> %s", containerName, containerStatus[containerName], status)
if c.status == "Created" && d.containers[containerName].status != "" {
log.Infof("ignoring created container %q that would %s -> %s", containerName, d.containers[containerName].status, c.status)
continue
}
d.containers[containerName] = containerID
containerStatus[containerName] = status
log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, containerID, createdAt, status)
d.containers[containerName] = c
log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, c.containerID, c.createdAt, c.status)
}
}
// sidecars start after the main containers, so we can't just exit once we know about all the main containers,
Expand All @@ -270,16 +290,16 @@ func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []

func (d *DockerExecutor) haveContainers(containerNames []string) bool {
for _, n := range containerNames {
if d.containers[n] == "" {
if _, ok := d.containers[n]; !ok {
return false
}
}
return true
}

func (d *DockerExecutor) getContainerID(containerName string) (string, error) {
if containerID, ok := d.containers[containerName]; ok {
return containerID, nil
if ctr, ok := d.containers[containerName]; ok {
return ctr.containerID, nil
}
return "", errContainerNotExist
}
Expand Down Expand Up @@ -338,8 +358,12 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerNames []string, term
}

func (d *DockerExecutor) ListContainerNames(ctx context.Context) ([]string, error) {
containers, err := d.listContainers()
if err != nil {
return nil, err
}
var containerNames []string
for n := range d.containers {
for n := range containers {
containerNames = append(containerNames, n)
}
return containerNames, nil
Expand Down