Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docker Image Coordinator and caching #2361

Merged
merged 1 commit into from
Feb 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,29 @@ func (c *Config) ReadBoolDefault(id string, defaultValue bool) bool {
return val
}

// ReadDuration parses the specified option as a duration.
func (c *Config) ReadDuration(id string) (time.Duration, error) {
val, ok := c.Options[id]
if !ok {
return time.Duration(0), fmt.Errorf("Specified config is missing from options")
}
dval, err := time.ParseDuration(val)
if err != nil {
return time.Duration(0), fmt.Errorf("Failed to parse %s as time duration: %s", val, err)
}
return dval, nil
}

// ReadDurationDefault tries to parse the specified option as a duration. If there is
// an error in parsing, the default option is returned.
func (c *Config) ReadDurationDefault(id string, defaultValue time.Duration) time.Duration {
val, err := c.ReadDuration(id)
if err != nil {
return defaultValue
}
return val
}

// ReadStringListToMap tries to parse the specified option as a comma separated list.
// If there is an error in parsing, an empty list is returned.
func (c *Config) ReadStringListToMap(key string) map[string]struct{} {
Expand Down
165 changes: 75 additions & 90 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -58,11 +57,7 @@ var (
recoverableErrTimeouts = func(err error) error {
r := false
if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") ||
strings.Contains(err.Error(), "EOF") ||
// TODO Remove when we implement global co-ordination among docker
// drivers to not remove images which are in use by instances of
// other drivers
strings.Contains(err.Error(), "no such image") {
strings.Contains(err.Error(), "EOF") {
r = true
}
return structs.NewRecoverableError(err, r)
Expand Down Expand Up @@ -96,6 +91,11 @@ const (
dockerCleanupImageConfigOption = "docker.cleanup.image"
dockerCleanupImageConfigDefault = true

// dockerPullTimeoutConfigOption is the key for setting an images pull
// timeout
dockerImageRemoveDelayConfigOption = "docker.cleanup.image.delay"
dockerImageRemoveDelayConfigDefault = 3 * time.Minute

// dockerTimeout is the length of time a request can be outstanding before
// it is timed out.
dockerTimeout = 5 * time.Minute
Expand Down Expand Up @@ -130,7 +130,7 @@ type DockerLoggingOpts struct {

type DockerDriverConfig struct {
ImageName string `mapstructure:"image"` // Container's Image Name
LoadImages []string `mapstructure:"load"` // LoadImage is array of paths to image archive files
LoadImage string `mapstructure:"load"` // LoadImage is a path to an image archive file
Command string `mapstructure:"command"` // The Command to run when the container starts up
Args []string `mapstructure:"args"` // The arguments to the Command
IpcMode string `mapstructure:"ipc_mode"` // The IPC mode of the container - host and none
Expand Down Expand Up @@ -191,11 +191,11 @@ func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*Docke
dconf.UTSMode = env.ReplaceEnv(dconf.UTSMode)
dconf.Hostname = env.ReplaceEnv(dconf.Hostname)
dconf.WorkDir = env.ReplaceEnv(dconf.WorkDir)
dconf.LoadImage = env.ReplaceEnv(dconf.LoadImage)
dconf.Volumes = env.ParseAndReplace(dconf.Volumes)
dconf.VolumeDriver = env.ReplaceEnv(dconf.VolumeDriver)
dconf.DNSServers = env.ParseAndReplace(dconf.DNSServers)
dconf.DNSSearchDomains = env.ParseAndReplace(dconf.DNSSearchDomains)
dconf.LoadImages = env.ParseAndReplace(dconf.LoadImages)

for _, m := range dconf.LabelsRaw {
for k, v := range m {
Expand Down Expand Up @@ -241,6 +241,7 @@ func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*Docke

type dockerPID struct {
Version string
Image string
ImageID string
ContainerID string
KillTimeout time.Duration
Expand All @@ -254,6 +255,8 @@ type DockerHandle struct {
client *docker.Client
waitClient *docker.Client
logger *log.Logger
Image string
ImageID string
containerID string
version string
clkSpeed float64
Expand Down Expand Up @@ -321,7 +324,7 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error {
Required: true,
},
"load": &fields.FieldSchema{
Type: fields.TypeArray,
Type: fields.TypeString,
},
"command": &fields.FieldSchema{
Type: fields.TypeString,
Expand Down Expand Up @@ -416,6 +419,18 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation {
return cstructs.FSIsolationImage
}

// getDockerCoordinator returns the docker coordinator
func (d *DockerDriver) getDockerCoordinator(client *docker.Client) *dockerCoordinator {
config := &dockerCoordinatorConfig{
client: client,
cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault),
logger: d.logger,
removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault),
}

return GetDockerCoordinator(config)
}

func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
driverConfig, err := NewDockerDriverConfig(task, d.taskEnv)
if err != nil {
Expand All @@ -432,22 +447,14 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR
}

// Ensure the image is available
if err := d.createImage(driverConfig, client, ctx.TaskDir); err != nil {
return nil, err
}

// Regardless of whether the image was downloaded already or not, store
// it as a created resource. Cleanup will soft fail if the image is
// still in use by another contianer.
dockerImage, err := client.InspectImage(driverConfig.ImageName)
id, err := d.createImage(driverConfig, client, ctx.TaskDir)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed getting image id for %q: %v", driverConfig.ImageName, err)
return nil, err
}

res := NewCreatedResources()
res.Add(dockerImageResKey, dockerImage.ID)
d.imageID = dockerImage.ID
res.Add(dockerImageResKey, id)
d.imageID = id
return res, nil
}

Expand Down Expand Up @@ -535,6 +542,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
executor: exec,
pluginClient: pluginClient,
logger: d.logger,
Image: d.driverConfig.ImageName,
ImageID: d.imageID,
containerID: container.ID,
version: d.config.Version,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
Expand Down Expand Up @@ -585,20 +594,9 @@ func (d *DockerDriver) cleanupImage(id string) error {
return nil
}

if err := client.RemoveImage(id); err != nil {
if err == docker.ErrNoSuchImage {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: does not exist", id)
return nil
}
if derr, ok := err.(*docker.Error); ok && derr.Status == 409 {
d.logger.Printf("[DEBUG] driver.docker: unable to cleanup image %q: still in use", id)
return nil
}
// Retry on unknown errors
return structs.NewRecoverableError(err, true)
}
coordinator := d.getDockerCoordinator(client)
coordinator.RemoveImage(id)

d.logger.Printf("[DEBUG] driver.docker: cleanup removed downloaded image: %q", id)
return nil
}

Expand Down Expand Up @@ -942,116 +940,96 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
}, nil
}

var (
// imageNotFoundMatcher is a regex expression that matches the image not
// found error Docker returns.
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)

// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func (d *DockerDriver) recoverablePullError(err error, image string) error {
recoverable := true
if imageNotFoundMatcher.MatchString(err.Error()) {
recoverable = false
}
return structs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
}

func (d *DockerDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}

// createImage creates a docker image either by pulling it from a registry or by
// loading it from the file system
func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error {
func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) (string, error) {
image := driverConfig.ImageName
repo, tag := docker.ParseRepositoryTag(image)
if tag == "" {
tag = "latest"
}

coordinator := d.getDockerCoordinator(client)

// We're going to check whether the image is already downloaded. If the tag
// is "latest", or ForcePull is set, we have to check for a new version every time so we don't
// bother to check and cache the id here. We'll download first, then cache.
if driverConfig.ForcePull {
d.logger.Printf("[DEBUG] driver.docker: force pull image '%s:%s' instead of inspecting local", repo, tag)
} else if tag != "latest" {
if dockerImage, _ := client.InspectImage(image); dockerImage != nil {
// Image exists, nothing to do
return nil
// Image exists so just increment its reference count
coordinator.IncrementImageReference(dockerImage.ID, image)
return dockerImage.ID, nil
}
}

// Load the image if specified
if len(driverConfig.LoadImages) > 0 {
if driverConfig.LoadImage != "" {
return d.loadImage(driverConfig, client, taskDir)
}

// Download the image
if err := d.pullImage(driverConfig, client, repo, tag); err != nil {
return err
}
return nil
return d.pullImage(driverConfig, client, repo, tag)
}

// pullImage creates an image by pulling it from a docker registry
func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docker.Client, repo string, tag string) error {
pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
}

authOptions := docker.AuthConfiguration{}
func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docker.Client, repo, tag string) (id string, err error) {
var authOptions *docker.AuthConfiguration
if len(driverConfig.Auth) != 0 {
authOptions = docker.AuthConfiguration{
authOptions = &docker.AuthConfiguration{
Username: driverConfig.Auth[0].Username,
Password: driverConfig.Auth[0].Password,
Email: driverConfig.Auth[0].Email,
ServerAddress: driverConfig.Auth[0].ServerAddress,
}
} else if authConfigFile := d.config.Read("docker.auth.config"); authConfigFile != "" {
authOptionsPtr, err := authOptionFrom(authConfigFile, repo)
authOptions, err := authOptionFrom(authConfigFile, repo)
if err != nil {
d.logger.Printf("[INFO] driver.docker: failed to find docker auth for repo %q: %v", repo, err)
return fmt.Errorf("Failed to find docker auth for repo %q: %v", repo, err)
return "", fmt.Errorf("Failed to find docker auth for repo %q: %v", repo, err)
}

authOptions = *authOptionsPtr
if authOptions.Email == "" && authOptions.Password == "" &&
authOptions.ServerAddress == "" && authOptions.Username == "" {
d.logger.Printf("[DEBUG] driver.docker: did not find docker auth for repo %q", repo)
}
}

d.emitEvent("Downloading image %s:%s", repo, tag)
err := client.PullImage(pullOptions, authOptions)
coordinator := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions)
}

// loadImage creates an image by loading it from the file system
func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docker.Client,
taskDir *allocdir.TaskDir) (id string, err error) {

archive := filepath.Join(taskDir.LocalDir, driverConfig.LoadImage)
d.logger.Printf("[DEBUG] driver.docker: loading image from: %v", archive)

f, err := os.Open(archive)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
return d.recoverablePullError(err, driverConfig.ImageName)
return "", fmt.Errorf("unable to open image archive: %v", err)
}

d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag)
return nil
}
if err := client.LoadImage(docker.LoadImageOptions{InputStream: f}); err != nil {
return "", err
}
f.Close()

