Skip to content

Commit

Permalink
csi: plugin instance manager should retry creating gRPC client
Browse files Browse the repository at this point in the history
Nomad communicates with CSI plugin tasks via gRPC. The plugin
supervisor hook uses this to ping the plugin for health checks which
it emits as task events. After the first successsful health check the
plugin supervisor registers the plugin in the client's dynamic plugin
registry, which in turn creates a CSI plugin manager instance that has
its own gRPC client for fingerprinting the plugin and sending mount
requests.

If the plugin manager instance fails to connect to the plugin on its
first attempt, it exits. The plugin supervisor hook is unaware that
connection failed so long as its own pings continue to work. A
transient failure during plugin startup may mislead the plugin
supervisor hook into thinking the plugin is up (so there's no need to
restart the allocation) but no fingerprinter is started. Update the
plugin manager instance to retry the gRPC client connection until
success. Move the client creation into the fingerprinter so that
the volume manager is guaranteed to have a constructed client once
fingerprinting is complete.

Includes two other small improvements:
* The plugin supervisor hook creates a new gRPC client for every probe
  and then throws it away. Instead, reuse the client as we do for the
  plugin manager.
* The gRPC client constructor has a 1 second timeout. Clarify that this
  timeout applies to the connection and not the rest of the client
  lifetime.
  • Loading branch information
tgross committed Feb 14, 2022
1 parent 72e19c3 commit 7ab05b0
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 39 deletions.
63 changes: 37 additions & 26 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
}()

socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)
t := time.NewTimer(0)

var client csi.CSIPlugin
defer func() {
if client != nil {
client.Close()
}
}()
client := h.newClient(ctx, socketPath)
if client == nil {
return // only hit this on shutdown
}
defer client.Close()

t := time.NewTimer(0)

// Step 1: Wait for the plugin to initially become available.
WAITFORREADY:
Expand All @@ -217,7 +217,7 @@ WAITFORREADY:
case <-ctx.Done():
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, client, socketPath)
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
if err != nil || !pluginHealthy {
h.logger.Debug("CSI plugin not ready", "error", err)

Expand Down Expand Up @@ -256,7 +256,7 @@ WAITFORREADY:
deregisterPluginFn()
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, client, socketPath)
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
if err != nil {
h.logger.Error("CSI plugin fingerprinting failed", "error", err)
}
Expand Down Expand Up @@ -288,6 +288,33 @@ WAITFORREADY:
}
}

func (h *csiPluginSupervisorHook) newClient(ctx context.Context, socketPath string) csi.CSIPlugin {
t := time.NewTimer(0)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
_, err := os.Stat(socketPath)
if err != nil {
h.logger.Debug("CSI plugin not ready: failed to stat socket", "error", err)
t.Reset(5 * time.Second)
continue
}
client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
if err != nil {
h.logger.Debug("CSI plugin not ready: failed to create gRPC client", "error", err)
t.Reset(5 * time.Second)
continue
}
return client
}
}
}

func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPath string) (func(), error) {
// At this point we know the plugin is ready and we can fingerprint it
// to get its vendor name and version
Expand Down Expand Up @@ -354,17 +381,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
}, nil
}

func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin, socketPath string) (bool, error) {
_, err := os.Stat(socketPath)
if err != nil {
return false, fmt.Errorf("failed to stat socket: %v", err)
}

client, err = h.newClient(socketPath)
if err != nil {
return false, fmt.Errorf("failed to create csi client: %v", err)
}

func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin) (bool, error) {
healthy, err := client.PluginProbe(ctx)
if err != nil {
return false, fmt.Errorf("failed to probe plugin: %v", err)
Expand All @@ -373,12 +390,6 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client
return healthy, nil
}

func (h *csiPluginSupervisorHook) newClient(socketPath string) (csi.CSIPlugin, error) {
return csi.NewClient(socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
}

// Stop is called after the task has exited and will not be started
// again. It is the only hook guaranteed to be executed whenever
// TaskRunner.Run is called (and not gracefully shutting down).
Expand Down
45 changes: 45 additions & 0 deletions client/pluginmanager/csimanager/fingerprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package csimanager
import (
"context"
"fmt"
"os"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/dynamicplugins"
Expand Down Expand Up @@ -39,6 +41,18 @@ type pluginFingerprinter struct {
}

func (p *pluginFingerprinter) fingerprint(ctx context.Context) *structs.CSIInfo {

// to construct the gRPC client we have to connect, but if we
// shutdown before we've done so, we end up with no client and
// fingerprinting will panic
if p.client == nil {
p.client = p.newClient(ctx, p.info.ConnectionInfo.SocketPath)
if p.client == nil {
p.logger.Debug("plugin fingerprint shutdown while establishing client")
return nil
}
}

if p.basicInfo == nil {
info, err := p.buildBasicFingerprint(ctx)
if err != nil {
Expand Down Expand Up @@ -175,6 +189,37 @@ func (p *pluginFingerprinter) buildNodeFingerprint(ctx context.Context, base *st
return fp, nil
}

func (p *pluginFingerprinter) newClient(ctx context.Context, socketPath string) csi.CSIPlugin {
t := time.NewTimer(0)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
_, err := os.Stat(socketPath)
if err != nil {
p.logger.Debug("CSI plugin not ready: failed to stat socket", "error", err)
t.Reset(5 * time.Second)
continue
}
client, err := csi.NewClient(socketPath, p.logger)
if err != nil {
p.logger.Debug("CSI plugin not ready: failed to create gRPC client", "error", err)
t.Reset(5 * time.Second)
continue
}
return client
}
}
}

func (p *pluginFingerprinter) close() {
if p.client != nil {
p.client.Close()
}
}

func structCSITopologyFromCSITopology(a *csi.Topology) *structs.CSITopology {
if a == nil {
return nil
Expand Down
18 changes: 5 additions & 13 deletions client/pluginmanager/csimanager/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
)

const managerFingerprintInterval = 30 * time.Second
const managerFingerprintRetryInterval = 5 * time.Second

// instanceManager is used to manage the fingerprinting and supervision of a
// single CSI Plugin.
Expand Down Expand Up @@ -73,15 +74,6 @@ func newInstanceManager(logger hclog.Logger, eventer TriggerNodeEvent, updater U
}

func (i *instanceManager) run() {
c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
if err != nil {
i.logger.Error("failed to setup instance manager client", "error", err)
close(i.shutdownCh)
return
}
i.client = c
i.fp.client = c

go i.setupVolumeManager()
go i.runLoop()
}
Expand All @@ -96,6 +88,9 @@ func (i *instanceManager) setupVolumeManager() {
case <-i.shutdownCtx.Done():
return
case <-i.fp.hadFirstSuccessfulFingerprintCh:
// the fingerprinter creates the client so we can't access it
// until we've received on this channel
i.client = i.fp.client
i.volumeManager = newVolumeManager(i.logger, i.eventer, i.client, i.mountPoint, i.containerMountPoint, i.fp.requiresStaging)
i.logger.Debug("volume manager setup complete")
close(i.volumeManagerSetupCh)
Expand Down Expand Up @@ -124,10 +119,7 @@ func (i *instanceManager) runLoop() {
for {
select {
case <-i.shutdownCtx.Done():
if i.client != nil {
i.client.Close()
i.client = nil
}
i.fp.close()

// run one last fingerprint so that we mark the plugin as unhealthy.
// the client has been closed so this will return quickly with the
Expand Down

0 comments on commit 7ab05b0

Please sign in to comment.