diff --git a/client/driver/docker.go b/client/driver/docker.go index 94fcae9c5653..cc31e547eae3 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -234,8 +234,6 @@ type DockerDriverConfig struct { ReadonlyRootfs bool `mapstructure:"readonly_rootfs"` // Mount the container’s root filesystem as read only AdvertiseIPv6Address bool `mapstructure:"advertise_ipv6_address"` // Flag to use the GlobalIPv6Address from the container as the detected IP CPUHardLimit bool `mapstructure:"cpu_hard_limit"` // Enforce CPU hard limit. - ImagePullTimeoutRaw string `mapstructure:"image_pull_timeout"` // - ImagePullTimeout time.Duration `mapstructure:"-"` // Timeout on the image pull after which the pull is cancelled } func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) { @@ -305,12 +303,6 @@ func (c *DockerDriverConfig) Validate() error { return err } c.Ulimit = ulimit - if len(c.ImagePullTimeoutRaw) > 0 { - c.ImagePullTimeout, err = time.ParseDuration(c.ImagePullTimeoutRaw) - if err != nil { - return err - } - } return nil } @@ -744,9 +736,6 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error { "cpu_hard_limit": { Type: fields.TypeBool, }, - "image_pull_timeout": { - Type: fields.TypeString, - }, }, } @@ -1558,7 +1547,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke d.emitEvent("Downloading image %s:%s", repo, tag) coordinator, callerID := d.getDockerCoordinator(client) - return coordinator.PullImage(driverConfig.ImageName, authOptions, driverConfig.ImagePullTimeout, callerID, d.emitEvent) + return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID, d.emitEvent) } // authBackend encapsulates a function that resolves registry credentials. diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go index 2a6dcfeae798..9b61f43ec7da 100644 --- a/client/driver/docker_coordinator.go +++ b/client/driver/docker_coordinator.go @@ -143,7 +143,7 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { // PullImage is used to pull an image. It returns the pulled imaged ID or an // error that occurred during the pull -func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, callerID string, emitFn LogEventFn) (imageID string, err error) { +func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string, emitFn LogEventFn) (imageID string, err error) { // Get the future d.imageLock.Lock() future, ok := d.pullFutures[image] @@ -152,7 +152,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // Make the future future = newPullFuture() d.pullFutures[image] = future - go d.pullImageImpl(image, authOptions, pullTimeout, future) + go d.pullImageImpl(image, authOptions, future) } d.imageLock.Unlock() @@ -179,7 +179,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // pullImageImpl is the implementation of pulling an image. The results are // returned via the passed future -func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, future *pullFuture) { +func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) { defer d.clearPullLogger(image) // Parse the repo and tag repo, tag := docker.ParseRepositoryTag(image) @@ -188,11 +188,10 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if pullTimeout > 0 { - ctx, cancel = context.WithDeadline(ctx, time.Now().Add(pullTimeout)) - } pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport) + defer pm.stop() + pullOptions := docker.PullImageOptions{ Repository: repo, Tag: tag, @@ -207,8 +206,6 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth auth = *authOptions } - pm.start() - defer pm.stop() err := d.client.PullImage(pullOptions, auth) if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded { @@ -395,15 +392,15 @@ func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{} } } -func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time) { +func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time, interval int64) { d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg) } -func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time) { +func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time, interval int64) { d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg) - if timestamp.Sub(pullStart) > dockerPullProgressEmitInterval { + if interval%int64(dockerPullProgressEmitInterval.Seconds()/dockerImageProgressReportInterval.Seconds()) == 0 { d.emitEvent(image, "Docker image %s pull progress: %s", image, msg) } } diff --git a/client/driver/docker_coordinator_test.go b/client/driver/docker_coordinator_test.go index 7feef1f18a2f..c81cee99b9e9 100644 --- a/client/driver/docker_coordinator_test.go +++ b/client/driver/docker_coordinator_test.go @@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) { id := "" for i := 0; i < 10; i++ { go func() { - id, _ = coordinator.PullImage(image, nil, 0, uuid.Generate(), nil) + id, _ = coordinator.PullImage(image, nil, uuid.Generate(), nil) }() } @@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) { callerIDs := make([]string, 10, 10) for i := 0; i < 10; i++ { callerIDs[i] = uuid.Generate() - id, _ = coordinator.PullImage(image, nil, 0, callerIDs[i], nil) + id, _ = coordinator.PullImage(image, nil, callerIDs[i], nil) } // Check the reference count @@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, 0, callerID, nil) + id, _ := coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { } // Pull image again within delay - id, _ = coordinator.PullImage(image, nil, 0, callerID, nil) + id, _ = coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, 0, callerID, nil) + id, _ := coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 0 { diff --git a/client/driver/docker_progress.go b/client/driver/docker_progress.go index c62ebf877f45..8ae87f50085f 100644 --- a/client/driver/docker_progress.go +++ b/client/driver/docker_progress.go @@ -14,13 +14,13 @@ import ( ) const ( - // defaultPullActivityDeadline is the default value set in the imageProgressManager + // dockerPullActivityDeadline is the default value set in the imageProgressManager // when newImageProgressManager is called - defaultPullActivityDeadline = 2 * time.Minute + dockerPullActivityDeadline = 2 * time.Minute - // defaultImageProgressReportInterval is the default value set in the + // dockerImageProgressReportInterval is the default value set in the // imageProgressManager when newImageProgressManager is called - defaultImageProgressReportInterval = 10 * time.Second + dockerImageProgressReportInterval = 10 * time.Second ) // layerProgress tracks the state and downloaded bytes of a single layer within @@ -162,9 +162,9 @@ func (p *imageProgress) totalBytes() int64 { // progressReporterFunc defines the method for handeling inactivity and report // events from the imageProgressManager. The image name, current status message, -// timestamp of last received status update and timestamp of when the pull started -// are passed in. -type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time) +// timestamp of last received status update, timestamp of when the pull started +// and current report interation are passed in. +type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time, interval int64) // imageProgressManager tracks the progress of pulling a docker image from an // image repository. @@ -187,11 +187,11 @@ func newImageProgressManager( image string, cancel context.CancelFunc, inactivityFunc, reporter progressReporterFunc) *imageProgressManager { - return &imageProgressManager{ + pm := &imageProgressManager{ image: image, - activityDeadline: defaultPullActivityDeadline, + activityDeadline: dockerPullActivityDeadline, inactivityFunc: inactivityFunc, - reportInterval: defaultImageProgressReportInterval, + reportInterval: dockerImageProgressReportInterval, reporter: reporter, imageProgress: &imageProgress{ timestamp: time.Now(), @@ -200,23 +200,28 @@ func newImageProgressManager( cancel: cancel, stopCh: make(chan struct{}), } + + pm.start() + return pm } // start intiates the ticker to trigger the inactivity and reporter handlers func (pm *imageProgressManager) start() { pm.imageProgress.pullStart = time.Now() go func() { - ticker := time.NewTicker(defaultImageProgressReportInterval) + ticker := time.NewTicker(dockerImageProgressReportInterval) + var interval int64 for { + interval++ select { case <-ticker.C: msg, timestamp := pm.imageProgress.get() if time.Now().Sub(timestamp) > pm.activityDeadline { - pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart) + pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval) pm.cancel() return } - pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart) + pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval) case <-pm.stopCh: return }