// loadImage creates an image by loading it from the file system
func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docker.Client, taskDir *allocdir.TaskDir) error {
var errors multierror.Error
for _, image := range driverConfig.LoadImages {
archive := filepath.Join(taskDir.LocalDir, image)
d.logger.Printf("[DEBUG] driver.docker: loading image from: %v", archive)
f, err := os.Open(archive)
if err != nil {
errors.Errors = append(errors.Errors, fmt.Errorf("unable to open image archive: %v", err))
continue
}
if err := client.LoadImage(docker.LoadImageOptions{InputStream: f}); err != nil {
errors.Errors = append(errors.Errors, err)
}
f.Close()
dockerImage, err := client.InspectImage(driverConfig.ImageName)
if err != nil {
return "", recoverableErrTimeouts(err)
}
return errors.ErrorOrNil()

coordinator := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName)
return dockerImage.ID, nil
}

// createContainer creates the container given the passed configuration. It
Expand Down Expand Up @@ -1202,6 +1180,11 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
ver, _ := exec.Version()
d.logger.Printf("[DEBUG] driver.docker: version of executor: %v", ver.Version)

// Increment the reference count since we successfully attached to this
// container
coordinator := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(pid.ImageID, pid.Image)

// Return a driver handle
h := &DockerHandle{
client: client,
Expand Down Expand Up @@ -1230,6 +1213,8 @@ func (h *DockerHandle) ID() string {
pid := dockerPID{
Version: h.version,
ContainerID: h.containerID,
Image: h.Image,
ImageID: h.ImageID,
KillTimeout: h.killTimeout,
MaxKillTimeout: h.maxKillTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
Expand Down
Loading