diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index 2438edce0a1b..abbe0b8e3cf6 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -216,6 +216,10 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum resp.CapacityBytes = cresp.Volume.CapacityBytes resp.VolumeContext = cresp.Volume.VolumeContext + // Note: we safely throw away cresp.Volume.ContentSource here + // because it's just round-tripping the value set by the user in + // the server RPC call + resp.Topologies = make([]*nstructs.CSITopology, len(cresp.Volume.AccessibleTopology)) for _, topo := range cresp.Volume.AccessibleTopology { resp.Topologies = append(resp.Topologies, diff --git a/command/alloc_status_test.go b/command/alloc_status_test.go index 69626b50e3c8..fca8a2a19133 100644 --- a/command/alloc_status_test.go +++ b/command/alloc_status_test.go @@ -479,7 +479,7 @@ func TestAllocStatusCommand_CSIVolumes(t *testing.T) { Segments: map[string]string{"foo": "bar"}, }}, }} - err = state.CSIVolumeRegister(1002, vols) + err = state.UpsertCSIVolume(1002, vols) require.NoError(t, err) // Upsert the job and alloc diff --git a/command/volume_status_test.go b/command/volume_status_test.go index afd35213c4de..313d57502007 100644 --- a/command/volume_status_test.go +++ b/command/volume_status_test.go @@ -46,7 +46,7 @@ func TestCSIVolumeStatusCommand_AutocompleteArgs(t *testing.T) { PluginID: "glade", } - require.NoError(t, state.CSIVolumeRegister(1000, []*structs.CSIVolume{vol})) + require.NoError(t, state.UpsertCSIVolume(1000, []*structs.CSIVolume{vol})) prefix := vol.ID[:len(vol.ID)-5] args := complete.Args{Last: prefix} diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 092fc16ffd0a..b57ae126ece4 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -238,6 +238,7 @@ func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, vol.Provider = plugin.Provider vol.ProviderVersion = plugin.Version + return plugin, nil } @@ -263,7 +264,15 @@ func (v *CSIVolume) controllerValidateVolume(req *structs.CSIVolumeRegisterReque return v.srv.RPC(method, cReq, cResp) } -// Register registers a new volume +// Register registers a new volume or updates an existing volume. Note +// that most user-defined CSIVolume fields are immutable once the +// volume has been created. +// +// If the user needs to change fields because they've misconfigured +// the registration of the external volume, we expect that claims +// won't work either, and the user can deregister the volume and try +// again with the right settings. This lets us be as strict with +// validation here as the CreateVolume CSI RPC is expected to be. func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *structs.CSIVolumeRegisterResponse) error { if done, err := v.srv.forward("CSIVolume.Register", args, args, reply); done { return err @@ -302,9 +311,35 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru return err } - // The topologies for the volume have already been set when it was - // created, so we accept the user's description of that topology - if vol.Topologies == nil || len(vol.Topologies) == 0 { + snap, err := v.srv.State().Snapshot() + if err != nil { + return err + } + ws := memdb.NewWatchSet() + existingVol, err := snap.CSIVolumeByID(ws, vol.Namespace, vol.ID) + if err != nil { + return err + } + + // CSIVolume has many user-defined fields which are immutable + // once set, and many fields that are controlled by Nomad and + // are not user-settable. We merge onto a copy of the existing + // volume to allow a user to submit a volume spec for `volume + // create` and reuse it for updates in `volume register` + // without having to manually remove the fields unused by + // register (and similar use cases with API consumers such as + // Terraform). + if existingVol != nil { + existingVol = existingVol.Copy() + err = existingVol.Merge(existingVol) + if err != nil { + return err + } + vol = existingVol + } else if vol.Topologies == nil || len(vol.Topologies) == 0 { + // The topologies for the volume have already been set when it + // was created, so for newly register volumes we accept the + // user's description of that topology if vol.RequestedTopologies != nil { vol.Topologies = vol.RequestedTopologies.Required } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index e4235ee213e8..61f828f1126b 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -45,7 +45,7 @@ func TestCSIVolumeEndpoint_Get(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, }}, }} - err := state.CSIVolumeRegister(999, vols) + err := state.UpsertCSIVolume(999, vols) require.NoError(t, err) // Create the register request @@ -95,7 +95,7 @@ func TestCSIVolumeEndpoint_Get_ACL(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, }}, }} - err := state.CSIVolumeRegister(999, vols) + err := state.UpsertCSIVolume(999, vols) require.NoError(t, err) // Create the register request @@ -286,7 +286,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) { }}, }} index++ - err = state.CSIVolumeRegister(index, vols) + err = state.UpsertCSIVolume(index, vols) require.NoError(t, err) // Verify that the volume exists, and is healthy @@ -425,7 +425,7 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, }}, }} - err = state.CSIVolumeRegister(1003, vols) + err = state.UpsertCSIVolume(1003, vols) require.NoError(t, err) alloc := mock.BatchAlloc() @@ -535,7 +535,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { } index++ - err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + err = state.UpsertCSIVolume(index, []*structs.CSIVolume{vol}) require.NoError(t, err) // setup: create an alloc that will claim our volume @@ -642,7 +642,7 @@ func TestCSIVolumeEndpoint_List(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, }}, }} - err = state.CSIVolumeRegister(1002, vols) + err = state.UpsertCSIVolume(1002, vols) require.NoError(t, err) // Query everything in the namespace @@ -721,7 +721,7 @@ func TestCSIVolumeEndpoint_ListAllNamespaces(t *testing.T) { }}, }, } - err = state.CSIVolumeRegister(1001, vols) + err = state.UpsertCSIVolume(1001, vols) require.NoError(t, err) // Lookup volumes in all namespaces @@ -956,7 +956,7 @@ func TestCSIVolumeEndpoint_Delete(t *testing.T) { Secrets: structs.CSISecrets{"mysecret": "secretvalue"}, }} index++ - err = state.CSIVolumeRegister(index, vols) + err = state.UpsertCSIVolume(index, vols) require.NoError(t, err) // Delete volumes @@ -1174,7 +1174,7 @@ func TestCSIVolumeEndpoint_CreateSnapshot(t *testing.T) { ExternalID: "vol-12345", }} index++ - require.NoError(t, state.CSIVolumeRegister(index, vols)) + require.NoError(t, state.UpsertCSIVolume(index, vols)) // Create the snapshot request req1 := &structs.CSISnapshotCreateRequest{ @@ -1649,7 +1649,7 @@ func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) { ControllerRequired: false, }, } - err = state.CSIVolumeRegister(1002, vols) + err = state.UpsertCSIVolume(1002, vols) require.NoError(t, err) // has controller diff --git a/nomad/fsm.go b/nomad/fsm.go index 6fcd0a04478a..c288fc09b4c2 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1260,7 +1260,7 @@ func (n *nomadFSM) applyCSIVolumeRegister(buf []byte, index uint64) interface{} } defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_register"}, time.Now()) - if err := n.state.CSIVolumeRegister(index, req.Volumes); err != nil { + if err := n.state.UpsertCSIVolume(index, req.Volumes); err != nil { n.logger.Error("CSIVolumeRegister failed", "error", err) return err } diff --git a/nomad/search_endpoint_test.go b/nomad/search_endpoint_test.go index f4a4fa8ef1ec..cbd634b3e74d 100644 --- a/nomad/search_endpoint_test.go +++ b/nomad/search_endpoint_test.go @@ -728,7 +728,7 @@ func TestSearch_PrefixSearch_CSIVolume(t *testing.T) { testutil.WaitForLeader(t, s.RPC) id := uuid.Generate() - err := s.fsm.State().CSIVolumeRegister(1000, []*structs.CSIVolume{{ + err := s.fsm.State().UpsertCSIVolume(1000, []*structs.CSIVolume{{ ID: id, Namespace: structs.DefaultNamespace, PluginID: "glade", @@ -1348,7 +1348,7 @@ func TestSearch_FuzzySearch_CSIVolume(t *testing.T) { testutil.WaitForLeader(t, s.RPC) id := uuid.Generate() - err := s.fsm.State().CSIVolumeRegister(1000, []*structs.CSIVolume{{ + err := s.fsm.State().UpsertCSIVolume(1000, []*structs.CSIVolume{{ ID: id, Namespace: structs.DefaultNamespace, PluginID: "glade", diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index b8215b4baedc..9974ec3014fb 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2143,8 +2143,8 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) return iter, nil } -// CSIVolumeRegister adds a volume to the server store, failing if it already exists -func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error { +// UpsertCSIVolume inserts a volume in the state store. +func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) error { txn := s.db.WriteTxn(index) defer txn.Abort() @@ -2155,7 +2155,6 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace) } - // Check for volume existence obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID) if err != nil { return fmt.Errorf("volume existence check error: %v", err) @@ -2164,17 +2163,20 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum // Allow some properties of a volume to be updated in place, but // prevent accidentally overwriting important properties, or // overwriting a volume in use - old, ok := obj.(*structs.CSIVolume) - if ok && - old.InUse() || - old.ExternalID != v.ExternalID || + old := obj.(*structs.CSIVolume) + if old.ExternalID != v.ExternalID || old.PluginID != v.PluginID || old.Provider != v.Provider { - return fmt.Errorf("volume exists: %s", v.ID) + return fmt.Errorf("volume identity cannot be updated: %s", v.ID) + } + s.CSIVolumeDenormalize(nil, old.Copy()) + if old.InUse() { + return fmt.Errorf("volume cannot be updated while in use") } - } - if v.CreateIndex == 0 { + v.CreateIndex = old.CreateIndex + v.ModifyIndex = index + } else { v.CreateIndex = index v.ModifyIndex = index } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 9f4dd3930a7b..32987465a28c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2684,6 +2684,10 @@ func TestStateStore_CSIVolume(t *testing.T) { v0.Schedulable = true v0.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter v0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem + v0.RequestedCapabilities = []*structs.CSIVolumeCapability{{ + AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }} index++ v1 := structs.NewCSIVolume("foo", index) @@ -2693,20 +2697,24 @@ func TestStateStore_CSIVolume(t *testing.T) { v1.Schedulable = true v1.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter v1.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem + v1.RequestedCapabilities = []*structs.CSIVolumeCapability{{ + AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }} index++ - err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0, v1}) + err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0, v1}) require.NoError(t, err) // volume registration is idempotent, unless identies are changed index++ - err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0, v1}) + err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0, v1}) require.NoError(t, err) index++ v2 := v0.Copy() v2.PluginID = "new-id" - err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v2}) + err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v2}) require.Error(t, err, fmt.Sprintf("volume exists: %s", v0.ID)) ws := memdb.NewWatchSet() @@ -2786,7 +2794,7 @@ func TestStateStore_CSIVolume(t *testing.T) { // registration is an error when the volume is in use index++ - err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0}) + err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0}) require.Error(t, err, "volume re-registered while in use") // as is deregistration index++ @@ -3126,7 +3134,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) { Namespace: structs.DefaultNamespace, PluginID: plugID, } - err = store.CSIVolumeRegister(nextIndex(store), []*structs.CSIVolume{vol}) + err = store.UpsertCSIVolume(nextIndex(store), []*structs.CSIVolume{vol}) require.NoError(t, err) err = store.DeleteJob(nextIndex(store), structs.DefaultNamespace, controllerJobID) diff --git a/nomad/state/testing.go b/nomad/state/testing.go index c7a2f3e8e27c..b288f7d3ffe5 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -305,7 +305,7 @@ func TestBadCSIState(t testing.TB, store *StateStore) error { } vol = vol.Copy() // canonicalize - err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + err = store.UpsertCSIVolume(index, []*structs.CSIVolume{vol}) if err != nil { return err } diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 3d7c5178fc5a..799a6498e5d9 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -1,6 +1,7 @@ package structs import ( + "errors" "fmt" "strings" "time" @@ -180,6 +181,22 @@ func (o *CSIMountOptions) Merge(p *CSIMountOptions) { } } +func (o *CSIMountOptions) Equal(p *CSIMountOptions) bool { + if o == nil && p == nil { + return true + } + if o == nil || p == nil { + return false + } + + if o.FSType != p.FSType { + return false + } + + return helper.CompareSliceSetString( + o.MountFlags, p.MountFlags) +} + // CSIMountOptions implements the Stringer and GoStringer interfaces to prevent // accidental leakage of sensitive mount flags via logs. var _ fmt.Stringer = &CSIMountOptions{} @@ -707,6 +724,126 @@ func (v *CSIVolume) Validate() error { return nil } +// Merge updates the mutable fields of a volume with those from +// another volume. CSIVolume has many user-defined fields which are +// immutable once set, and many fields that are not +// user-settable. Merge will return an error if we try to mutate the +// user-defined immutable fields after they're set, but silently +// ignore fields that are controlled by Nomad. +func (v *CSIVolume) Merge(other *CSIVolume) error { + if other == nil { + return nil + } + + var errs *multierror.Error + + if v.Name != other.Name && other.Name != "" { + errs = multierror.Append(errs, errors.New("volume name cannot be updated")) + } + if v.ExternalID != other.ExternalID && other.ExternalID != "" { + errs = multierror.Append(errs, errors.New( + "volume external ID cannot be updated")) + } + if v.PluginID != other.PluginID { + errs = multierror.Append(errs, errors.New( + "volume plugin ID cannot be updated")) + } + if v.CloneID != other.CloneID && other.CloneID != "" { + errs = multierror.Append(errs, errors.New( + "volume clone ID cannot be updated")) + } + if v.SnapshotID != other.SnapshotID && other.SnapshotID != "" { + errs = multierror.Append(errs, errors.New( + "volume snapshot ID cannot be updated")) + } + + // must be compatible with capacity range + // TODO: when ExpandVolume is implemented we'll need to update + // this logic https://github.com/hashicorp/nomad/issues/10324 + if other.RequestedCapacityMax < v.Capacity || + other.RequestedCapacityMin > v.Capacity { + errs = multierror.Append(errs, errors.New( + "volume requested capacity update was not compatible with existing capacity")) + } else { + v.RequestedCapacityMin = other.RequestedCapacityMin + v.RequestedCapacityMax = other.RequestedCapacityMax + } + + // must be compatible with volume_capabilities + if v.AccessMode != CSIVolumeAccessModeUnknown || + v.AttachmentMode != CSIVolumeAttachmentModeUnknown { + var ok bool + for _, cap := range other.RequestedCapabilities { + if cap.AccessMode == v.AccessMode && + cap.AttachmentMode == v.AttachmentMode { + ok = true + break + } + } + if ok { + v.RequestedCapabilities = other.RequestedCapabilities + } else { + errs = multierror.Append(errs, errors.New( + "volume requested capabilities update was not compatible with existing capability in use")) + } + } + + // topologies are immutable, so topology request changes must be + // compatible with the existing topology, if any + if len(v.Topologies) > 0 { + if other.RequestedTopologies == nil { + if v.RequestedTopologies != nil { + errs = multierror.Append(errs, errors.New( + "volume topology request update was not compatible with existing topology")) + } + } else { + var err error + for _, otherTopo := range other.RequestedTopologies.Required { + if !otherTopo.MatchFound(v.Topologies) { + err = errors.New( + "volume topology requirement update was not compatible with existing topology") + break + } + } + for _, otherTopo := range other.RequestedTopologies.Preferred { + if !otherTopo.MatchFound(v.Topologies) { + err = errors.New( + "volume topology preference update was not compatible with existing topology") + break + } + } + if err != nil { + errs = multierror.Append(errs, err) + } else { + v.RequestedTopologies = other.RequestedTopologies + } + } + } + + // MountOptions are immutable once set + if v.MountOptions.Equal(other.MountOptions) { + v.MountOptions = other.MountOptions + } else { + errs = multierror.Append(errs, errors.New( + "volume mount options cannot be updated")) + } + + // Secrets can be updated freely + v.Secrets = other.Secrets + + // must be compatible with parameters set by from CreateVolumeResponse + if !helper.CompareMapStringString(v.Parameters, other.Parameters) { + errs = multierror.Append(errs, errors.New( + "volume parameters cannot be updated")) + } + + // Context is mutable and will be used during controller + // validation + v.Context = other.Context + + return errs.ErrorOrNil() +} + // Request and response wrappers type CSIVolumeRegisterRequest struct { Volumes []*CSIVolume diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index 855a65871b47..df09685e703e 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -569,6 +569,179 @@ func TestCSIVolume_Validate(t *testing.T) { } +func TestCSIVolume_Merge(t *testing.T) { + + testCases := []struct { + name string + v *CSIVolume + update *CSIVolume + expected string + expectFn func(t *testing.T, v *CSIVolume) + }{ + { + name: "invalid capacity update", + v: &CSIVolume{Capacity: 100}, + update: &CSIVolume{ + RequestedCapacityMax: 300, RequestedCapacityMin: 200}, + expected: "volume requested capacity update was not compatible with existing capacity", + expectFn: func(t *testing.T, v *CSIVolume) { + require.NotEqual(t, 300, v.RequestedCapacityMax) + require.NotEqual(t, 200, v.RequestedCapacityMin) + }, + }, + { + name: "invalid capability update", + v: &CSIVolume{ + AccessMode: CSIVolumeAccessModeMultiNodeReader, + AttachmentMode: CSIVolumeAttachmentModeFilesystem, + }, + update: &CSIVolume{ + RequestedCapabilities: []*CSIVolumeCapability{ + { + AccessMode: CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: CSIVolumeAttachmentModeFilesystem, + }, + }, + }, + expected: "volume requested capabilities update was not compatible with existing capability in use", + }, + { + name: "invalid topology update - removed", + v: &CSIVolume{ + RequestedTopologies: &CSITopologyRequest{ + Required: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + }, + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + }, + update: &CSIVolume{}, + expected: "volume topology request update was not compatible with existing topology", + expectFn: func(t *testing.T, v *CSIVolume) { + require.Len(t, v.Topologies, 1) + }, + }, + { + name: "invalid topology requirement update", + v: &CSIVolume{ + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + }, + update: &CSIVolume{ + RequestedTopologies: &CSITopologyRequest{ + Required: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R3"}}, + }, + }, + }, + expected: "volume topology requirement update was not compatible with existing topology", + expectFn: func(t *testing.T, v *CSIVolume) { + require.Len(t, v.Topologies, 1) + require.Equal(t, "R1", v.Topologies[0].Segments["rack"]) + }, + }, + { + name: "invalid topology preference update", + v: &CSIVolume{ + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + }, + update: &CSIVolume{ + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + RequestedTopologies: &CSITopologyRequest{ + Preferred: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R3"}}, + }, + }, + }, + expected: "volume topology preference update was not compatible with existing topology", + }, + { + name: "invalid mount options update", + v: &CSIVolume{ + MountOptions: &CSIMountOptions{ + FSType: "ext4", + MountFlags: []string{"noatime"}, + }, + }, + update: &CSIVolume{ + MountOptions: &CSIMountOptions{ + FSType: "ext4", + MountFlags: []string{"noatime", "ro"}, + }, + }, + expected: "volume mount options cannot be updated", + }, + { + name: "valid update", + v: &CSIVolume{ + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R2"}}, + }, + AccessMode: CSIVolumeAccessModeMultiNodeReader, + AttachmentMode: CSIVolumeAttachmentModeFilesystem, + MountOptions: &CSIMountOptions{ + FSType: "ext4", + MountFlags: []string{"noatime"}, + }, + }, + update: &CSIVolume{ + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R2"}}, + }, + MountOptions: &CSIMountOptions{ + FSType: "ext4", + MountFlags: []string{"noatime"}, + }, + RequestedTopologies: &CSITopologyRequest{ + Required: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R2"}}, + }, + Preferred: []*CSITopology{ + {Segments: map[string]string{"rack": "R2"}}, + }, + }, + RequestedCapabilities: []*CSIVolumeCapability{ + { + AccessMode: CSIVolumeAccessModeMultiNodeReader, + AttachmentMode: CSIVolumeAttachmentModeFilesystem, + }, + { + AccessMode: CSIVolumeAccessModeMultiNodeReader, + AttachmentMode: CSIVolumeAttachmentModeFilesystem, + }, + }, + }, + }, + } + for _, tc := range testCases { + tc = tc + t.Run(tc.name, func(t *testing.T) { + err := tc.v.Merge(tc.update) + if tc.expected == "" { + require.NoError(t, err) + } else { + if tc.expectFn != nil { + tc.expectFn(t, tc.v) + } + require.Error(t, err, tc.expected) + require.Contains(t, err.Error(), tc.expected) + } + }) + } +} + func TestCSIPluginJobs(t *testing.T) { plug := NewCSIPlugin("foo", 1000) controller := &Job{ diff --git a/nomad/structs/node.go b/nomad/structs/node.go index 6a8f9cd47a56..ed3802a8fb64 100644 --- a/nomad/structs/node.go +++ b/nomad/structs/node.go @@ -62,6 +62,19 @@ func (t *CSITopology) Equal(o *CSITopology) bool { return helper.CompareMapStringString(t.Segments, o.Segments) } +func (t *CSITopology) MatchFound(o []*CSITopology) bool { + if t == nil || o == nil || len(o) == 0 { + return false + } + + for _, other := range o { + if t.Equal(other) { + return true + } + } + return false +} + // CSITopologyRequest are the topologies submitted as options to the // storage provider at the time the volume was created. The storage // provider will return a single topology. diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 1652faa403ea..47f1c970ad83 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -32,7 +32,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { vol := testVolume(plugin, alloc, node.ID) index++ - err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol}) require.NoError(err) claim := &structs.CSIVolumeClaim{ @@ -78,7 +78,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { watcher.SetEnabled(true, srv.State(), "") index++ - err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + err = srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol}) require.NoError(err) // we should get or start up a watcher when we get an update for @@ -167,7 +167,7 @@ func TestVolumeWatch_StartStop(t *testing.T) { // register a volume vol := testVolume(plugin, alloc1, node.ID) index++ - err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + err = srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol}) require.NoError(err) // assert we get a watcher; there are no claims so it should immediately stop @@ -254,7 +254,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) { // register a volume without claims vol := mock.CSIVolume(plugin) index++ - err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol}) require.NoError(err) // watcher should be started but immediately stopped diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 3c478eb695d6..2fcf1a325a6d 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -318,14 +318,7 @@ func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) { // volume MUST be accessible from at least one of the // requisite topologies." if len(vol.Topologies) > 0 { - var ok bool - for _, requiredTopo := range vol.Topologies { - if requiredTopo.Equal(plugin.NodeInfo.AccessibleTopology) { - ok = true - break - } - } - if !ok { + if !plugin.NodeInfo.AccessibleTopology.MatchFound(vol.Topologies) { return false, FilterConstraintsCSIPluginTopology } } diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 26e2e79072da..1689fd415b22 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -342,7 +342,7 @@ func TestCSIVolumeChecker(t *testing.T) { {Segments: map[string]string{"rack": "R1"}}, {Segments: map[string]string{"rack": "R2"}}, } - err := state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + err := state.UpsertCSIVolume(index, []*structs.CSIVolume{vol}) require.NoError(t, err) index++ @@ -353,14 +353,14 @@ func TestCSIVolumeChecker(t *testing.T) { vol2.Namespace = structs.DefaultNamespace vol2.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter vol2.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem - err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol2}) + err = state.UpsertCSIVolume(index, []*structs.CSIVolume{vol2}) require.NoError(t, err) index++ vid3 := "volume-id[0]" vol3 := vol.Copy() vol3.ID = vid3 - err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol3}) + err = state.UpsertCSIVolume(index, []*structs.CSIVolume{vol3}) require.NoError(t, err) index++ diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index ecb2784c6bf8..713c6a94e82f 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -6133,7 +6133,7 @@ func TestServiceSched_CSIVolumesPerAlloc(t *testing.T) { // once its been fixed shared.AccessMode = structs.CSIVolumeAccessModeMultiNodeReader - require.NoError(h.State.CSIVolumeRegister( + require.NoError(h.State.UpsertCSIVolume( h.NextIndex(), []*structs.CSIVolume{shared, vol0, vol1, vol2})) // Create a job that uses both @@ -6226,7 +6226,7 @@ func TestServiceSched_CSIVolumesPerAlloc(t *testing.T) { vol4.ID = "volume-unique[3]" vol5 := vol0.Copy() vol5.ID = "volume-unique[4]" - require.NoError(h.State.CSIVolumeRegister( + require.NoError(h.State.UpsertCSIVolume( h.NextIndex(), []*structs.CSIVolume{vol4, vol5})) // Process again with failure fixed. It should create a new plan diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index 66ff92840d28..2f36e0014543 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -245,7 +245,7 @@ func TestServiceStack_Select_CSI(t *testing.T) { v.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter v.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem v.PluginID = "bar" - err := state.CSIVolumeRegister(999, []*structs.CSIVolume{v}) + err := state.UpsertCSIVolume(999, []*structs.CSIVolume{v}) require.NoError(t, err) // Create a node with healthy fingerprints for both controller and node plugins