Skip to content

Commit

Permalink
Merge pull request #2484 from hashicorp/b-reference-restarts
Browse files Browse the repository at this point in the history
Proper reference counting through task restarts
  • Loading branch information
dadgar committed Mar 28, 2017
2 parents 53766df + 564367f commit 1f41efc
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 92 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ func (c *Client) setupDrivers() error {

var avail []string
var skipped []string
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil, nil)
driverCtx := driver.NewDriverContext("", "", c.config, c.config.Node, c.logger, nil, nil)
for name := range driver.BuiltinDrivers {
// Skip fingerprinting drivers that are not in the whitelist if it is
// enabled.
Expand Down
33 changes: 17 additions & 16 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,16 +419,17 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation {
return cstructs.FSIsolationImage
}

// getDockerCoordinator returns the docker coordinator
func (d *DockerDriver) getDockerCoordinator(client *docker.Client) *dockerCoordinator {
// getDockerCoordinator returns the docker coordinator and the caller ID to use when
// interacting with the coordinator
func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoordinator, string) {
config := &dockerCoordinatorConfig{
client: client,
cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault),
logger: d.logger,
removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault),
}

return GetDockerCoordinator(config)
return GetDockerCoordinator(config), fmt.Sprintf("%s-%s", d.DriverContext.allocID, d.DriverContext.taskName)
}

func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
Expand Down Expand Up @@ -474,7 +475,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
TaskEnv: d.taskEnv,
Task: task,
Driver: "docker",
AllocID: ctx.AllocID,
AllocID: d.DriverContext.allocID,
LogDir: ctx.TaskDir.LogDir,
TaskDir: ctx.TaskDir.Dir,
PortLowerBound: d.config.ClientMinPort,
Expand Down Expand Up @@ -588,14 +589,14 @@ func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error {
// cleanupImage removes a Docker image. No error is returned if the image
// doesn't exist or is still in use. Requires the global client to already be
// initialized.
func (d *DockerDriver) cleanupImage(id string) error {
func (d *DockerDriver) cleanupImage(imageID string) error {
if !d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) {
// Config says not to cleanup
return nil
}

coordinator := d.getDockerCoordinator(client)
coordinator.RemoveImage(id)
coordinator, callerID := d.getDockerCoordinator(client)
coordinator.RemoveImage(imageID, callerID)

return nil
}
Expand Down Expand Up @@ -914,7 +915,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas

config.Env = d.taskEnv.EnvList()

containerName := fmt.Sprintf("%s-%s", task.Name, ctx.AllocID)
containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID)
d.logger.Printf("[DEBUG] driver.docker: setting container name to: %s", containerName)

var networkingConfig *docker.NetworkingConfig
Expand Down Expand Up @@ -952,7 +953,7 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc
tag = "latest"
}

coordinator := d.getDockerCoordinator(client)
coordinator, callerID := d.getDockerCoordinator(client)

// We're going to check whether the image is already downloaded. If the tag
// is "latest", or ForcePull is set, we have to check for a new version every time so we don't
Expand All @@ -962,7 +963,7 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc
} else if tag != "latest" {
if dockerImage, _ := client.InspectImage(image); dockerImage != nil {
// Image exists so just increment its reference count
coordinator.IncrementImageReference(dockerImage.ID, image)
coordinator.IncrementImageReference(dockerImage.ID, image, callerID)
return dockerImage.ID, nil
}
}
Expand Down Expand Up @@ -1001,8 +1002,8 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke
}

d.emitEvent("Downloading image %s:%s", repo, tag)
coordinator := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions)
coordinator, callerID := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID)
}

// loadImage creates an image by loading it from the file system
Expand All @@ -1027,8 +1028,8 @@ func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docke
return "", recoverableErrTimeouts(err)
}

coordinator := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName)
coordinator, callerID := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName, callerID)
return dockerImage.ID, nil
}

Expand Down Expand Up @@ -1186,8 +1187,8 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er

// Increment the reference count since we successfully attached to this
// container
coordinator := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(pid.ImageID, pid.Image)
coordinator, callerID := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(pid.ImageID, pid.Image, callerID)

