Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: allow updates to volumes on re-registration #12167

Merged
merged 5 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12167.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
csi: Allow volumes to be re-registered to be updated while not in use
```
4 changes: 4 additions & 0 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum
resp.CapacityBytes = cresp.Volume.CapacityBytes
resp.VolumeContext = cresp.Volume.VolumeContext

// Note: we safely throw away cresp.Volume.ContentSource here
// because it's just round-tripping the value set by the user in
// the server RPC call

resp.Topologies = make([]*nstructs.CSITopology, len(cresp.Volume.AccessibleTopology))
for _, topo := range cresp.Volume.AccessibleTopology {
resp.Topologies = append(resp.Topologies,
Expand Down
2 changes: 1 addition & 1 deletion command/alloc_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func TestAllocStatusCommand_CSIVolumes(t *testing.T) {
Segments: map[string]string{"foo": "bar"},
}},
}}
err = state.CSIVolumeRegister(1002, vols)
err = state.UpsertCSIVolume(1002, vols)
require.NoError(t, err)

// Upsert the job and alloc
Expand Down
1 change: 1 addition & 0 deletions command/volume_register_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (c *VolumeRegisterCommand) csiRegister(client *api.Client, ast *ast.File) i
return 1
}

c.Ui.Output(fmt.Sprintf("Volume %q registered", vol.ID))
return 0
}

Expand Down
2 changes: 1 addition & 1 deletion command/volume_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestCSIVolumeStatusCommand_AutocompleteArgs(t *testing.T) {
PluginID: "glade",
}

require.NoError(t, state.CSIVolumeRegister(1000, []*structs.CSIVolume{vol}))
require.NoError(t, state.UpsertCSIVolume(1000, []*structs.CSIVolume{vol}))

prefix := vol.ID[:len(vol.ID)-5]
args := complete.Args{Last: prefix}
Expand Down
58 changes: 49 additions & 9 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest,

vol.Provider = plugin.Provider
vol.ProviderVersion = plugin.Version

return plugin, nil
}

Expand All @@ -263,7 +264,15 @@ func (v *CSIVolume) controllerValidateVolume(req *structs.CSIVolumeRegisterReque
return v.srv.RPC(method, cReq, cResp)
}

// Register registers a new volume
// Register registers a new volume or updates an existing volume. Note
// that most user-defined CSIVolume fields are immutable once the
// volume has been created.
//
// If the user needs to change fields because they've misconfigured
// the registration of the external volume, we expect that claims
// won't work either, and the user can deregister the volume and try
// again with the right settings. This lets us be as strict with
// validation here as the CreateVolume CSI RPC is expected to be.
func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *structs.CSIVolumeRegisterResponse) error {
if done, err := v.srv.forward("CSIVolume.Register", args, args, reply); done {
return err
Expand All @@ -289,26 +298,57 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
// We also validate that the plugin exists for each plugin, and validate the
// capabilities when the plugin has a controller.
for _, vol := range args.Volumes {
vol.Namespace = args.RequestNamespace()

snap, err := v.srv.State().Snapshot()
if err != nil {
return err
}
// TODO: allow volume spec file to set namespace
// https://github.com/hashicorp/nomad/issues/11196
if vol.Namespace == "" {
vol.Namespace = args.RequestNamespace()
}
if err = vol.Validate(); err != nil {
return err
}

plugin, err := v.pluginValidateVolume(args, vol)
ws := memdb.NewWatchSet()
existingVol, err := snap.CSIVolumeByID(ws, vol.Namespace, vol.ID)
if err != nil {
return err
}
if err := v.controllerValidateVolume(args, vol, plugin); err != nil {
return err
}

// The topologies for the volume have already been set when it was
// created, so we accept the user's description of that topology
if vol.Topologies == nil || len(vol.Topologies) == 0 {
// CSIVolume has many user-defined fields which are immutable
// once set, and many fields that are controlled by Nomad and
// are not user-settable. We merge onto a copy of the existing
// volume to allow a user to submit a volume spec for `volume
// create` and reuse it for updates in `volume register`
// without having to manually remove the fields unused by
// register (and similar use cases with API consumers such as
// Terraform).
if existingVol != nil {
existingVol = existingVol.Copy()
err = existingVol.Merge(vol)
if err != nil {
return err
}
*vol = *existingVol
} else if vol.Topologies == nil || len(vol.Topologies) == 0 {
// The topologies for the volume have already been set
// when it was created, so for newly register volumes
// we accept the user's description of that topology
if vol.RequestedTopologies != nil {
vol.Topologies = vol.RequestedTopologies.Required
}
}

