Skip to content

Commit

Permalink
csi: validate updates to volumes on re-registration
Browse files Browse the repository at this point in the history
The CSI `CreateVolume` RPC is idempotent given that the topology,
capabilities, and parameters are unchanged. Update our state store
validation so that updating the volume via `nomad volume register`
meets the same requirements.
  • Loading branch information
tgross committed Mar 1, 2022
1 parent 907c795 commit fd88e6f
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 18 deletions.
21 changes: 11 additions & 10 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2155,7 +2155,6 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace)
}

// Check for volume existence
obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID)
if err != nil {
return fmt.Errorf("volume existence check error: %v", err)
Expand All @@ -2164,17 +2163,19 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
// Allow some properties of a volume to be updated in place, but
// prevent accidentally overwriting important properties, or
// overwriting a volume in use
old, ok := obj.(*structs.CSIVolume)
if ok &&
old.InUse() ||
old.ExternalID != v.ExternalID ||
old.PluginID != v.PluginID ||
old.Provider != v.Provider {
return fmt.Errorf("volume exists: %s", v.ID)
old := obj.(*structs.CSIVolume)
err := v.ValidateUpdate(old)
if err != nil {
return err
}
s.CSIVolumeDenormalize(nil, old.Copy())
if old.InUse() {
return fmt.Errorf("volume cannot be updated while in use")
}
}

if v.CreateIndex == 0 {
v.CreateIndex = old.CreateIndex
v.ModifyIndex = index
} else {
v.CreateIndex = index
v.ModifyIndex = index
}
Expand Down
8 changes: 8 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2684,6 +2684,10 @@ func TestStateStore_CSIVolume(t *testing.T) {
v0.Schedulable = true
v0.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
v0.RequestedCapabilities = []*structs.CSIVolumeCapability{{
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}

index++
v1 := structs.NewCSIVolume("foo", index)
Expand All @@ -2693,6 +2697,10 @@ func TestStateStore_CSIVolume(t *testing.T) {
v1.Schedulable = true
v1.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v1.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
v1.RequestedCapabilities = []*structs.CSIVolumeCapability{{
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}

index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0, v1})
Expand Down
91 changes: 91 additions & 0 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,97 @@ func (v *CSIVolume) Validate() error {
return nil
}

// ValidateUpdate validates that a volume registration update is
// compatible with the CSI specification's CreateVolume RPC, so that
// we can update volumes via Register with the same requirements as
// Create. CreateVolume must be idempotent, and if a volume with the
// same name already exists, is is accessible from
// accessibility_requirements, and is compatible with the specified
// capacity_range, volume_capabilities and parameters in the
// CreateVolumeRequest, the plugin must return OK
func (v *CSIVolume) ValidateUpdate(old *CSIVolume) error {
if old == nil {
return nil
}

errs := []string{}

if v.ExternalID != old.ExternalID {
errs = append(errs, "volume external ID cannot be updated")
}
if v.Name != old.Name {
errs = append(errs, "volume name cannot be updated")
}
if v.PluginID != old.PluginID {
errs = append(errs, "volume plugin ID cannot be updated")
}
if v.Provider != old.Provider {
errs = append(errs, "volume plugin provider cannot be updated")
}

// must be accessible from new topology requests
if len(v.Topologies) != len(old.Topologies) {
errs = append(errs, "volume topology update was not compatible with existing topology")
} else {
for _, topo := range v.Topologies {
if !topo.MatchFound(old.Topologies) {
errs = append(errs, "volume topology update was not compatible with existing topology")
break
}
}
}
if v.RequestedTopologies != nil {
for _, newTopo := range v.RequestedTopologies.Required {
if !newTopo.MatchFound(old.Topologies) {
errs = append(errs, "volume topology requirement update was not compatible with existing topology")
break
}
}
for _, newTopo := range v.RequestedTopologies.Preferred {
if !newTopo.MatchFound(old.Topologies) {
errs = append(errs, "volume topology preference update was not compatible with existing topology")
break
}
}
}

// must be compatible with capacity range
// TODO: when ExpandVolume is implemented we'll need to update
// this logic https://github.com/hashicorp/nomad/issues/10324
if v.RequestedCapacityMax < old.Capacity ||
v.RequestedCapacityMin > old.Capacity {
errs = append(errs, "volume requested capacity update was not compatible with existing capacity")
}

// must be compatible with volume_capabilities
if old.AccessMode != CSIVolumeAccessModeUnknown ||
old.AttachmentMode != CSIVolumeAttachmentModeUnknown {
var ok bool
for _, cap := range v.RequestedCapabilities {
if cap.AccessMode == old.AccessMode &&
cap.AttachmentMode == old.AttachmentMode {
ok = true
break
}
}
if !ok {
errs = append(errs, "volume requested capabilities update was not compatible with existing capability")
}
}

// must be compatible with parameters set by from CreateVolumeResponse
if !helper.CompareMapStringString(v.Parameters, old.Parameters) {
errs = append(errs, "volume parameters cannot be updated")
}

if len(errs) > 0 {
return fmt.Errorf("validation: %s", strings.Join(errs, ", "))
}

return nil

}

// Request and response wrappers
type CSIVolumeRegisterRequest struct {
Volumes []*CSIVolume
Expand Down
134 changes: 134 additions & 0 deletions nomad/structs/csi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,140 @@ func TestCSIVolume_Validate(t *testing.T) {

}

func TestCSIVolume_ValidateUpdate(t *testing.T) {

testCases := []struct {
name string
old *CSIVolume
v *CSIVolume
expected string
}{
{
name: "invalid topology update",
old: &CSIVolume{
Topologies: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R2"}},
},
},
v: &CSIVolume{
Topologies: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R3"}},
},
},
expected: "validation: volume topology update was not compatible with existing topology",
},
{
name: "invalid topology requirement update",
old: &CSIVolume{
Topologies: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
},
},
v: &CSIVolume{
Topologies: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
},
RequestedTopologies: &CSITopologyRequest{
Required: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R3"}},
},
},
},
expected: "validation: volume topology requirement update was not compatible with existing topology",
},
{
name: "invalid topology preference update",
old: &CSIVolume{
Topologies: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
},
},
v: &CSIVolume{
Topologies: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
},
RequestedTopologies: &CSITopologyRequest{
Preferred: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R3"}},
},
},
},
expected: "validation: volume topology preference update was not compatible with existing topology",
},
{
name: "invalid capacity update",
old: &CSIVolume{Capacity: 100},
v: &CSIVolume{
RequestedCapacityMax: 300, RequestedCapacityMin: 200},
expected: "validation: volume requested capacity update was not compatible with existing capacity",
},
{
name: "invalid capability update",
old: &CSIVolume{
AccessMode: CSIVolumeAccessModeMultiNodeReader,
AttachmentMode: CSIVolumeAttachmentModeFilesystem,
},
v: &CSIVolume{
RequestedCapabilities: []*CSIVolumeCapability{
{
AccessMode: CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: CSIVolumeAttachmentModeFilesystem,
},
},
},
expected: "validation: volume requested capabilities update was not compatible with existing capability",
},
{
name: "valid update",
old: &CSIVolume{
Topologies: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R2"}},
},
AccessMode: CSIVolumeAccessModeMultiNodeReader,
AttachmentMode: CSIVolumeAttachmentModeFilesystem,
},
v: &CSIVolume{
Topologies: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R2"}},
},
RequestedTopologies: &CSITopologyRequest{
Required: []*CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R2"}},
},
Preferred: []*CSITopology{
{Segments: map[string]string{"rack": "R2"}},
},
},
RequestedCapabilities: []*CSIVolumeCapability{
{
AccessMode: CSIVolumeAccessModeMultiNodeReader,
AttachmentMode: CSIVolumeAttachmentModeFilesystem,
},
{
AccessMode: CSIVolumeAccessModeMultiNodeReader,
AttachmentMode: CSIVolumeAttachmentModeFilesystem,
},
},
},
},
}
for _, tc := range testCases {
err := tc.v.ValidateUpdate(tc.old)
if tc.expected == "" {
require.NoError(t, err, tc.name)
} else {
require.EqualError(t, err, tc.expected, tc.name)
}
}
}

