diff --git a/client/config/config.go b/client/config/config.go index 3680fd709cd6..54936ec0b081 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -233,6 +233,29 @@ func (c *Config) ReadBoolDefault(id string, defaultValue bool) bool { return val } +// ReadDuration parses the specified option as a duration. +func (c *Config) ReadDuration(id string) (time.Duration, error) { + val, ok := c.Options[id] + if !ok { + return time.Duration(0), fmt.Errorf("Specified config is missing from options") + } + dval, err := time.ParseDuration(val) + if err != nil { + return time.Duration(0), fmt.Errorf("Failed to parse %s as time duration: %s", val, err) + } + return dval, nil +} + +// ReadDurationDefault tries to parse the specified option as a duration. If there is +// an error in parsing, the default option is returned. +func (c *Config) ReadDurationDefault(id string, defaultValue time.Duration) time.Duration { + val, err := c.ReadDuration(id) + if err != nil { + return defaultValue + } + return val +} + // ReadStringListToMap tries to parse the specified option as a comma separated list. // If there is an error in parsing, an empty list is returned. func (c *Config) ReadStringListToMap(key string) map[string]struct{} { diff --git a/client/driver/docker.go b/client/driver/docker.go index 50201134d584..e7ba88515f55 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -7,7 +7,6 @@ import ( "net" "os" "path/filepath" - "regexp" "runtime" "strconv" "strings" @@ -58,11 +57,7 @@ var ( recoverableErrTimeouts = func(err error) error { r := false if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") || - strings.Contains(err.Error(), "EOF") || - // TODO Remove when we implement global co-ordination among docker - // drivers to not remove images which are in use by instances of - // other drivers - strings.Contains(err.Error(), "no such image") { + strings.Contains(err.Error(), "EOF") { r = true } return structs.NewRecoverableError(err, r) @@ -96,6 +91,11 @@ const ( dockerCleanupImageConfigOption = "docker.cleanup.image" dockerCleanupImageConfigDefault = true + // dockerPullTimeoutConfigOption is the key for setting an images pull + // timeout + dockerImageRemoveDelayConfigOption = "docker.cleanup.image.delay" + dockerImageRemoveDelayConfigDefault = 3 * time.Minute + // dockerTimeout is the length of time a request can be outstanding before // it is timed out. dockerTimeout = 5 * time.Minute @@ -130,7 +130,7 @@ type DockerLoggingOpts struct { type DockerDriverConfig struct { ImageName string `mapstructure:"image"` // Container's Image Name - LoadImages []string `mapstructure:"load"` // LoadImage is array of paths to image archive files + LoadImage string `mapstructure:"load"` // LoadImage is a path to an image archive file Command string `mapstructure:"command"` // The Command to run when the container starts up Args []string `mapstructure:"args"` // The arguments to the Command IpcMode string `mapstructure:"ipc_mode"` // The IPC mode of the container - host and none @@ -191,11 +191,11 @@ func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*Docke dconf.UTSMode = env.ReplaceEnv(dconf.UTSMode) dconf.Hostname = env.ReplaceEnv(dconf.Hostname) dconf.WorkDir = env.ReplaceEnv(dconf.WorkDir) + dconf.LoadImage = env.ReplaceEnv(dconf.LoadImage) dconf.Volumes = env.ParseAndReplace(dconf.Volumes) dconf.VolumeDriver = env.ReplaceEnv(dconf.VolumeDriver) dconf.DNSServers = env.ParseAndReplace(dconf.DNSServers) dconf.DNSSearchDomains = env.ParseAndReplace(dconf.DNSSearchDomains) - dconf.LoadImages = env.ParseAndReplace(dconf.LoadImages) for _, m := range dconf.LabelsRaw { for k, v := range m { @@ -241,6 +241,7 @@ func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*Docke type dockerPID struct { Version string + Image string ImageID string ContainerID string KillTimeout time.Duration @@ -254,6 +255,8 @@ type DockerHandle struct { client *docker.Client waitClient *docker.Client logger *log.Logger + Image string + ImageID string containerID string version string clkSpeed float64 @@ -321,7 +324,7 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error { Required: true, }, "load": &fields.FieldSchema{ - Type: fields.TypeArray, + Type: fields.TypeString, }, "command": &fields.FieldSchema{ Type: fields.TypeString, @@ -416,6 +419,18 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation { return cstructs.FSIsolationImage } +// getDockerCoordinator returns the docker coordinator +func (d *DockerDriver) getDockerCoordinator(client *docker.Client) *dockerCoordinator { + config := &dockerCoordinatorConfig{ + client: client, + cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault), + logger: d.logger, + removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault), + } + + return GetDockerCoordinator(config) +} + func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) { driverConfig, err := NewDockerDriverConfig(task, d.taskEnv) if err != nil { @@ -432,22 +447,14 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR } // Ensure the image is available - if err := d.createImage(driverConfig, client, ctx.TaskDir); err != nil { - return nil, err - } - - // Regardless of whether the image was downloaded already or not, store - // it as a created resource. Cleanup will soft fail if the image is - // still in use by another contianer. - dockerImage, err := client.InspectImage(driverConfig.ImageName) + id, err := d.createImage(driverConfig, client, ctx.TaskDir) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err) return nil, err } res := NewCreatedResources() - res.Add(dockerImageResKey, dockerImage.ID) - d.imageID = dockerImage.ID + res.Add(dockerImageResKey, id) + d.imageID = id return res, nil } @@ -535,6 +542,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle executor: exec, pluginClient: pluginClient, logger: d.logger, + Image: d.driverConfig.ImageName, + ImageID: d.imageID, containerID: container.ID, version: d.config.Version, killTimeout: GetKillTimeout(task.KillTimeout, maxKill), @@ -585,20 +594,9 @@ func (d *DockerDriver) cleanupImage(id string) error { return nil } - if err := client.RemoveImage(id); err != nil { - if err == docker.ErrNoSuchImage { - d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: does not exist", id) - return nil - } - if derr, ok := err.(*docker.Error); ok && derr.Status == 409 { - d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id) - return nil - } - // Retry on unknown errors - return structs.NewRecoverableError(err, true) - } + coordinator := d.getDockerCoordinator(client) + coordinator.RemoveImage(id) - d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id) return nil } @@ -946,35 +944,21 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas }, nil } -var ( - // imageNotFoundMatcher is a regex expression that matches the image not - // found error Docker returns. - imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`) -) - -// recoverablePullError wraps the error gotten when trying to pull and image if -// the error is recoverable. -func (d *DockerDriver) recoverablePullError(err error, image string) error { - recoverable := true - if imageNotFoundMatcher.MatchString(err.Error()) { - recoverable = false - } - return structs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable) -} - func (d *DockerDriver) Periodic() (bool, time.Duration) { return true, 15 * time.Second } // createImage creates a docker image either by pulling it from a registry or by // loading it from the file system -func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error { +func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) (string, error) { image := driverConfig.ImageName repo, tag := docker.ParseRepositoryTag(image) if tag == "" { tag = "latest" } + coordinator := d.getDockerCoordinator(client) + // We're going to check whether the image is already downloaded. If the tag // is "latest", or ForcePull is set, we have to check for a new version every time so we don't // bother to check and cache the id here. We'll download first, then cache. @@ -982,46 +966,38 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc d.logger.Printf("[DEBUG] driver.docker: force pull image '%s:%s' instead of inspecting local", repo, tag) } else if tag != "latest" { if dockerImage, _ := client.InspectImage(image); dockerImage != nil { - // Image exists, nothing to do - return nil + // Image exists so just increment its reference count + coordinator.IncrementImageReference(dockerImage.ID, image) + return dockerImage.ID, nil } } // Load the image if specified - if len(driverConfig.LoadImages) > 0 { + if driverConfig.LoadImage != "" { return d.loadImage(driverConfig, client, taskDir) } // Download the image - if err := d.pullImage(driverConfig, client, repo, tag); err != nil { - return err - } - return nil + return d.pullImage(driverConfig, client, repo, tag) } // pullImage creates an image by pulling it from a docker registry -func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docker.Client, repo string, tag string) error { - pullOptions := docker.PullImageOptions{ - Repository: repo, - Tag: tag, - } - - authOptions := docker.AuthConfiguration{} +func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docker.Client, repo, tag string) (id string, err error) { + var authOptions *docker.AuthConfiguration if len(driverConfig.Auth) != 0 { - authOptions = docker.AuthConfiguration{ + authOptions = &docker.AuthConfiguration{ Username: driverConfig.Auth[0].Username, Password: driverConfig.Auth[0].Password, Email: driverConfig.Auth[0].Email, ServerAddress: driverConfig.Auth[0].ServerAddress, } } else if authConfigFile := d.config.Read("docker.auth.config"); authConfigFile != "" { - authOptionsPtr, err := authOptionFrom(authConfigFile, repo) + authOptions, err := authOptionFrom(authConfigFile, repo) if err != nil { d.logger.Printf("[INFO] driver.docker: failed to find docker auth for repo %q: %v", repo, err) - return fmt.Errorf("Failed to find docker auth for repo %q: %v", repo, err) + return "", fmt.Errorf("Failed to find docker auth for repo %q: %v", repo, err) } - authOptions = *authOptionsPtr if authOptions.Email == "" && authOptions.Password == "" && authOptions.ServerAddress == "" && authOptions.Username == "" { d.logger.Printf("[DEBUG] driver.docker: did not find docker auth for repo %q", repo) @@ -1029,33 +1005,35 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke } d.emitEvent("Downloading image %s:%s", repo, tag) - err := client.PullImage(pullOptions, authOptions) + coordinator := d.getDockerCoordinator(client) + return coordinator.PullImage(driverConfig.ImageName, authOptions) +} + +// loadImage creates an image by loading it from the file system +func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docker.Client, + taskDir *allocdir.TaskDir) (id string, err error) { + + archive := filepath.Join(taskDir.LocalDir, driverConfig.LoadImage) + d.logger.Printf("[DEBUG] driver.docker: loading image from: %v", archive) + + f, err := os.Open(archive) if err != nil { - d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) - return d.recoverablePullError(err, driverConfig.ImageName) + return "", fmt.Errorf("unable to open image archive: %v", err) } - d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag) - return nil -} + if err := client.LoadImage(docker.LoadImageOptions{InputStream: f}); err != nil { + return "", err + } + f.Close() -// loadImage creates an image by loading it from the file system -func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error { - var errors multierror.Error - for _, image := range driverConfig.LoadImages { - archive := filepath.Join(taskDir.LocalDir, image) - d.logger.Printf("[DEBUG] driver.docker: loading image from: %v", archive) - f, err := os.Open(archive) - if err != nil { - errors.Errors = append(errors.Errors, fmt.Errorf("unable to open image archive: %v", err)) - continue - } - if err := client.LoadImage(docker.LoadImageOptions{InputStream: f}); err != nil { - errors.Errors = append(errors.Errors, err) - } - f.Close() + dockerImage, err := client.InspectImage(driverConfig.ImageName) + if err != nil { + return "", recoverableErrTimeouts(err) } - return errors.ErrorOrNil() + + coordinator := d.getDockerCoordinator(client) + coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName) + return dockerImage.ID, nil } // createContainer creates the container given the passed configuration. It @@ -1206,6 +1184,11 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er ver, _ := exec.Version() d.logger.Printf("[DEBUG] driver.docker: version of executor: %v", ver.Version) + // Increment the reference count since we successfully attached to this + // container + coordinator := d.getDockerCoordinator(client) + coordinator.IncrementImageReference(pid.ImageID, pid.Image) + // Return a driver handle h := &DockerHandle{ client: client, @@ -1234,6 +1217,8 @@ func (h *DockerHandle) ID() string { pid := dockerPID{ Version: h.version, ContainerID: h.containerID, + Image: h.Image, + ImageID: h.ImageID, KillTimeout: h.killTimeout, MaxKillTimeout: h.maxKillTimeout, PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go new file mode 100644 index 000000000000..32491d5f5c43 --- /dev/null +++ b/client/driver/docker_coordinator.go @@ -0,0 +1,288 @@ +package driver + +import ( + "context" + "fmt" + "log" + "regexp" + "sync" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/nomad/nomad/structs" +) + +var ( + // createCoordinator allows us to only create a single coordinator + createCoordinator sync.Once + + // globalCoordinator is the shared coordinator and should only be retreived + // using the GetDockerCoordinator() method. + globalCoordinator *dockerCoordinator + + // imageNotFoundMatcher is a regex expression that matches the image not + // found error Docker returns. + imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`) +) + +// pullFuture is a sharable future for retrieving a pulled images ID and any +// error that may have occured during the pull. +type pullFuture struct { + waitCh chan struct{} + + err error + imageID string +} + +// newPullFuture returns a new pull future +func newPullFuture() *pullFuture { + return &pullFuture{ + waitCh: make(chan struct{}), + } +} + +// wait waits till the future has a result +func (p *pullFuture) wait() *pullFuture { + <-p.waitCh + return p +} + +// result returns the results of the future and should only ever be called after +// wait returns. +func (p *pullFuture) result() (imageID string, err error) { + return p.imageID, p.err +} + +// set is used to set the results and unblock any waiter. This may only be +// called once. +func (p *pullFuture) set(imageID string, err error) { + p.imageID = imageID + p.err = err + close(p.waitCh) +} + +// DockerImageClient provides the methods required to do CRUD operations on the +// Docker images +type DockerImageClient interface { + PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error + InspectImage(id string) (*docker.Image, error) + RemoveImage(id string) error +} + +// dockerCoordinatorConfig is used to configure the Docker coordinator. +type dockerCoordinatorConfig struct { + // logger is the logger the coordinator should use + logger *log.Logger + + // cleanup marks whether images should be deleting when the reference count + // is zero + cleanup bool + + // client is the Docker client to use for communicating with Docker + client DockerImageClient + + // removeDelay is the delay between an image's reference count going to + // zero and the image actually being deleted. + removeDelay time.Duration +} + +// dockerCoordinator is used to coordinate actions against images to prevent +// racy deletions. It can be thought of as a reference counter on images. +type dockerCoordinator struct { + *dockerCoordinatorConfig + + // imageLock is used to lock access to all images + imageLock sync.Mutex + + // pullFutures is used to allow multiple callers to pull the same image but + // only have one request be sent to Docker + pullFutures map[string]*pullFuture + + // imageRefCount is the reference count of image IDs + imageRefCount map[string]int + + // deleteFuture is indexed by image ID and has a cancable delete future + deleteFuture map[string]context.CancelFunc +} + +// NewDockerCoordinator returns a new Docker coordinator +func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { + if config.client == nil { + return nil + } + + return &dockerCoordinator{ + dockerCoordinatorConfig: config, + pullFutures: make(map[string]*pullFuture), + imageRefCount: make(map[string]int), + deleteFuture: make(map[string]context.CancelFunc), + } +} + +// GetDockerCoordinator returns the shared dockerCoordinator instance +func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { + createCoordinator.Do(func() { + globalCoordinator = NewDockerCoordinator(config) + }) + + return globalCoordinator +} + +// PullImage is used to pull an image. It returns the pulled imaged ID or an +// error that occured during the pull +func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration) (imageID string, err error) { + // Lock while we look up the future + d.imageLock.Lock() + + // Get the future + future, ok := d.pullFutures[image] + if !ok { + // Make the future + future = newPullFuture() + d.pullFutures[image] = future + go d.pullImageImpl(image, authOptions, future) + } + d.imageLock.Unlock() + + // We unlock while we wait since this can take a while + id, err := future.wait().result() + + // If we are cleaning up, we increment the reference count on the image + if err == nil && d.cleanup { + d.IncrementImageReference(id, image) + } + + return id, err +} + +// pullImageImpl is the implementation of pulling an image. The results are +// returned via the passed future +func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) { + // Parse the repo and tag + repo, tag := docker.ParseRepositoryTag(image) + if tag == "" { + tag = "latest" + } + pullOptions := docker.PullImageOptions{ + Repository: repo, + Tag: tag, + } + + // Attempt to pull the image + var auth docker.AuthConfiguration + if authOptions != nil { + auth = *authOptions + } + err := d.client.PullImage(pullOptions, auth) + if err != nil { + d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) + future.set("", recoverablePullError(err, image)) + return + } + + d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag) + + dockerImage, err := d.client.InspectImage(image) + if err != nil { + d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", image, err) + future.set("", recoverableErrTimeouts(err)) + return + } + + future.set(dockerImage.ID, nil) + return +} + +// IncrementImageReference is used to increment an image reference count +func (d *dockerCoordinator) IncrementImageReference(id, image string) { + d.imageLock.Lock() + d.imageRefCount[id] += 1 + d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", image, id, d.imageRefCount[id]) + + // Cancel any pending delete + if cancel, ok := d.deleteFuture[id]; ok { + d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", image) + cancel() + } + d.imageLock.Unlock() +} + +// RemoveImage removes the given image. If there are any errors removing the +// image, the remove is retried internally. +func (d *dockerCoordinator) RemoveImage(id string) { + d.imageLock.Lock() + defer d.imageLock.Unlock() + + references, ok := d.imageRefCount[id] + if !ok { + d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", id) + return + } + + // Decrement the reference count + references-- + d.imageRefCount[id] = references + d.logger.Printf("[DEBUG] driver.docker: image id %q reference count decremented: %d", id, references) + + // Nothing to do + if references != 0 { + return + } + + // Setup a future to delete the image + ctx, cancel := context.WithCancel(context.Background()) + d.deleteFuture[id] = cancel + go d.removeImageImpl(id, ctx) + + // Delete the key from the reference count + delete(d.imageRefCount, id) +} + +// removeImageImpl is used to remove an image. It wil wait the specified remove +// delay to remove the image. If the context is cancalled before that the image +// removal will be cancelled. +func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) { + // Sanity check + if !d.cleanup { + return + } + + // Wait for the delay or a cancellation event + select { + case <-ctx.Done(): + // We have been cancelled + return + case <-time.After(d.removeDelay): + } + + for i := 0; i < 3; i++ { + err := d.client.RemoveImage(id) + if err == nil { + break + } + + if err == docker.ErrNoSuchImage { + d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: does not exist", id) + return + } + if derr, ok := err.(*docker.Error); ok && derr.Status == 409 { + d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id) + return + } + + // Retry on unknown errors + d.logger.Printf("[DEBUG] driver.docker: failed to remove image %q (attempt %d): %v", id, i+1, err) + } + + d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id) +} + +// recoverablePullError wraps the error gotten when trying to pull and image if +// the error is recoverable. +func recoverablePullError(err error, image string) error { + recoverable := true + if imageNotFoundMatcher.MatchString(err.Error()) { + recoverable = false + } + return structs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable) +} diff --git a/client/driver/docker_coordinator_test.go b/client/driver/docker_coordinator_test.go new file mode 100644 index 000000000000..7ac3189e410e --- /dev/null +++ b/client/driver/docker_coordinator_test.go @@ -0,0 +1,211 @@ +package driver + +import ( + "fmt" + "testing" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" +) + +type mockImageClient struct { + pulled map[string]int + idToName map[string]string + removed map[string]int + pullDelay time.Duration +} + +func newMockImageClient(idToName map[string]string, pullDelay time.Duration) *mockImageClient { + return &mockImageClient{ + pulled: make(map[string]int), + removed: make(map[string]int), + idToName: idToName, + pullDelay: pullDelay, + } +} + +func (m *mockImageClient) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error { + time.Sleep(m.pullDelay) + m.pulled[opts.Repository]++ + return nil +} + +func (m *mockImageClient) InspectImage(id string) (*docker.Image, error) { + return &docker.Image{ + ID: m.idToName[id], + }, nil +} + +func (m *mockImageClient) RemoveImage(id string) error { + m.removed[id]++ + return nil +} + +func TestDockerCoordinator_ConcurrentPulls(t *testing.T) { + image := "foo" + imageID := structs.GenerateUUID() + mapping := map[string]string{imageID: image} + + // Add a delay so we can get multiple queued up + mock := newMockImageClient(mapping, 10*time.Millisecond) + config := &dockerCoordinatorConfig{ + logger: testLogger(), + cleanup: true, + client: mock, + removeDelay: 100 * time.Millisecond, + } + + // Create a coordinator + coordinator := NewDockerCoordinator(config) + + id := "" + for i := 0; i < 10; i++ { + id, _ = coordinator.PullImage(image, nil) + } + + if p := mock.pulled[image]; p != 1 { + t.Fatalf("Got multiple pulls %d", p) + } + + // Check the reference count + if r := coordinator.imageRefCount[id]; r != 10 { + t.Fatalf("Got reference count %d; want %d", r, 10) + } +} + +func TestDockerCoordinator_Pull_Remove(t *testing.T) { + image := "foo" + imageID := structs.GenerateUUID() + mapping := map[string]string{imageID: image} + + // Add a delay so we can get multiple queued up + mock := newMockImageClient(mapping, 10*time.Millisecond) + config := &dockerCoordinatorConfig{ + logger: testLogger(), + cleanup: true, + client: mock, + removeDelay: 1 * time.Millisecond, + } + + // Create a coordinator + coordinator := NewDockerCoordinator(config) + + id := "" + for i := 0; i < 10; i++ { + id, _ = coordinator.PullImage(image, nil) + } + + // Check the reference count + if r := coordinator.imageRefCount[id]; r != 10 { + t.Fatalf("Got reference count %d; want %d", r, 10) + } + + // Remove some + for i := 0; i < 8; i++ { + coordinator.RemoveImage(id) + } + + // Check the reference count + if r := coordinator.imageRefCount[id]; r != 2 { + t.Fatalf("Got reference count %d; want %d", r, 2) + } + + // Remove all + for i := 0; i < 2; i++ { + coordinator.RemoveImage(id) + } + + // Check the reference count + if r := coordinator.imageRefCount[id]; r != 0 { + t.Fatalf("Got reference count %d; want %d", r, 0) + } + + // Check that only one delete happened + testutil.WaitForResult(func() (bool, error) { + removes := mock.removed[id] + return removes == 1, fmt.Errorf("Wrong number of removes: %d", removes) + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + +func TestDockerCoordinator_Remove_Cancel(t *testing.T) { + image := "foo" + imageID := structs.GenerateUUID() + mapping := map[string]string{imageID: image} + + mock := newMockImageClient(mapping, 1*time.Millisecond) + config := &dockerCoordinatorConfig{ + logger: testLogger(), + cleanup: true, + client: mock, + removeDelay: 1 * time.Millisecond, + } + + // Create a coordinator + coordinator := NewDockerCoordinator(config) + + // Pull image + id, _ := coordinator.PullImage(image, nil) + + // Check the reference count + if r := coordinator.imageRefCount[id]; r != 1 { + t.Fatalf("Got reference count %d; want %d", r, 10) + } + + // Remove image + coordinator.RemoveImage(id) + + // Check the reference count + if r := coordinator.imageRefCount[id]; r != 0 { + t.Fatalf("Got reference count %d; want %d", r, 0) + } + + // Pull image again within delay + id, _ = coordinator.PullImage(image, nil) + + // Check the reference count + if r := coordinator.imageRefCount[id]; r != 1 { + t.Fatalf("Got reference count %d; want %d", r, 0) + } + + // Check that only no delete happened + if removes := mock.removed[id]; removes != 0 { + t.Fatalf("Image deleted when it shouldn't have") + } +} + +func TestDockerCoordinator_No_Cleanup(t *testing.T) { + image := "foo" + imageID := structs.GenerateUUID() + mapping := map[string]string{imageID: image} + + mock := newMockImageClient(mapping, 1*time.Millisecond) + config := &dockerCoordinatorConfig{ + logger: testLogger(), + cleanup: false, + client: mock, + removeDelay: 1 * time.Millisecond, + } + + // Create a coordinator + coordinator := NewDockerCoordinator(config) + + // Pull image + id, _ := coordinator.PullImage(image, nil) + + // Check the reference count + if r := coordinator.imageRefCount[id]; r != 0 { + t.Fatalf("Got reference count %d; want %d", r, 10) + } + + // Remove image + coordinator.RemoveImage(id) + + // Check that only no delete happened + if removes := mock.removed[id]; removes != 0 { + t.Fatalf("Image deleted when it shouldn't have") + } +} diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 68c5578e3052..d6ef97ef0d65 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -55,7 +55,7 @@ func dockerTask() (*structs.Task, int, int) { Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "/bin/nc", "args": []string{"-l", "127.0.0.1", "-p", "0"}, }, @@ -93,28 +93,41 @@ func dockerSetup(t *testing.T, task *structs.Task) (*docker.Client, DriverHandle return dockerSetupWithClient(t, task, client) } -func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Client) (*docker.Client, DriverHandle, func()) { +func testDockerDriverContexts(t *testing.T, task *structs.Task) *testContext { tctx := testDriverContexts(t, task) - tctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + + // Drop the delay + tctx.DriverCtx.config.Options = make(map[string]string) + tctx.DriverCtx.config.Options[dockerImageRemoveDelayConfigOption] = "1s" + + return tctx +} + +func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Client) (*docker.Client, DriverHandle, func()) { + tctx := testDockerDriverContexts(t, task) + //tctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} driver := NewDockerDriver(tctx.DriverCtx) copyImage(t, tctx.ExecCtx.TaskDir, "busybox.tar") - _, err := driver.Prestart(tctx.ExecCtx, task) + res, err := driver.Prestart(tctx.ExecCtx, task) if err != nil { tctx.AllocDir.Destroy() t.Fatalf("error in prestart: %v", err) } + handle, err := driver.Start(tctx.ExecCtx, task) if err != nil { tctx.AllocDir.Destroy() t.Fatalf("Failed to start driver: %s\nStack\n%s", err, debug.Stack()) } + if handle == nil { tctx.AllocDir.Destroy() t.Fatalf("handle is nil\nStack\n%s", debug.Stack()) } cleanup := func() { + driver.Cleanup(tctx.ExecCtx, res) handle.Kill() tctx.AllocDir.Destroy() } @@ -136,8 +149,8 @@ func newTestDockerClient(t *testing.T) *docker.Client { // This test should always pass, even if docker daemon is not available func TestDockerDriver_Fingerprint(t *testing.T) { - ctx := testDriverContexts(t, &structs.Task{Name: "foo", Driver: "docker", Resources: basicResources}) - ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + ctx := testDockerDriverContexts(t, &structs.Task{Name: "foo", Driver: "docker", Resources: basicResources}) + //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) node := &structs.Node{ @@ -165,7 +178,7 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { Name: "nc-demo", Driver: "docker", Config: map[string]interface{}{ - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "image": "busybox", "command": "/bin/nc", "args": []string{"-l", "127.0.0.1", "-p", "0"}, @@ -177,8 +190,8 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { Resources: basicResources, } - ctx := testDriverContexts(t, task) - ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + ctx := testDockerDriverContexts(t, task) + //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") @@ -212,7 +225,7 @@ func TestDockerDriver_Start_Wait(t *testing.T) { Name: "nc-demo", Driver: "docker", Config: map[string]interface{}{ - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "image": "busybox", "command": "/bin/echo", "args": []string{"hello"}, @@ -255,7 +268,7 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) { Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "/bin/echo", "args": []string{ "hello", @@ -271,8 +284,8 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) { }, } - ctx := testDriverContexts(t, task) - ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + ctx := testDockerDriverContexts(t, task) + //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) @@ -339,8 +352,8 @@ func TestDockerDriver_Start_BadPull_Recoverable(t *testing.T) { }, } - ctx := testDriverContexts(t, task) - ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + ctx := testDockerDriverContexts(t, task) + //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) @@ -371,7 +384,7 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "/bin/sh", "args": []string{ "-c", @@ -389,8 +402,8 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { }, } - ctx := testDriverContexts(t, task) - ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + ctx := testDockerDriverContexts(t, task) + //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") @@ -435,7 +448,7 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) { Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "/bin/sleep", "args": []string{"10"}, }, @@ -483,8 +496,8 @@ func TestDockerDriver_StartN(t *testing.T) { // Let's spin up a bunch of things for idx, task := range taskList { - ctx := testDriverContexts(t, task) - ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + ctx := testDockerDriverContexts(t, task) + //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") @@ -523,15 +536,15 @@ func TestDockerDriver_StartNVersions(t *testing.T) { task1, _, _ := dockerTask() task1.Config["image"] = "busybox" - task1.Config["load"] = []string{"busybox.tar"} + task1.Config["load"] = "busybox.tar" task2, _, _ := dockerTask() task2.Config["image"] = "busybox:musl" - task2.Config["load"] = []string{"busybox_musl.tar"} + task2.Config["load"] = "busybox_musl.tar" task3, _, _ := dockerTask() task3.Config["image"] = "busybox:glibc" - task3.Config["load"] = []string{"busybox_glibc.tar"} + task3.Config["load"] = "busybox_glibc.tar" taskList := []*structs.Task{task1, task2, task3} @@ -541,8 +554,8 @@ func TestDockerDriver_StartNVersions(t *testing.T) { // Let's spin up a bunch of things for idx, task := range taskList { - ctx := testDriverContexts(t, task) - ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + ctx := testDockerDriverContexts(t, task) + //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") @@ -599,7 +612,7 @@ func TestDockerDriver_NetworkMode_Host(t *testing.T) { Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "/bin/nc", "args": []string{"-l", "127.0.0.1", "-p", "0"}, "network_mode": expected, @@ -649,7 +662,7 @@ func TestDockerDriver_NetworkAliases_Bridge(t *testing.T) { Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "/bin/nc", "args": []string{"-l", "127.0.0.1", "-p", "0"}, "network_mode": network.Name, @@ -708,9 +721,9 @@ func TestDockerDriver_ForcePull_IsInvalidConfig(t *testing.T) { task, _, _ := dockerTask() task.Config["force_pull"] = "nothing" - ctx := testDriverContexts(t, task) + ctx := testDockerDriverContexts(t, task) defer ctx.AllocDir.Destroy() - ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} driver := NewDockerDriver(ctx.DriverCtx) if _, err := driver.Prestart(ctx.ExecCtx, task); err == nil { @@ -897,7 +910,7 @@ func TestDockerDriver_User(t *testing.T) { Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "/bin/sleep", "args": []string{"10000"}, }, @@ -915,8 +928,8 @@ func TestDockerDriver_User(t *testing.T) { t.SkipNow() } - ctx := testDriverContexts(t, task) - ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + ctx := testDockerDriverContexts(t, task) + //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} driver := NewDockerDriver(ctx.DriverCtx) defer ctx.AllocDir.Destroy() copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar") @@ -945,7 +958,7 @@ func TestDockerDriver_CleanupContainer(t *testing.T) { Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "/bin/echo", "args": []string{"hello"}, }, @@ -993,7 +1006,7 @@ func TestDockerDriver_Stats(t *testing.T) { Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "/bin/sleep", "args": []string{"100"}, }, @@ -1045,7 +1058,7 @@ func TestDockerDriver_Signal(t *testing.T) { Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "/bin/sh", "args": []string{"local/test.sh"}, }, @@ -1059,8 +1072,8 @@ func TestDockerDriver_Signal(t *testing.T) { }, } - ctx := testDriverContexts(t, task) - ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} + ctx := testDockerDriverContexts(t, task) + //ctx.DriverCtx.config.Options = map[string]string{"docker.cleanup.image": "false"} defer ctx.AllocDir.Destroy() d := NewDockerDriver(ctx.DriverCtx) @@ -1140,7 +1153,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str Driver: "docker", Config: map[string]interface{}{ "image": "busybox", - "load": []string{"busybox.tar"}, + "load": "busybox.tar", "command": "touch", "args": []string{containerFile}, "volumes": []string{fmt.Sprintf("%s:${VOL_PATH}", hostpath)}, @@ -1308,7 +1321,7 @@ func TestDockerDriver_Cleanup(t *testing.T) { "image": imageName, }, } - tctx := testDriverContexts(t, task) + tctx := testDockerDriverContexts(t, task) defer tctx.AllocDir.Destroy() // Run Prestart @@ -1333,9 +1346,15 @@ func TestDockerDriver_Cleanup(t *testing.T) { } // Ensure image was removed - if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil { - t.Fatalf("image exists but should have been removed. Does another %v container exist?", imageName) - } + tu.WaitForResult(func() (bool, error) { + if _, err := client.InspectImage(driver.driverConfig.ImageName); err == nil { + return false, fmt.Errorf("image exists but should have been removed. Does another %v container exist?", imageName) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) // The image doesn't exist which shouldn't be an error when calling // Cleanup, so call it again to make sure. diff --git a/website/source/docs/drivers/docker.html.md b/website/source/docs/drivers/docker.html.md index 631bf6f24589..068c0228ecd5 100644 --- a/website/source/docs/drivers/docker.html.md +++ b/website/source/docs/drivers/docker.html.md @@ -410,6 +410,11 @@ options](/docs/agent/configuration/client.html#options): * `docker.cleanup.image` Defaults to `true`. Changing this to `false` will prevent Nomad from removing images from stopped tasks. +* `docker.cleanup.image.delay` A time duration that defaults to `3m`. The delay + controls how long Nomad will wait between an image being unused and deleting + it. If a tasks is received that uses the same image within the delay, the + image will be reused. + * `docker.volumes.enabled`: Defaults to `true`. Allows tasks to bind host paths (`volumes`) inside their container and use volume drivers (`volume_driver`). Binding relative paths is always allowed and will be resolved relative to the