Skip to content

Commit

Permalink
Merge pull request #2361 from hashicorp/f-docker-coordination
Browse files Browse the repository at this point in the history
Docker Image Coordinator and caching
  • Loading branch information
dadgar committed Feb 24, 2017
2 parents 6799c76 + 27b2c67 commit 5338daf
Show file tree
Hide file tree
Showing 6 changed files with 664 additions and 133 deletions.
23 changes: 23 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,29 @@ func (c *Config) ReadBoolDefault(id string, defaultValue bool) bool {
return val
}

// ReadDuration parses the specified option as a duration.
func (c *Config) ReadDuration(id string) (time.Duration, error) {
val, ok := c.Options[id]
if !ok {
return time.Duration(0), fmt.Errorf("Specified config is missing from options")
}
dval, err := time.ParseDuration(val)
if err != nil {
return time.Duration(0), fmt.Errorf("Failed to parse %s as time duration: %s", val, err)
}
return dval, nil
}

// ReadDurationDefault tries to parse the specified option as a duration. If there is
// an error in parsing, the default option is returned.
func (c *Config) ReadDurationDefault(id string, defaultValue time.Duration) time.Duration {
val, err := c.ReadDuration(id)
if err != nil {
return defaultValue
}
return val
}

// ReadStringListToMap tries to parse the specified option as a comma separated list.
// If there is an error in parsing, an empty list is returned.
func (c *Config) ReadStringListToMap(key string) map[string]struct{} {
Expand Down
165 changes: 75 additions & 90 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -58,11 +57,7 @@ var (
recoverableErrTimeouts = func(err error) error {
r := false
if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") ||
strings.Contains(err.Error(), "EOF") ||
// TODO Remove when we implement global co-ordination among docker
// drivers to not remove images which are in use by instances of
// other drivers
strings.Contains(err.Error(), "no such image") {
strings.Contains(err.Error(), "EOF") {
r = true
}
return structs.NewRecoverableError(err, r)
Expand Down Expand Up @@ -96,6 +91,11 @@ const (
dockerCleanupImageConfigOption = "docker.cleanup.image"
dockerCleanupImageConfigDefault = true

// dockerPullTimeoutConfigOption is the key for setting an images pull
// timeout
dockerImageRemoveDelayConfigOption = "docker.cleanup.image.delay"
dockerImageRemoveDelayConfigDefault = 3 * time.Minute

// dockerTimeout is the length of time a request can be outstanding before
// it is timed out.
dockerTimeout = 5 * time.Minute
Expand Down Expand Up @@ -130,7 +130,7 @@ type DockerLoggingOpts struct {

type DockerDriverConfig struct {
ImageName string `mapstructure:"image"` // Container's Image Name
LoadImages []string `mapstructure:"load"` // LoadImage is array of paths to image archive files
LoadImage string `mapstructure:"load"` // LoadImage is a path to an image archive file
Command string `mapstructure:"command"` // The Command to run when the container starts up
Args []string `mapstructure:"args"` // The arguments to the Command
IpcMode string `mapstructure:"ipc_mode"` // The IPC mode of the container - host and none
Expand Down Expand Up @@ -191,11 +191,11 @@ func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*Docke
dconf.UTSMode = env.ReplaceEnv(dconf.UTSMode)
dconf.Hostname = env.ReplaceEnv(dconf.Hostname)
dconf.WorkDir = env.ReplaceEnv(dconf.WorkDir)
dconf.LoadImage = env.ReplaceEnv(dconf.LoadImage)
dconf.Volumes = env.ParseAndReplace(dconf.Volumes)
dconf.VolumeDriver = env.ReplaceEnv(dconf.VolumeDriver)
dconf.DNSServers = env.ParseAndReplace(dconf.DNSServers)
dconf.DNSSearchDomains = env.ParseAndReplace(dconf.DNSSearchDomains)
dconf.LoadImages = env.ParseAndReplace(dconf.LoadImages)

for _, m := range dconf.LabelsRaw {
for k, v := range m {
Expand Down Expand Up @@ -241,6 +241,7 @@ func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*Docke

type dockerPID struct {
Version string
Image string
ImageID string
ContainerID string
KillTimeout time.Duration
Expand All @@ -254,6 +255,8 @@ type DockerHandle struct {
client *docker.Client
waitClient *docker.Client
logger *log.Logger
Image string
ImageID string
containerID string
version string
clkSpeed float64
Expand Down Expand Up @@ -321,7 +324,7 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error {
Required: true,
},
"load": &fields.FieldSchema{
Type: fields.TypeArray,
Type: fields.TypeString,
},
"command": &fields.FieldSchema{
Type: fields.TypeString,
Expand Down Expand Up @@ -416,6 +419,18 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation {
return cstructs.FSIsolationImage
}

// getDockerCoordinator returns the docker coordinator
func (d *DockerDriver) getDockerCoordinator(client *docker.Client) *dockerCoordinator {
config := &dockerCoordinatorConfig{
client: client,
cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault),
logger: d.logger,
removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault),
}

return GetDockerCoordinator(config)
}

func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
driverConfig, err := NewDockerDriverConfig(task, d.taskEnv)
if err != nil {
Expand All @@ -432,22 +447,14 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR
}

// Ensure the image is available
if err := d.createImage(driverConfig, client, ctx.TaskDir); err != nil {
return nil, err
}

// Regardless of whether the image was downloaded already or not, store
// it as a created resource. Cleanup will soft fail if the image is
// still in use by another contianer.
dockerImage, err := client.InspectImage(driverConfig.ImageName)
id, err := d.createImage(driverConfig, client, ctx.TaskDir)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err)
return nil, err
}

res := NewCreatedResources()
res.Add(dockerImageResKey, dockerImage.ID)
d.imageID = dockerImage.ID
res.Add(dockerImageResKey, id)
d.imageID = id
return res, nil
}

Expand Down Expand Up @@ -535,6 +542,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
executor: exec,
pluginClient: pluginClient,
logger: d.logger,
Image: d.driverConfig.ImageName,
ImageID: d.imageID,
containerID: container.ID,
version: d.config.Version,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
Expand Down Expand Up @@ -585,20 +594,9 @@ func (d *DockerDriver) cleanupImage(id string) error {
return nil
}

if err := client.RemoveImage(id); err != nil {
if err == docker.ErrNoSuchImage {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: does not exist", id)
return nil
}
if derr, ok := err.(*docker.Error); ok && derr.Status == 409 {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id)
return nil
}
// Retry on unknown errors
return structs.NewRecoverableError(err, true)
}
coordinator := d.getDockerCoordinator(client)
coordinator.RemoveImage(id)

d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id)
return nil
}

Expand Down Expand Up @@ -946,116 +944,96 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
}, nil
}

var (
// imageNotFoundMatcher is a regex expression that matches the image not
// found error Docker returns.
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)

// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func (d *DockerDriver) recoverablePullError(err error, image string) error {
recoverable := true
if imageNotFoundMatcher.MatchString(err.Error()) {
recoverable = false
}
return structs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
}

func (d *DockerDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}

// createImage creates a docker image either by pulling it from a registry or by
// loading it from the file system
func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error {
func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) (string, error) {
image := driverConfig.ImageName
repo, tag := docker.ParseRepositoryTag(image)
if tag == "" {
tag = "latest"
}

coordinator := d.getDockerCoordinator(client)

// We're going to check whether the image is already downloaded. If the tag
// is "latest", or ForcePull is set, we have to check for a new version every time so we don't
// bother to check and cache the id here. We'll download first, then cache.
if driverConfig.ForcePull {
d.logger.Printf("[DEBUG] driver.docker: force pull image '%s:%s' instead of inspecting local", repo, tag)
} else if tag != "latest" {
if dockerImage, _ := client.InspectImage(image); dockerImage != nil {
// Image exists, nothing to do
return nil
// Image exists so just increment its reference count
coordinator.IncrementImageReference(dockerImage.ID, image)
return dockerImage.ID, nil
}
}

// Load the image if specified
if len(driverConfig.LoadImages) > 0 {
if driverConfig.LoadImage != "" {
return d.loadImage(driverConfig, client, taskDir)
}

// Download the image
if err := d.pullImage(driverConfig, client, repo, tag); err != nil {
return err
}
return nil
return d.pullImage(driverConfig, client, repo, tag)
}

// pullImage creates an image by pulling it from a docker registry
func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docker.Client, repo string, tag string) error {
pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
}

authOptions := docker.AuthConfiguration{}
func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docker.Client, repo, tag string) (id string, err error) {
var authOptions *docker.AuthConfiguration
if len(driverConfig.Auth) != 0 {
authOptions = docker.AuthConfiguration{
authOptions = &docker.AuthConfiguration{
Username: driverConfig.Auth[0].Username,
Password: driverConfig.Auth[0].Password,
Email: driverConfig.Auth[0].Email,
ServerAddress: driverConfig.Auth[0].ServerAddress,
}
} else if authConfigFile := d.config.Read("docker.auth.config"); authConfigFile != "" {
authOptionsPtr, err := authOptionFrom(authConfigFile, repo)
authOptions, err := authOptionFrom(authConfigFile, repo)
if err != nil {
d.logger.Printf("[INFO] driver.docker: failed to find docker auth for repo %q: %v", repo, err)
return fmt.Errorf("Failed to find docker auth for repo %q: %v", repo, err)
return "", fmt.Errorf("Failed to find docker auth for repo %q: %v", repo, err)
}

authOptions = *authOptionsPtr
if authOptions.Email == "" && authOptions.Password == "" &&
authOptions.ServerAddress == "" && authOptions.Username == "" {
d.logger.Printf("[DEBUG] driver.docker: did not find docker auth for repo %q", repo)
}
}

d.emitEvent("Downloading image %s:%s", repo, tag)
err := client.PullImage(pullOptions, authOptions)
coordinator := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions)
}

