Skip to content

Commit

Permalink
csi: validate updates to volumes on re-registration
Browse files Browse the repository at this point in the history
The CSI `CreateVolume` RPC is idempotent given that the topology,
capabilities, and parameters are unchanged. CSI volumes have many
user-defined fields that are immutable once set, and many fields that
are not user-settable.

Update the `Register` RPC so that updating a volume via the API merges
onto any existing volume without touching Nomad-controlled fields,
while validating it with the same strict requirements expected for
idempotent `CreateVolume` RPCs.
  • Loading branch information
tgross committed Mar 2, 2022
1 parent 907c795 commit 2311782
Show file tree
Hide file tree
Showing 10 changed files with 393 additions and 26 deletions.
4 changes: 4 additions & 0 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum
resp.CapacityBytes = cresp.Volume.CapacityBytes
resp.VolumeContext = cresp.Volume.VolumeContext

// Note: we safely throw away cresp.Volume.ContentSource here
// because it's just round-tripping the value set by the user in
// the server RPC call

resp.Topologies = make([]*nstructs.CSITopology, len(cresp.Volume.AccessibleTopology))
for _, topo := range cresp.Volume.AccessibleTopology {
resp.Topologies = append(resp.Topologies,
Expand Down
49 changes: 43 additions & 6 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest,

vol.Provider = plugin.Provider
vol.ProviderVersion = plugin.Version

return plugin, nil
}

Expand All @@ -263,7 +264,15 @@ func (v *CSIVolume) controllerValidateVolume(req *structs.CSIVolumeRegisterReque
return v.srv.RPC(method, cReq, cResp)
}

// Register registers a new volume
// Register registers a new volume or updates an existing volume. Note
// that most user-defined CSIVolume fields are immutable once the
// volume has been created.
//
// If the user needs to change fields because they've misconfigured
// the registration of the external volume, we expect that claims
// won't work either, and the user can deregister the volume and try
// again with the right settings. This lets us be as strict with
// validation here as the CreateVolume CSI RPC is expected to be.
func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *structs.CSIVolumeRegisterResponse) error {
if done, err := v.srv.forward("CSIVolume.Register", args, args, reply); done {
return err
Expand Down Expand Up @@ -302,11 +311,39 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
return err
}

// The topologies for the volume have already been set when it was
// created, so we accept the user's description of that topology
if vol.Topologies == nil || len(vol.Topologies) == 0 {
if vol.RequestedTopologies != nil {
vol.Topologies = vol.RequestedTopologies.Required
snap, err := v.srv.State().Snapshot()
if err != nil {
return err
}
ws := memdb.NewWatchSet()
existingVol, err := snap.CSIVolumeByID(ws, vol.Namespace, vol.ID)
if err != nil {
return err
}

// CSIVolume has many user-defined fields which are immutable
// once set, and many fields that are controlled by Nomad and
// are not user-settable. We merge onto a copy of the existing
// volume to allow a user to submit a volume spec for `volume
// create` and reuse it for updates in `volume register`
// without having to manually remove the fields unused by
// register (and similar use cases with API consumers such as
// Terraform).
if existingVol != nil {
existingVol = existingVol.Copy()
err = existingVol.Merge(existingVol)
if err != nil {
return err
}
vol = existingVol
} else {
// The topologies for the volume have already been set when it
// was created, so for newly register volumes we accept the
// user's description of that topology
if vol.Topologies == nil || len(vol.Topologies) == 0 {
if vol.RequestedTopologies != nil {
vol.Topologies = vol.RequestedTopologies.Required
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ func (n *nomadFSM) applyCSIVolumeRegister(buf []byte, index uint64) interface{}
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_register"}, time.Now())

if err := n.state.CSIVolumeRegister(index, req.Volumes); err != nil {
if err := n.state.UpsertCSIVolume(index, req.Volumes); err != nil {
n.logger.Error("CSIVolumeRegister failed", "error", err)
return err
}
Expand Down
22 changes: 12 additions & 10 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,8 +2143,8 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string)
return iter, nil
}

// CSIVolumeRegister adds a volume to the server store, failing if it already exists
func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error {
// UpsertCSIVolume inserts a volume in the state store.
func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()

Expand All @@ -2155,7 +2155,6 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace)
}

// Check for volume existence
obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID)
if err != nil {
return fmt.Errorf("volume existence check error: %v", err)
Expand All @@ -2164,17 +2163,20 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
// Allow some properties of a volume to be updated in place, but
// prevent accidentally overwriting important properties, or
// overwriting a volume in use
old, ok := obj.(*structs.CSIVolume)
if ok &&
old.InUse() ||
old.ExternalID != v.ExternalID ||
old := obj.(*structs.CSIVolume)
if old.ExternalID != v.ExternalID ||
old.PluginID != v.PluginID ||
old.Provider != v.Provider {
return fmt.Errorf("volume exists: %s", v.ID)
return fmt.Errorf("volume identity cannot be updated: %s", v.ID)
}
s.CSIVolumeDenormalize(nil, old.Copy())
if old.InUse() {
return fmt.Errorf("volume cannot be updated while in use")
}
}

if v.CreateIndex == 0 {
v.CreateIndex = old.CreateIndex
v.ModifyIndex = index
} else {
v.CreateIndex = index
v.ModifyIndex = index
}
Expand Down
8 changes: 8 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2684,6 +2684,10 @@ func TestStateStore_CSIVolume(t *testing.T) {
v0.Schedulable = true
v0.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
v0.RequestedCapabilities = []*structs.CSIVolumeCapability{{
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}

index++
v1 := structs.NewCSIVolume("foo", index)
Expand All @@ -2693,6 +2697,10 @@ func TestStateStore_CSIVolume(t *testing.T) {
v1.Schedulable = true
v1.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v1.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
v1.RequestedCapabilities = []*structs.CSIVolumeCapability{{
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}

index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0, v1})
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestBadCSIState(t testing.TB, store *StateStore) error {
}
vol = vol.Copy() // canonicalize

err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
err = store.UpsertCSIVolume(index, []*structs.CSIVolume{vol})
if err != nil {
return err
}
Expand Down
137 changes: 137 additions & 0 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package structs

import (
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -180,6 +181,22 @@ func (o *CSIMountOptions) Merge(p *CSIMountOptions) {
}
}

func (o *CSIMountOptions) Equal(p *CSIMountOptions) bool {
if o == nil && p == nil {
return true
}
if o == nil || p == nil {
return false
}

if o.FSType != p.FSType {
return false
}

return helper.CompareSliceSetString(
o.MountFlags, p.MountFlags)
}

// CSIMountOptions implements the Stringer and GoStringer interfaces to prevent
// accidental leakage of sensitive mount flags via logs.
var _ fmt.Stringer = &CSIMountOptions{}
Expand Down Expand Up @@ -707,6 +724,126 @@ func (v *CSIVolume) Validate() error {
return nil
}

// Merge updates the mutable fields of a volume with those from
// another volume. CSIVolume has many user-defined fields which are
// immutable once set, and many fields that are not
// user-settable. Merge will return an error if we try to mutate the
// user-defined immutable fields after they're set, but silently
// ignore fields that are controlled by Nomad.
func (v *CSIVolume) Merge(other *CSIVolume) error {
if other == nil {
return nil
}

var errs *multierror.Error

if v.Name != other.Name && other.Name != "" {
errs = multierror.Append(errs, errors.New("volume name cannot be updated"))
}
if v.ExternalID != other.ExternalID && other.ExternalID != "" {
errs = multierror.Append(errs, errors.New(
"volume external ID cannot be updated"))
}
if v.PluginID != other.PluginID {
errs = multierror.Append(errs, errors.New(
"volume plugin ID cannot be updated"))
}
if v.CloneID != other.CloneID && other.CloneID != "" {
errs = multierror.Append(errs, errors.New(
"volume clone ID cannot be updated"))
}
if v.SnapshotID != other.SnapshotID && other.SnapshotID != "" {
errs = multierror.Append(errs, errors.New(
"volume snapshot ID cannot be updated"))
}

// must be compatible with capacity range
// TODO: when ExpandVolume is implemented we'll need to update
// this logic https://github.com/hashicorp/nomad/issues/10324
if other.RequestedCapacityMax < v.Capacity ||
other.RequestedCapacityMin > v.Capacity {
errs = multierror.Append(errs, errors.New(
"volume requested capacity update was not compatible with existing capacity"))
} else {
v.RequestedCapacityMin = other.RequestedCapacityMin
v.RequestedCapacityMax = other.RequestedCapacityMax
}

// must be compatible with volume_capabilities
if v.AccessMode != CSIVolumeAccessModeUnknown ||
v.AttachmentMode != CSIVolumeAttachmentModeUnknown {
var ok bool
for _, cap := range other.RequestedCapabilities {
if cap.AccessMode == v.AccessMode &&
cap.AttachmentMode == v.AttachmentMode {
ok = true
break
}
}
if ok {
v.RequestedCapabilities = other.RequestedCapabilities
} else {
errs = multierror.Append(errs, errors.New(
"volume requested capabilities update was not compatible with existing capability in use"))
}
}

// topologies are immutable, so topology request changes must be
// compatible with the existing topology, if any
if len(v.Topologies) > 0 {
if other.RequestedTopologies == nil {
if v.RequestedTopologies != nil {
errs = multierror.Append(errs, errors.New(
"volume topology request update was not compatible with existing topology"))
}
} else {
var err error
for _, otherTopo := range other.RequestedTopologies.Required {
if !otherTopo.MatchFound(v.Topologies) {
err = errors.New(
"volume topology requirement update was not compatible with existing topology")
break
}
}
for _, otherTopo := range other.RequestedTopologies.Preferred {
if !otherTopo.MatchFound(v.Topologies) {
err = errors.New(
"volume topology preference update was not compatible with existing topology")
break
}
}
if err != nil {
errs = multierror.Append(errs, err)
} else {
v.RequestedTopologies = other.RequestedTopologies
}
}
}

// MountOptions are immutable once set
if v.MountOptions.Equal(other.MountOptions) {
v.MountOptions = other.MountOptions
} else {
errs = multierror.Append(errs, errors.New(
"volume mount options cannot be updated"))
}

// Secrets can be updated freely
v.Secrets = other.Secrets

// must be compatible with parameters set by from CreateVolumeResponse
if !helper.CompareMapStringString(v.Parameters, other.Parameters) {
errs = multierror.Append(errs, errors.New(
"volume parameters cannot be updated"))
}

// Context is mutable and will be used during controller
// validation
v.Context = other.Context

return errs.ErrorOrNil()
}

// Request and response wrappers
type CSIVolumeRegisterRequest struct {
Volumes []*CSIVolume
Expand Down
Loading

0 comments on commit 2311782

Please sign in to comment.