plugin, err := v.pluginValidateVolume(args, vol)
if err != nil {
return err
}
if err := v.controllerValidateVolume(args, vol, plugin); err != nil {
return err
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
Expand Down
21 changes: 10 additions & 11 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestCSIVolumeEndpoint_Get(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}},
}}
err := state.CSIVolumeRegister(999, vols)
err := state.UpsertCSIVolume(999, vols)
require.NoError(t, err)

// Create the register request
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestCSIVolumeEndpoint_Get_ACL(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}},
}}
err := state.CSIVolumeRegister(999, vols)
err := state.UpsertCSIVolume(999, vols)
require.NoError(t, err)

// Create the register request
Expand Down Expand Up @@ -145,7 +145,6 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) {
// Create the volume
vols := []*structs.CSIVolume{{
ID: id0,
Namespace: "notTheNamespace",
PluginID: "minnie",
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, // legacy field ignored
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, // legacy field ignored
Expand Down Expand Up @@ -286,7 +285,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
}},
}}
index++
err = state.CSIVolumeRegister(index, vols)
err = state.UpsertCSIVolume(index, vols)
require.NoError(t, err)

// Verify that the volume exists, and is healthy
Expand Down Expand Up @@ -425,7 +424,7 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}},
}}
err = state.CSIVolumeRegister(1003, vols)
err = state.UpsertCSIVolume(1003, vols)
require.NoError(t, err)

alloc := mock.BatchAlloc()
Expand Down Expand Up @@ -535,7 +534,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
}

index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)

// setup: create an alloc that will claim our volume
Expand Down Expand Up @@ -642,7 +641,7 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}},
}}
err = state.CSIVolumeRegister(1002, vols)
err = state.UpsertCSIVolume(1002, vols)
require.NoError(t, err)

// Query everything in the namespace
Expand Down Expand Up @@ -721,7 +720,7 @@ func TestCSIVolumeEndpoint_ListAllNamespaces(t *testing.T) {
}},
},
}
err = state.CSIVolumeRegister(1001, vols)
err = state.UpsertCSIVolume(1001, vols)
require.NoError(t, err)

// Lookup volumes in all namespaces
Expand Down Expand Up @@ -956,7 +955,7 @@ func TestCSIVolumeEndpoint_Delete(t *testing.T) {
Secrets: structs.CSISecrets{"mysecret": "secretvalue"},
}}
index++
err = state.CSIVolumeRegister(index, vols)
err = state.UpsertCSIVolume(index, vols)
require.NoError(t, err)

// Delete volumes
Expand Down Expand Up @@ -1174,7 +1173,7 @@ func TestCSIVolumeEndpoint_CreateSnapshot(t *testing.T) {
ExternalID: "vol-12345",
}}
index++
require.NoError(t, state.CSIVolumeRegister(index, vols))
require.NoError(t, state.UpsertCSIVolume(index, vols))

// Create the snapshot request
req1 := &structs.CSISnapshotCreateRequest{
Expand Down Expand Up @@ -1649,7 +1648,7 @@ func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) {
ControllerRequired: false,
},
}
err = state.CSIVolumeRegister(1002, vols)
err = state.UpsertCSIVolume(1002, vols)
require.NoError(t, err)

// has controller
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ func (n *nomadFSM) applyCSIVolumeRegister(buf []byte, index uint64) interface{}
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_register"}, time.Now())