// loadImage creates an image by loading it from the file system
func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docker.Client,
taskDir *allocdir.TaskDir) (id string, err error) {

archive := filepath.Join(taskDir.LocalDir, driverConfig.LoadImage)
d.logger.Printf("[DEBUG] driver.docker: loading image from: %v", archive)

f, err := os.Open(archive)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
return d.recoverablePullError(err, driverConfig.ImageName)
return "", fmt.Errorf("unable to open image archive: %v", err)
}

d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag)
return nil
}
if err := client.LoadImage(docker.LoadImageOptions{InputStream: f}); err != nil {
return "", err
}
f.Close()

// loadImage creates an image by loading it from the file system
func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error {
var errors multierror.Error
for _, image := range driverConfig.LoadImages {
archive := filepath.Join(taskDir.LocalDir, image)
d.logger.Printf("[DEBUG] driver.docker: loading image from: %v", archive)
f, err := os.Open(archive)
if err != nil {
errors.Errors = append(errors.Errors, fmt.Errorf("unable to open image archive: %v", err))
continue
}
if err := client.LoadImage(docker.LoadImageOptions{InputStream: f}); err != nil {
errors.Errors = append(errors.Errors, err)
}
f.Close()
dockerImage, err := client.InspectImage(driverConfig.ImageName)
if err != nil {
return "", recoverableErrTimeouts(err)
}
return errors.ErrorOrNil()

coordinator := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName)
return dockerImage.ID, nil
}

// createContainer creates the container given the passed configuration. It
Expand Down Expand Up @@ -1206,6 +1184,11 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
ver, _ := exec.Version()
d.logger.Printf("[DEBUG] driver.docker: version of executor: %v", ver.Version)

// Increment the reference count since we successfully attached to this
// container
coordinator := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(pid.ImageID, pid.Image)

// Return a driver handle
h := &DockerHandle{
client: client,
Expand Down Expand Up @@ -1234,6 +1217,8 @@ func (h *DockerHandle) ID() string {
pid := dockerPID{
Version: h.version,
ContainerID: h.containerID,
Image: h.Image,
ImageID: h.ImageID,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
Expand Down
Loading

0 comments on commit 5338daf

Please sign in to comment.