From 9539687e72dd1e478b20ce4867cca1b4c18743d4 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Wed, 25 Apr 2018 17:05:14 -0400 Subject: [PATCH] client/driver: remove pull timeout due to race condition that can lead to unexpected timeouts If two jobs are pulling the same image simultaneously, which ever starts the pull first will set the pull timeout. This can lead to a poor UX where the first job requested a short timeout while the second job requested a longer timeout causing the pull to potentially timeout much sooner than expected by the second job. --- client/driver/docker.go | 13 +--------- client/driver/docker_coordinator.go | 19 ++++++--------- client/driver/docker_coordinator_test.go | 10 ++++---- client/driver/docker_progress.go | 31 ++++++++++++++---------- 4 files changed, 32 insertions(+), 41 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index 94fcae9c5653..cc31e547eae3 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -234,8 +234,6 @@ type DockerDriverConfig struct { ReadonlyRootfs bool `mapstructure:"readonly_rootfs"` // Mount the container’s root filesystem as read only AdvertiseIPv6Address bool `mapstructure:"advertise_ipv6_address"` // Flag to use the GlobalIPv6Address from the container as the detected IP CPUHardLimit bool `mapstructure:"cpu_hard_limit"` // Enforce CPU hard limit. - ImagePullTimeoutRaw string `mapstructure:"image_pull_timeout"` // - ImagePullTimeout time.Duration `mapstructure:"-"` // Timeout on the image pull after which the pull is cancelled } func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) { @@ -305,12 +303,6 @@ func (c *DockerDriverConfig) Validate() error { return err } c.Ulimit = ulimit - if len(c.ImagePullTimeoutRaw) > 0 { - c.ImagePullTimeout, err = time.ParseDuration(c.ImagePullTimeoutRaw) - if err != nil { - return err - } - } return nil } @@ -744,9 +736,6 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error { "cpu_hard_limit": { Type: fields.TypeBool, }, - "image_pull_timeout": { - Type: fields.TypeString, - }, }, } @@ -1558,7 +1547,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke d.emitEvent("Downloading image %s:%s", repo, tag) coordinator, callerID := d.getDockerCoordinator(client) - return coordinator.PullImage(driverConfig.ImageName, authOptions, driverConfig.ImagePullTimeout, callerID, d.emitEvent) + return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID, d.emitEvent) } // authBackend encapsulates a function that resolves registry credentials. diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go index 2a6dcfeae798..9b61f43ec7da 100644 --- a/client/driver/docker_coordinator.go +++ b/client/driver/docker_coordinator.go @@ -143,7 +143,7 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { // PullImage is used to pull an image. It returns the pulled imaged ID or an // error that occurred during the pull -func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, callerID string, emitFn LogEventFn) (imageID string, err error) { +func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string, emitFn LogEventFn) (imageID string, err error) { // Get the future d.imageLock.Lock() future, ok := d.pullFutures[image] @@ -152,7 +152,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // Make the future future = newPullFuture() d.pullFutures[image] = future - go d.pullImageImpl(image, authOptions, pullTimeout, future) + go d.pullImageImpl(image, authOptions, future) } d.imageLock.Unlock() @@ -179,7 +179,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // pullImageImpl is the implementation of pulling an image. The results are // returned via the passed future -func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, future *pullFuture) { +func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) { defer d.clearPullLogger(image) // Parse the repo and tag repo, tag := docker.ParseRepositoryTag(image) @@ -188,11 +188,10 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if pullTimeout > 0 { - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(pullTimeout)) - } pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport) + defer pm.stop() + pullOptions := docker.PullImageOptions{ Repository: repo, Tag: tag, @@ -207,8 +206,6 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth auth = *authOptions } - pm.start() - defer pm.stop() err := d.client.PullImage(pullOptions, auth) if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded { @@ -395,15 +392,15 @@ func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{} } } -func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time) { +func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time, interval int64) { d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg) } -func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time) { +func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time, interval int64) { d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg) - if timestamp.Sub(pullStart) > dockerPullProgressEmitInterval { + if interval%int64(dockerPullProgressEmitInterval.Seconds()/dockerImageProgressReportInterval.Seconds()) == 0 { d.emitEvent(image, "Docker image %s pull progress: %s", image, msg) } } diff --git a/client/driver/docker_coordinator_test.go b/client/driver/docker_coordinator_test.go index 7feef1f18a2f..c81cee99b9e9 100644 --- a/client/driver/docker_coordinator_test.go +++ b/client/driver/docker_coordinator_test.go @@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) { id := "" for i := 0; i < 10; i++ { go func() { - id, _ = coordinator.PullImage(image, nil, 0, uuid.Generate(), nil) + id, _ = coordinator.PullImage(image, nil, uuid.Generate(), nil) }() } @@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) { callerIDs := make([]string, 10, 10) for i := 0; i < 10; i++ { callerIDs[i] = uuid.Generate() - id, _ = coordinator.PullImage(image, nil, 0, callerIDs[i], nil) + id, _ = coordinator.PullImage(image, nil, callerIDs[i], nil) } // Check the reference count @@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, 0, callerID, nil) + id, _ := coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { } // Pull image again within delay - id, _ = coordinator.PullImage(image, nil, 0, callerID, nil) + id, _ = coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, 0, callerID, nil) + id, _ := coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 0 { diff --git a/client/driver/docker_progress.go b/client/driver/docker_progress.go index c62ebf877f45..8ae87f50085f 100644 --- a/client/driver/docker_progress.go +++ b/client/driver/docker_progress.go @@ -14,13 +14,13 @@ import ( ) const ( - // defaultPullActivityDeadline is the default value set in the imageProgressManager + // dockerPullActivityDeadline is the default value set in the imageProgressManager // when newImageProgressManager is called - defaultPullActivityDeadline = 2 * time.Minute + dockerPullActivityDeadline = 2 * time.Minute - // defaultImageProgressReportInterval is the default value set in the + // dockerImageProgressReportInterval is the default value set in the // imageProgressManager when newImageProgressManager is called - defaultImageProgressReportInterval = 10 * time.Second + dockerImageProgressReportInterval = 10 * time.Second ) // layerProgress tracks the state and downloaded bytes of a single layer within @@ -162,9 +162,9 @@ func (p *imageProgress) totalBytes() int64 { // progressReporterFunc defines the method for handeling inactivity and report // events from the imageProgressManager. The image name, current status message, -// timestamp of last received status update and timestamp of when the pull started -// are passed in. -type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time) +// timestamp of last received status update, timestamp of when the pull started +// and current report interation are passed in. +type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time, interval int64) // imageProgressManager tracks the progress of pulling a docker image from an // image repository. @@ -187,11 +187,11 @@ func newImageProgressManager( image string, cancel context.CancelFunc, inactivityFunc, reporter progressReporterFunc) *imageProgressManager { - return &imageProgressManager{ + pm := &imageProgressManager{ image: image, - activityDeadline: defaultPullActivityDeadline, + activityDeadline: dockerPullActivityDeadline, inactivityFunc: inactivityFunc, - reportInterval: defaultImageProgressReportInterval, + reportInterval: dockerImageProgressReportInterval, reporter: reporter, imageProgress: &imageProgress{ timestamp: time.Now(), @@ -200,23 +200,28 @@ func newImageProgressManager( cancel: cancel, stopCh: make(chan struct{}), } + + pm.start() + return pm } // start intiates the ticker to trigger the inactivity and reporter handlers func (pm *imageProgressManager) start() { pm.imageProgress.pullStart = time.Now() go func() { - ticker := time.NewTicker(defaultImageProgressReportInterval) + ticker := time.NewTicker(dockerImageProgressReportInterval) + var interval int64 for { + interval++ select { case <-ticker.C: msg, timestamp := pm.imageProgress.get() if time.Now().Sub(timestamp) > pm.activityDeadline { - pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart) + pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval) pm.cancel() return } - pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart) + pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval) case <-pm.stopCh: return }