diff --git a/.changelog/18570.txt b/.changelog/18570.txt new file mode 100644 index 000000000000..844fb52a4191 --- /dev/null +++ b/.changelog/18570.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: check controller plugin health early during volume register/create +``` diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 671d47232825..f31376eba465 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -225,7 +225,7 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol return v.srv.blockingRPC(&opts) } -func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, vol *structs.CSIVolume) (*structs.CSIPlugin, error) { +func (v *CSIVolume) pluginValidateVolume(vol *structs.CSIVolume) (*structs.CSIPlugin, error) { state := v.srv.fsm.State() plugin, err := state.CSIPluginByID(nil, vol.PluginID) @@ -236,6 +236,10 @@ func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, return nil, fmt.Errorf("no CSI plugin named: %s could be found", vol.PluginID) } + if plugin.ControllerRequired && plugin.ControllersHealthy < 1 { + return nil, fmt.Errorf("no healthy controllers for CSI plugin: %s", vol.PluginID) + } + vol.Provider = plugin.Provider vol.ProviderVersion = plugin.Version @@ -322,6 +326,11 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru return err } + plugin, err := v.pluginValidateVolume(vol) + if err != nil { + return err + } + // CSIVolume has many user-defined fields which are immutable // once set, and many fields that are controlled by Nomad and // are not user-settable. We merge onto a copy of the existing @@ -346,10 +355,6 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru } } - plugin, err := v.pluginValidateVolume(args, vol) - if err != nil { - return err - } if err := v.controllerValidateVolume(args, vol, plugin); err != nil { return err } @@ -1033,7 +1038,7 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs. if err = vol.Validate(); err != nil { return err } - plugin, err := v.pluginValidateVolume(regArgs, vol) + plugin, err := v.pluginValidateVolume(vol) if err != nil { return err } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 0a1a0d3201f3..5edde7e18b55 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -123,6 +123,83 @@ func TestCSIVolumeEndpoint_Get_ACL(t *testing.T) { require.Equal(t, vols[0].ID, resp.Volume.ID) } +func TestCSIVolume_pluginValidateVolume(t *testing.T) { + // bare minimum server for this method + store := state.TestStateStore(t) + srv := &Server{ + fsm: &nomadFSM{state: store}, + } + // has our method under test + csiVolume := &CSIVolume{srv: srv} + // volume for which we will request a valid plugin + vol := &structs.CSIVolume{PluginID: "neat-plugin"} + + // plugin not found + got, err := csiVolume.pluginValidateVolume(vol) + must.Nil(t, got, must.Sprint("nonexistent plugin should be nil")) + must.ErrorContains(t, err, "no CSI plugin named") + + // we'll upsert this plugin after optionally modifying it + basePlug := &structs.CSIPlugin{ + ID: vol.PluginID, + // these should be set on the volume after success + Provider: "neat-provider", + Version: "v0", + // explicit zero values, because these modify behavior we care about + ControllerRequired: false, + ControllersHealthy: 0, + } + + cases := []struct { + name string + updatePlugin func(*structs.CSIPlugin) + expectErr string + }{ + { + name: "controller not required", + }, + { + name: "controller unhealthy", + updatePlugin: func(p *structs.CSIPlugin) { + p.ControllerRequired = true + }, + expectErr: "no healthy controllers", + }, + { + name: "controller healthy", + updatePlugin: func(p *structs.CSIPlugin) { + p.ControllerRequired = true + p.ControllersHealthy = 1 + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + vol := vol.Copy() + plug := basePlug.Copy() + + if tc.updatePlugin != nil { + tc.updatePlugin(plug) + } + must.NoError(t, store.UpsertCSIPlugin(1000, plug)) + + got, err := csiVolume.pluginValidateVolume(vol) + + if tc.expectErr == "" { + must.NoError(t, err) + must.NotNil(t, got, must.Sprint("plugin should not be nil")) + must.Eq(t, vol.Provider, plug.Provider) + must.Eq(t, vol.ProviderVersion, plug.Version) + } else { + must.Error(t, err, must.Sprint("expect error:", tc.expectErr)) + must.ErrorContains(t, err, tc.expectErr) + must.Nil(t, got, must.Sprint("plugin should be nil")) + } + }) + } +} + func TestCSIVolumeEndpoint_Register(t *testing.T) { ci.Parallel(t) srv, shutdown := TestServer(t, func(c *Config) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 09ea4dbb655d..fd1fa16e9d8b 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2275,7 +2275,6 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) old.Provider != v.Provider { return fmt.Errorf("volume identity cannot be updated: %s", v.ID) } - s.CSIVolumeDenormalize(nil, old.Copy()) if old.InUse() { return fmt.Errorf("volume cannot be updated while in use") }