Skip to content

Commit

Permalink
csi: plugin instance manager should retry creating gRPC client
Browse files Browse the repository at this point in the history
Nomad communicates with CSI plugin tasks via gRPC. The plugin
supervisor hook uses this to ping the plugin for health checks which
it emits as task events. After the first successsful health check the
plugin supervisor registers the plugin in the client's dynamic plugin
registry, which in turn creates a CSI plugin manager instance that has
its own gRPC client for fingerprinting the plugin and sending mount
requests.

If the plugin manager instance fails to connect to the plugin on its
first attempt, it exits. The plugin supervisor hook is unaware that
connection failed so long as its own pings continue to work. A
transient failure during plugin startup may mislead the plugin
supervisor hook into thinking the plugin is up (so there's no need to
restart the allocation) but no fingerprinter is started. Update the
plugin manager instance to retry the gRPC client connection until
success.

Includes two other small improvements:
* The plugin supervisor hook creates a new gRPC client for every probe
  and then throws it away. Instead, reuse the client as we do for the
  plugin manager.
* The gRPC client constructor has a 1 second timeout. Clarify that this
  timeout applies to the connection and not the rest of the client
  lifetime.
  • Loading branch information
tgross committed Feb 11, 2022
1 parent 72e19c3 commit 5c1fe6d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 35 deletions.
63 changes: 37 additions & 26 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
}()

socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)
t := time.NewTimer(0)

var client csi.CSIPlugin
defer func() {
if client != nil {
client.Close()
}
}()
client := h.newClient(ctx, socketPath)
if client == nil {
return // only hit this on shutdown
}
defer client.Close()

t := time.NewTimer(0)

// Step 1: Wait for the plugin to initially become available.
WAITFORREADY:
Expand All @@ -217,7 +217,7 @@ WAITFORREADY:
case <-ctx.Done():
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, client, socketPath)
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
if err != nil || !pluginHealthy {
h.logger.Debug("CSI plugin not ready", "error", err)

Expand Down Expand Up @@ -256,7 +256,7 @@ WAITFORREADY:
deregisterPluginFn()
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, client, socketPath)
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
if err != nil {
h.logger.Error("CSI plugin fingerprinting failed", "error", err)
}
Expand Down Expand Up @@ -288,6 +288,33 @@ WAITFORREADY:
}
}

func (h *csiPluginSupervisorHook) newClient(ctx context.Context, socketPath string) csi.CSIPlugin {
t := time.NewTimer(0)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
_, err := os.Stat(socketPath)
if err != nil {
h.logger.Debug("CSI plugin not ready: failed to stat socket", "error", err)
t.Reset(5 * time.Second)
continue
}
client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
if err != nil {
h.logger.Debug("CSI plugin not ready: failed to create gRPC client", "error", err)
t.Reset(5 * time.Second)
continue
}
return client
}
}
}

func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPath string) (func(), error) {
// At this point we know the plugin is ready and we can fingerprint it
// to get its vendor name and version
Expand Down Expand Up @@ -354,17 +381,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
}, nil
}

func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin, socketPath string) (bool, error) {
_, err := os.Stat(socketPath)
if err != nil {
return false, fmt.Errorf("failed to stat socket: %v", err)
}

client, err = h.newClient(socketPath)
if err != nil {
return false, fmt.Errorf("failed to create csi client: %v", err)
}

func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin) (bool, error) {
healthy, err := client.PluginProbe(ctx)
if err != nil {
return false, fmt.Errorf("failed to probe plugin: %v", err)
Expand All @@ -373,12 +390,6 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client
return healthy, nil
}

func (h *csiPluginSupervisorHook) newClient(socketPath string) (csi.CSIPlugin, error) {
return csi.NewClient(socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
}

// Stop is called after the task has exited and will not be started
// again. It is the only hook guaranteed to be executed whenever
// TaskRunner.Run is called (and not gracefully shutting down).
Expand Down
24 changes: 15 additions & 9 deletions client/pluginmanager/csimanager/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
)

const managerFingerprintInterval = 30 * time.Second
const managerFingerprintRetryInterval = 5 * time.Second

// instanceManager is used to manage the fingerprinting and supervision of a
// single CSI Plugin.
Expand Down Expand Up @@ -73,15 +74,6 @@ func newInstanceManager(logger hclog.Logger, eventer TriggerNodeEvent, updater U
}

func (i *instanceManager) run() {
c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
if err != nil {
i.logger.Error("failed to setup instance manager client", "error", err)
close(i.shutdownCh)
return
}
i.client = c
i.fp.client = c

go i.setupVolumeManager()
go i.runLoop()
}
Expand All @@ -96,6 +88,9 @@ func (i *instanceManager) setupVolumeManager() {
case <-i.shutdownCtx.Done():
return
case <-i.fp.hadFirstSuccessfulFingerprintCh:
// the runLoop goroutine populates i.client but we never get
// the first fingerprint until after it's been populated, so
// this is safe
i.volumeManager = newVolumeManager(i.logger, i.eventer, i.client, i.mountPoint, i.containerMountPoint, i.fp.requiresStaging)
i.logger.Debug("volume manager setup complete")
close(i.volumeManagerSetupCh)
Expand Down Expand Up @@ -142,6 +137,17 @@ func (i *instanceManager) runLoop() {
return

case <-timer.C:
if i.client == nil {
c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
if err != nil {
i.logger.Debug("failed to setup instance manager client", "error", err)
timer.Reset(managerFingerprintRetryInterval)
continue
}
i.client = c
i.fp.client = c
}

ctx, cancelFn := i.requestCtxWithTimeout(managerFingerprintInterval)
info := i.fp.fingerprint(ctx)
cancelFn()
Expand Down

0 comments on commit 5c1fe6d

Please sign in to comment.