Skip to content

Commit

Permalink
client/driver: remove pull timeout due to race condition that can lea…
Browse files Browse the repository at this point in the history
…d to unexpected timeouts

If two jobs are pulling the same image simultaneously, which ever starts the pull first will set the pull timeout.
This can lead to a poor UX where the first job requested a short timeout while the second job requested a longer timeout
causing the pull to potentially timeout much sooner than expected by the second job.
  • Loading branch information
nickethier committed May 1, 2018
1 parent 475def4 commit a58a914
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 41 deletions.
13 changes: 1 addition & 12 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,6 @@ type DockerDriverConfig struct {
ReadonlyRootfs bool `mapstructure:"readonly_rootfs"` // Mount the container’s root filesystem as read only
AdvertiseIPv6Address bool `mapstructure:"advertise_ipv6_address"` // Flag to use the GlobalIPv6Address from the container as the detected IP
CPUHardLimit bool `mapstructure:"cpu_hard_limit"` // Enforce CPU hard limit.
ImagePullTimeoutRaw string `mapstructure:"image_pull_timeout"` //
ImagePullTimeout time.Duration `mapstructure:"-"` // Timeout on the image pull after which the pull is cancelled
}

func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) {
Expand Down Expand Up @@ -305,12 +303,6 @@ func (c *DockerDriverConfig) Validate() error {
return err
}
c.Ulimit = ulimit
if len(c.ImagePullTimeoutRaw) > 0 {
c.ImagePullTimeout, err = time.ParseDuration(c.ImagePullTimeoutRaw)
if err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -744,9 +736,6 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error {
"cpu_hard_limit": {
Type: fields.TypeBool,
},
"image_pull_timeout": {
Type: fields.TypeString,
},
},
}

Expand Down Expand Up @@ -1558,7 +1547,7 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke
d.emitEvent("Downloading image %s:%s", repo, tag)
coordinator, callerID := d.getDockerCoordinator(client)

return coordinator.PullImage(driverConfig.ImageName, authOptions, driverConfig.ImagePullTimeout, callerID, d.emitEvent)
return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID, d.emitEvent)
}

// authBackend encapsulates a function that resolves registry credentials.
Expand Down
19 changes: 8 additions & 11 deletions client/driver/docker_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {

// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occurred during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, callerID string, emitFn LogEventFn) (imageID string, err error) {
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string, emitFn LogEventFn) (imageID string, err error) {
// Get the future
d.imageLock.Lock()
future, ok := d.pullFutures[image]
Expand All @@ -152,7 +152,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf
// Make the future
future = newPullFuture()
d.pullFutures[image] = future
go d.pullImageImpl(image, authOptions, pullTimeout, future)
go d.pullImageImpl(image, authOptions, future)
}
d.imageLock.Unlock()

Expand All @@ -179,7 +179,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf

// pullImageImpl is the implementation of pulling an image. The results are
// returned via the passed future
func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, pullTimeout time.Duration, future *pullFuture) {
func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) {
defer d.clearPullLogger(image)
// Parse the repo and tag
repo, tag := docker.ParseRepositoryTag(image)
Expand All @@ -188,11 +188,10 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if pullTimeout > 0 {
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(pullTimeout))
}

pm := newImageProgressManager(image, cancel, d.handlePullInactivity, d.handlePullProgressReport)
defer pm.stop()

pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
Expand All @@ -207,8 +206,6 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth
auth = *authOptions
}

pm.start()
defer pm.stop()
err := d.client.PullImage(pullOptions, auth)

if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded {
Expand Down Expand Up @@ -395,15 +392,15 @@ func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{}
}
}

func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time) {
func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp, pullStart time.Time, interval int64) {
d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg)

}

