Skip to content

Commit

Permalink
Merge pull request #4192 from hashicorp/f-docker-progress-detection
Browse files Browse the repository at this point in the history
client/driver: docker progress detection and monitoring
  • Loading branch information
nickethier committed May 7, 2018
2 parents f92d364 + d442a44 commit 006c6f3
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ IMPROVEMENTS:
* command: Add -short option to init command that emits a minimal
jobspec [[GH-4239](https://github.com/hashicorp/nomad/issues/4239)]
* discovery: Support Consul gRPC health checks. [[GH-4251](https://github.com/hashicorp/nomad/issues/4251)]
* driver/docker: Add progress monitoring and inactivity detection to docker
image pulls [[GH-4192](https://github.com/hashicorp/nomad/issues/4192)]

## 0.8.3 (April 27, 2018)

Expand Down
3 changes: 2 additions & 1 deletion client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,7 +1546,8 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke

d.emitEvent("Downloading image %s:%s", repo, tag)
coordinator, callerID := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID)

return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID, d.emitEvent)
}

// authBackend encapsulates a function that resolves registry credentials.
Expand Down
76 changes: 73 additions & 3 deletions client/driver/docker_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ var (
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)

const (
// dockerPullProgressEmitInterval is the interval at which the pull progress
// is emitted to the allocation
dockerPullProgressEmitInterval = 2 * time.Minute
)

// pullFuture is a sharable future for retrieving a pulled images ID and any
// error that may have occurred during the pull.
type pullFuture struct {
Expand Down Expand Up @@ -98,6 +104,14 @@ type dockerCoordinator struct {
// only have one request be sent to Docker
pullFutures map[string]*pullFuture

// pullLoggers is used to track the LogEventFn for each alloc pulling an image.
// If multiple alloc's are attempting to pull the same image, each will need
// to register its own LogEventFn with the coordinator.
pullLoggers map[string][]LogEventFn

// pullLoggerLock is used to sync access to the pullLoggers map
pullLoggerLock sync.RWMutex

// imageRefCount is the reference count of image IDs
imageRefCount map[string]map[string]struct{}

Expand All @@ -114,6 +128,7 @@ func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
return &dockerCoordinator{
dockerCoordinatorConfig: config,
pullFutures: make(map[string]*pullFuture),
pullLoggers: make(map[string][]LogEventFn),
imageRefCount: make(map[string]map[string]struct{}),
deleteFuture: make(map[string]context.CancelFunc),
}
Expand All @@ -130,10 +145,11 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {

// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occurred during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string) (imageID string, err error) {
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string, emitFn LogEventFn) (imageID string, err error) {
// Get the future
d.imageLock.Lock()
future, ok := d.pullFutures[image]
d.registerPullLogger(image, emitFn)
if !ok {
// Make the future
future = newPullFuture()
Expand Down Expand Up @@ -166,22 +182,41 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf
// pullImageImpl is the implementation of pulling an image. The results are
// returned via the passed future
func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) {
defer d.clearPullLogger(image)
// Parse the repo and tag
repo, tag := docker.ParseRepositoryTag(image)
if tag == "" {
tag = "latest"
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pm := newImageProgressManager(image, cancel, d.handlePullInactivity,
d.handlePullProgressReport, d.handleSlowPullProgressReport)
defer pm.stop()

pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
Repository: repo,
Tag: tag,
OutputStream: pm,
RawJSONStream: true,
Context: ctx,
}

// Attempt to pull the image
var auth docker.AuthConfiguration
if authOptions != nil {
auth = *authOptions
}

err := d.client.PullImage(pullOptions, auth)

if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded {
d.logger.Printf("[ERR] driver.docker: timeout pulling container %s:%s", repo, tag)
future.set("", recoverablePullError(ctxErr, image))
return
}

if err != nil {
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
future.set("", recoverablePullError(err, image))
Expand Down Expand Up @@ -337,6 +372,41 @@ func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) {
d.imageLock.Unlock()
}

func (d *dockerCoordinator) registerPullLogger(image string, logger LogEventFn) {
d.pullLoggerLock.Lock()
defer d.pullLoggerLock.Unlock()
if _, ok := d.pullLoggers[image]; !ok {
d.pullLoggers[image] = []LogEventFn{}
}
d.pullLoggers[image] = append(d.pullLoggers[image], logger)
}

func (d *dockerCoordinator) clearPullLogger(image string) {
d.pullLoggerLock.Lock()
defer d.pullLoggerLock.Unlock()
delete(d.pullLoggers, image)
}

func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{}) {
d.pullLoggerLock.RLock()
defer d.pullLoggerLock.RUnlock()
for i := range d.pullLoggers[image] {
go d.pullLoggers[image][i](message, args...)
}
}

func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp time.Time) {
d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg)
}

func (d *dockerCoordinator) handlePullProgressReport(image, msg string, _ time.Time) {
d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg)
}

func (d *dockerCoordinator) handleSlowPullProgressReport(image, msg string, _ time.Time) {
d.emitEvent(image, "Docker image %s pull progress: %s", image, msg)
}

// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func recoverablePullError(err error, image string) error {
Expand Down
10 changes: 5 additions & 5 deletions client/driver/docker_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
id := ""
for i := 0; i < 10; i++ {
go func() {
id, _ = coordinator.PullImage(image, nil, uuid.Generate())
id, _ = coordinator.PullImage(image, nil, uuid.Generate(), nil)
}()
}

Expand Down Expand Up @@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) {
callerIDs := make([]string, 10, 10)
for i := 0; i < 10; i++ {
callerIDs[i] = uuid.Generate()
id, _ = coordinator.PullImage(image, nil, callerIDs[i])
id, _ = coordinator.PullImage(image, nil, callerIDs[i], nil)
}

// Check the reference count
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
callerID := uuid.Generate()

// Pull image
id, _ := coordinator.PullImage(image, nil, callerID)
id, _ := coordinator.PullImage(image, nil, callerID, nil)

// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
Expand All @@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
}

// Pull image again within delay
id, _ = coordinator.PullImage(image, nil, callerID)
id, _ = coordinator.PullImage(image, nil, callerID, nil)

// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) {
callerID := uuid.Generate()

// Pull image
id, _ := coordinator.PullImage(image, nil, callerID)
id, _ := coordinator.PullImage(image, nil, callerID, nil)

// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 0 {
Expand Down
Loading

0 comments on commit 006c6f3

Please sign in to comment.