diff --git a/client/client.go b/client/client.go index 1dc8d19b83d4..c771065f1958 100644 --- a/client/client.go +++ b/client/client.go @@ -857,7 +857,7 @@ func (c *Client) setupDrivers() error { var avail []string var skipped []string - driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil, nil) + driverCtx := driver.NewDriverContext("", "", c.config, c.config.Node, c.logger, nil, nil) for name := range driver.BuiltinDrivers { // Skip fingerprinting drivers that are not in the whitelist if it is // enabled. diff --git a/client/driver/docker.go b/client/driver/docker.go index 844c9cff873d..a334876939d6 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -419,8 +419,9 @@ func (d *DockerDriver) FSIsolation() cstructs.FSIsolation { return cstructs.FSIsolationImage } -// getDockerCoordinator returns the docker coordinator -func (d *DockerDriver) getDockerCoordinator(client *docker.Client) *dockerCoordinator { +// getDockerCoordinator returns the docker coordinator and the caller ID to use when +// interacting with the coordinator +func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoordinator, string) { config := &dockerCoordinatorConfig{ client: client, cleanup: d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault), @@ -428,7 +429,7 @@ func (d *DockerDriver) getDockerCoordinator(client *docker.Client) *dockerCoordi removeDelay: d.config.ReadDurationDefault(dockerImageRemoveDelayConfigOption, dockerImageRemoveDelayConfigDefault), } - return GetDockerCoordinator(config) + return GetDockerCoordinator(config), fmt.Sprintf("%s-%s", d.DriverContext.allocID, d.DriverContext.taskName) } func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) { @@ -474,7 +475,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle TaskEnv: d.taskEnv, Task: task, Driver: "docker", - AllocID: ctx.AllocID, + AllocID: d.DriverContext.allocID, LogDir: ctx.TaskDir.LogDir, TaskDir: ctx.TaskDir.Dir, PortLowerBound: d.config.ClientMinPort, @@ -588,14 +589,14 @@ func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error { // cleanupImage removes a Docker image. No error is returned if the image // doesn't exist or is still in use. Requires the global client to already be // initialized. -func (d *DockerDriver) cleanupImage(id string) error { +func (d *DockerDriver) cleanupImage(imageID string) error { if !d.config.ReadBoolDefault(dockerCleanupImageConfigOption, dockerCleanupImageConfigDefault) { // Config says not to cleanup return nil } - coordinator := d.getDockerCoordinator(client) - coordinator.RemoveImage(id) + coordinator, callerID := d.getDockerCoordinator(client) + coordinator.RemoveImage(imageID, callerID) return nil } @@ -914,7 +915,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas config.Env = d.taskEnv.EnvList() - containerName := fmt.Sprintf("%s-%s", task.Name, ctx.AllocID) + containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID) d.logger.Printf("[DEBUG] driver.docker: setting container name to: %s", containerName) var networkingConfig *docker.NetworkingConfig @@ -952,7 +953,7 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc tag = "latest" } - coordinator := d.getDockerCoordinator(client) + coordinator, callerID := d.getDockerCoordinator(client) // We're going to check whether the image is already downloaded. If the tag // is "latest", or ForcePull is set, we have to check for a new version every time so we don't @@ -962,7 +963,7 @@ func (d *DockerDriver) createImage(driverConfig *DockerDriverConfig, client *doc } else if tag != "latest" { if dockerImage, _ := client.InspectImage(image); dockerImage != nil { // Image exists so just increment its reference count - coordinator.IncrementImageReference(dockerImage.ID, image) + coordinator.IncrementImageReference(dockerImage.ID, image, callerID) return dockerImage.ID, nil } } @@ -1001,8 +1002,8 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke } d.emitEvent("Downloading image %s:%s", repo, tag) - coordinator := d.getDockerCoordinator(client) - return coordinator.PullImage(driverConfig.ImageName, authOptions) + coordinator, callerID := d.getDockerCoordinator(client) + return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID) } // loadImage creates an image by loading it from the file system @@ -1027,8 +1028,8 @@ func (d *DockerDriver) loadImage(driverConfig *DockerDriverConfig, client *docke return "", recoverableErrTimeouts(err) } - coordinator := d.getDockerCoordinator(client) - coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName) + coordinator, callerID := d.getDockerCoordinator(client) + coordinator.IncrementImageReference(dockerImage.ID, driverConfig.ImageName, callerID) return dockerImage.ID, nil } @@ -1186,8 +1187,8 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er // Increment the reference count since we successfully attached to this // container - coordinator := d.getDockerCoordinator(client) - coordinator.IncrementImageReference(pid.ImageID, pid.Image) + coordinator, callerID := d.getDockerCoordinator(client) + coordinator.IncrementImageReference(pid.ImageID, pid.Image, callerID) // Return a driver handle h := &DockerHandle{ diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go index 587499fdafa8..30a97ae87b1c 100644 --- a/client/driver/docker_coordinator.go +++ b/client/driver/docker_coordinator.go @@ -99,7 +99,7 @@ type dockerCoordinator struct { pullFutures map[string]*pullFuture // imageRefCount is the reference count of image IDs - imageRefCount map[string]int + imageRefCount map[string]map[string]struct{} // deleteFuture is indexed by image ID and has a cancable delete future deleteFuture map[string]context.CancelFunc @@ -114,7 +114,7 @@ func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { return &dockerCoordinator{ dockerCoordinatorConfig: config, pullFutures: make(map[string]*pullFuture), - imageRefCount: make(map[string]int), + imageRefCount: make(map[string]map[string]struct{}), deleteFuture: make(map[string]context.CancelFunc), } } @@ -130,7 +130,7 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { // PullImage is used to pull an image. It returns the pulled imaged ID or an // error that occured during the pull -func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration) (imageID string, err error) { +func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string) (imageID string, err error) { // Get the future d.imageLock.Lock() future, ok := d.pullFutures[image] @@ -157,7 +157,7 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // If we are cleaning up, we increment the reference count on the image if err == nil && d.cleanup { - d.incrementImageReferenceImpl(id, image) + d.incrementImageReferenceImpl(id, image, callerID) } return id, err @@ -202,29 +202,39 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth } // IncrementImageReference is used to increment an image reference count -func (d *dockerCoordinator) IncrementImageReference(id, image string) { +func (d *dockerCoordinator) IncrementImageReference(imageID, imageName, callerID string) { d.imageLock.Lock() defer d.imageLock.Unlock() - d.incrementImageReferenceImpl(id, image) + if d.cleanup { + d.incrementImageReferenceImpl(imageID, imageName, callerID) + } } // incrementImageReferenceImpl assumes the lock is held -func (d *dockerCoordinator) incrementImageReferenceImpl(id, image string) { +func (d *dockerCoordinator) incrementImageReferenceImpl(imageID, imageName, callerID string) { // Cancel any pending delete - if cancel, ok := d.deleteFuture[id]; ok { - d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", image) + if cancel, ok := d.deleteFuture[imageID]; ok { + d.logger.Printf("[DEBUG] driver.docker: cancelling removal of image %q", imageName) cancel() - delete(d.deleteFuture, id) + delete(d.deleteFuture, imageID) } // Increment the reference - d.imageRefCount[id] += 1 - d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", image, id, d.imageRefCount[id]) + references, ok := d.imageRefCount[imageID] + if !ok { + references = make(map[string]struct{}) + d.imageRefCount[imageID] = references + } + + if _, ok := references[callerID]; !ok { + references[callerID] = struct{}{} + d.logger.Printf("[DEBUG] driver.docker: image %q (%v) reference count incremented: %d", imageName, imageID, len(references)) + } } // RemoveImage removes the given image. If there are any errors removing the // image, the remove is retried internally. -func (d *dockerCoordinator) RemoveImage(id string) { +func (d *dockerCoordinator) RemoveImage(imageID, callerID string) { d.imageLock.Lock() defer d.imageLock.Unlock() @@ -232,36 +242,36 @@ func (d *dockerCoordinator) RemoveImage(id string) { return } - references, ok := d.imageRefCount[id] + references, ok := d.imageRefCount[imageID] if !ok { - d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", id) + d.logger.Printf("[WARN] driver.docker: RemoveImage on non-referenced counted image id %q", imageID) return } // Decrement the reference count - references-- - d.imageRefCount[id] = references - d.logger.Printf("[DEBUG] driver.docker: image id %q reference count decremented: %d", id, references) + delete(references, callerID) + count := len(references) + d.logger.Printf("[DEBUG] driver.docker: image id %q reference count decremented: %d", imageID, count) // Nothing to do - if references != 0 { + if count != 0 { return } // This should never be the case but we safefty guard so we don't leak a // cancel. - if cancel, ok := d.deleteFuture[id]; ok { - d.logger.Printf("[ERR] driver.docker: image id %q has lingering delete future", id) + if cancel, ok := d.deleteFuture[imageID]; ok { + d.logger.Printf("[ERR] driver.docker: image id %q has lingering delete future", imageID) cancel() } // Setup a future to delete the image ctx, cancel := context.WithCancel(context.Background()) - d.deleteFuture[id] = cancel - go d.removeImageImpl(id, ctx) + d.deleteFuture[imageID] = cancel + go d.removeImageImpl(imageID, ctx) // Delete the key from the reference count - delete(d.imageRefCount, id) + delete(d.imageRefCount, imageID) } // removeImageImpl is used to remove an image. It wil wait the specified remove diff --git a/client/driver/docker_coordinator_test.go b/client/driver/docker_coordinator_test.go index 5501fe2061a4..7b17ea8567cf 100644 --- a/client/driver/docker_coordinator_test.go +++ b/client/driver/docker_coordinator_test.go @@ -63,7 +63,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) { id := "" for i := 0; i < 10; i++ { go func() { - id, _ = coordinator.PullImage(image, nil) + id, _ = coordinator.PullImage(image, nil, structs.GenerateUUID()) }() } @@ -74,8 +74,8 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) { } // Check the reference count - if r := coordinator.imageRefCount[id]; r != 10 { - return false, fmt.Errorf("Got reference count %d; want %d", r, 10) + if references := coordinator.imageRefCount[id]; len(references) != 10 { + return false, fmt.Errorf("Got reference count %d; want %d", len(references), 10) } // Ensure there is no pull future @@ -107,33 +107,35 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) { coordinator := NewDockerCoordinator(config) id := "" + callerIDs := make([]string, 10, 10) for i := 0; i < 10; i++ { - id, _ = coordinator.PullImage(image, nil) + callerIDs[i] = structs.GenerateUUID() + id, _ = coordinator.PullImage(image, nil, callerIDs[i]) } // Check the reference count - if r := coordinator.imageRefCount[id]; r != 10 { - t.Fatalf("Got reference count %d; want %d", r, 10) + if references := coordinator.imageRefCount[id]; len(references) != 10 { + t.Fatalf("Got reference count %d; want %d", len(references), 10) } // Remove some for i := 0; i < 8; i++ { - coordinator.RemoveImage(id) + coordinator.RemoveImage(id, callerIDs[i]) } // Check the reference count - if r := coordinator.imageRefCount[id]; r != 2 { - t.Fatalf("Got reference count %d; want %d", r, 2) + if references := coordinator.imageRefCount[id]; len(references) != 2 { + t.Fatalf("Got reference count %d; want %d", len(references), 2) } // Remove all - for i := 0; i < 2; i++ { - coordinator.RemoveImage(id) + for i := 8; i < 10; i++ { + coordinator.RemoveImage(id, callerIDs[i]) } // Check the reference count - if r := coordinator.imageRefCount[id]; r != 0 { - t.Fatalf("Got reference count %d; want %d", r, 0) + if references := coordinator.imageRefCount[id]; len(references) != 0 { + t.Fatalf("Got reference count %d; want %d", len(references), 0) } // Check that only one delete happened @@ -165,29 +167,30 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { // Create a coordinator coordinator := NewDockerCoordinator(config) + callerID := structs.GenerateUUID() // Pull image - id, _ := coordinator.PullImage(image, nil) + id, _ := coordinator.PullImage(image, nil, callerID) // Check the reference count - if r := coordinator.imageRefCount[id]; r != 1 { - t.Fatalf("Got reference count %d; want %d", r, 10) + if references := coordinator.imageRefCount[id]; len(references) != 1 { + t.Fatalf("Got reference count %d; want %d", len(references), 1) } // Remove image - coordinator.RemoveImage(id) + coordinator.RemoveImage(id, callerID) // Check the reference count - if r := coordinator.imageRefCount[id]; r != 0 { - t.Fatalf("Got reference count %d; want %d", r, 0) + if references := coordinator.imageRefCount[id]; len(references) != 0 { + t.Fatalf("Got reference count %d; want %d", len(references), 0) } // Pull image again within delay - id, _ = coordinator.PullImage(image, nil) + id, _ = coordinator.PullImage(image, nil, callerID) // Check the reference count - if r := coordinator.imageRefCount[id]; r != 1 { - t.Fatalf("Got reference count %d; want %d", r, 0) + if references := coordinator.imageRefCount[id]; len(references) != 1 { + t.Fatalf("Got reference count %d; want %d", len(references), 1) } // Check that only no delete happened @@ -211,17 +214,18 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) { // Create a coordinator coordinator := NewDockerCoordinator(config) + callerID := structs.GenerateUUID() // Pull image - id, _ := coordinator.PullImage(image, nil) + id, _ := coordinator.PullImage(image, nil, callerID) // Check the reference count - if r := coordinator.imageRefCount[id]; r != 0 { - t.Fatalf("Got reference count %d; want %d", r, 10) + if references := coordinator.imageRefCount[id]; len(references) != 0 { + t.Fatalf("Got reference count %d; want %d", len(references), 0) } // Remove image - coordinator.RemoveImage(id) + coordinator.RemoveImage(id, callerID) // Check that only no delete happened if removes := mock.removed[id]; removes != 0 { diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index fe725252fc8f..002b07d59c74 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -1177,7 +1177,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str } alloc := mock.Alloc() - execCtx := NewExecContext(taskDir, alloc.ID) + execCtx := NewExecContext(taskDir) cleanup := func() { allocDir.Destroy() if filepath.IsAbs(hostpath) { @@ -1195,7 +1195,7 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str emitter := func(m string, args ...interface{}) { logger.Printf("[EVENT] "+m, args...) } - driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv, emitter) + driverCtx := NewDriverContext(task.Name, alloc.ID, cfg, cfg.Node, testLogger(), taskEnv, emitter) driver := NewDockerDriver(driverCtx) copyImage(t, taskDir, "busybox.tar") diff --git a/client/driver/driver.go b/client/driver/driver.go index ee53e64424c3..a89db81679f6 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -201,6 +201,7 @@ type LogEventFn func(message string, args ...interface{}) // each time we do it. Used in conjection with Factory, above. type DriverContext struct { taskName string + allocID string config *config.Config logger *log.Logger node *structs.Node @@ -219,10 +220,11 @@ func NewEmptyDriverContext() *DriverContext { // This enables other packages to create DriverContexts but keeps the fields // private to the driver. If we want to change this later we can gorename all of // the fields in DriverContext. -func NewDriverContext(taskName string, config *config.Config, node *structs.Node, +func NewDriverContext(taskName, allocID string, config *config.Config, node *structs.Node, logger *log.Logger, taskEnv *env.TaskEnvironment, eventEmitter LogEventFn) *DriverContext { return &DriverContext{ taskName: taskName, + allocID: allocID, config: config, node: node, logger: logger, @@ -258,16 +260,12 @@ type DriverHandle interface { type ExecContext struct { // TaskDir contains information about the task directory structure. TaskDir *allocdir.TaskDir - - // Alloc ID - AllocID string } // NewExecContext is used to create a new execution context -func NewExecContext(td *allocdir.TaskDir, allocID string) *ExecContext { +func NewExecContext(td *allocdir.TaskDir) *ExecContext { return &ExecContext{ TaskDir: td, - AllocID: allocID, } } diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index 42fc842722a2..8cd44b331ae9 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -110,7 +110,7 @@ func testDriverContexts(t *testing.T, task *structs.Task) *testContext { return nil } - execCtx := NewExecContext(td, alloc.ID) + execCtx := NewExecContext(td) taskEnv, err := GetTaskEnv(td, cfg.Node, task, alloc, cfg, "") if err != nil { @@ -123,7 +123,7 @@ func testDriverContexts(t *testing.T, task *structs.Task) *testContext { emitter := func(m string, args ...interface{}) { logger.Printf("[EVENT] "+m, args...) } - driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, logger, taskEnv, emitter) + driverCtx := NewDriverContext(task.Name, alloc.ID, cfg, cfg.Node, logger, taskEnv, emitter) return &testContext{allocDir, driverCtx, execCtx} } diff --git a/client/driver/exec.go b/client/driver/exec.go index da1256be6edc..d94c82443fff 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -124,7 +124,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, Driver: "exec", - AllocID: ctx.AllocID, + AllocID: d.DriverContext.allocID, LogDir: ctx.TaskDir.LogDir, TaskDir: ctx.TaskDir.Dir, Task: task, diff --git a/client/driver/java.go b/client/driver/java.go index bbe03f2e65b1..4a90b4efad65 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -248,7 +248,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, Driver: "java", - AllocID: ctx.AllocID, + AllocID: d.DriverContext.allocID, Task: task, TaskDir: ctx.TaskDir.Dir, LogDir: ctx.TaskDir.LogDir, diff --git a/client/driver/lxc.go b/client/driver/lxc.go index 5b541a3f8d04..0d369d34cd0a 100644 --- a/client/driver/lxc.go +++ b/client/driver/lxc.go @@ -186,7 +186,7 @@ func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e lxcPath = path } - containerName := fmt.Sprintf("%s-%s", task.Name, ctx.AllocID) + containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID) c, err := lxc.NewContainer(containerName, lxcPath) if err != nil { return nil, fmt.Errorf("unable to initialize container: %v", err) diff --git a/client/driver/lxc_test.go b/client/driver/lxc_test.go index b6188bc6b6c5..3bb8dee86f6b 100644 --- a/client/driver/lxc_test.go +++ b/client/driver/lxc_test.go @@ -102,7 +102,7 @@ func TestLxcDriver_Start_Wait(t *testing.T) { }) // Look for mounted directories in their proper location - containerName := fmt.Sprintf("%s-%s", task.Name, ctx.ExecCtx.AllocID) + containerName := fmt.Sprintf("%s-%s", task.Name, ctx.DriverCtx.allocID) for _, mnt := range []string{"alloc", "local", "secrets"} { fullpath := filepath.Join(lxcHandle.lxcPath, containerName, "rootfs", mnt) stat, err := os.Stat(fullpath) diff --git a/client/driver/qemu.go b/client/driver/qemu.go index f4d2c1d06b76..ad90c80383c2 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -238,7 +238,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, Driver: "qemu", - AllocID: ctx.AllocID, + AllocID: d.DriverContext.allocID, Task: task, TaskDir: ctx.TaskDir.Dir, LogDir: ctx.TaskDir.LogDir, diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 802de073cc69..1500cf00f013 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -129,7 +129,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, Driver: "raw_exec", - AllocID: ctx.AllocID, + AllocID: d.DriverContext.allocID, Task: task, TaskDir: ctx.TaskDir.Dir, LogDir: ctx.TaskDir.LogDir, diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 0a15ac34cfaf..55e2a026c9de 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -257,17 +257,17 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e sanitizedName := strings.Replace(task.Name, "_", "-", -1) // Mount /alloc - allocVolName := fmt.Sprintf("%s-%s-alloc", ctx.AllocID, sanitizedName) + allocVolName := fmt.Sprintf("%s-%s-alloc", d.DriverContext.allocID, sanitizedName) cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", allocVolName, ctx.TaskDir.SharedAllocDir)) cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", allocVolName, allocdir.SharedAllocContainerPath)) // Mount /local - localVolName := fmt.Sprintf("%s-%s-local", ctx.AllocID, sanitizedName) + localVolName := fmt.Sprintf("%s-%s-local", d.DriverContext.allocID, sanitizedName) cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", localVolName, ctx.TaskDir.LocalDir)) cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", localVolName, allocdir.TaskLocalContainerPath)) // Mount /secrets - secretsVolName := fmt.Sprintf("%s-%s-secrets", ctx.AllocID, sanitizedName) + secretsVolName := fmt.Sprintf("%s-%s-secrets", d.DriverContext.allocID, sanitizedName) cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", secretsVolName, ctx.TaskDir.SecretsDir)) cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", secretsVolName, allocdir.TaskSecretsContainerPath)) @@ -281,7 +281,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e if len(parts) != 2 { return nil, fmt.Errorf("invalid rkt volume: %q", rawvol) } - volName := fmt.Sprintf("%s-%s-%d", ctx.AllocID, sanitizedName, i) + volName := fmt.Sprintf("%s-%s-%d", d.DriverContext.allocID, sanitizedName, i) cmdArgs = append(cmdArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", volName, parts[0])) cmdArgs = append(cmdArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, parts[1])) } @@ -413,7 +413,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, Driver: "rkt", - AllocID: ctx.AllocID, + AllocID: d.DriverContext.allocID, Task: task, TaskDir: ctx.TaskDir.Dir, LogDir: ctx.TaskDir.LogDir, diff --git a/client/task_runner.go b/client/task_runner.go index 56df71f619ae..7ef169e07b7a 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -280,7 +280,7 @@ func (r *TaskRunner) RestoreState() error { return err } - ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) + ctx := driver.NewExecContext(r.taskDir) handle, err := d.Open(ctx, snap.HandleID) // In the case it fails, we relaunch the task in the Run() method. @@ -378,7 +378,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) { r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg)) } - driverCtx := driver.NewDriverContext(r.task.Name, r.config, r.config.Node, r.logger, env, eventEmitter) + driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, env, eventEmitter) driver, err := driver.NewDriver(r.task.Driver, driverCtx) if err != nil { return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v", @@ -1061,7 +1061,7 @@ func (r *TaskRunner) cleanup() { res := r.getCreatedResources() - ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) + ctx := driver.NewExecContext(r.taskDir) attempts := 1 var cleanupErr error for retry := true; retry; attempts++ { @@ -1182,7 +1182,7 @@ func (r *TaskRunner) startTask() error { } // Run prestart - ctx := driver.NewExecContext(r.taskDir, r.alloc.ID) + ctx := driver.NewExecContext(r.taskDir) res, err := drv.Prestart(ctx, r.task) // Merge newly created resources into previously created resources