// Return a driver handle
h := &DockerHandle{
Expand Down
58 changes: 34 additions & 24 deletions client/driver/docker_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type dockerCoordinator struct {
pullFutures map[string]*pullFuture

// imageRefCount is the reference count of image IDs
imageRefCount map[string]int
imageRefCount map[string]map[string]struct{}

// deleteFuture is indexed by image ID and has a cancable delete future
deleteFuture map[string]context.CancelFunc
Expand All @@ -114,7 +114,7 @@ func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
return &dockerCoordinator{
dockerCoordinatorConfig: config,
pullFutures: make(map[string]*pullFuture),
imageRefCount: make(map[string]int),
imageRefCount: make(map[string]map[string]struct{}),
deleteFuture: make(map[string]context.CancelFunc),
}
}
Expand All @@ -130,7 +130,7 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {

// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occured during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration) (imageID string, err error) {
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string) (imageID string, err error) {
// Get the future
d.imageLock.Lock()
future, ok := d.pullFutures[image]
Expand All @@ -157,7 +157,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf

// If we are cleaning up, we increment the reference count on the image
if err == nil && d.cleanup {
d.incrementImageReferenceImpl(id, image)
d.incrementImageReferenceImpl(id, image, callerID)
}

return id, err
Expand Down Expand Up @@ -202,66 +202,76 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth
}

// IncrementImageReference is used to increment an image reference count
func (d *dockerCoordinator) IncrementImageReference(id, image string) {
func (d *dockerCoordinator) IncrementImageReference(imageID, imageName, callerID string) {
d.imageLock.Lock()
defer d.imageLock.Unlock()
d.incrementImageReferenceImpl(id, image)
if d.cleanup {
d.incrementImageReferenceImpl(imageID, imageName, callerID)
}
}

// incrementImageReferenceImpl assumes the lock is held
func (d *dockerCoordinator) incrementImageReferenceImpl(id, image string) {
func (d *dockerCoordinator) incrementImageReferenceImpl(imageID, imageName, callerID string) {
// Cancel any pending delete
if cancel, ok := d.deleteFuture[id]; ok {
d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", image)
if cancel, ok := d.deleteFuture[imageID]; ok {
d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", imageName)
cancel()
delete(d.deleteFuture, id)
delete(d.deleteFuture, imageID)
}

// Increment the reference
d.imageRefCount[id] += 1
d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", image, id, d.imageRefCount[id])
references, ok := d.imageRefCount[imageID]
if !ok {
references = make(map[string]struct{})
d.imageRefCount[imageID] = references
}

if _, ok := references[callerID]; !ok {
references[callerID] = struct{}{}
d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", imageName, imageID, len(references))
}
}

// RemoveImage removes the given image. If there are any errors removing the
// image, the remove is retried internally.
func (d *dockerCoordinator) RemoveImage(id string) {
func (d *dockerCoordinator) RemoveImage(imageID, callerID string) {
d.imageLock.Lock()
defer d.imageLock.Unlock()

if !d.cleanup {
return
}

references, ok := d.imageRefCount[id]
references, ok := d.imageRefCount[imageID]
if !ok {
d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", id)
d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", imageID)
return
}

// Decrement the reference count
references--
d.imageRefCount[id] = references
d.logger.Printf("[DEBUG] driver.docker: image id %q reference count decremented: %d", id, references)
delete(references, callerID)
count := len(references)
d.logger.Printf("[DEBUG] driver.docker: image id %q reference count decremented: %d", imageID, count)

// Nothing to do
if references != 0 {
if count != 0 {
return
}

// This should never be the case but we safefty guard so we don't leak a
// cancel.
if cancel, ok := d.deleteFuture[id]; ok {
d.logger.Printf("[ERR] driver.docker: image id %q has lingering delete future", id)
if cancel, ok := d.deleteFuture[imageID]; ok {
d.logger.Printf("[ERR] driver.docker: image id %q has lingering delete future", imageID)
cancel()
}

// Setup a future to delete the image
ctx, cancel := context.WithCancel(context.Background())
d.deleteFuture[id] = cancel
go d.removeImageImpl(id, ctx)
d.deleteFuture[imageID] = cancel
go d.removeImageImpl(imageID, ctx)

// Delete the key from the reference count
delete(d.imageRefCount, id)
delete(d.imageRefCount, imageID)
}

// removeImageImpl is used to remove an image. It wil wait the specified remove
Expand Down
56 changes: 30 additions & 26 deletions client/driver/docker_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
id := ""
for i := 0; i < 10; i++ {
go func() {
id, _ = coordinator.PullImage(image, nil)
id, _ = coordinator.PullImage(image, nil, structs.GenerateUUID())
}()
}

Expand All @@ -74,8 +74,8 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
}

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 10 {
return false, fmt.Errorf("Got reference count %d; want %d", r, 10)
if references := coordinator.imageRefCount[id]; len(references) != 10 {
return false, fmt.Errorf("Got reference count %d; want %d", len(references), 10)
}

// Ensure there is no pull future
Expand Down Expand Up @@ -107,33 +107,35 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) {
coordinator := NewDockerCoordinator(config)

id := ""
callerIDs := make([]string, 10, 10)
for i := 0; i < 10; i++ {
id, _ = coordinator.PullImage(image, nil)
callerIDs[i] = structs.GenerateUUID()
id, _ = coordinator.PullImage(image, nil, callerIDs[i])
}

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 10 {
t.Fatalf("Got reference count %d; want %d", r, 10)
if references := coordinator.imageRefCount[id]; len(references) != 10 {
t.Fatalf("Got reference count %d; want %d", len(references), 10)
}

// Remove some
for i := 0; i < 8; i++ {
coordinator.RemoveImage(id)
coordinator.RemoveImage(id, callerIDs[i])
}

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 2 {
t.Fatalf("Got reference count %d; want %d", r, 2)
if references := coordinator.imageRefCount[id]; len(references) != 2 {
t.Fatalf("Got reference count %d; want %d", len(references), 2)
}

// Remove all
for i := 0; i < 2; i++ {
coordinator.RemoveImage(id)
for i := 8; i < 10; i++ {
coordinator.RemoveImage(id, callerIDs[i])
}

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 0 {
t.Fatalf("Got reference count %d; want %d", r, 0)
if references := coordinator.imageRefCount[id]; len(references) != 0 {
t.Fatalf("Got reference count %d; want %d", len(references), 0)
}

// Check that only one delete happened
Expand Down Expand Up @@ -165,29 +167,30 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {

// Create a coordinator
coordinator := NewDockerCoordinator(config)
callerID := structs.GenerateUUID()

// Pull image
id, _ := coordinator.PullImage(image, nil)
id, _ := coordinator.PullImage(image, nil, callerID)

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 1 {
t.Fatalf("Got reference count %d; want %d", r, 10)
if references := coordinator.imageRefCount[id]; len(references) != 1 {
t.Fatalf("Got reference count %d; want %d", len(references), 1)
}

// Remove image
coordinator.RemoveImage(id)
coordinator.RemoveImage(id, callerID)

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 0 {
t.Fatalf("Got reference count %d; want %d", r, 0)
if references := coordinator.imageRefCount[id]; len(references) != 0 {
t.Fatalf("Got reference count %d; want %d", len(references), 0)
}

// Pull image again within delay
id, _ = coordinator.PullImage(image, nil)
id, _ = coordinator.PullImage(image, nil, callerID)

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 1 {
t.Fatalf("Got reference count %d; want %d", r, 0)
if references := coordinator.imageRefCount[id]; len(references) != 1 {
t.Fatalf("Got reference count %d; want %d", len(references), 1)
}

// Check that only no delete happened
Expand All @@ -211,17 +214,18 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) {

// Create a coordinator
coordinator := NewDockerCoordinator(config)
callerID := structs.GenerateUUID()

// Pull image
id, _ := coordinator.PullImage(image, nil)
id, _ := coordinator.PullImage(image, nil, callerID)

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 0 {
t.Fatalf("Got reference count %d; want %d", r, 10)
if references := coordinator.imageRefCount[id]; len(references) != 0 {
t.Fatalf("Got reference count %d; want %d", len(references), 0)
}

// Remove image
coordinator.RemoveImage(id)
coordinator.RemoveImage(id, callerID)

// Check that only no delete happened
if removes := mock.removed[id]; removes != 0 {
Expand Down
4 changes: 2 additions & 2 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
}

alloc := mock.Alloc()
execCtx := NewExecContext(taskDir, alloc.ID)
execCtx := NewExecContext(taskDir)
cleanup := func() {
allocDir.Destroy()
if filepath.IsAbs(hostpath) {
Expand All @@ -1195,7 +1195,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
emitter := func(m string, args ...interface{}) {
logger.Printf("[EVENT] "+m, args...)
}
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv, emitter)
driverCtx := NewDriverContext(task.Name, alloc.ID, cfg, cfg.Node, testLogger(), taskEnv, emitter)
driver := NewDockerDriver(driverCtx)
copyImage(t, taskDir, "busybox.tar")

Expand Down
Loading

0 comments on commit 1f41efc

Please sign in to comment.