Skip to content

Commit

Permalink
Merge pull request #5923 from hashicorp/dani/rfc-host-volumes
Browse files Browse the repository at this point in the history
Initial support for Host Volumes
  • Loading branch information
endocrimes committed Aug 9, 2019
2 parents 2e0d67c + 22b16de commit a087e8e
Show file tree
Hide file tree
Showing 23 changed files with 876 additions and 30 deletions.
17 changes: 17 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,21 @@ func (m *MigrateStrategy) Copy() *MigrateStrategy {
return nm
}

type Volume struct {
Name string
Type string
ReadOnly bool `mapstructure:"read_only"`
Hidden bool

Config map[string]interface{}
}

type VolumeMount struct {
Volume string
Destination string
ReadOnly bool `mapstructure:"read_only"`
}

// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name *string
Expand All @@ -508,6 +523,7 @@ type TaskGroup struct {
Affinities []*Affinity
Tasks []*Task
Spreads []*Spread
Volumes map[string]*Volume
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
Expand Down Expand Up @@ -715,6 +731,7 @@ type Task struct {
Vault *Vault
Templates []*Template
DispatchPayload *DispatchPayloadConfig
VolumeMounts []*VolumeMount
Leader bool
ShutdownDelay time.Duration `mapstructure:"shutdown_delay"`
KillSignal string `mapstructure:"kill_signal"`
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (tr *TaskRunner) initHooks() {
newTaskDirHook(tr, hookLogger),
newLogMonHook(tr.logmonHookConfig, hookLogger),
newDispatchHook(tr.Alloc(), hookLogger),
newVolumeHook(tr, hookLogger),
newArtifactHook(tr, hookLogger),
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
newDeviceHook(tr.devicemanager, hookLogger),
Expand Down
127 changes: 127 additions & 0 deletions client/allocrunner/taskrunner/volume_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package taskrunner

import (
"context"
"fmt"

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)

type volumeHook struct {
alloc *structs.Allocation
runner *TaskRunner
logger log.Logger
}

func newVolumeHook(runner *TaskRunner, logger log.Logger) *volumeHook {
h := &volumeHook{
alloc: runner.Alloc(),
runner: runner,
}
h.logger = logger.Named(h.Name())
return h
}

func (*volumeHook) Name() string {
return "volumes"
}

func validateHostVolumes(requestedByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) error {
var result error

for n, req := range requestedByAlias {
if req.Type != structs.VolumeTypeHost {
continue
}

cfg, err := structs.ParseHostVolumeConfig(req.Config)
if err != nil {
result = multierror.Append(result, fmt.Errorf("failed to parse config for %s: %v", n, err))
continue
}

_, ok := clientVolumesByName[cfg.Source]
if !ok {
result = multierror.Append(result, fmt.Errorf("missing %s", cfg.Source))
}
}

return result
}

// hostVolumeMountConfigurations takes the users requested volume mounts,
// volumes, and the client host volume configuration and converts them into a
// format that can be used by drivers.
func (h *volumeHook) hostVolumeMountConfigurations(taskMounts []*structs.VolumeMount, taskVolumesByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) ([]*drivers.MountConfig, error) {
var mounts []*drivers.MountConfig
for _, m := range taskMounts {
req, ok := taskVolumesByAlias[m.Volume]
if !ok {
// Should never happen unless we misvalidated on job submission
return nil, fmt.Errorf("No group volume declaration found named: %s", m.Volume)
}

cfg, err := structs.ParseHostVolumeConfig(req.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse config for %s: %v", m.Volume, err)
}

hostVolume, ok := clientVolumesByName[cfg.Source]
if !ok {
// Should never happen, but unless the client volumes were mutated during
// the execution of this hook.
return nil, fmt.Errorf("No host volume named: %s", cfg.Source)
}

mcfg := &drivers.MountConfig{
HostPath: hostVolume.Path,
TaskPath: m.Destination,
Readonly: hostVolume.ReadOnly || req.ReadOnly || m.ReadOnly,
}
mounts = append(mounts, mcfg)
}

return mounts, nil
}

func (h *volumeHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
volumes := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup).Volumes
mounts := h.runner.hookResources.getMounts()

hostVolumes := h.runner.clientConfig.Node.HostVolumes

// Always validate volumes to ensure that we do not allow volumes to be used
// if a host is restarted and loses the host volume configuration.
if err := validateHostVolumes(volumes, hostVolumes); err != nil {
h.logger.Error("Requested Host Volume does not exist", "existing", hostVolumes, "requested", volumes)
return fmt.Errorf("host volume validation error: %v", err)
}

requestedMounts, err := h.hostVolumeMountConfigurations(req.Task.VolumeMounts, volumes, hostVolumes)
if err != nil {
h.logger.Error("Failed to generate volume mounts", "error", err)
return err
}

// Because this hook is also ran on restores, we only add mounts that do not
// already exist. Although this loop is somewhat expensive, there are only
// a small number of mounts that exist within most individual tasks. We may
// want to revisit this using a `hookdata` param to be "mount only once"
REQUESTED:
for _, m := range requestedMounts {
for _, em := range mounts {
if em.IsEqual(m) {
continue REQUESTED
}
}

mounts = append(mounts, m)
}

h.runner.hookResources.setMounts(mounts)
return nil
}
10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,16 @@ func (c *Client) setupNode() error {
if node.Name == "" {
node.Name, _ = os.Hostname()
}
// TODO(dani): Fingerprint these to handle volumes that don't exist/have bad perms.
if node.HostVolumes == nil {
if l := len(c.config.HostVolumes); l != 0 {
node.HostVolumes = make(map[string]*structs.ClientHostVolumeConfig, l)
for k, v := range c.config.HostVolumes {
node.HostVolumes[k] = v.Copy()
}
}
}

if node.Name == "" {
node.Name = node.ID
}
Expand Down
4 changes: 4 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ type Config struct {
// AutoFetchCNIDir is the destination dir to use when auto doanloading CNI plugins.
// This directory will be appended to the CNIPath so it is searched last
AutoFetchCNIDir string

// HostVolumes is the set of configured host volumes
HostVolumes map[string]*structs.ClientHostVolumeConfig
}

func (c *Config) Copy() *Config {
Expand All @@ -254,6 +257,7 @@ func (c *Config) Copy() *Config {
nc.Node = nc.Node.Copy()
nc.Servers = helper.CopySliceString(nc.Servers)
nc.Options = helper.CopyMapStringString(nc.Options)
nc.HostVolumes = structs.CopyMapStringClientHostVolumeConfig(nc.HostVolumes)
nc.ConsulConfig = c.ConsulConfig.Copy()
nc.VaultConfig = c.VaultConfig.Copy()
return nc
Expand Down
6 changes: 6 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,12 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
conf.ClientMinPort = uint(agentConfig.Client.ClientMinPort)
conf.DisableRemoteExec = agentConfig.Client.DisableRemoteExec

hvMap := make(map[string]*structs.ClientHostVolumeConfig, len(agentConfig.Client.HostVolumes))
for _, v := range agentConfig.Client.HostVolumes {
hvMap[v.Name] = v
}
conf.HostVolumes = hvMap

// Setup the node
conf.Node = new(structs.Node)
conf.Node.Datacenter = agentConfig.Datacenter
Expand Down
10 changes: 10 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ type ClientConfig struct {
// ServerJoin contains information that is used to attempt to join servers
ServerJoin *ServerJoin `hcl:"server_join"`

// HostVolumes contains information about the volumes an operator has made
// available to jobs running on this node.
HostVolumes []*structs.ClientHostVolumeConfig `hcl:"host_volume"`

// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`

Expand Down Expand Up @@ -1333,6 +1337,12 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin)
}

if len(a.HostVolumes) == 0 && len(b.HostVolumes) != 0 {
result.HostVolumes = structs.CopySliceClientHostVolumeConfig(b.HostVolumes)
} else if len(b.HostVolumes) != 0 {
result.HostVolumes = structs.HostVolumeSliceMerge(a.HostVolumes, b.HostVolumes)
}

return &result
}

Expand Down
5 changes: 5 additions & 0 deletions command/agent/config_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func extraKeys(c *Config) error {
// stats is an unused key, continue to silently ignore it
removeEqualFold(&c.Client.ExtraKeysHCL, "stats")

// Remove HostVolume extra keys
for _, hv := range c.Client.HostVolumes {
removeEqualFold(&c.Client.ExtraKeysHCL, hv.Name)
}

for _, k := range []string{"enabled_schedulers", "start_join", "retry_join", "server_join"} {
removeEqualFold(&c.ExtraKeysHCL, k)
removeEqualFold(&c.ExtraKeysHCL, "server")
Expand Down
4 changes: 4 additions & 0 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -81,6 +82,9 @@ var basicConfig = &Config{
GCMaxAllocs: 50,
NoHostUUID: helper.BoolToPtr(false),
DisableRemoteExec: true,
HostVolumes: []*structs.ClientHostVolumeConfig{
{Name: "tmp", Path: "/tmp"},
},
},
Server: &ServerConfig{
Enabled: true,
Expand Down
32 changes: 32 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (s *HTTPServer) ValidateJobRequest(resp http.ResponseWriter, req *http.Requ
}

job := ApiJobToStructJob(validateRequest.Job)

args := structs.JobValidateRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Expand Down Expand Up @@ -728,6 +729,26 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
}
}

if l := len(taskGroup.Volumes); l != 0 {
tg.Volumes = make(map[string]*structs.VolumeRequest, l)
for k, v := range taskGroup.Volumes {
if v.Type != structs.VolumeTypeHost {
// Ignore non-host volumes in this iteration currently.
continue
}

vol := &structs.VolumeRequest{
Name: v.Name,
Type: v.Type,
ReadOnly: v.ReadOnly,
Hidden: v.Hidden,
Config: v.Config,
}

tg.Volumes[k] = vol
}
}

if taskGroup.Update != nil {
tg.Update = &structs.UpdateStrategy{
Stagger: *taskGroup.Update.Stagger,
Expand Down Expand Up @@ -775,6 +796,17 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
structsTask.Constraints = ApiConstraintsToStructs(apiTask.Constraints)
structsTask.Affinities = ApiAffinitiesToStructs(apiTask.Affinities)

if l := len(apiTask.VolumeMounts); l != 0 {
structsTask.VolumeMounts = make([]*structs.VolumeMount, l)
for i, mount := range apiTask.VolumeMounts {
structsTask.VolumeMounts[i] = &structs.VolumeMount{
Volume: mount.Volume,
Destination: mount.Destination,
ReadOnly: mount.ReadOnly,
}
}
}

if l := len(apiTask.Services); l != 0 {
structsTask.Services = make([]*structs.Service, l)
for i, service := range apiTask.Services {
Expand Down
4 changes: 4 additions & 0 deletions command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ client {
gc_max_allocs = 50
no_host_uuid = false
disable_remote_exec = true

host_volume "tmp" {
path = "/tmp"
}
}

server {
Expand Down
Loading

0 comments on commit a087e8e

Please sign in to comment.