func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time) {
func (d *dockerCoordinator) handlePullProgressReport(image, msg string, timestamp, pullStart time.Time, interval int64) {
d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg)

if timestamp.Sub(pullStart) > dockerPullProgressEmitInterval {
if interval%int64(dockerPullProgressEmitInterval.Seconds()/dockerImageProgressReportInterval.Seconds()) == 0 {
d.emitEvent(image, "Docker image %s pull progress: %s", image, msg)
}
}
Expand Down
10 changes: 5 additions & 5 deletions client/driver/docker_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
id := ""
for i := 0; i < 10; i++ {
go func() {
id, _ = coordinator.PullImage(image, nil, 0, uuid.Generate(), nil)
id, _ = coordinator.PullImage(image, nil, uuid.Generate(), nil)
}()
}

Expand Down Expand Up @@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) {
callerIDs := make([]string, 10, 10)
for i := 0; i < 10; i++ {
callerIDs[i] = uuid.Generate()
id, _ = coordinator.PullImage(image, nil, 0, callerIDs[i], nil)
id, _ = coordinator.PullImage(image, nil, callerIDs[i], nil)
}

// Check the reference count
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
callerID := uuid.Generate()

// Pull image
id, _ := coordinator.PullImage(image, nil, 0, callerID, nil)
id, _ := coordinator.PullImage(image, nil, callerID, nil)

// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
Expand All @@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
}

// Pull image again within delay
id, _ = coordinator.PullImage(image, nil, 0, callerID, nil)
id, _ = coordinator.PullImage(image, nil, callerID, nil)

// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) {
callerID := uuid.Generate()

// Pull image
id, _ := coordinator.PullImage(image, nil, 0, callerID, nil)
id, _ := coordinator.PullImage(image, nil, callerID, nil)

// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 0 {
Expand Down
31 changes: 18 additions & 13 deletions client/driver/docker_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
)

const (
// defaultPullActivityDeadline is the default value set in the imageProgressManager
// dockerPullActivityDeadline is the default value set in the imageProgressManager
// when newImageProgressManager is called
defaultPullActivityDeadline = 2 * time.Minute
dockerPullActivityDeadline = 2 * time.Minute

// defaultImageProgressReportInterval is the default value set in the
// dockerImageProgressReportInterval is the default value set in the
// imageProgressManager when newImageProgressManager is called
defaultImageProgressReportInterval = 10 * time.Second
dockerImageProgressReportInterval = 10 * time.Second
)

// layerProgress tracks the state and downloaded bytes of a single layer within
Expand Down Expand Up @@ -162,9 +162,9 @@ func (p *imageProgress) totalBytes() int64 {

// progressReporterFunc defines the method for handeling inactivity and report
// events from the imageProgressManager. The image name, current status message,
// timestamp of last received status update and timestamp of when the pull started
// are passed in.
type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time)
// timestamp of last received status update, timestamp of when the pull started
// and current report interation are passed in.
type progressReporterFunc func(image string, msg string, timestamp time.Time, pullStart time.Time, interval int64)

// imageProgressManager tracks the progress of pulling a docker image from an
// image repository.
Expand All @@ -187,11 +187,11 @@ func newImageProgressManager(
image string, cancel context.CancelFunc,
inactivityFunc, reporter progressReporterFunc) *imageProgressManager {

return &imageProgressManager{
pm := &imageProgressManager{
image: image,
activityDeadline: defaultPullActivityDeadline,
activityDeadline: dockerPullActivityDeadline,
inactivityFunc: inactivityFunc,
reportInterval: defaultImageProgressReportInterval,
reportInterval: dockerImageProgressReportInterval,
reporter: reporter,
imageProgress: &imageProgress{
timestamp: time.Now(),
Expand All @@ -200,23 +200,28 @@ func newImageProgressManager(
cancel: cancel,
stopCh: make(chan struct{}),
}

pm.start()
return pm
}

// start intiates the ticker to trigger the inactivity and reporter handlers
func (pm *imageProgressManager) start() {
pm.imageProgress.pullStart = time.Now()
go func() {
ticker := time.NewTicker(defaultImageProgressReportInterval)
ticker := time.NewTicker(dockerImageProgressReportInterval)
var interval int64
for {
interval++
select {
case <-ticker.C:
msg, timestamp := pm.imageProgress.get()
if time.Now().Sub(timestamp) > pm.activityDeadline {
pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart)
pm.inactivityFunc(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval)
pm.cancel()
return
}
pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart)
pm.reporter(pm.image, msg, timestamp, pm.imageProgress.pullStart, interval)
case <-pm.stopCh:
return
}
Expand Down

0 comments on commit a58a914

Please sign in to comment.