From 2c23e2546a6919ce34f66fe10e1feaa1a538c697 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 3 Mar 2022 11:18:09 -0500 Subject: [PATCH] csi: allow updates to volumes on re-registration CSI `CreateVolume` RPC is idempotent given that the topology, capabilities, and parameters are unchanged. CSI volumes have many user-defined fields that are immutable once set, and many fields that are not user-settable. Update the `Register` RPC so that updating a volume via the API merges onto any existing volume without touching Nomad-controlled fields, while validating it with the same strict requirements expected for idempotent `CreateVolume` RPCs. --- nomad/csi_endpoint.go | 58 ++++++++++-- nomad/csi_endpoint_test.go | 1 - nomad/state/state_store.go | 18 ++-- nomad/state/state_store_test.go | 8 ++ nomad/structs/csi.go | 137 ++++++++++++++++++++++++++++ nomad/structs/csi_test.go | 157 ++++++++++++++++++++++++++++++++ 6 files changed, 361 insertions(+), 18 deletions(-) diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 092fc16ffd0a..cc14295d6686 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -238,6 +238,7 @@ func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, vol.Provider = plugin.Provider vol.ProviderVersion = plugin.Version + return plugin, nil } @@ -263,7 +264,15 @@ func (v *CSIVolume) controllerValidateVolume(req *structs.CSIVolumeRegisterReque return v.srv.RPC(method, cReq, cResp) } -// Register registers a new volume +// Register registers a new volume or updates an existing volume. Note +// that most user-defined CSIVolume fields are immutable once the +// volume has been created. +// +// If the user needs to change fields because they've misconfigured +// the registration of the external volume, we expect that claims +// won't work either, and the user can deregister the volume and try +// again with the right settings. This lets us be as strict with +// validation here as the CreateVolume CSI RPC is expected to be. func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *structs.CSIVolumeRegisterResponse) error { if done, err := v.srv.forward("CSIVolume.Register", args, args, reply); done { return err @@ -289,26 +298,57 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru // We also validate that the plugin exists for each plugin, and validate the // capabilities when the plugin has a controller. for _, vol := range args.Volumes { - vol.Namespace = args.RequestNamespace() + + snap, err := v.srv.State().Snapshot() + if err != nil { + return err + } + // TODO: allow volume spec file to set namespace + // https://github.com/hashicorp/nomad/issues/11196 + if vol.Namespace == "" { + vol.Namespace = args.RequestNamespace() + } if err = vol.Validate(); err != nil { return err } - plugin, err := v.pluginValidateVolume(args, vol) + ws := memdb.NewWatchSet() + existingVol, err := snap.CSIVolumeByID(ws, vol.Namespace, vol.ID) if err != nil { return err } - if err := v.controllerValidateVolume(args, vol, plugin); err != nil { - return err - } - // The topologies for the volume have already been set when it was - // created, so we accept the user's description of that topology - if vol.Topologies == nil || len(vol.Topologies) == 0 { + // CSIVolume has many user-defined fields which are immutable + // once set, and many fields that are controlled by Nomad and + // are not user-settable. We merge onto a copy of the existing + // volume to allow a user to submit a volume spec for `volume + // create` and reuse it for updates in `volume register` + // without having to manually remove the fields unused by + // register (and similar use cases with API consumers such as + // Terraform). + if existingVol != nil { + existingVol = existingVol.Copy() + err = existingVol.Merge(vol) + if err != nil { + return err + } + *vol = *existingVol + } else if vol.Topologies == nil || len(vol.Topologies) == 0 { + // The topologies for the volume have already been set + // when it was created, so for newly register volumes + // we accept the user's description of that topology if vol.RequestedTopologies != nil { vol.Topologies = vol.RequestedTopologies.Required } } + + plugin, err := v.pluginValidateVolume(args, vol) + if err != nil { + return err + } + if err := v.controllerValidateVolume(args, vol, plugin); err != nil { + return err + } } resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args) diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 61f828f1126b..835da2b70003 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -145,7 +145,6 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) { // Create the volume vols := []*structs.CSIVolume{{ ID: id0, - Namespace: "notTheNamespace", PluginID: "minnie", AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, // legacy field ignored AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, // legacy field ignored diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6b501bc80016..9974ec3014fb 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2155,7 +2155,6 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace) } - // Check for volume existence obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID) if err != nil { return fmt.Errorf("volume existence check error: %v", err) @@ -2164,17 +2163,20 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) // Allow some properties of a volume to be updated in place, but // prevent accidentally overwriting important properties, or // overwriting a volume in use - old, ok := obj.(*structs.CSIVolume) - if ok && - old.InUse() || - old.ExternalID != v.ExternalID || + old := obj.(*structs.CSIVolume) + if old.ExternalID != v.ExternalID || old.PluginID != v.PluginID || old.Provider != v.Provider { - return fmt.Errorf("volume exists: %s", v.ID) + return fmt.Errorf("volume identity cannot be updated: %s", v.ID) + } + s.CSIVolumeDenormalize(nil, old.Copy()) + if old.InUse() { + return fmt.Errorf("volume cannot be updated while in use") } - } - if v.CreateIndex == 0 { + v.CreateIndex = old.CreateIndex + v.ModifyIndex = index + } else { v.CreateIndex = index v.ModifyIndex = index } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 547faee59c5f..32987465a28c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2684,6 +2684,10 @@ func TestStateStore_CSIVolume(t *testing.T) { v0.Schedulable = true v0.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter v0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem + v0.RequestedCapabilities = []*structs.CSIVolumeCapability{{ + AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }} index++ v1 := structs.NewCSIVolume("foo", index) @@ -2693,6 +2697,10 @@ func TestStateStore_CSIVolume(t *testing.T) { v1.Schedulable = true v1.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter v1.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem + v1.RequestedCapabilities = []*structs.CSIVolumeCapability{{ + AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }} index++ err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0, v1}) diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 3d7c5178fc5a..8104dc017ad3 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -1,6 +1,7 @@ package structs import ( + "errors" "fmt" "strings" "time" @@ -180,6 +181,22 @@ func (o *CSIMountOptions) Merge(p *CSIMountOptions) { } } +func (o *CSIMountOptions) Equal(p *CSIMountOptions) bool { + if o == nil && p == nil { + return true + } + if o == nil || p == nil { + return false + } + + if o.FSType != p.FSType { + return false + } + + return helper.CompareSliceSetString( + o.MountFlags, p.MountFlags) +} + // CSIMountOptions implements the Stringer and GoStringer interfaces to prevent // accidental leakage of sensitive mount flags via logs. var _ fmt.Stringer = &CSIMountOptions{} @@ -707,6 +724,126 @@ func (v *CSIVolume) Validate() error { return nil } +// Merge updates the mutable fields of a volume with those from +// another volume. CSIVolume has many user-defined fields which are +// immutable once set, and many fields that are not +// user-settable. Merge will return an error if we try to mutate the +// user-defined immutable fields after they're set, but silently +// ignore fields that are controlled by Nomad. +func (v *CSIVolume) Merge(other *CSIVolume) error { + if other == nil { + return nil + } + + var errs *multierror.Error + + if v.Name != other.Name && other.Name != "" { + errs = multierror.Append(errs, errors.New("volume name cannot be updated")) + } + if v.ExternalID != other.ExternalID && other.ExternalID != "" { + errs = multierror.Append(errs, errors.New( + "volume external ID cannot be updated")) + } + if v.PluginID != other.PluginID { + errs = multierror.Append(errs, errors.New( + "volume plugin ID cannot be updated")) + } + if v.CloneID != other.CloneID && other.CloneID != "" { + errs = multierror.Append(errs, errors.New( + "volume clone ID cannot be updated")) + } + if v.SnapshotID != other.SnapshotID && other.SnapshotID != "" { + errs = multierror.Append(errs, errors.New( + "volume snapshot ID cannot be updated")) + } + + // must be compatible with capacity range + // TODO: when ExpandVolume is implemented we'll need to update + // this logic https://github.com/hashicorp/nomad/issues/10324 + if v.Capacity != 0 { + if other.RequestedCapacityMax < v.Capacity || + other.RequestedCapacityMin > v.Capacity { + errs = multierror.Append(errs, errors.New( + "volume requested capacity update was not compatible with existing capacity")) + } else { + v.RequestedCapacityMin = other.RequestedCapacityMin + v.RequestedCapacityMax = other.RequestedCapacityMax + } + } + + // must be compatible with volume_capabilities + if v.AccessMode != CSIVolumeAccessModeUnknown || + v.AttachmentMode != CSIVolumeAttachmentModeUnknown { + var ok bool + for _, cap := range other.RequestedCapabilities { + if cap.AccessMode == v.AccessMode && + cap.AttachmentMode == v.AttachmentMode { + ok = true + break + } + } + if ok { + v.RequestedCapabilities = other.RequestedCapabilities + } else { + errs = multierror.Append(errs, errors.New( + "volume requested capabilities update was not compatible with existing capability in use")) + } + } else { + v.RequestedCapabilities = other.RequestedCapabilities + } + + // topologies are immutable, so topology request changes must be + // compatible with the existing topology, if any + if len(v.Topologies) > 0 { + if other.RequestedTopologies == nil { + if v.RequestedTopologies != nil { + errs = multierror.Append(errs, errors.New( + "volume topology request update was not compatible with existing topology")) + } + } else { + var err error + for _, otherTopo := range other.RequestedTopologies.Required { + if !otherTopo.MatchFound(v.Topologies) { + err = errors.New( + "volume topology requirement update was not compatible with existing topology") + break + } + } + for _, otherTopo := range other.RequestedTopologies.Preferred { + if !otherTopo.MatchFound(v.Topologies) { + err = errors.New( + "volume topology preference update was not compatible with existing topology") + break + } + } + if err != nil { + errs = multierror.Append(errs, err) + } else { + v.RequestedTopologies = other.RequestedTopologies + } + } + } + + // MountOptions can be updated so long as the volume isn't in use, + // but the caller will reject updating an in-use volume + v.MountOptions = other.MountOptions + + // Secrets can be updated freely + v.Secrets = other.Secrets + + // must be compatible with parameters set by from CreateVolumeResponse + + if len(other.Parameters) != 0 && !helper.CompareMapStringString(v.Parameters, other.Parameters) { + errs = multierror.Append(errs, errors.New( + "volume parameters cannot be updated")) + } + + // Context is mutable and will be used during controller + // validation + v.Context = other.Context + return errs.ErrorOrNil() +} + // Request and response wrappers type CSIVolumeRegisterRequest struct { Volumes []*CSIVolume diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index 855a65871b47..0dec0a2f2718 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -569,6 +569,163 @@ func TestCSIVolume_Validate(t *testing.T) { } +func TestCSIVolume_Merge(t *testing.T) { + + testCases := []struct { + name string + v *CSIVolume + update *CSIVolume + expected string + expectFn func(t *testing.T, v *CSIVolume) + }{ + { + name: "invalid capacity update", + v: &CSIVolume{Capacity: 100}, + update: &CSIVolume{ + RequestedCapacityMax: 300, RequestedCapacityMin: 200}, + expected: "volume requested capacity update was not compatible with existing capacity", + expectFn: func(t *testing.T, v *CSIVolume) { + require.NotEqual(t, 300, v.RequestedCapacityMax) + require.NotEqual(t, 200, v.RequestedCapacityMin) + }, + }, + { + name: "invalid capability update", + v: &CSIVolume{ + AccessMode: CSIVolumeAccessModeMultiNodeReader, + AttachmentMode: CSIVolumeAttachmentModeFilesystem, + }, + update: &CSIVolume{ + RequestedCapabilities: []*CSIVolumeCapability{ + { + AccessMode: CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: CSIVolumeAttachmentModeFilesystem, + }, + }, + }, + expected: "volume requested capabilities update was not compatible with existing capability in use", + }, + { + name: "invalid topology update - removed", + v: &CSIVolume{ + RequestedTopologies: &CSITopologyRequest{ + Required: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + }, + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + }, + update: &CSIVolume{}, + expected: "volume topology request update was not compatible with existing topology", + expectFn: func(t *testing.T, v *CSIVolume) { + require.Len(t, v.Topologies, 1) + }, + }, + { + name: "invalid topology requirement update", + v: &CSIVolume{ + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + }, + update: &CSIVolume{ + RequestedTopologies: &CSITopologyRequest{ + Required: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R3"}}, + }, + }, + }, + expected: "volume topology requirement update was not compatible with existing topology", + expectFn: func(t *testing.T, v *CSIVolume) { + require.Len(t, v.Topologies, 1) + require.Equal(t, "R1", v.Topologies[0].Segments["rack"]) + }, + }, + { + name: "invalid topology preference update", + v: &CSIVolume{ + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + }, + update: &CSIVolume{ + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + RequestedTopologies: &CSITopologyRequest{ + Preferred: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R3"}}, + }, + }, + }, + expected: "volume topology preference update was not compatible with existing topology", + }, + { + name: "valid update", + v: &CSIVolume{ + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R2"}}, + }, + AccessMode: CSIVolumeAccessModeMultiNodeReader, + AttachmentMode: CSIVolumeAttachmentModeFilesystem, + MountOptions: &CSIMountOptions{ + FSType: "ext4", + MountFlags: []string{"noatime"}, + }, + }, + update: &CSIVolume{ + Topologies: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R2"}}, + }, + MountOptions: &CSIMountOptions{ + FSType: "ext4", + MountFlags: []string{"noatime"}, + }, + RequestedTopologies: &CSITopologyRequest{ + Required: []*CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R2"}}, + }, + Preferred: []*CSITopology{ + {Segments: map[string]string{"rack": "R2"}}, + }, + }, + RequestedCapabilities: []*CSIVolumeCapability{ + { + AccessMode: CSIVolumeAccessModeMultiNodeReader, + AttachmentMode: CSIVolumeAttachmentModeFilesystem, + }, + { + AccessMode: CSIVolumeAccessModeMultiNodeReader, + AttachmentMode: CSIVolumeAttachmentModeFilesystem, + }, + }, + }, + }, + } + for _, tc := range testCases { + tc = tc + t.Run(tc.name, func(t *testing.T) { + err := tc.v.Merge(tc.update) + if tc.expected == "" { + require.NoError(t, err) + } else { + if tc.expectFn != nil { + tc.expectFn(t, tc.v) + } + require.Error(t, err, tc.expected) + require.Contains(t, err.Error(), tc.expected) + } + }) + } +} + func TestCSIPluginJobs(t *testing.T) { plug := NewCSIPlugin("foo", 1000) controller := &Job{