Skip to content

Commit

Permalink
refactor reconciler code and address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahmood Ali committed Oct 17, 2019
1 parent c8ba2d1 commit 97f0875
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 121 deletions.
40 changes: 31 additions & 9 deletions drivers/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,14 @@ var (
hclspec.NewAttr("period", "string", false),
hclspec.NewLiteral(`"5m"`),
),
"creation_timeout": hclspec.NewDefault(
hclspec.NewAttr("creation_timeout", "string", false),
"creation_grace": hclspec.NewDefault(
hclspec.NewAttr("creation_grace", "string", false),
hclspec.NewLiteral(`"5m"`),
),
"dry_run": hclspec.NewDefault(
hclspec.NewAttr("dry_run", "bool", false),
hclspec.NewLiteral(`false`),
),
})

// configSpec is the hcl specification returned by the ConfigSchema RPC
Expand Down Expand Up @@ -510,14 +514,26 @@ type DockerVolumeDriverConfig struct {
Options hclutils.MapStrStr `codec:"options"`
}

// ContainerGCConfig controls the behavior of the GC reconciler to detects
// dangling nomad containers that aren't tracked due to docker/nomad bugs
type ContainerGCConfig struct {
// Enabled controls whether container reconciler is enabled
Enabled bool `codec:"enabled"`

// DryRun indicates that reconciler should log unexpectedly running containers
// if found without actually killing them
DryRun bool `codec:"dry_run"`

// PeriodStr controls the frequency of scanning containers
PeriodStr string `codec:"period"`
period time.Duration `codec:"-"`

CreationTimeoutStr string `codec:"creation_timeout"`
creationTimeout time.Duration `codec:"-"`
// CreationGraceStr is the duration allowed for a newly created container
// to live without being registered as a running task in nomad.
// A container is treated as leaked if it lived more than grace duration
// and haven't been registered in tasks.
CreationGraceStr string `codec:"creation_grace"`
CreationGrace time.Duration `codec:"-"`
}

type DriverConfig struct {
Expand Down Expand Up @@ -565,6 +581,8 @@ func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
}

const danglingContainersCreationGraceMinimum = 1 * time.Minute

