From b775a73ded3c03a94986d8b63f579c3a8ea19f0e Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 15 Feb 2022 16:57:29 -0500 Subject: [PATCH] CSI: make gRPC client creation more robust (#12057) Nomad communicates with CSI plugin tasks via gRPC. The plugin supervisor hook uses this to ping the plugin for health checks which it emits as task events. After the first successful health check the plugin supervisor registers the plugin in the client's dynamic plugin registry, which in turn creates a CSI plugin manager instance that has its own gRPC client for fingerprinting the plugin and sending mount requests. If the plugin manager instance fails to connect to the plugin on its first attempt, it exits. The plugin supervisor hook is unaware that connection failed so long as its own pings continue to work. A transient failure during plugin startup may mislead the plugin supervisor hook into thinking the plugin is up (so there's no need to restart the allocation) but no fingerprinter is started. * Refactors the gRPC client to connect on first use. This provides the plugin manager instance the ability to retry the gRPC client connection until success. * Add a 30s timeout to the plugin supervisor so that we don't poll forever waiting for a plugin that will never come back up. Minor improvements: * The plugin supervisor hook creates a new gRPC client for every probe and then throws it away. Instead, reuse the client as we do for the plugin manager. * The gRPC client constructor has a 1 second timeout. Clarify that this timeout applies to the connection and not the rest of the client lifetime. --- .changelog/12057.txt | 7 + .../taskrunner/plugin_supervisor_hook.go | 90 +++++---- .../taskrunner/task_runner_hooks.go | 1 + client/client.go | 6 +- client/pluginmanager/csimanager/instance.go | 7 +- plugins/csi/client.go | 186 ++++++++++-------- plugins/csi/client_test.go | 50 +++-- 7 files changed, 201 insertions(+), 146 deletions(-) create mode 100644 .changelog/12057.txt diff --git a/.changelog/12057.txt b/.changelog/12057.txt new file mode 100644 index 000000000000..8e74a8fe045b --- /dev/null +++ b/.changelog/12057.txt @@ -0,0 +1,7 @@ +```release-note:bug +csi: Fixed a bug where the plugin instance manager would not retry the initial gRPC connection to plugins +``` + +```release-note:bug +csi: Fixed a bug where the plugin supervisor would not restart the task if it failed to connect to the plugin +``` diff --git a/client/allocrunner/taskrunner/plugin_supervisor_hook.go b/client/allocrunner/taskrunner/plugin_supervisor_hook.go index 9adb8d53bf50..679fb2f73967 100644 --- a/client/allocrunner/taskrunner/plugin_supervisor_hook.go +++ b/client/allocrunner/taskrunner/plugin_supervisor_hook.go @@ -38,6 +38,7 @@ type csiPluginSupervisorHook struct { // eventEmitter is used to emit events to the task eventEmitter ti.EventEmitter + lifecycle ti.TaskLifecycle shutdownCtx context.Context shutdownCancelFn context.CancelFunc @@ -54,6 +55,7 @@ type csiPluginSupervisorHookConfig struct { clientStateDirPath string events ti.EventEmitter runner *TaskRunner + lifecycle ti.TaskLifecycle capabilities *drivers.Capabilities logger hclog.Logger } @@ -90,6 +92,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi hook := &csiPluginSupervisorHook{ alloc: config.runner.Alloc(), runner: config.runner, + lifecycle: config.lifecycle, logger: config.logger, task: task, mountPoint: pluginRoot, @@ -201,27 +204,41 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) { }() socketPath := filepath.Join(h.mountPoint, structs.CSISocketName) + + client := csi.NewClient(socketPath, h.logger.Named("csi_client").With( + "plugin.name", h.task.CSIPluginConfig.ID, + "plugin.type", h.task.CSIPluginConfig.Type)) + defer client.Close() + t := time.NewTimer(0) + // We're in Poststart at this point, so if we can't connect within + // this deadline, assume it's broken so we can restart the task + startCtx, startCancelFn := context.WithTimeout(ctx, 30*time.Second) + defer startCancelFn() + + var err error + var pluginHealthy bool + // Step 1: Wait for the plugin to initially become available. WAITFORREADY: for { select { - case <-ctx.Done(): + case <-startCtx.Done(): + h.kill(ctx, fmt.Errorf("CSI plugin failed probe: %v", err)) return case <-t.C: - pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath) + pluginHealthy, err = h.supervisorLoopOnce(startCtx, client) if err != nil || !pluginHealthy { - h.logger.Debug("CSI Plugin not ready", "error", err) - - // Plugin is not yet returning healthy, because we want to optimise for - // quickly bringing a plugin online, we use a short timeout here. - // TODO(dani): Test with more plugins and adjust. + h.logger.Debug("CSI plugin not ready", "error", err) + // Use only a short delay here to optimize for quickly + // bringing up a plugin t.Reset(5 * time.Second) continue } // Mark the plugin as healthy in a task event + h.logger.Debug("CSI plugin is ready") h.previousHealthState = pluginHealthy event := structs.NewTaskEvent(structs.TaskPluginHealthy) event.SetMessage(fmt.Sprintf("plugin: %s", h.task.CSIPluginConfig.ID)) @@ -232,15 +249,14 @@ WAITFORREADY: } // Step 2: Register the plugin with the catalog. - deregisterPluginFn, err := h.registerPlugin(socketPath) + deregisterPluginFn, err := h.registerPlugin(client, socketPath) if err != nil { - h.logger.Error("CSI Plugin registration failed", "error", err) - event := structs.NewTaskEvent(structs.TaskPluginUnhealthy) - event.SetMessage(fmt.Sprintf("failed to register plugin: %s, reason: %v", h.task.CSIPluginConfig.ID, err)) - h.eventEmitter.EmitEvent(event) + h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err)) + return } - // Step 3: Start the lightweight supervisor loop. + // Step 3: Start the lightweight supervisor loop. At this point, failures + // don't cause the task to restart t.Reset(0) for { select { @@ -249,9 +265,9 @@ WAITFORREADY: deregisterPluginFn() return case <-t.C: - pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath) + pluginHealthy, err := h.supervisorLoopOnce(ctx, client) if err != nil { - h.logger.Error("CSI Plugin fingerprinting failed", "error", err) + h.logger.Error("CSI plugin fingerprinting failed", "error", err) } // The plugin has transitioned to a healthy state. Emit an event. @@ -265,7 +281,7 @@ WAITFORREADY: if h.previousHealthState && !pluginHealthy { event := structs.NewTaskEvent(structs.TaskPluginUnhealthy) if err != nil { - event.SetMessage(fmt.Sprintf("error: %v", err)) + event.SetMessage(fmt.Sprintf("Error: %v", err)) } else { event.SetMessage("Unknown Reason") } @@ -281,16 +297,9 @@ WAITFORREADY: } } -func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), error) { - +func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPath string) (func(), error) { // At this point we know the plugin is ready and we can fingerprint it // to get its vendor name and version - client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type)) - if err != nil { - return nil, fmt.Errorf("failed to create csi client: %v", err) - } - defer client.Close() - info, err := client.PluginInfo() if err != nil { return nil, fmt.Errorf("failed to probe plugin: %v", err) @@ -354,21 +363,13 @@ func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), err }, nil } -func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, socketPath string) (bool, error) { - _, err := os.Stat(socketPath) - if err != nil { - return false, fmt.Errorf("failed to stat socket: %v", err) - } +func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin) (bool, error) { + probeCtx, probeCancelFn := context.WithTimeout(ctx, 5*time.Second) + defer probeCancelFn() - client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type)) + healthy, err := client.PluginProbe(probeCtx) if err != nil { - return false, fmt.Errorf("failed to create csi client: %v", err) - } - defer client.Close() - - healthy, err := client.PluginProbe(ctx) - if err != nil { - return false, fmt.Errorf("failed to probe plugin: %v", err) + return false, err } return healthy, nil @@ -387,6 +388,21 @@ func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskSt return nil } +func (h *csiPluginSupervisorHook) kill(ctx context.Context, reason error) { + h.logger.Error("killing task because plugin failed", "error", reason) + event := structs.NewTaskEvent(structs.TaskPluginUnhealthy) + event.SetMessage(fmt.Sprintf("Error: %v", reason.Error())) + h.eventEmitter.EmitEvent(event) + + if err := h.lifecycle.Kill(ctx, + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage("CSI plugin did not become healthy before timeout"), + ); err != nil { + h.logger.Error("failed to kill task", "kill_reason", reason, "error", err) + } +} + func ensureMountpointInserted(mounts []*drivers.MountConfig, mount *drivers.MountConfig) []*drivers.MountConfig { for _, mnt := range mounts { if mnt.IsEqual(mount) { diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index b3f44a8b9855..62ff26c4b625 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -76,6 +76,7 @@ func (tr *TaskRunner) initHooks() { clientStateDirPath: tr.clientConfig.StateDir, events: tr, runner: tr, + lifecycle: tr, capabilities: tr.driverCapabilities, logger: hookLogger, })) diff --git a/client/client.go b/client/client.go index 34428464aeaf..dd21b66c0c29 100644 --- a/client/client.go +++ b/client/client.go @@ -390,11 +390,11 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie c.dynamicRegistry = dynamicplugins.NewRegistry(c.stateDB, map[string]dynamicplugins.PluginDispenser{ dynamicplugins.PluginTypeCSIController: func(info *dynamicplugins.PluginInfo) (interface{}, error) { - return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller")) + return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller")), nil }, dynamicplugins.PluginTypeCSINode: func(info *dynamicplugins.PluginInfo) (interface{}, error) { - return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client")) - }, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up + return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client")), nil + }, }) // Setup the clients RPC server diff --git a/client/pluginmanager/csimanager/instance.go b/client/pluginmanager/csimanager/instance.go index 062b73972783..3839113dcd4e 100644 --- a/client/pluginmanager/csimanager/instance.go +++ b/client/pluginmanager/csimanager/instance.go @@ -73,12 +73,7 @@ func newInstanceManager(logger hclog.Logger, eventer TriggerNodeEvent, updater U } func (i *instanceManager) run() { - c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger) - if err != nil { - i.logger.Error("failed to setup instance manager client", "error", err) - close(i.shutdownCh) - return - } + c := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger) i.client = c i.fp.client = c diff --git a/plugins/csi/client.go b/plugins/csi/client.go index 89237dbbf5a3..6d34d8f55aa2 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "net" + "os" "time" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" @@ -88,6 +89,7 @@ type CSINodeClient interface { } type client struct { + addr string conn *grpc.ClientConn identityClient csipbv1.IdentityClient controllerClient CSIControllerClient @@ -102,30 +104,59 @@ func (c *client) Close() error { return nil } -func NewClient(addr string, logger hclog.Logger) (CSIPlugin, error) { - if addr == "" { - return nil, fmt.Errorf("address is empty") +func NewClient(addr string, logger hclog.Logger) CSIPlugin { + return &client{ + addr: addr, + logger: logger, } +} - conn, err := newGrpcConn(addr, logger) - if err != nil { - return nil, err +func (c *client) ensureConnected(ctx context.Context) error { + if c == nil { + return fmt.Errorf("client not initialized") + } + if c.conn != nil { + return nil + } + if c.addr == "" { + return fmt.Errorf("address is empty") + } + var conn *grpc.ClientConn + var err error + t := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout while connecting to gRPC socket: %v", err) + case <-t.C: + _, err = os.Stat(c.addr) + if err != nil { + err = fmt.Errorf("failed to stat socket: %v", err) + t.Reset(5 * time.Second) + continue + } + conn, err = newGrpcConn(c.addr, c.logger) + if err != nil { + err = fmt.Errorf("failed to create gRPC connection: %v", err) + t.Reset(time.Second * 5) + continue + } + c.conn = conn + c.identityClient = csipbv1.NewIdentityClient(conn) + c.controllerClient = csipbv1.NewControllerClient(conn) + c.nodeClient = csipbv1.NewNodeClient(conn) + return nil + } } - - return &client{ - conn: conn, - identityClient: csipbv1.NewIdentityClient(conn), - controllerClient: csipbv1.NewControllerClient(conn), - nodeClient: csipbv1.NewNodeClient(conn), - logger: logger, - }, nil } func newGrpcConn(addr string, logger hclog.Logger) (*grpc.ClientConn, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + // after DialContext returns w/ initial connection, closing this + // context is a no-op + connectCtx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() conn, err := grpc.DialContext( - ctx, + connectCtx, addr, grpc.WithBlock(), grpc.WithInsecure(), @@ -146,10 +177,14 @@ func newGrpcConn(addr string, logger hclog.Logger) (*grpc.ClientConn, error) { // PluginInfo describes the type and version of a plugin as required by the nomad // base.BasePlugin interface. func (c *client) PluginInfo() (*base.PluginInfoResponse, error) { - // note: no grpc retries needed here, as this is called in - // fingerprinting and will get retried by the caller. ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + + // note: no grpc retries needed here, as this is called in + // fingerprinting and will get retried by the caller. name, version, err := c.PluginGetInfo(ctx) if err != nil { return nil, err @@ -176,6 +211,10 @@ func (c *client) SetConfig(_ *base.Config) error { } func (c *client) PluginProbe(ctx context.Context) (bool, error) { + if err := c.ensureConnected(ctx); err != nil { + return false, err + } + // note: no grpc retries should be done here req, err := c.identityClient.Probe(ctx, &csipbv1.ProbeRequest{}) if err != nil { @@ -198,11 +237,8 @@ func (c *client) PluginProbe(ctx context.Context) (bool, error) { } func (c *client) PluginGetInfo(ctx context.Context) (string, string, error) { - if c == nil { - return "", "", fmt.Errorf("Client not initialized") - } - if c.identityClient == nil { - return "", "", fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return "", "", err } resp, err := c.identityClient.GetPluginInfo(ctx, &csipbv1.GetPluginInfoRequest{}) @@ -220,11 +256,8 @@ func (c *client) PluginGetInfo(ctx context.Context) (string, string, error) { } func (c *client) PluginGetCapabilities(ctx context.Context) (*PluginCapabilitySet, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.identityClient == nil { - return nil, fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } // note: no grpc retries needed here, as this is called in @@ -243,11 +276,8 @@ func (c *client) PluginGetCapabilities(ctx context.Context) (*PluginCapabilitySe // func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapabilitySet, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.controllerClient == nil { - return nil, fmt.Errorf("controllerClient not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } // note: no grpc retries needed here, as this is called in @@ -262,11 +292,8 @@ func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapa } func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*ControllerPublishVolumeResponse, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.controllerClient == nil { - return nil, fmt.Errorf("controllerClient not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } err := req.Validate() @@ -304,11 +331,8 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub } func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*ControllerUnpublishVolumeResponse, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.controllerClient == nil { - return nil, fmt.Errorf("controllerClient not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } err := req.Validate() if err != nil { @@ -337,13 +361,9 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU } func (c *client) ControllerValidateCapabilities(ctx context.Context, req *ControllerValidateVolumeRequest, opts ...grpc.CallOption) error { - if c == nil { - return fmt.Errorf("Client not initialized") - } - if c.controllerClient == nil { - return fmt.Errorf("controllerClient not initialized") + if err := c.ensureConnected(ctx); err != nil { + return err } - if req.ExternalID == "" { return fmt.Errorf("missing volume ID") } @@ -390,6 +410,10 @@ func (c *client) ControllerValidateCapabilities(ctx context.Context, req *Contro } func (c *client) ControllerCreateVolume(ctx context.Context, req *ControllerCreateVolumeRequest, opts ...grpc.CallOption) (*ControllerCreateVolumeResponse, error) { + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + err := req.Validate() if err != nil { return nil, err @@ -433,6 +457,10 @@ func (c *client) ControllerCreateVolume(ctx context.Context, req *ControllerCrea } func (c *client) ControllerListVolumes(ctx context.Context, req *ControllerListVolumesRequest, opts ...grpc.CallOption) (*ControllerListVolumesResponse, error) { + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + err := req.Validate() if err != nil { return nil, err @@ -455,6 +483,10 @@ func (c *client) ControllerListVolumes(ctx context.Context, req *ControllerListV } func (c *client) ControllerDeleteVolume(ctx context.Context, req *ControllerDeleteVolumeRequest, opts ...grpc.CallOption) error { + if err := c.ensureConnected(ctx); err != nil { + return err + } + err := req.Validate() if err != nil { return err @@ -552,6 +584,10 @@ NEXT_CAP: } func (c *client) ControllerCreateSnapshot(ctx context.Context, req *ControllerCreateSnapshotRequest, opts ...grpc.CallOption) (*ControllerCreateSnapshotResponse, error) { + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + err := req.Validate() if err != nil { return nil, err @@ -596,6 +632,10 @@ func (c *client) ControllerCreateSnapshot(ctx context.Context, req *ControllerCr } func (c *client) ControllerDeleteSnapshot(ctx context.Context, req *ControllerDeleteSnapshotRequest, opts ...grpc.CallOption) error { + if err := c.ensureConnected(ctx); err != nil { + return err + } + err := req.Validate() if err != nil { return err @@ -626,6 +666,10 @@ func (c *client) ControllerDeleteSnapshot(ctx context.Context, req *ControllerDe } func (c *client) ControllerListSnapshots(ctx context.Context, req *ControllerListSnapshotsRequest, opts ...grpc.CallOption) (*ControllerListSnapshotsResponse, error) { + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + err := req.Validate() if err != nil { return nil, err @@ -657,11 +701,8 @@ func (c *client) ControllerListSnapshots(ctx context.Context, req *ControllerLis // func (c *client) NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.nodeClient == nil { - return nil, fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } // note: no grpc retries needed here, as this is called in @@ -675,11 +716,8 @@ func (c *client) NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, e } func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.nodeClient == nil { - return nil, fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } result := &NodeGetInfoResponse{} @@ -706,11 +744,8 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error) } func (c *client) NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error { - if c == nil { - return fmt.Errorf("Client not initialized") - } - if c.nodeClient == nil { - return fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return err } err := req.Validate() if err != nil { @@ -741,11 +776,8 @@ func (c *client) NodeStageVolume(ctx context.Context, req *NodeStageVolumeReques } func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, stagingTargetPath string, opts ...grpc.CallOption) error { - if c == nil { - return fmt.Errorf("Client not initialized") - } - if c.nodeClient == nil { - return fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return err } // These errors should not be returned during production use but exist as aids // during Nomad development @@ -779,13 +811,9 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging } func (c *client) NodePublishVolume(ctx context.Context, req *NodePublishVolumeRequest, opts ...grpc.CallOption) error { - if c == nil { - return fmt.Errorf("Client not initialized") - } - if c.nodeClient == nil { - return fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return err } - if err := req.Validate(); err != nil { return fmt.Errorf("validation error: %v", err) } @@ -813,13 +841,9 @@ func (c *client) NodePublishVolume(ctx context.Context, req *NodePublishVolumeRe } func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string, opts ...grpc.CallOption) error { - if c == nil { - return fmt.Errorf("Client not initialized") - } - if c.nodeClient == nil { - return fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return err } - // These errors should not be returned during production use but exist as aids // during Nomad development if volumeID == "" { diff --git a/plugins/csi/client_test.go b/plugins/csi/client_test.go index 40da479b7dc3..1c951b2bac96 100644 --- a/plugins/csi/client_test.go +++ b/plugins/csi/client_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "path/filepath" "testing" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" @@ -11,15 +12,26 @@ import ( "github.com/hashicorp/nomad/nomad/structs" fake "github.com/hashicorp/nomad/plugins/csi/testing" "github.com/stretchr/testify/require" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -func newTestClient() (*fake.IdentityClient, *fake.ControllerClient, *fake.NodeClient, CSIPlugin) { +func newTestClient(t *testing.T) (*fake.IdentityClient, *fake.ControllerClient, *fake.NodeClient, CSIPlugin) { ic := fake.NewIdentityClient() cc := fake.NewControllerClient() nc := fake.NewNodeClient() + + // we've set this as non-blocking so it won't connect to the + // socket unless a RPC is invoked + conn, err := grpc.DialContext(context.Background(), + filepath.Join(t.TempDir(), "csi.sock"), grpc.WithInsecure()) + if err != nil { + t.Errorf("failed: %v", err) + } + client := &client{ + conn: conn, identityClient: ic, controllerClient: cc, nodeClient: nc, @@ -69,7 +81,7 @@ func TestClient_RPC_PluginProbe(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - ic, _, _, client := newTestClient() + ic, _, _, client := newTestClient(t) defer client.Close() ic.NextErr = tc.ResponseErr @@ -121,7 +133,7 @@ func TestClient_RPC_PluginInfo(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - ic, _, _, client := newTestClient() + ic, _, _, client := newTestClient(t) defer client.Close() ic.NextErr = tc.ResponseErr @@ -186,7 +198,7 @@ func TestClient_RPC_PluginGetCapabilities(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - ic, _, _, client := newTestClient() + ic, _, _, client := newTestClient(t) defer client.Close() ic.NextErr = tc.ResponseErr @@ -284,7 +296,7 @@ func TestClient_RPC_ControllerGetCapabilities(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -342,7 +354,7 @@ func TestClient_RPC_NodeGetCapabilities(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, _, nc, client := newTestClient() + _, _, nc, client := newTestClient(t) defer client.Close() nc.NextErr = tc.ResponseErr @@ -407,7 +419,7 @@ func TestClient_RPC_ControllerPublishVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -453,7 +465,7 @@ func TestClient_RPC_ControllerUnpublishVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -661,7 +673,7 @@ func TestClient_RPC_ControllerValidateVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() requestedCaps := []*VolumeCapability{{ @@ -758,7 +770,7 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) { } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() req := &ControllerCreateVolumeRequest{ @@ -828,7 +840,7 @@ func TestClient_RPC_ControllerDeleteVolume(t *testing.T) { } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -871,7 +883,7 @@ func TestClient_RPC_ControllerListVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -979,7 +991,7 @@ func TestClient_RPC_ControllerCreateSnapshot(t *testing.T) { } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -1025,7 +1037,7 @@ func TestClient_RPC_ControllerDeleteSnapshot(t *testing.T) { } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -1068,7 +1080,7 @@ func TestClient_RPC_ControllerListSnapshots(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -1124,7 +1136,7 @@ func TestClient_RPC_NodeStageVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, _, nc, client := newTestClient() + _, _, nc, client := newTestClient(t) defer client.Close() nc.NextErr = tc.ResponseErr @@ -1165,7 +1177,7 @@ func TestClient_RPC_NodeUnstageVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, _, nc, client := newTestClient() + _, _, nc, client := newTestClient(t) defer client.Close() nc.NextErr = tc.ResponseErr @@ -1221,7 +1233,7 @@ func TestClient_RPC_NodePublishVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, _, nc, client := newTestClient() + _, _, nc, client := newTestClient(t) defer client.Close() nc.NextErr = tc.ResponseErr @@ -1274,7 +1286,7 @@ func TestClient_RPC_NodeUnpublishVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, _, nc, client := newTestClient() + _, _, nc, client := newTestClient(t) defer client.Close() nc.NextErr = tc.ResponseErr