Skip to content

Commit

Permalink
CSI: allow updates to volumes on re-registration (#12167)
Browse files Browse the repository at this point in the history
CSI `CreateVolume` RPC is idempotent given that the topology,
capabilities, and parameters are unchanged. CSI volumes have many
user-defined fields that are immutable once set, and many fields that
are not user-settable.

Update the `Register` RPC so that updating a volume via the API merges
onto any existing volume without touching Nomad-controlled fields,
while validating it with the same strict requirements expected for
idempotent `CreateVolume` RPCs.

Also, clarify that this state store method is used for everything, not just
for the `Register` RPC.
  • Loading branch information
tgross committed Mar 7, 2022
1 parent 711a9d9 commit 7d0f87b
Show file tree
Hide file tree
Showing 20 changed files with 428 additions and 59 deletions.
3 changes: 3 additions & 0 deletions .changelog/12167.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
csi: Allow volumes to be re-registered to be updated while not in use
```
4 changes: 4 additions & 0 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum
resp.CapacityBytes = cresp.Volume.CapacityBytes
resp.VolumeContext = cresp.Volume.VolumeContext

// Note: we safely throw away cresp.Volume.ContentSource here
// because it's just round-tripping the value set by the user in
// the server RPC call

resp.Topologies = make([]*nstructs.CSITopology, len(cresp.Volume.AccessibleTopology))
for _, topo := range cresp.Volume.AccessibleTopology {
resp.Topologies = append(resp.Topologies,
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func TestAllocStatusCommand_CSIVolumes(t *testing.T) {
Segments: map[string]string{"foo": "bar"},
}},
}}
err = state.CSIVolumeRegister(1002, vols)
err = state.UpsertCSIVolume(1002, vols)
require.NoError(t, err)

// Upsert the job and alloc
Expand Down
1 change: 1 addition & 0 deletions command/volume_register_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (c *VolumeRegisterCommand) csiRegister(client *api.Client, ast *ast.File) i
return 1
}

c.Ui.Output(fmt.Sprintf("Volume %q registered", vol.ID))
return 0
}

Expand Down
2 changes: 1 addition & 1 deletion command/volume_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestCSIVolumeStatusCommand_AutocompleteArgs(t *testing.T) {
PluginID: "glade",
}

require.NoError(t, state.CSIVolumeRegister(1000, []*structs.CSIVolume{vol}))
require.NoError(t, state.UpsertCSIVolume(1000, []*structs.CSIVolume{vol}))

prefix := vol.ID[:len(vol.ID)-5]
args := complete.Args{Last: prefix}
Expand Down
58 changes: 49 additions & 9 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest,

vol.Provider = plugin.Provider
vol.ProviderVersion = plugin.Version

return plugin, nil
}

Expand All @@ -265,7 +266,15 @@ func (v *CSIVolume) controllerValidateVolume(req *structs.CSIVolumeRegisterReque
return v.srv.RPC(method, cReq, cResp)
}

// Register registers a new volume
// Register registers a new volume or updates an existing volume. Note
// that most user-defined CSIVolume fields are immutable once the
// volume has been created.
//
// If the user needs to change fields because they've misconfigured
// the registration of the external volume, we expect that claims
// won't work either, and the user can deregister the volume and try
// again with the right settings. This lets us be as strict with
// validation here as the CreateVolume CSI RPC is expected to be.
func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *structs.CSIVolumeRegisterResponse) error {
if done, err := v.srv.forward("CSIVolume.Register", args, args, reply); done {
return err
Expand All @@ -291,26 +300,57 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
// We also validate that the plugin exists for each plugin, and validate the
// capabilities when the plugin has a controller.
for _, vol := range args.Volumes {
vol.Namespace = args.RequestNamespace()

snap, err := v.srv.State().Snapshot()
if err != nil {
return err
}
// TODO: allow volume spec file to set namespace
// https://github.com/hashicorp/nomad/issues/11196
if vol.Namespace == "" {
vol.Namespace = args.RequestNamespace()
}
if err = vol.Validate(); err != nil {
return err
}

plugin, err := v.pluginValidateVolume(args, vol)
ws := memdb.NewWatchSet()
existingVol, err := snap.CSIVolumeByID(ws, vol.Namespace, vol.ID)
if err != nil {
return err
}
if err := v.controllerValidateVolume(args, vol, plugin); err != nil {
return err
}

// The topologies for the volume have already been set when it was
// created, so we accept the user's description of that topology
if vol.Topologies == nil || len(vol.Topologies) == 0 {
// CSIVolume has many user-defined fields which are immutable
// once set, and many fields that are controlled by Nomad and
// are not user-settable. We merge onto a copy of the existing
// volume to allow a user to submit a volume spec for `volume
// create` and reuse it for updates in `volume register`
// without having to manually remove the fields unused by
// register (and similar use cases with API consumers such as
// Terraform).
if existingVol != nil {
existingVol = existingVol.Copy()
err = existingVol.Merge(vol)
if err != nil {
return err
}
*vol = *existingVol
} else if vol.Topologies == nil || len(vol.Topologies) == 0 {
// The topologies for the volume have already been set
// when it was created, so for newly register volumes
// we accept the user's description of that topology
if vol.RequestedTopologies != nil {
vol.Topologies = vol.RequestedTopologies.Required
}
}

plugin, err := v.pluginValidateVolume(args, vol)
if err != nil {
return err
}
if err := v.controllerValidateVolume(args, vol, plugin); err != nil {
return err
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
Expand Down
21 changes: 10 additions & 11 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestCSIVolumeEndpoint_Get(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}},
}}
err := state.CSIVolumeRegister(999, vols)
err := state.UpsertCSIVolume(999, vols)
require.NoError(t, err)

