Skip to content

Commit

Permalink
docker: fix bug where network pause containers would be erroneously r…
Browse files Browse the repository at this point in the history
…econciled (#16352)

* docker: fix bug where network pause containers would be erroneously gc'd

* docker: cl: thread context from driver into pause container restoration
  • Loading branch information
shoenig authored and philrenaud committed Mar 14, 2023
1 parent 4e0ae26 commit 4f3f1e0
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 112 deletions.
3 changes: 3 additions & 0 deletions .changelog/16352.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
docker: Fixed a bug where pause containers would be erroneously removed
```
87 changes: 80 additions & 7 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/drivers/docker/docklog"
Expand Down Expand Up @@ -84,6 +85,35 @@ const (
dockerLabelNodeID = "com.hashicorp.nomad.node_id"
)

type pauseContainerStore struct {
lock sync.Mutex
containerIDs *set.Set[string]
}

func newPauseContainerStore() *pauseContainerStore {
return &pauseContainerStore{
containerIDs: set.New[string](10),
}
}

func (s *pauseContainerStore) add(id string) {
s.lock.Lock()
defer s.lock.Unlock()
s.containerIDs.Insert(id)
}

func (s *pauseContainerStore) remove(id string) {
s.lock.Lock()
defer s.lock.Unlock()
s.containerIDs.Remove(id)
}

func (s *pauseContainerStore) union(other *set.Set[string]) *set.Set[string] {
s.lock.Lock()
defer s.lock.Unlock()
return other.Union(s.containerIDs)
}

type Driver struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
Expand All @@ -104,6 +134,9 @@ type Driver struct {
// tasks is the in memory datastore mapping taskIDs to taskHandles
tasks *taskStore

// pauseContainers keeps track of pause container IDs in use by allocations
pauseContainers *pauseContainerStore

// coordinator is what tracks multiple image pulls against the same docker image
coordinator *dockerCoordinator

Expand All @@ -130,13 +163,16 @@ type Driver struct {
// NewDockerDriver returns a docker implementation of a driver plugin
func NewDockerDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &DriverConfig{},
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
}
driver := &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &DriverConfig{},
tasks: newTaskStore(),
pauseContainers: newPauseContainerStore(),
ctx: ctx,
logger: logger,
}
go driver.recoverPauseContainers(ctx)
return driver
}

func (d *Driver) reattachToDockerLogger(reattachConfig *pstructs.ReattachConfig) (docklog.DockerLogger, *plugin.Client, error) {
Expand Down Expand Up @@ -238,6 +274,9 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
}

d.tasks.Set(handle.Config.ID, h)

// find a pause container?

go h.run()

return nil
Expand Down Expand Up @@ -709,6 +748,40 @@ func (d *Driver) containerBinds(task *drivers.TaskConfig, driverConfig *TaskConf
return binds, nil
}

func (d *Driver) recoverPauseContainers(ctx context.Context) {
// On Client restart, we must rebuild the set of pause containers
// we are tracking. Basically just scan all containers and pull the ID from
// anything that has the Nomad Label and has Name with prefix "/nomad_init_".

_, dockerClient, err := d.dockerClients()
if err != nil {
d.logger.Error("failed to recover pause containers", "error", err)
return
}

containers, listErr := dockerClient.ListContainers(docker.ListContainersOptions{
Context: ctx,
All: false, // running only
Filters: map[string][]string{
"label": {dockerLabelAllocID},
},
})
if listErr != nil {
d.logger.Error("failed to list pause containers", "error", err)
return
}

CONTAINER:
for _, c := range containers {
for _, name := range c.Names {
if strings.HasPrefix(name, "/nomad_init_") {
d.pauseContainers.add(c.ID)
continue CONTAINER
}
}
}
}

var userMountToUnixMount = map[string]string{
// Empty string maps to `rprivate` for backwards compatibility in restored
// older tasks, where mount propagation will not be present.
Expand Down
11 changes: 10 additions & 1 deletion drivers/docker/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,27 @@ func (d *Driver) CreateNetwork(allocID string, createSpec *drivers.NetworkCreate
return nil, false, err
}

// keep track of this pause container for reconciliation
d.pauseContainers.add(container.ID)

return specFromContainer(container, createSpec.Hostname), true, nil
}

func (d *Driver) DestroyNetwork(allocID string, spec *drivers.NetworkIsolationSpec) error {
id := spec.Labels[dockerNetSpecLabelKey]

// no longer tracking this pause container; even if we fail here we should
// let the background reconciliation keep trying
d.pauseContainers.remove(id)

client, _, err := d.dockerClients()
if err != nil {
return fmt.Errorf("failed to connect to docker daemon: %s", err)
}

if err := client.RemoveContainer(docker.RemoveContainerOptions{
Force: true,
ID: spec.Labels[dockerNetSpecLabelKey],
ID: id,
}); err != nil {
return err
}
Expand Down
37 changes: 17 additions & 20 deletions drivers/docker/reconcile_dangling.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

docker "github.com/fsouza/go-dockerclient"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-set"
)

// containerReconciler detects and kills unexpectedly running containers.
Expand All @@ -24,7 +25,7 @@ type containerReconciler struct {
logger hclog.Logger

isDriverHealthy func() bool
trackedContainers func() map[string]bool
trackedContainers func() *set.Set[string]
isNomadContainer func(c docker.APIContainers) bool

once sync.Once
Expand Down Expand Up @@ -96,7 +97,7 @@ func (r *containerReconciler) removeDanglingContainersIteration() error {
return fmt.Errorf("failed to find untracked containers: %v", err)
}

if len(untracked) == 0 {
if untracked.Empty() {
return nil
}

Expand All @@ -105,7 +106,7 @@ func (r *containerReconciler) removeDanglingContainersIteration() error {
return nil
}

for _, id := range untracked {
for _, id := range untracked.Slice() {
ctx, cancel := r.dockerAPIQueryContext()
err := client.RemoveContainer(docker.RemoveContainerOptions{
Context: ctx,
Expand All @@ -125,8 +126,8 @@ func (r *containerReconciler) removeDanglingContainersIteration() error {

// untrackedContainers returns the ids of containers that suspected
// to have been started by Nomad but aren't tracked by this driver
func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutoffTime time.Time) ([]string, error) {
result := []string{}
func (r *containerReconciler) untrackedContainers(tracked *set.Set[string], cutoffTime time.Time) (*set.Set[string], error) {
result := set.New[string](10)

ctx, cancel := r.dockerAPIQueryContext()
defer cancel()
Expand All @@ -142,7 +143,7 @@ func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutof
cutoff := cutoffTime.Unix()

for _, c := range cc {
if tracked[c.ID] {
if tracked.Contains(c.ID) {
continue
}

Expand All @@ -154,9 +155,8 @@ func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutof
continue
}

result = append(result, c.ID)
result.Insert(c.ID)
}

return result, nil
}

Expand All @@ -165,7 +165,7 @@ func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutof
//
// We'll try hitting Docker API on subsequent iteration.
func (r *containerReconciler) dockerAPIQueryContext() (context.Context, context.CancelFunc) {
// use a reasoanble floor to avoid very small limit
// use a reasonable floor to avoid very small limit
timeout := 30 * time.Second

if timeout < r.config.period {
Expand Down Expand Up @@ -211,18 +211,15 @@ func hasNomadName(c docker.APIContainers) bool {
return true
}
}

return false
}

func (d *Driver) trackedContainers() map[string]bool {
d.tasks.lock.RLock()
defer d.tasks.lock.RUnlock()

r := make(map[string]bool, len(d.tasks.store))
for _, h := range d.tasks.store {
r[h.containerID] = true
}

return r
// trackedContainers returns the set of container IDs of containers that were
// started by Driver and are expected to be running. This includes both normal
// Task containers, as well as infra pause containers.
func (d *Driver) trackedContainers() *set.Set[string] {
// collect the task containers
ids := d.tasks.IDs()
// now also accumulate pause containers
return d.pauseContainers.union(ids)
}
Loading

0 comments on commit 4f3f1e0

Please sign in to comment.