diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 0f4897f7ded9..2a2fe2963d13 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -20,18 +20,23 @@ import ( // // It is a noop for allocs that do not depend on CSI Volumes. type csiHook struct { - alloc *structs.Allocation - logger hclog.Logger - csimanager csimanager.Manager + alloc *structs.Allocation + logger hclog.Logger + csimanager csimanager.Manager + + // interfaces implemented by the allocRunner rpcClient RPCer taskCapabilityGetter taskCapabilityGetter updater hookResourceSetter - nodeSecret string + nodeSecret string volumeRequests map[string]*volumeAndRequest minBackoffInterval time.Duration maxBackoffInterval time.Duration maxBackoffDuration time.Duration + + shutdownCtx context.Context + shutdownCancelFn context.CancelFunc } // implemented by allocrunner @@ -40,6 +45,9 @@ type taskCapabilityGetter interface { } func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, updater hookResourceSetter, nodeSecret string) *csiHook { + + shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background()) + return &csiHook{ alloc: alloc, logger: logger.Named("csi_hook"), @@ -52,6 +60,8 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M minBackoffInterval: time.Second, maxBackoffInterval: time.Minute, maxBackoffDuration: time.Hour * 24, + shutdownCtx: shutdownCtx, + shutdownCancelFn: shutdownCancelFn, } } @@ -64,11 +74,6 @@ func (c *csiHook) Prerun() error { return nil } - // We use this context only to attach hclog to the gRPC context. The - // lifetime is the lifetime of the gRPC stream, not specific RPC timeouts, - // but we manage the stream lifetime via Close in the pluginmanager. - ctx := context.Background() - volumes, err := c.claimVolumesFromAlloc() if err != nil { return fmt.Errorf("claim volumes: %v", err) @@ -77,7 +82,12 @@ func (c *csiHook) Prerun() error { mounts := make(map[string]*csimanager.MountInfo, len(volumes)) for alias, pair := range volumes { - mounter, err := c.csimanager.MounterForPlugin(ctx, pair.volume.PluginID) + + // We use this context only to attach hclog to the gRPC + // context. The lifetime is the lifetime of the gRPC stream, + // not specific RPC timeouts, but we manage the stream + // lifetime via Close in the pluginmanager. + mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, pair.volume.PluginID) if err != nil { return err } @@ -89,7 +99,8 @@ func (c *csiHook) Prerun() error { MountOptions: pair.request.MountOptions, } - mountInfo, err := mounter.MountVolume(ctx, pair.volume, c.alloc, usageOpts, pair.publishContext) + mountInfo, err := mounter.MountVolume( + c.shutdownCtx, pair.volume, c.alloc, usageOpts, pair.publishContext) if err != nil { return err } @@ -118,21 +129,27 @@ func (c *csiHook) Postrun() error { for _, pair := range c.volumeRequests { wg.Add(1) - - // CSI RPCs can potentially fail for a very long time if a - // node plugin has failed. split the work into goroutines so - // that operators could potentially reuse one of a set of - // volumes even if this hook is stuck waiting on the others + // CSI RPCs can potentially take a long time. Split the work + // into goroutines so that operators could potentially reuse + // one of a set of volumes go func(pair *volumeAndRequest) { defer wg.Done() - - // we can recover an unmount failure if the operator - // brings the plugin back up, so retry every few minutes - // but eventually give up - err := c.unmountWithRetry(pair) + err := c.unmountImpl(pair) if err != nil { - errs <- err - return + // we can recover an unmount failure if the operator + // brings the plugin back up, so retry every few minutes + // but eventually give up. Don't block shutdown so that + // we don't block shutting down the client in -dev mode + go func(pair *volumeAndRequest) { + err := c.unmountWithRetry(pair) + if err != nil { + c.logger.Error("volume could not be unmounted") + } + err = c.unpublish(pair) + if err != nil { + c.logger.Error("volume could not be unpublished") + } + }(pair) } // we can't recover from this RPC error client-side; the @@ -236,12 +253,7 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error) // with exponential backoff capped to a maximum interval func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.CSIVolumeClaimResponse, error) { - // note: allocrunner hooks don't have access to the client's - // shutdown context, just the allocrunner's shutdown; if we make - // it available in the future we should thread it through here so - // that retry can exit gracefully instead of dropping the - // in-flight goroutine - ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration) + ctx, cancel := context.WithTimeout(c.shutdownCtx, c.maxBackoffDuration) defer cancel() var resp structs.CSIVolumeClaimResponse @@ -348,12 +360,7 @@ func (c *csiHook) unpublish(pair *volumeAndRequest) error { // exponential backoff capped to a maximum interval func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { - // note: allocrunner hooks don't have access to the client's - // shutdown context, just the allocrunner's shutdown; if we make - // it available in the future we should thread it through here so - // that retry can exit gracefully instead of dropping the - // in-flight goroutine - ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration) + ctx, cancel := context.WithTimeout(c.shutdownCtx, c.maxBackoffDuration) defer cancel() var err error backoff := c.minBackoffInterval @@ -388,7 +395,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { // NodeEvent func (c *csiHook) unmountImpl(pair *volumeAndRequest) error { - mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID) + mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, pair.volume.PluginID) if err != nil { return err } @@ -400,6 +407,23 @@ func (c *csiHook) unmountImpl(pair *volumeAndRequest) error { MountOptions: pair.request.MountOptions, } - return mounter.UnmountVolume(context.TODO(), + return mounter.UnmountVolume(c.shutdownCtx, pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts) } + +// Shutdown will get called when the client is gracefully +// stopping. Cancel our shutdown context so that we don't block client +// shutdown while in the CSI RPC retry loop. +func (c *csiHook) Shutdown() { + c.logger.Trace("shutting down hook") + c.shutdownCancelFn() +} + +// Destroy will get called when an allocation gets GC'd on the client +// or when a -dev mode client is stopped. Cancel our shutdown context +// so that we don't block client shutdown while in the CSI RPC retry +// loop. +func (c *csiHook) Destroy() { + c.logger.Trace("destroying hook") + c.shutdownCancelFn() +} diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 1af7f62a75f4..1d3b04ed36a3 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -24,10 +24,6 @@ import ( var _ interfaces.RunnerPrerunHook = (*csiHook)(nil) var _ interfaces.RunnerPostrunHook = (*csiHook)(nil) -// TODO https://github.com/hashicorp/nomad/issues/11786 -// we should implement Update as well -// var _ interfaces.RunnerUpdateHook = (*csiHook)(nil) - func TestCSIHook(t *testing.T) { ci.Parallel(t)