if err := n.state.CSIVolumeRegister(index, req.Volumes); err != nil {
if err := n.state.UpsertCSIVolume(index, req.Volumes); err != nil {
n.logger.Error("CSIVolumeRegister failed", "error", err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/search_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ func TestSearch_PrefixSearch_CSIVolume(t *testing.T) {
testutil.WaitForLeader(t, s.RPC)

id := uuid.Generate()
err := s.fsm.State().CSIVolumeRegister(1000, []*structs.CSIVolume{{
err := s.fsm.State().UpsertCSIVolume(1000, []*structs.CSIVolume{{
ID: id,
Namespace: structs.DefaultNamespace,
PluginID: "glade",
Expand Down Expand Up @@ -1348,7 +1348,7 @@ func TestSearch_FuzzySearch_CSIVolume(t *testing.T) {
testutil.WaitForLeader(t, s.RPC)

id := uuid.Generate()
err := s.fsm.State().CSIVolumeRegister(1000, []*structs.CSIVolume{{
err := s.fsm.State().UpsertCSIVolume(1000, []*structs.CSIVolume{{
ID: id,
Namespace: structs.DefaultNamespace,
PluginID: "glade",
Expand Down
22 changes: 12 additions & 10 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,8 +2143,8 @@ func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string)
return iter, nil
}

// CSIVolumeRegister adds a volume to the server store, failing if it already exists
func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error {
// UpsertCSIVolume inserts a volume in the state store.
func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()

Expand All @@ -2155,7 +2155,6 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace)
}

// Check for volume existence
obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID)
if err != nil {
return fmt.Errorf("volume existence check error: %v", err)
Expand All @@ -2164,17 +2163,20 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
// Allow some properties of a volume to be updated in place, but
// prevent accidentally overwriting important properties, or
// overwriting a volume in use
old, ok := obj.(*structs.CSIVolume)
if ok &&
old.InUse() ||
old.ExternalID != v.ExternalID ||
old := obj.(*structs.CSIVolume)
if old.ExternalID != v.ExternalID ||
old.PluginID != v.PluginID ||
old.Provider != v.Provider {
return fmt.Errorf("volume exists: %s", v.ID)
return fmt.Errorf("volume identity cannot be updated: %s", v.ID)
}
tgross marked this conversation as resolved.
Show resolved Hide resolved
s.CSIVolumeDenormalize(nil, old.Copy())
if old.InUse() {
Comment on lines +2172 to +2173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does passing a Copy to CSIVolumeDenormalize means that old is not denormalized at this point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! We denormalize-on-read rather than having to push volume updates on every allocation status update. The volume only gets written with those denormalized updates on claim transitions (which happens in either csi_hook.go on the client or in volumewatcher on the leader).

return fmt.Errorf("volume cannot be updated while in use")
}
}

if v.CreateIndex == 0 {
v.CreateIndex = old.CreateIndex
v.ModifyIndex = index
} else {
v.CreateIndex = index
v.ModifyIndex = index
}
Expand Down
18 changes: 13 additions & 5 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2684,6 +2684,10 @@ func TestStateStore_CSIVolume(t *testing.T) {
v0.Schedulable = true
v0.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
v0.RequestedCapabilities = []*structs.CSIVolumeCapability{{
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}

index++
v1 := structs.NewCSIVolume("foo", index)
Expand All @@ -2693,20 +2697,24 @@ func TestStateStore_CSIVolume(t *testing.T) {
v1.Schedulable = true
v1.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v1.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
v1.RequestedCapabilities = []*structs.CSIVolumeCapability{{
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}

index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0, v1})
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0, v1})
require.NoError(t, err)

// volume registration is idempotent, unless identies are changed
index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0, v1})
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0, v1})
require.NoError(t, err)

index++
v2 := v0.Copy()
v2.PluginID = "new-id"
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v2})
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v2})
require.Error(t, err, fmt.Sprintf("volume exists: %s", v0.ID))

ws := memdb.NewWatchSet()
Expand Down Expand Up @@ -2786,7 +2794,7 @@ func TestStateStore_CSIVolume(t *testing.T) {

// registration is an error when the volume is in use
index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{v0})
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0})
require.Error(t, err, "volume re-registered while in use")
// as is deregistration
index++
Expand Down Expand Up @@ -3126,7 +3134,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) {
Namespace: structs.DefaultNamespace,
PluginID: plugID,
}
err = store.CSIVolumeRegister(nextIndex(store), []*structs.CSIVolume{vol})
err = store.UpsertCSIVolume(nextIndex(store), []*structs.CSIVolume{vol})
require.NoError(t, err)

err = store.DeleteJob(nextIndex(store), structs.DefaultNamespace, controllerJobID)
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestBadCSIState(t testing.TB, store *StateStore) error {
}
vol = vol.Copy() // canonicalize

err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
err = store.UpsertCSIVolume(index, []*structs.CSIVolume{vol})
if err != nil {
return err
}
Expand Down
Loading