From 507eafa6152d83c1c7f5125352213094cfb0328b Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Tue, 4 Apr 2023 23:41:04 +0000 Subject: [PATCH 1/4] backport of commit 9ae2c9da8b1af531fd78ebacfaa092d5d961de99 --- client/allocrunner/csi_hook.go | 16 +++++--- client/allocrunner/csi_hook_test.go | 4 ++ client/dynamicplugins/registry.go | 40 +++++++++++++++++++ client/pluginmanager/csimanager/interface.go | 4 ++ client/pluginmanager/csimanager/manager.go | 15 ++++++- .../pluginmanager/csimanager/manager_test.go | 38 ++++++++++++++++++ 6 files changed, 110 insertions(+), 7 deletions(-) diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index e9af3ad1b422..6012fe910e72 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -9,7 +9,7 @@ import ( hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" - + "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" @@ -85,11 +85,15 @@ func (c *csiHook) Prerun() error { mounts := make(map[string]*csimanager.MountInfo, len(volumes)) for alias, pair := range volumes { - // We use this context only to attach hclog to the gRPC - // context. The lifetime is the lifetime of the gRPC stream, - // not specific RPC timeouts, but we manage the stream - // lifetime via Close in the pluginmanager. - mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, pair.volume.PluginID) + // make sure the plugin is ready or becomes so quickly. + plugin := pair.volume.PluginID + pType := dynamicplugins.PluginTypeCSINode + if err := c.csimanager.WaitForPlugin(c.shutdownCtx, pType, plugin); err != nil { + return err + } + c.logger.Debug("found CSI plugin", "type", pType, "name", plugin) + + mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, plugin) if err != nil { return err } diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 3aa6f6c07d1a..b960345af71c 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -417,6 +417,10 @@ type mockPluginManager struct { mounter mockVolumeMounter } +func (mgr mockPluginManager) WaitForPlugin(ctx context.Context, pluginType, pluginID string) error { + return nil +} + func (mgr mockPluginManager) MounterForPlugin(ctx context.Context, pluginID string) (csimanager.VolumeMounter, error) { return mgr.mounter, nil } diff --git a/client/dynamicplugins/registry.go b/client/dynamicplugins/registry.go index 4515c64fb684..fd84356fd957 100644 --- a/client/dynamicplugins/registry.go +++ b/client/dynamicplugins/registry.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "sync" + "time" ) const ( @@ -22,6 +23,7 @@ type Registry interface { RegisterPlugin(info *PluginInfo) error DeregisterPlugin(ptype, name, allocID string) error + WaitForPlugin(ctx context.Context, ptype, pname string) (*PluginInfo, error) ListPlugins(ptype string) []*PluginInfo DispensePlugin(ptype, name string) (interface{}, error) PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error) @@ -301,6 +303,44 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo { return plugins } +// WaitForPlugin repeatedly checks until a plugin with a given type and name +// becomes available or its context is canceled or times out. +// Callers should pass in a context with a sensible timeout +// for the plugin they're expecting to find. +func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) (*PluginInfo, error) { + // these numbers are almost arbitrary... + delay := 200 // milliseconds between checks, will backoff + maxDelay := 5000 // up to 5 seconds between each check + + // put a long upper bound on total time, + // just in case callers don't follow directions. + ctx, cancel := context.WithTimeout(ctx, 24*time.Hour) + defer cancel() + + for { + for _, p := range d.ListPlugins(ptype) { + if p.Name == name { + return p, nil + } + } + + //log.Printf("WaitForPlugin delay: %d", delay) // TODO: get a logger in here? + select { + case <-ctx.Done(): + // an externally-defined timeout wins the day + return nil, ctx.Err() + case <-time.After(time.Duration(delay) * time.Millisecond): + // continue after our internal delay + } + if delay < maxDelay { + delay += delay + } + if delay > maxDelay { + delay = maxDelay + } + } +} + func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}, error) { d.pluginsLock.Lock() defer d.pluginsLock.Unlock() diff --git a/client/pluginmanager/csimanager/interface.go b/client/pluginmanager/csimanager/interface.go index 0e4a1f03a118..479b32172b57 100644 --- a/client/pluginmanager/csimanager/interface.go +++ b/client/pluginmanager/csimanager/interface.go @@ -59,6 +59,10 @@ type Manager interface { // PluginManager returns a PluginManager for use by the node fingerprinter. PluginManager() pluginmanager.PluginManager + // WaitForPlugin waits for the plugin to become available, + // or until its context is canceled or times out. + WaitForPlugin(ctx context.Context, pluginType, pluginID string) error + // MounterForPlugin returns a VolumeMounter for the plugin ID associated // with the volume. Returns an error if this plugin isn't registered. MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error) diff --git a/client/pluginmanager/csimanager/manager.go b/client/pluginmanager/csimanager/manager.go index 178b0fe1513e..8a11f01930be 100644 --- a/client/pluginmanager/csimanager/manager.go +++ b/client/pluginmanager/csimanager/manager.go @@ -39,7 +39,7 @@ func New(config *Config) Manager { } return &csiManager{ - logger: config.Logger, + logger: config.Logger.Named("csiManager"), eventer: config.TriggerNodeEvent, registry: config.DynamicRegistry, instances: make(map[string]map[string]*instanceManager), @@ -75,6 +75,19 @@ func (c *csiManager) PluginManager() pluginmanager.PluginManager { return c } +// WaitForPlugin waits for a specific plugin to be registered and available, +// unless the context is canceled, or it takes longer than a minute. +func (c *csiManager) WaitForPlugin(ctx context.Context, pType, pID string) error { + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + p, err := c.registry.WaitForPlugin(ctx, pType, pID) + if err != nil { + return fmt.Errorf("%s plugin '%s' did not become ready: %w", pType, pID, err) + } + c.ensureInstance(p) + return nil +} + func (c *csiManager) MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error) { c.instancesLock.RLock() defer c.instancesLock.RUnlock() diff --git a/client/pluginmanager/csimanager/manager_test.go b/client/pluginmanager/csimanager/manager_test.go index ee469d9d171d..4b05513f09c5 100644 --- a/client/pluginmanager/csimanager/manager_test.go +++ b/client/pluginmanager/csimanager/manager_test.go @@ -1,15 +1,18 @@ package csimanager import ( + "context" "fmt" "sync" "testing" "time" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/pluginmanager" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -93,6 +96,41 @@ func TestManager_DeregisterPlugin(t *testing.T) { }, 5*time.Second, 10*time.Millisecond) } +func TestManager_WaitForPlugin(t *testing.T) { + ci.Parallel(t) + + registry := setupRegistry(nil) + t.Cleanup(registry.Shutdown) + pm := testManager(t, registry, 5*time.Second) // resync period can be long. + t.Cleanup(pm.Shutdown) + pm.Run() + + t.Run("never happens", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + t.Cleanup(cancel) + + err := pm.WaitForPlugin(ctx, "bad-type", "bad-name") + must.Error(t, err) + must.ErrorContains(t, err, "did not become ready: context deadline exceeded") + }) + + t.Run("ok after delay", func(t *testing.T) { + plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) + + // register the plugin in the near future + time.AfterFunc(100*time.Millisecond, func() { + err := registry.RegisterPlugin(plugin) + must.NoError(t, err) + }) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + t.Cleanup(cancel) + + err := pm.WaitForPlugin(ctx, plugin.Type, plugin.Name) + must.NoError(t, err) + }) +} + // TestManager_MultiplePlugins ensures that multiple plugins with the same // name but different types (as found with monolith plugins) don't interfere // with each other. From 2ff5f3b6a1a6b1518e2efc939dd3cd4f24a56336 Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Wed, 5 Apr 2023 22:46:19 +0000 Subject: [PATCH 2/4] backport of commit b772ad57868fb34863efbbbbbac504a75d7d4b1b --- client/dynamicplugins/registry.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/dynamicplugins/registry.go b/client/dynamicplugins/registry.go index fd84356fd957..8e04ff80ad02 100644 --- a/client/dynamicplugins/registry.go +++ b/client/dynamicplugins/registry.go @@ -317,6 +317,7 @@ func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) ctx, cancel := context.WithTimeout(ctx, 24*time.Hour) defer cancel() + timer := time.NewTimer(time.Duration(delay) * time.Millisecond) for { for _, p := range d.ListPlugins(ptype) { if p.Name == name { @@ -329,7 +330,7 @@ func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) case <-ctx.Done(): // an externally-defined timeout wins the day return nil, ctx.Err() - case <-time.After(time.Duration(delay) * time.Millisecond): + case <-timer.C: // continue after our internal delay } if delay < maxDelay { @@ -338,6 +339,7 @@ func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) if delay > maxDelay { delay = maxDelay } + timer.Reset(time.Duration(delay) * time.Millisecond) } } From a103ae9680f3e41d662ac16d527f8bdfc9ccec4c Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Thu, 6 Apr 2023 18:03:36 +0000 Subject: [PATCH 3/4] backport of commit 1813c9b7240a5ee3f91e1f0abd57536ce1ae58b4 --- client/dynamicplugins/registry.go | 34 +++++++++++++++++----- client/pluginmanager/csimanager/manager.go | 4 ++- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/client/dynamicplugins/registry.go b/client/dynamicplugins/registry.go index 8e04ff80ad02..497459731d9a 100644 --- a/client/dynamicplugins/registry.go +++ b/client/dynamicplugins/registry.go @@ -10,6 +10,8 @@ import ( "fmt" "sync" "time" + + "github.com/hashicorp/nomad/helper" ) const ( @@ -308,6 +310,23 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo { // Callers should pass in a context with a sensible timeout // for the plugin they're expecting to find. func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) (*PluginInfo, error) { + // this is our actual goal, which may be run repeatedly + findPlugin := func() *PluginInfo { + for _, p := range d.ListPlugins(ptype) { + if p.Name == name { + return p + } + } + return nil + } + + // try immediately first, before any timers get involved + if p := findPlugin(); p != nil { + return p, nil + } + + // next, loop until found or context is done + // these numbers are almost arbitrary... delay := 200 // milliseconds between checks, will backoff maxDelay := 5000 // up to 5 seconds between each check @@ -317,15 +336,9 @@ func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) ctx, cancel := context.WithTimeout(ctx, 24*time.Hour) defer cancel() - timer := time.NewTimer(time.Duration(delay) * time.Millisecond) + timer, stop := helper.NewSafeTimer(time.Duration(delay) * time.Millisecond) + defer stop() for { - for _, p := range d.ListPlugins(ptype) { - if p.Name == name { - return p, nil - } - } - - //log.Printf("WaitForPlugin delay: %d", delay) // TODO: get a logger in here? select { case <-ctx.Done(): // an externally-defined timeout wins the day @@ -333,6 +346,11 @@ func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) case <-timer.C: // continue after our internal delay } + + if p := findPlugin(); p != nil { + return p, nil + } + if delay < maxDelay { delay += delay } diff --git a/client/pluginmanager/csimanager/manager.go b/client/pluginmanager/csimanager/manager.go index 8a11f01930be..59879e350b22 100644 --- a/client/pluginmanager/csimanager/manager.go +++ b/client/pluginmanager/csimanager/manager.go @@ -39,7 +39,7 @@ func New(config *Config) Manager { } return &csiManager{ - logger: config.Logger.Named("csiManager"), + logger: config.Logger.Named("csi_manager"), eventer: config.TriggerNodeEvent, registry: config.DynamicRegistry, instances: make(map[string]map[string]*instanceManager), @@ -84,6 +84,8 @@ func (c *csiManager) WaitForPlugin(ctx context.Context, pType, pID string) error if err != nil { return fmt.Errorf("%s plugin '%s' did not become ready: %w", pType, pID, err) } + c.instancesLock.Lock() + defer c.instancesLock.Unlock() c.ensureInstance(p) return nil } From b7c17e37707801c0f33d292c779c847068f8092c Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Mon, 10 Apr 2023 20:45:01 +0000 Subject: [PATCH 4/4] backport of commit 10a2678a7a9b53ab070afc848671f237e6242318 --- .changelog/16809.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/16809.txt diff --git a/.changelog/16809.txt b/.changelog/16809.txt new file mode 100644 index 000000000000..2d0d1927b67c --- /dev/null +++ b/.changelog/16809.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: gracefully recover tasks that use csi node plugins +```