// Create the register request
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestCSIVolumeEndpoint_Get_ACL(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}},
}}
err := state.CSIVolumeRegister(999, vols)
err := state.UpsertCSIVolume(999, vols)
require.NoError(t, err)

// Create the register request
Expand Down Expand Up @@ -145,7 +145,6 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) {
// Create the volume
vols := []*structs.CSIVolume{{
ID: id0,
Namespace: "notTheNamespace",
PluginID: "minnie",
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, // legacy field ignored
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, // legacy field ignored
Expand Down Expand Up @@ -286,7 +285,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
}},
}}
index++
err = state.CSIVolumeRegister(index, vols)
err = state.UpsertCSIVolume(index, vols)
require.NoError(t, err)

// Verify that the volume exists, and is healthy
Expand Down Expand Up @@ -425,7 +424,7 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}},
}}
err = state.CSIVolumeRegister(1003, vols)
err = state.UpsertCSIVolume(1003, vols)
require.NoError(t, err)

alloc := mock.BatchAlloc()
Expand Down Expand Up @@ -535,7 +534,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
}

index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)

// setup: create an alloc that will claim our volume
Expand Down Expand Up @@ -642,7 +641,7 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}},
}}
err = state.CSIVolumeRegister(1002, vols)
err = state.UpsertCSIVolume(1002, vols)
require.NoError(t, err)

// Query everything in the namespace
Expand Down Expand Up @@ -721,7 +720,7 @@ func TestCSIVolumeEndpoint_ListAllNamespaces(t *testing.T) {
}},
},
}
err = state.CSIVolumeRegister(1001, vols)
err = state.UpsertCSIVolume(1001, vols)
require.NoError(t, err)

// Lookup volumes in all namespaces
Expand Down Expand Up @@ -972,7 +971,7 @@ func TestCSIVolumeEndpoint_Delete(t *testing.T) {
Secrets: structs.CSISecrets{"mysecret": "secretvalue"},
}}
index++
err = state.CSIVolumeRegister(index, vols)
err = state.UpsertCSIVolume(index, vols)
require.NoError(t, err)

// Delete volumes
Expand Down Expand Up @@ -1191,7 +1190,7 @@ func TestCSIVolumeEndpoint_CreateSnapshot(t *testing.T) {
ExternalID: "vol-12345",
}}
index++
require.NoError(t, state.CSIVolumeRegister(index, vols))
require.NoError(t, state.UpsertCSIVolume(index, vols))

// Create the snapshot request
req1 := &structs.CSISnapshotCreateRequest{
Expand Down Expand Up @@ -1665,7 +1664,7 @@ func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) {
ControllerRequired: false,
},
}
err = state.CSIVolumeRegister(1002, vols)
err = state.UpsertCSIVolume(1002, vols)
require.NoError(t, err)

// has controller
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ func (n *nomadFSM) applyCSIVolumeRegister(buf []byte, index uint64) interface{}
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_register"}, time.Now())

