diff --git a/.changelog/16352.txt b/.changelog/16352.txt new file mode 100644 index 000000000000..e164d9003073 --- /dev/null +++ b/.changelog/16352.txt @@ -0,0 +1,3 @@ +```release-note:bug +docker: Fixed a bug where pause containers would be erroneously removed +``` diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index a041034d7358..8b043fed2d7f 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -20,6 +20,7 @@ import ( hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/go-set" "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/drivers/docker/docklog" @@ -84,6 +85,35 @@ const ( dockerLabelNodeID = "com.hashicorp.nomad.node_id" ) +type pauseContainerStore struct { + lock sync.Mutex + containerIDs *set.Set[string] +} + +func newPauseContainerStore() *pauseContainerStore { + return &pauseContainerStore{ + containerIDs: set.New[string](10), + } +} + +func (s *pauseContainerStore) add(id string) { + s.lock.Lock() + defer s.lock.Unlock() + s.containerIDs.Insert(id) +} + +func (s *pauseContainerStore) remove(id string) { + s.lock.Lock() + defer s.lock.Unlock() + s.containerIDs.Remove(id) +} + +func (s *pauseContainerStore) union(other *set.Set[string]) *set.Set[string] { + s.lock.Lock() + defer s.lock.Unlock() + return other.Union(s.containerIDs) +} + type Driver struct { // eventer is used to handle multiplexing of TaskEvents calls such that an // event can be broadcast to all callers @@ -104,6 +134,9 @@ type Driver struct { // tasks is the in memory datastore mapping taskIDs to taskHandles tasks *taskStore + // pauseContainers keeps track of pause container IDs in use by allocations + pauseContainers *pauseContainerStore + // coordinator is what tracks multiple image pulls against the same docker image coordinator *dockerCoordinator @@ -130,13 +163,16 @@ type Driver struct { // NewDockerDriver returns a docker implementation of a driver plugin func NewDockerDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin { logger = logger.Named(pluginName) - return &Driver{ - eventer: eventer.NewEventer(ctx, logger), - config: &DriverConfig{}, - tasks: newTaskStore(), - ctx: ctx, - logger: logger, - } + driver := &Driver{ + eventer: eventer.NewEventer(ctx, logger), + config: &DriverConfig{}, + tasks: newTaskStore(), + pauseContainers: newPauseContainerStore(), + ctx: ctx, + logger: logger, + } + go driver.recoverPauseContainers() + return driver } func (d *Driver) reattachToDockerLogger(reattachConfig *pstructs.ReattachConfig) (docklog.DockerLogger, *plugin.Client, error) { @@ -709,6 +745,39 @@ func (d *Driver) containerBinds(task *drivers.TaskConfig, driverConfig *TaskConf return binds, nil } +func (d *Driver) recoverPauseContainers() { + // On Client restart, we must rebuild the set of pause containers + // we are tracking. Basically just scan all containers and pull the ID from + // anything that has the Nomad Label and has Name with prefix "/nomad_init_". + + _, dockerClient, err := d.dockerClients() + if err != nil { + d.logger.Error("failed to recover pause containers", "error", err) + return + } + + containers, listErr := dockerClient.ListContainers(docker.ListContainersOptions{ + All: false, // running only + Filters: map[string][]string{ + "label": {dockerLabelAllocID}, + }, + }) + if listErr != nil { + d.logger.Error("failed to list pause containers", "error", err) + return + } + +CONTAINER: + for _, c := range containers { + for _, name := range c.Names { + if strings.HasPrefix(name, "/nomad_init_") { + d.pauseContainers.add(c.ID) + continue CONTAINER + } + } + } +} + var userMountToUnixMount = map[string]string{ // Empty string maps to `rprivate` for backwards compatibility in restored // older tasks, where mount propagation will not be present. diff --git a/drivers/docker/network.go b/drivers/docker/network.go index 1cc57db77d8f..b25bb6f24ed7 100644 --- a/drivers/docker/network.go +++ b/drivers/docker/network.go @@ -83,10 +83,19 @@ func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreate return nil, false, err } + // keep track of this pause container for reconciliation + d.pauseContainers.add(container.ID) + return specFromContainer(container, createSpec.Hostname), true, nil } func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSpec) error { + id := spec.Labels[dockerNetSpecLabelKey] + + // no longer tracking this pause container; even if we fail here we should + // let the background reconciliation keep trying + d.pauseContainers.remove(id) + client, _, err := d.dockerClients() if err != nil { return fmt.Errorf("failed to connect to docker daemon: %s", err) @@ -94,7 +103,7 @@ func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSp if err := client.RemoveContainer(docker.RemoveContainerOptions{ Force: true, - ID: spec.Labels[dockerNetSpecLabelKey], + ID: id, }); err != nil { return err } diff --git a/drivers/docker/reconcile_dangling.go b/drivers/docker/reconcile_dangling.go index 16750fde60d9..292da062f43d 100644 --- a/drivers/docker/reconcile_dangling.go +++ b/drivers/docker/reconcile_dangling.go @@ -9,6 +9,7 @@ import ( docker "github.com/fsouza/go-dockerclient" hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-set" ) // containerReconciler detects and kills unexpectedly running containers. @@ -24,7 +25,7 @@ type containerReconciler struct { logger hclog.Logger isDriverHealthy func() bool - trackedContainers func() map[string]bool + trackedContainers func() *set.Set[string] isNomadContainer func(c docker.APIContainers) bool once sync.Once @@ -96,7 +97,7 @@ func (r *containerReconciler) removeDanglingContainersIteration() error { return fmt.Errorf("failed to find untracked containers: %v", err) } - if len(untracked) == 0 { + if untracked.Empty() { return nil } @@ -105,7 +106,7 @@ func (r *containerReconciler) removeDanglingContainersIteration() error { return nil } - for _, id := range untracked { + for _, id := range untracked.Slice() { ctx, cancel := r.dockerAPIQueryContext() err := client.RemoveContainer(docker.RemoveContainerOptions{ Context: ctx, @@ -125,8 +126,8 @@ func (r *containerReconciler) removeDanglingContainersIteration() error { // untrackedContainers returns the ids of containers that suspected // to have been started by Nomad but aren't tracked by this driver -func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutoffTime time.Time) ([]string, error) { - result := []string{} +func (r *containerReconciler) untrackedContainers(tracked *set.Set[string], cutoffTime time.Time) (*set.Set[string], error) { + result := set.New[string](10) ctx, cancel := r.dockerAPIQueryContext() defer cancel() @@ -142,7 +143,7 @@ func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutof cutoff := cutoffTime.Unix() for _, c := range cc { - if tracked[c.ID] { + if tracked.Contains(c.ID) { continue } @@ -154,9 +155,8 @@ func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutof continue } - result = append(result, c.ID) + result.Insert(c.ID) } - return result, nil } @@ -165,7 +165,7 @@ func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutof // // We'll try hitting Docker API on subsequent iteration. func (r *containerReconciler) dockerAPIQueryContext() (context.Context, context.CancelFunc) { - // use a reasoanble floor to avoid very small limit + // use a reasonable floor to avoid very small limit timeout := 30 * time.Second if timeout < r.config.period { @@ -211,18 +211,15 @@ func hasNomadName(c docker.APIContainers) bool { return true } } - return false } -func (d *Driver) trackedContainers() map[string]bool { - d.tasks.lock.RLock() - defer d.tasks.lock.RUnlock() - - r := make(map[string]bool, len(d.tasks.store)) - for _, h := range d.tasks.store { - r[h.containerID] = true - } - - return r +// trackedContainers returns the set of container IDs of containers that were +// started by Driver and are expected to be running. This includes both normal +// Task containers, as well as infra pause containers. +func (d *Driver) trackedContainers() *set.Set[string] { + // collect the task containers + ids := d.tasks.IDs() + // now also accumulate pause containers + return d.pauseContainers.union(ids) } diff --git a/drivers/docker/reconcile_dangling_test.go b/drivers/docker/reconcile_dangling_test.go index 08befd827bde..b413f6051cfc 100644 --- a/drivers/docker/reconcile_dangling_test.go +++ b/drivers/docker/reconcile_dangling_test.go @@ -2,30 +2,30 @@ package docker import ( "encoding/json" + "fmt" "os" + "regexp" "testing" "time" docker "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/go-set" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/helper/uuid" - "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/shoenig/test/must" ) func fakeContainerList(t *testing.T) (nomadContainer, nonNomadContainer docker.APIContainers) { path := "./test-resources/docker/reconciler_containers_list.json" f, err := os.Open(path) - if err != nil { - t.Fatalf("failed to open file: %v", err) - } + must.NoError(t, err, must.Sprintf("failed to open %s", path)) var sampleContainerList []docker.APIContainers err = json.NewDecoder(f).Decode(&sampleContainerList) - if err != nil { - t.Fatalf("failed to decode container list: %v", err) - } + must.NoError(t, err, must.Sprint("failed to decode container list")) return sampleContainerList[0], sampleContainerList[1] } @@ -35,15 +35,15 @@ func Test_HasMount(t *testing.T) { nomadContainer, nonNomadContainer := fakeContainerList(t) - require.True(t, hasMount(nomadContainer, "/alloc")) - require.True(t, hasMount(nomadContainer, "/data")) - require.True(t, hasMount(nomadContainer, "/secrets")) - require.False(t, hasMount(nomadContainer, "/random")) + must.True(t, hasMount(nomadContainer, "/alloc")) + must.True(t, hasMount(nomadContainer, "/data")) + must.True(t, hasMount(nomadContainer, "/secrets")) + must.False(t, hasMount(nomadContainer, "/random")) - require.False(t, hasMount(nonNomadContainer, "/alloc")) - require.False(t, hasMount(nonNomadContainer, "/data")) - require.False(t, hasMount(nonNomadContainer, "/secrets")) - require.False(t, hasMount(nonNomadContainer, "/random")) + must.False(t, hasMount(nonNomadContainer, "/alloc")) + must.False(t, hasMount(nonNomadContainer, "/data")) + must.False(t, hasMount(nonNomadContainer, "/secrets")) + must.False(t, hasMount(nonNomadContainer, "/random")) } func Test_HasNomadName(t *testing.T) { @@ -51,41 +51,45 @@ func Test_HasNomadName(t *testing.T) { nomadContainer, nonNomadContainer := fakeContainerList(t) - require.True(t, hasNomadName(nomadContainer)) - require.False(t, hasNomadName(nonNomadContainer)) + must.True(t, hasNomadName(nomadContainer)) + must.False(t, hasNomadName(nonNomadContainer)) } -// TestDanglingContainerRemoval asserts containers without corresponding tasks +// TestDanglingContainerRemoval_normal asserts containers without corresponding tasks // are removed after the creation grace period. -func TestDanglingContainerRemoval(t *testing.T) { +func TestDanglingContainerRemoval_normal(t *testing.T) { ci.Parallel(t) testutil.DockerCompatible(t) // start two containers: one tracked nomad container, and one unrelated container task, cfg, _ := dockerTask(t) - require.NoError(t, task.EncodeConcreteDriverConfig(cfg)) + must.NoError(t, task.EncodeConcreteDriverConfig(cfg)) - client, d, handle, cleanup := dockerSetup(t, task, nil) - defer cleanup() - require.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) + dockerClient, d, handle, cleanup := dockerSetup(t, task, nil) + t.Cleanup(cleanup) - nonNomadContainer, err := client.CreateContainer(docker.CreateContainerOptions{ + // wait for task to start + must.NoError(t, d.WaitUntilStarted(task.ID, 5*time.Second)) + + nonNomadContainer, err := dockerClient.CreateContainer(docker.CreateContainerOptions{ Name: "mytest-image-" + uuid.Generate(), Config: &docker.Config{ Image: cfg.Image, Cmd: append([]string{cfg.Command}, cfg.Args...), }, }) - require.NoError(t, err) - defer client.RemoveContainer(docker.RemoveContainerOptions{ - ID: nonNomadContainer.ID, - Force: true, + must.NoError(t, err) + t.Cleanup(func() { + _ = dockerClient.RemoveContainer(docker.RemoveContainerOptions{ + ID: nonNomadContainer.ID, + Force: true, + }) }) - err = client.StartContainer(nonNomadContainer.ID, nil) - require.NoError(t, err) + err = dockerClient.StartContainer(nonNomadContainer.ID, nil) + must.NoError(t, err) - untrackedNomadContainer, err := client.CreateContainer(docker.CreateContainerOptions{ + untrackedNomadContainer, err := dockerClient.CreateContainer(docker.CreateContainerOptions{ Name: "mytest-image-" + uuid.Generate(), Config: &docker.Config{ Image: cfg.Image, @@ -95,45 +99,47 @@ func TestDanglingContainerRemoval(t *testing.T) { }, }, }) - require.NoError(t, err) - defer client.RemoveContainer(docker.RemoveContainerOptions{ - ID: untrackedNomadContainer.ID, - Force: true, + must.NoError(t, err) + t.Cleanup(func() { + _ = dockerClient.RemoveContainer(docker.RemoveContainerOptions{ + ID: untrackedNomadContainer.ID, + Force: true, + }) }) - err = client.StartContainer(untrackedNomadContainer.ID, nil) - require.NoError(t, err) + err = dockerClient.StartContainer(untrackedNomadContainer.ID, nil) + must.NoError(t, err) dd := d.Impl().(*Driver) reconciler := newReconciler(dd) - trackedContainers := map[string]bool{handle.containerID: true} + trackedContainers := set.From([]string{handle.containerID}) - tf := reconciler.trackedContainers() - require.Contains(t, tf, handle.containerID) - require.NotContains(t, tf, untrackedNomadContainer) - require.NotContains(t, tf, nonNomadContainer.ID) + tracked := reconciler.trackedContainers() + must.Contains[string](t, handle.containerID, tracked) + must.NotContains[string](t, untrackedNomadContainer.ID, tracked) + must.NotContains[string](t, nonNomadContainer.ID, tracked) // assert tracked containers should never be untracked untracked, err := reconciler.untrackedContainers(trackedContainers, time.Now()) - require.NoError(t, err) - require.NotContains(t, untracked, handle.containerID) - require.NotContains(t, untracked, nonNomadContainer.ID) - require.Contains(t, untracked, untrackedNomadContainer.ID) + must.NoError(t, err) + must.NotContains[string](t, handle.containerID, untracked) + must.NotContains[string](t, nonNomadContainer.ID, untracked) + must.Contains[string](t, untrackedNomadContainer.ID, untracked) // assert we recognize nomad containers with appropriate cutoff - untracked, err = reconciler.untrackedContainers(map[string]bool{}, time.Now()) - require.NoError(t, err) - require.Contains(t, untracked, handle.containerID) - require.Contains(t, untracked, untrackedNomadContainer.ID) - require.NotContains(t, untracked, nonNomadContainer.ID) + untracked, err = reconciler.untrackedContainers(set.New[string](0), time.Now()) + must.NoError(t, err) + must.Contains[string](t, handle.containerID, untracked) + must.Contains[string](t, untrackedNomadContainer.ID, untracked) + must.NotContains[string](t, nonNomadContainer.ID, untracked) // but ignore if creation happened before cutoff - untracked, err = reconciler.untrackedContainers(map[string]bool{}, time.Now().Add(-1*time.Minute)) - require.NoError(t, err) - require.NotContains(t, untracked, handle.containerID) - require.NotContains(t, untracked, untrackedNomadContainer.ID) - require.NotContains(t, untracked, nonNomadContainer.ID) + untracked, err = reconciler.untrackedContainers(set.New[string](0), time.Now().Add(-1*time.Minute)) + must.NoError(t, err) + must.NotContains[string](t, handle.containerID, untracked) + must.NotContains[string](t, untrackedNomadContainer.ID, untracked) + must.NotContains[string](t, nonNomadContainer.ID, untracked) // a full integration tests to assert that containers are removed prestineDriver := dockerDriverHarness(t, nil).Impl().(*Driver) @@ -144,18 +150,54 @@ func TestDanglingContainerRemoval(t *testing.T) { } nReconciler := newReconciler(prestineDriver) - require.NoError(t, nReconciler.removeDanglingContainersIteration()) + err = nReconciler.removeDanglingContainersIteration() + must.NoError(t, err) + + _, err = dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ID: nonNomadContainer.ID}) + must.NoError(t, err) + + _, err = dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ID: handle.containerID}) + must.ErrorContains(t, err, NoSuchContainerError) + + _, err = dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ID: untrackedNomadContainer.ID}) + must.ErrorContains(t, err, NoSuchContainerError) +} + +var ( + dockerNetRe = regexp.MustCompile(`/var/run/docker/netns/[[:xdigit:]]`) +) - _, err = client.InspectContainer(nonNomadContainer.ID) - require.NoError(t, err) +func TestDanglingContainerRemoval_network(t *testing.T) { + ci.Parallel(t) + testutil.DockerCompatible(t) + testutil.RequireLinux(t) // bridge implies linux - _, err = client.InspectContainer(handle.containerID) - require.Error(t, err) - require.Contains(t, err.Error(), NoSuchContainerError) + dd := dockerDriverHarness(t, nil).Impl().(*Driver) + reconciler := newReconciler(dd) - _, err = client.InspectContainer(untrackedNomadContainer.ID) - require.Error(t, err) - require.Contains(t, err.Error(), NoSuchContainerError) + // create a pause container + allocID := uuid.Generate() + spec, created, err := dd.CreateNetwork(allocID, &drivers.NetworkCreateRequest{ + Hostname: "hello", + }) + must.NoError(t, err) + must.True(t, created) + must.RegexMatch(t, dockerNetRe, spec.Path) + id := spec.Labels[dockerNetSpecLabelKey] + + // execute reconciliation + err = reconciler.removeDanglingContainersIteration() + must.NoError(t, err) + + dockerClient := newTestDockerClient(t) + c, iErr := dockerClient.InspectContainerWithOptions(docker.InspectContainerOptions{ID: id}) + must.NoError(t, iErr) + must.Eq(t, "running", c.State.Status) + fmt.Println("state", c.State) + + // cleanup pause container + err = dd.DestroyNetwork(allocID, spec) + must.NoError(t, err) } // TestDanglingContainerRemoval_Stopped asserts stopped containers without @@ -166,8 +208,8 @@ func TestDanglingContainerRemoval_Stopped(t *testing.T) { _, cfg, _ := dockerTask(t) - client := newTestDockerClient(t) - container, err := client.CreateContainer(docker.CreateContainerOptions{ + dockerClient := newTestDockerClient(t) + container, err := dockerClient.CreateContainer(docker.CreateContainerOptions{ Name: "mytest-image-" + uuid.Generate(), Config: &docker.Config{ Image: cfg.Image, @@ -177,33 +219,35 @@ func TestDanglingContainerRemoval_Stopped(t *testing.T) { }, }, }) - require.NoError(t, err) - defer client.RemoveContainer(docker.RemoveContainerOptions{ - ID: container.ID, - Force: true, + must.NoError(t, err) + t.Cleanup(func() { + _ = dockerClient.RemoveContainer(docker.RemoveContainerOptions{ + ID: container.ID, + Force: true, + }) }) - err = client.StartContainer(container.ID, nil) - require.NoError(t, err) + err = dockerClient.StartContainer(container.ID, nil) + must.NoError(t, err) - err = client.StopContainer(container.ID, 60) - require.NoError(t, err) + err = dockerClient.StopContainer(container.ID, 60) + must.NoError(t, err) dd := dockerDriverHarness(t, nil).Impl().(*Driver) reconciler := newReconciler(dd) // assert nomad container is tracked, and we ignore stopped one - tf := reconciler.trackedContainers() - require.NotContains(t, tf, container.ID) + tracked := reconciler.trackedContainers() + must.NotContains[string](t, container.ID, tracked) - untracked, err := reconciler.untrackedContainers(map[string]bool{}, time.Now()) - require.NoError(t, err) - require.NotContains(t, untracked, container.ID) + untracked, err := reconciler.untrackedContainers(set.New[string](0), time.Now()) + must.NoError(t, err) + must.NotContains[string](t, container.ID, untracked) // if we start container again, it'll be marked as untracked - require.NoError(t, client.StartContainer(container.ID, nil)) + must.NoError(t, dockerClient.StartContainer(container.ID, nil)) - untracked, err = reconciler.untrackedContainers(map[string]bool{}, time.Now()) - require.NoError(t, err) - require.Contains(t, untracked, container.ID) + untracked, err = reconciler.untrackedContainers(set.New[string](0), time.Now()) + must.NoError(t, err) + must.Contains[string](t, container.ID, untracked) } diff --git a/drivers/docker/state.go b/drivers/docker/state.go index 309eba957655..3365a88f70b0 100644 --- a/drivers/docker/state.go +++ b/drivers/docker/state.go @@ -2,6 +2,8 @@ package docker import ( "sync" + + "github.com/hashicorp/go-set" ) type taskStore struct { @@ -26,6 +28,17 @@ func (ts *taskStore) Get(id string) (*taskHandle, bool) { return t, ok } +func (ts *taskStore) IDs() *set.Set[string] { + ts.lock.RLock() + defer ts.lock.RUnlock() + + s := set.New[string](len(ts.store)) + for _, handle := range ts.store { + s.Insert(handle.containerID) + } + return s +} + func (ts *taskStore) Delete(id string) { ts.lock.Lock() defer ts.lock.Unlock()