Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper reference counting through task restarts #2484

Merged
merged 1 commit into from
Mar 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ func (c *Client) setupDrivers() error {

var avail []string
var skipped []string
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil, nil)
driverCtx := driver.NewDriverContext("", "", c.config, c.config.Node, c.logger, nil, nil)
for name := range driver.BuiltinDrivers {
// Skip fingerprinting drivers that are not in the whitelist if it is
// enabled.
Expand Down
33 changes: 17 additions & 16 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,16 +419,17 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation {
return cstructs.FSIsolationImage
}

// getDockerCoordinator returns the docker coordinator
func (d *DockerDriver) getDockerCoordinator(client *docker.Client) *dockerCoordinator {
// getDockerCoordinator returns the docker coordinator and the caller ID to use when
// interacting with the coordinator
func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoordinator, string) {
config := &dockerCoordinatorConfig{
client: client,
cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault),
logger: d.logger,
removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault),
}

return GetDockerCoordinator(config)
return GetDockerCoordinator(config), fmt.Sprintf("%s-%s", d.DriverContext.allocID, d.DriverContext.taskName)
}

func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
Expand Down Expand Up @@ -474,7 +475,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
TaskEnv: d.taskEnv,
Task: task,
Driver: "docker",
AllocID: ctx.AllocID,
AllocID: d.DriverContext.allocID,
LogDir: ctx.TaskDir.LogDir,
TaskDir: ctx.TaskDir.Dir,
PortLowerBound: d.config.ClientMinPort,
Expand Down Expand Up @@ -588,14 +589,14 @@ func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error {
// cleanupImage removes a Docker image. No error is returned if the image
// doesn't exist or is still in use. Requires the global client to already be
// initialized.
func (d *DockerDriver) cleanupImage(id string) error {
func (d *DockerDriver) cleanupImage(imageID string) error {
if !d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) {
// Config says not to cleanup
return nil
}

coordinator := d.getDockerCoordinator(client)
coordinator.RemoveImage(id)
coordinator, callerID := d.getDockerCoordinator(client)
coordinator.RemoveImage(imageID, callerID)

return nil
}
Expand Down Expand Up @@ -914,7 +915,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas

config.Env = d.taskEnv.EnvList()

containerName := fmt.Sprintf("%s-%s", task.Name, ctx.AllocID)
containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID)
d.logger.Printf("[DEBUG] driver.docker: setting container name to: %s", containerName)

var networkingConfig *docker.NetworkingConfig
Expand Down Expand Up @@ -952,7 +953,7 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc
tag = "latest"
}

coordinator := d.getDockerCoordinator(client)
coordinator, callerID := d.getDockerCoordinator(client)

// We're going to check whether the image is already downloaded. If the tag
// is "latest", or ForcePull is set, we have to check for a new version every time so we don't
Expand All @@ -962,7 +963,7 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc
} else if tag != "latest" {
if dockerImage, _ := client.InspectImage(image); dockerImage != nil {
// Image exists so just increment its reference count
coordinator.IncrementImageReference(dockerImage.ID, image)
coordinator.IncrementImageReference(dockerImage.ID, image, callerID)
return dockerImage.ID, nil
}
}
Expand Down Expand Up @@ -1001,8 +1002,8 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke
}

d.emitEvent("Downloading image %s:%s", repo, tag)
coordinator := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions)
coordinator, callerID := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID)
}

// loadImage creates an image by loading it from the file system
Expand All @@ -1027,8 +1028,8 @@ func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docke
return "", recoverableErrTimeouts(err)
}

coordinator := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName)
coordinator, callerID := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName, callerID)
return dockerImage.ID, nil
}

Expand Down Expand Up @@ -1186,8 +1187,8 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er

// Increment the reference count since we successfully attached to this
// container
coordinator := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(pid.ImageID, pid.Image)
coordinator, callerID := d.getDockerCoordinator(client)
coordinator.IncrementImageReference(pid.ImageID, pid.Image, callerID)

