Skip to content

Commit

Permalink
Merge pull request #12636 from hashicorp/backport/csi-plugin-client-r…
Browse files Browse the repository at this point in the history
…estarts/perfectly-super-garfish

This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core committed Apr 19, 2022
2 parents 4c26684 + 2376468 commit 2285548
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 146 deletions.
7 changes: 7 additions & 0 deletions .changelog/12057.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
csi: Fixed a bug where the plugin instance manager would not retry the initial gRPC connection to plugins
```

```release-note:bug
csi: Fixed a bug where the plugin supervisor would not restart the task if it failed to connect to the plugin
```
90 changes: 53 additions & 37 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type csiPluginSupervisorHook struct {

// eventEmitter is used to emit events to the task
eventEmitter ti.EventEmitter
lifecycle ti.TaskLifecycle

shutdownCtx context.Context
shutdownCancelFn context.CancelFunc
Expand All @@ -54,6 +55,7 @@ type csiPluginSupervisorHookConfig struct {
clientStateDirPath string
events ti.EventEmitter
runner *TaskRunner
lifecycle ti.TaskLifecycle
capabilities *drivers.Capabilities
logger hclog.Logger
}
Expand Down Expand Up @@ -90,6 +92,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi
hook := &csiPluginSupervisorHook{
alloc: config.runner.Alloc(),
runner: config.runner,
lifecycle: config.lifecycle,
logger: config.logger,
task: task,
mountPoint: pluginRoot,
Expand Down Expand Up @@ -201,27 +204,41 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
}()

socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)

client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()

t := time.NewTimer(0)

// We're in Poststart at this point, so if we can't connect within
// this deadline, assume it's broken so we can restart the task
startCtx, startCancelFn := context.WithTimeout(ctx, 30*time.Second)
defer startCancelFn()

var err error
var pluginHealthy bool

// Step 1: Wait for the plugin to initially become available.
WAITFORREADY:
for {
select {
case <-ctx.Done():
case <-startCtx.Done():
h.kill(ctx, fmt.Errorf("CSI plugin failed probe: %v", err))
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
pluginHealthy, err = h.supervisorLoopOnce(startCtx, client)
if err != nil || !pluginHealthy {
h.logger.Debug("CSI Plugin not ready", "error", err)

// Plugin is not yet returning healthy, because we want to optimise for
// quickly bringing a plugin online, we use a short timeout here.
// TODO(dani): Test with more plugins and adjust.
h.logger.Debug("CSI plugin not ready", "error", err)
// Use only a short delay here to optimize for quickly
// bringing up a plugin
t.Reset(5 * time.Second)
continue
}

// Mark the plugin as healthy in a task event
h.logger.Debug("CSI plugin is ready")
h.previousHealthState = pluginHealthy
event := structs.NewTaskEvent(structs.TaskPluginHealthy)
event.SetMessage(fmt.Sprintf("plugin: %s", h.task.CSIPluginConfig.ID))
Expand All @@ -232,15 +249,14 @@ WAITFORREADY:
}

// Step 2: Register the plugin with the catalog.
deregisterPluginFn, err := h.registerPlugin(socketPath)
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
if err != nil {
h.logger.Error("CSI Plugin registration failed", "error", err)
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
event.SetMessage(fmt.Sprintf("failed to register plugin: %s, reason: %v", h.task.CSIPluginConfig.ID, err))
h.eventEmitter.EmitEvent(event)
h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err))
return
}

// Step 3: Start the lightweight supervisor loop.
// Step 3: Start the lightweight supervisor loop. At this point, failures
// don't cause the task to restart
t.Reset(0)
for {
select {
Expand All @@ -249,9 +265,9 @@ WAITFORREADY:
deregisterPluginFn()
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
if err != nil {
h.logger.Error("CSI Plugin fingerprinting failed", "error", err)
h.logger.Error("CSI plugin fingerprinting failed", "error", err)
}

// The plugin has transitioned to a healthy state. Emit an event.
Expand All @@ -265,7 +281,7 @@ WAITFORREADY:
if h.previousHealthState && !pluginHealthy {
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
if err != nil {
event.SetMessage(fmt.Sprintf("error: %v", err))
event.SetMessage(fmt.Sprintf("Error: %v", err))
} else {
event.SetMessage("Unknown Reason")
}
Expand All @@ -281,16 +297,9 @@ WAITFORREADY:
}
}

func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), error) {

func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPath string) (func(), error) {
// At this point we know the plugin is ready and we can fingerprint it
// to get its vendor name and version
client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
if err != nil {
return nil, fmt.Errorf("failed to create csi client: %v", err)
}
defer client.Close()

info, err := client.PluginInfo()
if err != nil {
return nil, fmt.Errorf("failed to probe plugin: %v", err)
Expand Down Expand Up @@ -354,21 +363,13 @@ func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), err
}, nil
}

func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, socketPath string) (bool, error) {
_, err := os.Stat(socketPath)
if err != nil {
return false, fmt.Errorf("failed to stat socket: %v", err)
}
func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin) (bool, error) {
probeCtx, probeCancelFn := context.WithTimeout(ctx, 5*time.Second)
defer probeCancelFn()

client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
healthy, err := client.PluginProbe(probeCtx)
if err != nil {
return false, fmt.Errorf("failed to create csi client: %v", err)
}
defer client.Close()

healthy, err := client.PluginProbe(ctx)
if err != nil {
return false, fmt.Errorf("failed to probe plugin: %v", err)
return false, err
}

return healthy, nil
Expand All @@ -387,6 +388,21 @@ func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskSt
return nil
}

func (h *csiPluginSupervisorHook) kill(ctx context.Context, reason error) {
h.logger.Error("killing task because plugin failed", "error", reason)
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
event.SetMessage(fmt.Sprintf("Error: %v", reason.Error()))
h.eventEmitter.EmitEvent(event)

if err := h.lifecycle.Kill(ctx,
structs.NewTaskEvent(structs.TaskKilling).
SetFailsTask().
SetDisplayMessage("CSI plugin did not become healthy before timeout"),
); err != nil {
h.logger.Error("failed to kill task", "kill_reason", reason, "error", err)
}
}

func ensureMountpointInserted(mounts []*drivers.MountConfig, mount *drivers.MountConfig) []*drivers.MountConfig {
for _, mnt := range mounts {
if mnt.IsEqual(mount) {
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (tr *TaskRunner) initHooks() {
clientStateDirPath: tr.clientConfig.StateDir,
events: tr,
runner: tr,
lifecycle: tr,
capabilities: tr.driverCapabilities,
logger: hookLogger,
}))
Expand Down
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.dynamicRegistry =
dynamicplugins.NewRegistry(c.stateDB, map[string]dynamicplugins.PluginDispenser{
dynamicplugins.PluginTypeCSIController: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller"))
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller")), nil
},
dynamicplugins.PluginTypeCSINode: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client"))
}, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client")), nil
},
})

// Setup the clients RPC server
Expand Down
7 changes: 1 addition & 6 deletions client/pluginmanager/csimanager/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ func newInstanceManager(logger hclog.Logger, eventer TriggerNodeEvent, updater U
}

func (i *instanceManager) run() {
c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
if err != nil {
i.logger.Error("failed to setup instance manager client", "error", err)
close(i.shutdownCh)
return
}
c := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
i.client = c
i.fp.client = c

Expand Down
Loading

0 comments on commit 2285548

Please sign in to comment.