if err := n.state.CSIVolumeRegister(index, req.Volumes); err != nil {
if err := n.state.UpsertCSIVolume(index, req.Volumes); err != nil {
n.logger.Error("CSIVolumeRegister failed", "error", err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/search_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ func TestSearch_PrefixSearch_CSIVolume(t *testing.T) {
testutil.WaitForLeader(t, s.RPC)

id := uuid.Generate()
err := s.fsm.State().CSIVolumeRegister(1000, []*structs.CSIVolume{{
err := s.fsm.State().UpsertCSIVolume(1000, []*structs.CSIVolume{{
ID: id,
Namespace: structs.DefaultNamespace,
PluginID: "glade",
Expand Down Expand Up @@ -1348,7 +1348,7 @@ func TestSearch_FuzzySearch_CSIVolume(t *testing.T) {
testutil.WaitForLeader(t, s.RPC)

id := uuid.Generate()
err := s.fsm.State().CSIVolumeRegister(1000, []*structs.CSIVolume{{
err := s.fsm.State().UpsertCSIVolume(1000, []*structs.CSIVolume{{
ID: id,
Namespace: structs.DefaultNamespace,
PluginID: "glade",
Expand Down
22 changes: 12 additions & 10 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,8 +2143,8 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string)
return iter, nil
}

// CSIVolumeRegister adds a volume to the server store, failing if it already exists
func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error {
// UpsertCSIVolume inserts a volume in the state store.
func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()

Expand All @@ -2155,7 +2155,6 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace)
}

// Check for volume existence
obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID)
if err != nil {
return fmt.Errorf("volume existence check error: %v", err)
Expand All @@ -2164,17 +2163,20 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
// Allow some properties of a volume to be updated in place, but
// prevent accidentally overwriting important properties, or
// overwriting a volume in use
old, ok := obj.(*structs.CSIVolume)
if ok &&
old.InUse() ||
old.ExternalID != v.ExternalID ||
old := obj.(*structs.CSIVolume)
if old.ExternalID != v.ExternalID ||
old.PluginID != v.PluginID ||
old.Provider != v.Provider {
return fmt.Errorf("volume exists: %s", v.ID)
return fmt.Errorf("volume identity cannot be updated: %s", v.ID)
}
s.CSIVolumeDenormalize(nil, old.Copy())
if old.InUse() {
return fmt.Errorf("volume cannot be updated while in use")
}
}

if v.CreateIndex == 0 {
v.CreateIndex = old.CreateIndex
v.ModifyIndex = index
} else {
v.CreateIndex = index
v.ModifyIndex = index
}
Expand Down
18 changes: 13 additions & 5 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2684,6 +2684,10 @@ func TestStateStore_CSIVolume(t *testing.T) {
v0.Schedulable = true
v0.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
v0.RequestedCapabilities = []*structs.CSIVolumeCapability{{
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}

index++
v1 := structs.NewCSIVolume("foo", index)
Expand All @@ -2693,20 +2697,24 @@ func TestStateStore_CSIVolume(t *testing.T) {
v1.Schedulable = true
v1.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v1.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
v1.RequestedCapabilities = []*structs.CSIVolumeCapability{{
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}

index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0, v1})
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0, v1})
require.NoError(t, err)

// volume registration is idempotent, unless identies are changed
index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0, v1})
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0, v1})
require.NoError(t, err)

index++
v2 := v0.Copy()
v2.PluginID = "new-id"
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v2})
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v2})
require.Error(t, err, fmt.Sprintf("volume exists: %s", v0.ID))

ws := memdb.NewWatchSet()
Expand Down Expand Up @@ -2786,7 +2794,7 @@ func TestStateStore_CSIVolume(t *testing.T) {

// registration is an error when the volume is in use
index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0})
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0})
require.Error(t, err, "volume re-registered while in use")
// as is deregistration
index++
Expand Down Expand Up @@ -3126,7 +3134,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) {
Namespace: structs.DefaultNamespace,
PluginID: plugID,
}
err = store.CSIVolumeRegister(nextIndex(store), []*structs.CSIVolume{vol})
err = store.UpsertCSIVolume(nextIndex(store), []*structs.CSIVolume{vol})
require.NoError(t, err)

err = store.DeleteJob(nextIndex(store), structs.DefaultNamespace, controllerJobID)
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestBadCSIState(t testing.TB, store *StateStore) error {
}
vol = vol.Copy() // canonicalize

err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
err = store.UpsertCSIVolume(index, []*structs.CSIVolume{vol})
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 7d0f87b

Please sign in to comment.