Skip to content

Commit

Permalink
client: Add volume_hook for mounting volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
endocrimes committed Jul 25, 2019
1 parent 7db1f0f commit d2ee5fc
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 0 deletions.
1 change: 1 addition & 0 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (tr *TaskRunner) initHooks() {
newTaskDirHook(tr, hookLogger),
newLogMonHook(tr.logmonHookConfig, hookLogger),
newDispatchHook(tr.Alloc(), hookLogger),
newVolumeHook(tr, hookLogger),
newArtifactHook(tr, hookLogger),
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
newDeviceHook(tr.devicemanager, hookLogger),
Expand Down
123 changes: 123 additions & 0 deletions client/allocrunner/taskrunner/volume_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package taskrunner

import (
"context"
"fmt"

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)

type volumeHook struct {
alloc *structs.Allocation
runner *TaskRunner
logger log.Logger
}

func newVolumeHook(runner *TaskRunner, logger log.Logger) *volumeHook {
h := &volumeHook{
alloc: runner.Alloc(),
runner: runner,
}
h.logger = logger.Named(h.Name())
return h
}

func (*volumeHook) Name() string {
return "volumes"
}

func validateHostVolumes(requested map[string]*structs.VolumeRequest, client map[string]*structs.ClientHostVolumeConfig) error {
var result error

for n, req := range requested {
if req.Volume.Type != "host" {
continue
}

cfg, err := structs.ParseHostVolumeConfig(req.Config)
if err != nil {
result = multierror.Append(result, fmt.Errorf("failed to parse config for %s: %v", n, err))
}

_, ok := client[cfg.Source]
if !ok {
result = multierror.Append(result, fmt.Errorf("missing %s", cfg.Source))
}
}

return result
}

func (h *volumeHook) hostVolumeMountConfigurations(vmounts []*structs.VolumeMount, volumes map[string]*structs.VolumeRequest, client map[string]*structs.ClientHostVolumeConfig) ([]*drivers.MountConfig, error) {
var mounts []*drivers.MountConfig
for _, m := range vmounts {
req, ok := volumes[m.Volume]
if !ok {
// Should never happen unless we misvalidated on job submission
return nil, fmt.Errorf("No group volume declaration found named: %s", m.Volume)
}

cfg, err := structs.ParseHostVolumeConfig(req.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse config for %s: %v", m.Volume, err)
}

hostVolume, ok := client[cfg.Source]
if !ok {
// Should never happen, but unless the client volumes were mutated during
// the execution of this hook.
return nil, fmt.Errorf("No host volume named: %s", cfg.Source)
}

mcfg := &drivers.MountConfig{
HostPath: hostVolume.Source,
TaskPath: m.Destination,
Readonly: hostVolume.ReadOnly || req.Volume.ReadOnly || m.ReadOnly,
}
mounts = append(mounts, mcfg)
}

return mounts, nil
}

func (h *volumeHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
volumes := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup).Volumes
mounts := h.runner.hookResources.getMounts()

hostVolumes := h.runner.clientConfig.Node.HostVolumes

// Always validate volumes to ensure that we do not allow volumes to be used
// if a host is restarted and loses the host volume configuration.
if err := validateHostVolumes(volumes, hostVolumes); err != nil {
h.logger.Error("Requested Host Volume does not exist", "existing", hostVolumes, "requested", volumes)
return fmt.Errorf("host volume validation error: %v", err)
}

requestedMounts, err := h.hostVolumeMountConfigurations(req.Task.VolumeMounts, volumes, hostVolumes)
if err != nil {
h.logger.Error("Failed to generate volume mounts", "error", err)
return err
}

// Because this hook is also ran on restores, we only add mounts that do not
// already exist. Although this loop is somewhat expensive, there are only
// a small number of mounts that exist within most individual tasks. We may
// want to revisit this using a `hookdata` param to be "mount only once"
REQUESTED:
for _, m := range requestedMounts {
for _, em := range mounts {
if em.IsEqual(m) {
continue REQUESTED
}
}

mounts = append(mounts, m)
}

h.runner.hookResources.setMounts(mounts)
return nil
}
6 changes: 6 additions & 0 deletions plugins/drivers/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ type MountConfig struct {
Readonly bool
}

func (m *MountConfig) IsEqual(o *MountConfig) bool {
return m.TaskPath == o.TaskPath &&
m.HostPath == o.HostPath &&
m.Readonly == o.Readonly
}

func (m *MountConfig) Copy() *MountConfig {
if m == nil {
return nil
Expand Down

0 comments on commit d2ee5fc

Please sign in to comment.