func (d *Driver) SetConfig(c *base.Config) error {
var config DriverConfig
if len(c.PluginConfig) != 0 {
Expand All @@ -590,12 +608,15 @@ func (d *Driver) SetConfig(c *base.Config) error {
d.config.GC.DanglingContainers.period = dur
}

if len(d.config.GC.DanglingContainers.CreationTimeoutStr) > 0 {
dur, err := time.ParseDuration(d.config.GC.DanglingContainers.CreationTimeoutStr)
if len(d.config.GC.DanglingContainers.CreationGraceStr) > 0 {
dur, err := time.ParseDuration(d.config.GC.DanglingContainers.CreationGraceStr)
if err != nil {
return fmt.Errorf("failed to parse 'container_delay' duration: %v", err)
return fmt.Errorf("failed to parse 'creation_grace' duration: %v", err)
}
if dur < danglingContainersCreationGraceMinimum {
return fmt.Errorf("creation_grace is less than minimum, %v", danglingContainersCreationGraceMinimum)
}
d.config.GC.DanglingContainers.creationTimeout = dur
d.config.GC.DanglingContainers.CreationGrace = dur
}

if c.AgentConfig != nil {
Expand All @@ -615,7 +636,8 @@ func (d *Driver) SetConfig(c *base.Config) error {

d.coordinator = newDockerCoordinator(coordinatorConfig)

go d.removeDanglingContainersGoroutine()
reconciler := newReconciler(d)
reconciler.Start()

return nil
}
Expand Down
118 changes: 72 additions & 46 deletions drivers/docker/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,111 @@ import (
"context"
"fmt"
"regexp"
"strings"
"time"

docker "github.com/fsouza/go-dockerclient"
hclog "github.com/hashicorp/go-hclog"
)

func (d *Driver) removeDanglingContainersGoroutine() {
if !d.config.GC.DanglingContainers.Enabled {
d.logger.Debug("skipping dangling containers handling; is disabled")
// containerReconciler detects and kills unexpectedly running containers.
//
// Due to Docker architecture and network based communication, it is
// possible for Docker to start a container successfully, but have the
// creation API call fail with a network error. containerReconciler
// scans for these untracked containers and kill them.
type containerReconciler struct {
ctx context.Context
config *ContainerGCConfig
client *docker.Client
logger hclog.Logger

isDriverHealthy func() bool
trackedContainers func() map[string]bool
isNomadContainer func(c docker.APIContainers) bool
}

func newReconciler(d *Driver) *containerReconciler {
return &containerReconciler{
ctx: d.ctx,
config: &d.config.GC.DanglingContainers,
client: client,
logger: d.logger,

isDriverHealthy: func() bool { return d.previouslyDetected() && d.fingerprintSuccessful() },
trackedContainers: d.trackedContainers,
isNomadContainer: isNomadContainer,
}
}

func (r *containerReconciler) Start() {
if !r.config.Enabled {
r.logger.Debug("skipping dangling containers handling; is disabled")
return
}

period := d.config.GC.DanglingContainers.period
go r.removeDanglingContainersGoroutine()
}

func (r *containerReconciler) removeDanglingContainersGoroutine() {
period := r.config.period

succeeded := true
lastIterSucceeded := true

// ensure that we wait for at least a period or creation timeout
// for first container GC iteration
// The initial period is a grace period for restore allocation
// before a driver may kill containers launched by an earlier nomad
// process.
initialDelay := period
if d.config.GC.DanglingContainers.creationTimeout > initialDelay {
initialDelay = d.config.GC.DanglingContainers.creationTimeout
if r.config.CreationGrace > initialDelay {
initialDelay = r.config.CreationGrace
}

timer := time.NewTimer(initialDelay)
for {
select {
case <-timer.C:
if d.previouslyDetected() && d.fingerprintSuccessful() {
err := d.removeDanglingContainersIteration()
if err != nil && succeeded {
d.logger.Warn("failed to remove dangling containers", "error", err)
if r.isDriverHealthy() {
err := r.removeDanglingContainersIteration()
if err != nil && lastIterSucceeded {
r.logger.Warn("failed to remove dangling containers", "error", err)
}
succeeded = (err == nil)
lastIterSucceeded = (err == nil)
}

timer.Reset(period)
case <-d.ctx.Done():
case <-r.ctx.Done():
return
}
}
}

func (d *Driver) removeDanglingContainersIteration() error {
tracked := d.trackedContainers()
untracked, err := d.untrackedContainers(tracked, d.config.GC.DanglingContainers.creationTimeout)
func (r *containerReconciler) removeDanglingContainersIteration() error {
cutoff := time.Now().Add(-r.config.CreationGrace)
tracked := r.trackedContainers()
untracked, err := r.untrackedContainers(tracked, cutoff)
if err != nil {
return fmt.Errorf("failed to find untracked containers: %v", err)
}

if len(untracked) == 0 {
return nil
}

if r.config.DryRun {
r.logger.Info("detected untracked containers", "container_ids", untracked)
return nil
}

for _, id := range untracked {
d.logger.Info("removing untracked container", "container_id", id)
err := client.RemoveContainer(docker.RemoveContainerOptions{
ID: id,
Force: true,
})
if err != nil {
d.logger.Warn("failed to remove untracked container", "container_id", id, "error", err)
r.logger.Warn("failed to remove untracked container", "container_id", id, "error", err)
} else {
r.logger.Info("removed untracked container", "container_id", id)
}
}

Expand All @@ -72,15 +117,17 @@ func (d *Driver) removeDanglingContainersIteration() error {

// untrackedContainers returns the ids of containers that suspected
// to have been started by Nomad but aren't tracked by this driver
func (d *Driver) untrackedContainers(tracked map[string]bool, creationTimeout time.Duration) ([]string, error) {
func (r *containerReconciler) untrackedContainers(tracked map[string]bool, cutoffTime time.Time) ([]string, error) {
result := []string{}

cc, err := client.ListContainers(docker.ListContainersOptions{})
cc, err := client.ListContainers(docker.ListContainersOptions{
All: false, // only reconcile running containers
})
if err != nil {
return nil, fmt.Errorf("failed to list containers: %v", err)
}

cutoff := time.Now().Add(-creationTimeout).Unix()
cutoff := cutoffTime.Unix()

for _, c := range cc {
if tracked[c.ID] {
Expand All @@ -91,7 +138,7 @@ func (d *Driver) untrackedContainers(tracked map[string]bool, creationTimeout ti
continue
}

if !d.isNomadContainer(c) {
if !r.isNomadContainer(c) {
continue
}

Expand All @@ -101,7 +148,7 @@ func (d *Driver) untrackedContainers(tracked map[string]bool, creationTimeout ti
return result, nil
}

func (d *Driver) isNomadContainer(c docker.APIContainers) bool {
func isNomadContainer(c docker.APIContainers) bool {
if _, ok := c.Labels["com.hashicorp.nomad.alloc_id"]; ok {
return true
}
Expand All @@ -116,18 +163,7 @@ func (d *Driver) isNomadContainer(c docker.APIContainers) bool {
return false
}

// double check before killing process
ctx, cancel := context.WithTimeout(d.ctx, 20*time.Second)
defer cancel()

ci, err := client.InspectContainerWithContext(c.ID, ctx)
if err != nil {
return false
}

env := ci.Config.Env
return hasEnvVar(env, "NOMAD_ALLOC_ID") &&
hasEnvVar(env, "NOMAD_GROUP_NAME")
return true
}

func hasMount(c docker.APIContainers, p string) bool {
Expand All @@ -152,16 +188,6 @@ func hasNomadName(c docker.APIContainers) bool {
return false
}

func hasEnvVar(vars []string, key string) bool {
for _, v := range vars {
if strings.HasPrefix(v, key+"=") {
return true
}
}

return false
}

func (d *Driver) trackedContainers() map[string]bool {
d.tasks.lock.RLock()
defer d.tasks.lock.RUnlock()
Expand Down
Loading

0 comments on commit 97f0875

Please sign in to comment.