// Return a driver handle
h := &DockerHandle{
Expand Down
58 changes: 34 additions & 24 deletions client/driver/docker_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type dockerCoordinator struct {
pullFutures map[string]*pullFuture

// imageRefCount is the reference count of image IDs
imageRefCount map[string]int
imageRefCount map[string]map[string]struct{}

// deleteFuture is indexed by image ID and has a cancable delete future
deleteFuture map[string]context.CancelFunc
Expand All @@ -114,7 +114,7 @@ func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
return &dockerCoordinator{
dockerCoordinatorConfig: config,
pullFutures: make(map[string]*pullFuture),
imageRefCount: make(map[string]int),
imageRefCount: make(map[string]map[string]struct{}),
deleteFuture: make(map[string]context.CancelFunc),
}
}
Expand All @@ -130,7 +130,7 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {

// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occured during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration) (imageID string, err error) {
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string) (imageID string, err error) {
// Get the future
d.imageLock.Lock()
future, ok := d.pullFutures[image]
Expand All @@ -157,7 +157,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf

// If we are cleaning up, we increment the reference count on the image
if err == nil && d.cleanup {
d.incrementImageReferenceImpl(id, image)
d.incrementImageReferenceImpl(id, image, callerID)
}

return id, err
Expand Down Expand Up @@ -202,66 +202,76 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth
}

// IncrementImageReference is used to increment an image reference count
func (d *dockerCoordinator) IncrementImageReference(id, image string) {
func (d *dockerCoordinator) IncrementImageReference(imageID, imageName, callerID string) {
d.imageLock.Lock()
defer d.imageLock.Unlock()
d.incrementImageReferenceImpl(id, image)
if d.cleanup {
d.incrementImageReferenceImpl(imageID, imageName, callerID)
}
}

// incrementImageReferenceImpl assumes the lock is held
func (d *dockerCoordinator) incrementImageReferenceImpl(id, image string) {
func (d *dockerCoordinator) incrementImageReferenceImpl(imageID, imageName, callerID string) {
// Cancel any pending delete
if cancel, ok := d.deleteFuture[id]; ok {
d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", image)
if cancel, ok := d.deleteFuture[imageID]; ok {
d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", imageName)
cancel()
delete(d.deleteFuture, id)
delete(d.deleteFuture, imageID)
}

// Increment the reference
d.imageRefCount[id] += 1
d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", image, id, d.imageRefCount[id])
references, ok := d.imageRefCount[imageID]
if !ok {
references = make(map[string]struct{})
d.imageRefCount[imageID] = references
}

if _, ok := references[callerID]; !ok {
references[callerID] = struct{}{}
d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", imageName, imageID, len(references))
}
}

// RemoveImage removes the given image. If there are any errors removing the
// image, the remove is retried internally.
func (d *dockerCoordinator) RemoveImage(id string) {
func (d *dockerCoordinator) RemoveImage(imageID, callerID string) {
d.imageLock.Lock()
defer d.imageLock.Unlock()

if !d.cleanup {
return
}

references, ok := d.imageRefCount[id]
references, ok := d.imageRefCount[imageID]
if !ok {
d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", id)
d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", imageID)
return
}

// Decrement the reference count
references--
d.imageRefCount[id] = references
d.logger.Printf("[DEBUG] driver.docker: image id %q reference count decremented: %d", id, references)
delete(references, callerID)
count := len(references)
d.logger.Printf("[DEBUG] driver.docker: image id %q reference count decremented: %d", imageID, count)

// Nothing to do
if references != 0 {
if count != 0 {
return
}

// This should never be the case but we safefty guard so we don't leak a
// cancel.
if cancel, ok := d.deleteFuture[id]; ok {
d.logger.Printf("[ERR] driver.docker: image id %q has lingering delete future", id)
if cancel, ok := d.deleteFuture[imageID]; ok {
d.logger.Printf("[ERR] driver.docker: image id %q has lingering delete future", imageID)
cancel()
}

// Setup a future to delete the image
ctx, cancel := context.WithCancel(context.Background())
d.deleteFuture[id] = cancel
go d.removeImageImpl(id, ctx)
d.deleteFuture[imageID] = cancel
go d.removeImageImpl(imageID, ctx)

// Delete the key from the reference count
delete(d.imageRefCount, id)
delete(d.imageRefCount, imageID)
}

// removeImageImpl is used to remove an image. It wil wait the specified remove
Expand Down
56 changes: 30 additions & 26 deletions client/driver/docker_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
id := ""
for i := 0; i < 10; i++ {
go func() {
id, _ = coordinator.PullImage(image, nil)
id, _ = coordinator.PullImage(image, nil, structs.GenerateUUID())
}()
}

Expand All @@ -74,8 +74,8 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
}

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 10 {
return false, fmt.Errorf("Got reference count %d; want %d", r, 10)
if references := coordinator.imageRefCount[id]; len(references) != 10 {
return false, fmt.Errorf("Got reference count %d; want %d", len(references), 10)
}

// Ensure there is no pull future
Expand Down Expand Up @@ -107,33 +107,35 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) {
coordinator := NewDockerCoordinator(config)

id := ""
callerIDs := make([]string, 10, 10)
for i := 0; i < 10; i++ {
id, _ = coordinator.PullImage(image, nil)
callerIDs[i] = structs.GenerateUUID()
id, _ = coordinator.PullImage(image, nil, callerIDs[i])
}

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 10 {
t.Fatalf("Got reference count %d; want %d", r, 10)
if references := coordinator.imageRefCount[id]; len(references) != 10 {
t.Fatalf("Got reference count %d; want %d", len(references), 10)
}

// Remove some
for i := 0; i < 8; i++ {
coordinator.RemoveImage(id)
coordinator.RemoveImage(id, callerIDs[i])
}

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 2 {
t.Fatalf("Got reference count %d; want %d", r, 2)
if references := coordinator.imageRefCount[id]; len(references) != 2 {
t.Fatalf("Got reference count %d; want %d", len(references), 2)
}

// Remove all
for i := 0; i < 2; i++ {
coordinator.RemoveImage(id)
for i := 8; i < 10; i++ {
coordinator.RemoveImage(id, callerIDs[i])
}

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 0 {
t.Fatalf("Got reference count %d; want %d", r, 0)
if references := coordinator.imageRefCount[id]; len(references) != 0 {
t.Fatalf("Got reference count %d; want %d", len(references), 0)
}

// Check that only one delete happened
Expand Down Expand Up @@ -165,29 +167,30 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {

// Create a coordinator
coordinator := NewDockerCoordinator(config)
callerID := structs.GenerateUUID()

// Pull image
id, _ := coordinator.PullImage(image, nil)
id, _ := coordinator.PullImage(image, nil, callerID)

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 1 {
t.Fatalf("Got reference count %d; want %d", r, 10)
if references := coordinator.imageRefCount[id]; len(references) != 1 {
t.Fatalf("Got reference count %d; want %d", len(references), 1)
}

// Remove image
coordinator.RemoveImage(id)
coordinator.RemoveImage(id, callerID)

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 0 {
t.Fatalf("Got reference count %d; want %d", r, 0)
if references := coordinator.imageRefCount[id]; len(references) != 0 {
t.Fatalf("Got reference count %d; want %d", len(references), 0)
}

// Pull image again within delay
id, _ = coordinator.PullImage(image, nil)
id, _ = coordinator.PullImage(image, nil, callerID)

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 1 {
t.Fatalf("Got reference count %d; want %d", r, 0)
if references := coordinator.imageRefCount[id]; len(references) != 1 {
t.Fatalf("Got reference count %d; want %d", len(references), 1)
}

// Check that only no delete happened
Expand All @@ -211,17 +214,18 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) {

// Create a coordinator
coordinator := NewDockerCoordinator(config)
callerID := structs.GenerateUUID()

// Pull image
id, _ := coordinator.PullImage(image, nil)
id, _ := coordinator.PullImage(image, nil, callerID)

// Check the reference count
if r := coordinator.imageRefCount[id]; r != 0 {
t.Fatalf("Got reference count %d; want %d", r, 10)
if references := coordinator.imageRefCount[id]; len(references) != 0 {
t.Fatalf("Got reference count %d; want %d", len(references), 0)
}

// Remove image
coordinator.RemoveImage(id)
coordinator.RemoveImage(id, callerID)

// Check that only no delete happened
if removes := mock.removed[id]; removes != 0 {
Expand Down
4 changes: 2 additions & 2 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
}

alloc := mock.Alloc()
execCtx := NewExecContext(taskDir, alloc.ID)
execCtx := NewExecContext(taskDir)
cleanup := func() {
allocDir.Destroy()
if filepath.IsAbs(hostpath) {
Expand All @@ -1195,7 +1195,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
emitter := func(m string, args ...interface{}) {
logger.Printf("[EVENT] "+m, args...)
}
driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv, emitter)
driverCtx := NewDriverContext(task.Name, alloc.ID, cfg, cfg.Node, testLogger(), taskEnv, emitter)
driver := NewDockerDriver(driverCtx)
copyImage(t, taskDir, "busybox.tar")

Expand Down
Loading