Skip to content

Commit

Permalink
Backport of gracefully recover tasks that use csi node plugins into r…
Browse files Browse the repository at this point in the history
…elease/1.4.x (#16847)

This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core committed Apr 11, 2023
1 parent 006f35c commit b3085e1
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .changelog/16809.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: gracefully recover tasks that use csi node plugins
```
16 changes: 10 additions & 6 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -85,11 +85,15 @@ func (c *csiHook) Prerun() error {
mounts := make(map[string]*csimanager.MountInfo, len(volumes))
for alias, pair := range volumes {

// We use this context only to attach hclog to the gRPC
// context. The lifetime is the lifetime of the gRPC stream,
// not specific RPC timeouts, but we manage the stream
// lifetime via Close in the pluginmanager.
mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, pair.volume.PluginID)
// make sure the plugin is ready or becomes so quickly.
plugin := pair.volume.PluginID
pType := dynamicplugins.PluginTypeCSINode
if err := c.csimanager.WaitForPlugin(c.shutdownCtx, pType, plugin); err != nil {
return err
}
c.logger.Debug("found CSI plugin", "type", pType, "name", plugin)

mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, plugin)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions client/allocrunner/csi_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ type mockPluginManager struct {
mounter mockVolumeMounter
}

func (mgr mockPluginManager) WaitForPlugin(ctx context.Context, pluginType, pluginID string) error {
return nil
}

func (mgr mockPluginManager) MounterForPlugin(ctx context.Context, pluginID string) (csimanager.VolumeMounter, error) {
return mgr.mounter, nil
}
Expand Down
60 changes: 60 additions & 0 deletions client/dynamicplugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/hashicorp/nomad/helper"
)

const (
Expand All @@ -22,6 +25,7 @@ type Registry interface {
RegisterPlugin(info *PluginInfo) error
DeregisterPlugin(ptype, name, allocID string) error

WaitForPlugin(ctx context.Context, ptype, pname string) (*PluginInfo, error)
ListPlugins(ptype string) []*PluginInfo
DispensePlugin(ptype, name string) (interface{}, error)
PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error)
Expand Down Expand Up @@ -301,6 +305,62 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo {
return plugins
}

// WaitForPlugin repeatedly checks until a plugin with a given type and name
// becomes available or its context is canceled or times out.
// Callers should pass in a context with a sensible timeout
// for the plugin they're expecting to find.
func (d *dynamicRegistry) WaitForPlugin(ctx context.Context, ptype, name string) (*PluginInfo, error) {
// this is our actual goal, which may be run repeatedly
findPlugin := func() *PluginInfo {
for _, p := range d.ListPlugins(ptype) {
if p.Name == name {
return p
}
}
return nil
}

// try immediately first, before any timers get involved
if p := findPlugin(); p != nil {
return p, nil
}

// next, loop until found or context is done

// these numbers are almost arbitrary...
delay := 200 // milliseconds between checks, will backoff
maxDelay := 5000 // up to 5 seconds between each check

// put a long upper bound on total time,
// just in case callers don't follow directions.
ctx, cancel := context.WithTimeout(ctx, 24*time.Hour)
defer cancel()

timer, stop := helper.NewSafeTimer(time.Duration(delay) * time.Millisecond)
defer stop()
for {
select {
case <-ctx.Done():
// an externally-defined timeout wins the day
return nil, ctx.Err()
case <-timer.C:
// continue after our internal delay
}

if p := findPlugin(); p != nil {
return p, nil
}

if delay < maxDelay {
delay += delay
}
if delay > maxDelay {
delay = maxDelay
}
timer.Reset(time.Duration(delay) * time.Millisecond)
}
}

func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}, error) {
d.pluginsLock.Lock()
defer d.pluginsLock.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions client/pluginmanager/csimanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type Manager interface {
// PluginManager returns a PluginManager for use by the node fingerprinter.
PluginManager() pluginmanager.PluginManager

// WaitForPlugin waits for the plugin to become available,
// or until its context is canceled or times out.
WaitForPlugin(ctx context.Context, pluginType, pluginID string) error

// MounterForPlugin returns a VolumeMounter for the plugin ID associated
// with the volume. Returns an error if this plugin isn't registered.
MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error)
Expand Down
17 changes: 16 additions & 1 deletion client/pluginmanager/csimanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(config *Config) Manager {
}

return &csiManager{
logger: config.Logger,
logger: config.Logger.Named("csi_manager"),
eventer: config.TriggerNodeEvent,
registry: config.DynamicRegistry,
instances: make(map[string]map[string]*instanceManager),
Expand Down Expand Up @@ -75,6 +75,21 @@ func (c *csiManager) PluginManager() pluginmanager.PluginManager {
return c
}

// WaitForPlugin waits for a specific plugin to be registered and available,
// unless the context is canceled, or it takes longer than a minute.
func (c *csiManager) WaitForPlugin(ctx context.Context, pType, pID string) error {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
p, err := c.registry.WaitForPlugin(ctx, pType, pID)
if err != nil {
return fmt.Errorf("%s plugin '%s' did not become ready: %w", pType, pID, err)
}
c.instancesLock.Lock()
defer c.instancesLock.Unlock()
c.ensureInstance(p)
return nil
}

func (c *csiManager) MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error) {
c.instancesLock.RLock()
defer c.instancesLock.RUnlock()
Expand Down
38 changes: 38 additions & 0 deletions client/pluginmanager/csimanager/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package csimanager

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/pluginmanager"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -93,6 +96,41 @@ func TestManager_DeregisterPlugin(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond)
}

func TestManager_WaitForPlugin(t *testing.T) {
ci.Parallel(t)

registry := setupRegistry(nil)
t.Cleanup(registry.Shutdown)
pm := testManager(t, registry, 5*time.Second) // resync period can be long.
t.Cleanup(pm.Shutdown)
pm.Run()

t.Run("never happens", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
t.Cleanup(cancel)

err := pm.WaitForPlugin(ctx, "bad-type", "bad-name")
must.Error(t, err)
must.ErrorContains(t, err, "did not become ready: context deadline exceeded")
})

t.Run("ok after delay", func(t *testing.T) {
plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController)

// register the plugin in the near future
time.AfterFunc(100*time.Millisecond, func() {
err := registry.RegisterPlugin(plugin)
must.NoError(t, err)
})

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
t.Cleanup(cancel)

err := pm.WaitForPlugin(ctx, plugin.Type, plugin.Name)
must.NoError(t, err)
})
}

// TestManager_MultiplePlugins ensures that multiple plugins with the same
// name but different types (as found with monolith plugins) don't interfere
// with each other.
Expand Down

0 comments on commit b3085e1

Please sign in to comment.