func TestCSIPluginJobs(t *testing.T) {
plug := NewCSIPlugin("foo", 1000)
controller := &Job{
Expand Down
13 changes: 13 additions & 0 deletions nomad/structs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ func (t *CSITopology) Equal(o *CSITopology) bool {
return helper.CompareMapStringString(t.Segments, o.Segments)
}

func (t *CSITopology) MatchFound(o []*CSITopology) bool {
if t == nil || o == nil || len(o) == 0 {
return false
}

for _, other := range o {
if t.Equal(other) {
return true
}
}
return false
}

// CSITopologyRequest are the topologies submitted as options to the
// storage provider at the time the volume was created. The storage
// provider will return a single topology.
Expand Down
9 changes: 1 addition & 8 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,7 @@ func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) {
// volume MUST be accessible from at least one of the
// requisite topologies."
if len(vol.Topologies) > 0 {
var ok bool
for _, requiredTopo := range vol.Topologies {
if requiredTopo.Equal(plugin.NodeInfo.AccessibleTopology) {
ok = true
break
}
}
if !ok {
if !plugin.NodeInfo.AccessibleTopology.MatchFound(vol.Topologies) {
return false, FilterConstraintsCSIPluginTopology
}
}
Expand Down

0 comments on commit fd88e6f

Please sign in to comment.