From 7ab05b03e64d2528625e593a42de6a04f88015c5 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 11 Feb 2022 14:21:47 -0500 Subject: [PATCH] csi: plugin instance manager should retry creating gRPC client Nomad communicates with CSI plugin tasks via gRPC. The plugin supervisor hook uses this to ping the plugin for health checks which it emits as task events. After the first successsful health check the plugin supervisor registers the plugin in the client's dynamic plugin registry, which in turn creates a CSI plugin manager instance that has its own gRPC client for fingerprinting the plugin and sending mount requests. If the plugin manager instance fails to connect to the plugin on its first attempt, it exits. The plugin supervisor hook is unaware that connection failed so long as its own pings continue to work. A transient failure during plugin startup may mislead the plugin supervisor hook into thinking the plugin is up (so there's no need to restart the allocation) but no fingerprinter is started. Update the plugin manager instance to retry the gRPC client connection until success. Move the client creation into the fingerprinter so that the volume manager is guaranteed to have a constructed client once fingerprinting is complete. Includes two other small improvements: * The plugin supervisor hook creates a new gRPC client for every probe and then throws it away. Instead, reuse the client as we do for the plugin manager. * The gRPC client constructor has a 1 second timeout. Clarify that this timeout applies to the connection and not the rest of the client lifetime. --- .../taskrunner/plugin_supervisor_hook.go | 63 +++++++++++-------- .../pluginmanager/csimanager/fingerprint.go | 45 +++++++++++++ client/pluginmanager/csimanager/instance.go | 18 ++---- 3 files changed, 87 insertions(+), 39 deletions(-) diff --git a/client/allocrunner/taskrunner/plugin_supervisor_hook.go b/client/allocrunner/taskrunner/plugin_supervisor_hook.go index 7cea2522ffd0..a77897b647fd 100644 --- a/client/allocrunner/taskrunner/plugin_supervisor_hook.go +++ b/client/allocrunner/taskrunner/plugin_supervisor_hook.go @@ -201,14 +201,14 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) { }() socketPath := filepath.Join(h.mountPoint, structs.CSISocketName) - t := time.NewTimer(0) - var client csi.CSIPlugin - defer func() { - if client != nil { - client.Close() - } - }() + client := h.newClient(ctx, socketPath) + if client == nil { + return // only hit this on shutdown + } + defer client.Close() + + t := time.NewTimer(0) // Step 1: Wait for the plugin to initially become available. WAITFORREADY: @@ -217,7 +217,7 @@ WAITFORREADY: case <-ctx.Done(): return case <-t.C: - pluginHealthy, err := h.supervisorLoopOnce(ctx, client, socketPath) + pluginHealthy, err := h.supervisorLoopOnce(ctx, client) if err != nil || !pluginHealthy { h.logger.Debug("CSI plugin not ready", "error", err) @@ -256,7 +256,7 @@ WAITFORREADY: deregisterPluginFn() return case <-t.C: - pluginHealthy, err := h.supervisorLoopOnce(ctx, client, socketPath) + pluginHealthy, err := h.supervisorLoopOnce(ctx, client) if err != nil { h.logger.Error("CSI plugin fingerprinting failed", "error", err) } @@ -288,6 +288,33 @@ WAITFORREADY: } } +func (h *csiPluginSupervisorHook) newClient(ctx context.Context, socketPath string) csi.CSIPlugin { + t := time.NewTimer(0) + + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + _, err := os.Stat(socketPath) + if err != nil { + h.logger.Debug("CSI plugin not ready: failed to stat socket", "error", err) + t.Reset(5 * time.Second) + continue + } + client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With( + "plugin.name", h.task.CSIPluginConfig.ID, + "plugin.type", h.task.CSIPluginConfig.Type)) + if err != nil { + h.logger.Debug("CSI plugin not ready: failed to create gRPC client", "error", err) + t.Reset(5 * time.Second) + continue + } + return client + } + } +} + func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPath string) (func(), error) { // At this point we know the plugin is ready and we can fingerprint it // to get its vendor name and version @@ -354,17 +381,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat }, nil } -func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin, socketPath string) (bool, error) { - _, err := os.Stat(socketPath) - if err != nil { - return false, fmt.Errorf("failed to stat socket: %v", err) - } - - client, err = h.newClient(socketPath) - if err != nil { - return false, fmt.Errorf("failed to create csi client: %v", err) - } - +func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin) (bool, error) { healthy, err := client.PluginProbe(ctx) if err != nil { return false, fmt.Errorf("failed to probe plugin: %v", err) @@ -373,12 +390,6 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client return healthy, nil } -func (h *csiPluginSupervisorHook) newClient(socketPath string) (csi.CSIPlugin, error) { - return csi.NewClient(socketPath, h.logger.Named("csi_client").With( - "plugin.name", h.task.CSIPluginConfig.ID, - "plugin.type", h.task.CSIPluginConfig.Type)) -} - // Stop is called after the task has exited and will not be started // again. It is the only hook guaranteed to be executed whenever // TaskRunner.Run is called (and not gracefully shutting down). diff --git a/client/pluginmanager/csimanager/fingerprint.go b/client/pluginmanager/csimanager/fingerprint.go index 981bb63c6752..8f0b48531424 100644 --- a/client/pluginmanager/csimanager/fingerprint.go +++ b/client/pluginmanager/csimanager/fingerprint.go @@ -3,6 +3,8 @@ package csimanager import ( "context" "fmt" + "os" + "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -39,6 +41,18 @@ type pluginFingerprinter struct { } func (p *pluginFingerprinter) fingerprint(ctx context.Context) *structs.CSIInfo { + + // to construct the gRPC client we have to connect, but if we + // shutdown before we've done so, we end up with no client and + // fingerprinting will panic + if p.client == nil { + p.client = p.newClient(ctx, p.info.ConnectionInfo.SocketPath) + if p.client == nil { + p.logger.Debug("plugin fingerprint shutdown while establishing client") + return nil + } + } + if p.basicInfo == nil { info, err := p.buildBasicFingerprint(ctx) if err != nil { @@ -175,6 +189,37 @@ func (p *pluginFingerprinter) buildNodeFingerprint(ctx context.Context, base *st return fp, nil } +func (p *pluginFingerprinter) newClient(ctx context.Context, socketPath string) csi.CSIPlugin { + t := time.NewTimer(0) + + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + _, err := os.Stat(socketPath) + if err != nil { + p.logger.Debug("CSI plugin not ready: failed to stat socket", "error", err) + t.Reset(5 * time.Second) + continue + } + client, err := csi.NewClient(socketPath, p.logger) + if err != nil { + p.logger.Debug("CSI plugin not ready: failed to create gRPC client", "error", err) + t.Reset(5 * time.Second) + continue + } + return client + } + } +} + +func (p *pluginFingerprinter) close() { + if p.client != nil { + p.client.Close() + } +} + func structCSITopologyFromCSITopology(a *csi.Topology) *structs.CSITopology { if a == nil { return nil diff --git a/client/pluginmanager/csimanager/instance.go b/client/pluginmanager/csimanager/instance.go index 062b73972783..ee8990b2142b 100644 --- a/client/pluginmanager/csimanager/instance.go +++ b/client/pluginmanager/csimanager/instance.go @@ -10,6 +10,7 @@ import ( ) const managerFingerprintInterval = 30 * time.Second +const managerFingerprintRetryInterval = 5 * time.Second // instanceManager is used to manage the fingerprinting and supervision of a // single CSI Plugin. @@ -73,15 +74,6 @@ func newInstanceManager(logger hclog.Logger, eventer TriggerNodeEvent, updater U } func (i *instanceManager) run() { - c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger) - if err != nil { - i.logger.Error("failed to setup instance manager client", "error", err) - close(i.shutdownCh) - return - } - i.client = c - i.fp.client = c - go i.setupVolumeManager() go i.runLoop() } @@ -96,6 +88,9 @@ func (i *instanceManager) setupVolumeManager() { case <-i.shutdownCtx.Done(): return case <-i.fp.hadFirstSuccessfulFingerprintCh: + // the fingerprinter creates the client so we can't access it + // until we've received on this channel + i.client = i.fp.client i.volumeManager = newVolumeManager(i.logger, i.eventer, i.client, i.mountPoint, i.containerMountPoint, i.fp.requiresStaging) i.logger.Debug("volume manager setup complete") close(i.volumeManagerSetupCh) @@ -124,10 +119,7 @@ func (i *instanceManager) runLoop() { for { select { case <-i.shutdownCtx.Done(): - if i.client != nil { - i.client.Close() - i.client = nil - } + i.fp.close() // run one last fingerprint so that we mark the plugin as unhealthy. // the client has been closed so this will return quickly with the