From 0d344a75408907ded549986063d3a6cd362078b5 Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Mon, 25 Sep 2023 14:40:06 -0500 Subject: [PATCH 1/2] csi: check controller health early backported bugfix from c6dbba7cde911bb08f1f8da445a44a0125cd2047 this makes both create and register get the same error, early in the process, when a controller plugin will be needed but is offline. also remove a stray ineffective CSIVolumeDenormalize --- nomad/csi_endpoint.go | 17 ++++++--- nomad/csi_endpoint_test.go | 77 ++++++++++++++++++++++++++++++++++++++ nomad/state/state_store.go | 1 - 3 files changed, 88 insertions(+), 7 deletions(-) diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 773546d37871..d65cdee7d644 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -255,7 +255,7 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol return v.srv.blockingRPC(&opts) } -func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, vol *structs.CSIVolume) (*structs.CSIPlugin, error) { +func (v *CSIVolume) pluginValidateVolume(vol *structs.CSIVolume) (*structs.CSIPlugin, error) { state := v.srv.fsm.State() plugin, err := state.CSIPluginByID(nil, vol.PluginID) @@ -266,6 +266,10 @@ func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, return nil, fmt.Errorf("no CSI plugin named: %s could be found", vol.PluginID) } + if plugin.ControllerRequired && plugin.ControllersHealthy < 1 { + return nil, fmt.Errorf("no healthy controllers for CSI plugin: %s", vol.PluginID) + } + vol.Provider = plugin.Provider vol.ProviderVersion = plugin.Version @@ -346,6 +350,11 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru return err } + plugin, err := v.pluginValidateVolume(vol) + if err != nil { + return err + } + // CSIVolume has many user-defined fields which are immutable // once set, and many fields that are controlled by Nomad and // are not user-settable. We merge onto a copy of the existing @@ -370,10 +379,6 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru } } - plugin, err := v.pluginValidateVolume(args, vol) - if err != nil { - return err - } if err := v.controllerValidateVolume(args, vol, plugin); err != nil { return err } @@ -1035,7 +1040,7 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs. if err = vol.Validate(); err != nil { return err } - plugin, err := v.pluginValidateVolume(regArgs, vol) + plugin, err := v.pluginValidateVolume(vol) if err != nil { return err } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index defe787940d7..370daded6436 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -122,6 +122,83 @@ func TestCSIVolumeEndpoint_Get_ACL(t *testing.T) { require.Equal(t, vols[0].ID, resp.Volume.ID) } +func TestCSIVolume_pluginValidateVolume(t *testing.T) { + // bare minimum server for this method + store := state.TestStateStore(t) + srv := &Server{ + fsm: &nomadFSM{state: store}, + } + // has our method under test + csiVolume := &CSIVolume{srv: srv} + // volume for which we will request a valid plugin + vol := &structs.CSIVolume{PluginID: "neat-plugin"} + + // plugin not found + got, err := csiVolume.pluginValidateVolume(vol) + must.Nil(t, got, must.Sprint("nonexistent plugin should be nil")) + must.ErrorContains(t, err, "no CSI plugin named") + + // we'll upsert this plugin after optionally modifying it + basePlug := &structs.CSIPlugin{ + ID: vol.PluginID, + // these should be set on the volume after success + Provider: "neat-provider", + Version: "v0", + // explicit zero values, because these modify behavior we care about + ControllerRequired: false, + ControllersHealthy: 0, + } + + cases := []struct { + name string + updatePlugin func(*structs.CSIPlugin) + expectErr string + }{ + { + name: "controller not required", + }, + { + name: "controller unhealthy", + updatePlugin: func(p *structs.CSIPlugin) { + p.ControllerRequired = true + }, + expectErr: "no healthy controllers", + }, + { + name: "controller healthy", + updatePlugin: func(p *structs.CSIPlugin) { + p.ControllerRequired = true + p.ControllersHealthy = 1 + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + vol := vol.Copy() + plug := basePlug.Copy() + + if tc.updatePlugin != nil { + tc.updatePlugin(plug) + } + must.NoError(t, store.UpsertCSIPlugin(1000, plug)) + + got, err := csiVolume.pluginValidateVolume(vol) + + if tc.expectErr == "" { + must.NoError(t, err) + must.NotNil(t, got, must.Sprint("plugin should not be nil")) + must.Eq(t, vol.Provider, plug.Provider) + must.Eq(t, vol.ProviderVersion, plug.Version) + } else { + must.Error(t, err, must.Sprint("expect error:", tc.expectErr)) + must.ErrorContains(t, err, tc.expectErr) + must.Nil(t, got, must.Sprint("plugin should be nil")) + } + }) + } +} + func TestCSIVolumeEndpoint_Register(t *testing.T) { ci.Parallel(t) srv, shutdown := TestServer(t, func(c *Config) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 39a799ea61ca..57fb121520a4 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2275,7 +2275,6 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume) old.Provider != v.Provider { return fmt.Errorf("volume identity cannot be updated: %s", v.ID) } - s.CSIVolumeDenormalize(nil, old.Copy()) if old.InUse() { return fmt.Errorf("volume cannot be updated while in use") } From fee8e2373052f348b883f141c711bf1532239d59 Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Mon, 25 Sep 2023 15:17:09 -0500 Subject: [PATCH 2/2] add changelog --- .changelog/18570.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/18570.txt diff --git a/.changelog/18570.txt b/.changelog/18570.txt new file mode 100644 index 000000000000..844fb52a4191 --- /dev/null +++ b/.changelog/18570.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: check controller plugin health early during volume register/create +```