diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index e7e7385a3e8a..6fb1b2866abb 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -3,6 +3,8 @@ package allocrunner import ( "context" "fmt" + "sync" + "time" hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" @@ -24,7 +26,9 @@ type csiHook struct { updater hookResourceSetter nodeSecret string - volumeRequests map[string]*volumeAndRequest + volumeRequests map[string]*volumeAndRequest + maxBackoffInterval time.Duration + maxBackoffDuration time.Duration } // implemented by allocrunner @@ -42,6 +46,8 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M updater: updater, nodeSecret: nodeSecret, volumeRequests: map[string]*volumeAndRequest{}, + maxBackoffInterval: time.Minute, + maxBackoffDuration: time.Hour * 24, } } @@ -103,41 +109,43 @@ func (c *csiHook) Postrun() error { return nil } - var mErr *multierror.Error + var wg sync.WaitGroup + errs := make(chan error, len(c.volumeRequests)) for _, pair := range c.volumeRequests { + wg.Add(1) + + // CSI RPCs can potentially fail for a very long time if a + // node plugin has failed. split the work into goroutines so + // that operators could potentially reuse one of a set of + // volumes even if this hook is stuck waiting on the others + go func(pair *volumeAndRequest) { + defer wg.Done() + + // we can recover an unmount failure if the operator + // brings the plugin back up, so retry every few minutes + // but eventually give up + err := c.unmountWithRetry(pair) + if err != nil { + errs <- err + return + } - mode := structs.CSIVolumeClaimRead - if !pair.request.ReadOnly { - mode = structs.CSIVolumeClaimWrite - } + // we can't recover from this RPC error client-side; the + // volume claim GC job will have to clean up for us once + // the allocation is marked terminal + errs <- c.unpublish(pair) + }(pair) + } - source := pair.request.Source - if pair.request.PerAlloc { - // NOTE: PerAlloc can't be set if we have canaries - source = source + structs.AllocSuffix(c.alloc.Name) - } + wg.Wait() + close(errs) // so we don't block waiting if there were no errors - req := &structs.CSIVolumeUnpublishRequest{ - VolumeID: source, - Claim: &structs.CSIVolumeClaim{ - AllocationID: c.alloc.ID, - NodeID: c.alloc.NodeID, - Mode: mode, - State: structs.CSIVolumeClaimStateUnpublishing, - }, - WriteRequest: structs.WriteRequest{ - Region: c.alloc.Job.Region, - Namespace: c.alloc.Job.Namespace, - AuthToken: c.nodeSecret, - }, - } - err := c.rpcClient.RPC("CSIVolume.Unpublish", - req, &structs.CSIVolumeUnpublishResponse{}) - if err != nil { - mErr = multierror.Append(mErr, err) - } + var mErr *multierror.Error + for err := range errs { + mErr = multierror.Append(mErr, err) } + return mErr.ErrorOrNil() } @@ -231,3 +239,95 @@ func (c *csiHook) shouldRun() bool { return false } + +func (c *csiHook) unpublish(pair *volumeAndRequest) error { + + mode := structs.CSIVolumeClaimRead + if !pair.request.ReadOnly { + mode = structs.CSIVolumeClaimWrite + } + + source := pair.request.Source + if pair.request.PerAlloc { + // NOTE: PerAlloc can't be set if we have canaries + source = source + structs.AllocSuffix(c.alloc.Name) + } + + req := &structs.CSIVolumeUnpublishRequest{ + VolumeID: source, + Claim: &structs.CSIVolumeClaim{ + AllocationID: c.alloc.ID, + NodeID: c.alloc.NodeID, + Mode: mode, + State: structs.CSIVolumeClaimStateUnpublishing, + }, + WriteRequest: structs.WriteRequest{ + Region: c.alloc.Job.Region, + Namespace: c.alloc.Job.Namespace, + AuthToken: c.nodeSecret, + }, + } + + return c.rpcClient.RPC("CSIVolume.Unpublish", + req, &structs.CSIVolumeUnpublishResponse{}) + +} + +// unmountWithRetry tries to unmount/unstage the volume, retrying with +// exponential backoff capped to a maximum interval +func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { + + // note: allocrunner hooks don't have access to the client's + // shutdown context, just the allocrunner's shutdown; if we make + // it available in the future we should thread it through here so + // that retry can exit gracefully instead of dropping the + // in-flight goroutine + ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration) + defer cancel() + var err error + backoff := time.Second + ticker := time.NewTicker(backoff) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return err + case <-ticker.C: + } + + err = c.unmountImpl(pair) + if err == nil { + break + } + + if backoff < c.maxBackoffInterval { + backoff = backoff * 2 + if backoff > c.maxBackoffInterval { + backoff = c.maxBackoffInterval + } + } + ticker.Reset(backoff) + } + return nil +} + +// unmountImpl implements the call to the CSI plugin manager to +// unmount the volume. Each retry will write an "Unmount volume" +// NodeEvent +func (c *csiHook) unmountImpl(pair *volumeAndRequest) error { + + mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID) + if err != nil { + return err + } + + usageOpts := &csimanager.UsageOptions{ + ReadOnly: pair.request.ReadOnly, + AttachmentMode: pair.request.AttachmentMode, + AccessMode: pair.request.AccessMode, + MountOptions: pair.request.MountOptions, + } + + return mounter.UnmountVolume(context.TODO(), + pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts) +} diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 045ef3e0afce..d05d07385c3d 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -5,6 +5,7 @@ import ( "fmt" "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" @@ -59,7 +60,7 @@ func TestCSIHook(t *testing.T) { "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)}, }, expectedMountCalls: 1, - expectedUnmountCalls: 0, // not until this is done client-side + expectedUnmountCalls: 1, expectedClaimCalls: 1, expectedUnpublishCalls: 1, }, @@ -83,7 +84,7 @@ func TestCSIHook(t *testing.T) { "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)}, }, expectedMountCalls: 1, - expectedUnmountCalls: 0, // not until this is done client-side + expectedUnmountCalls: 1, expectedClaimCalls: 1, expectedUnpublishCalls: 1, }, @@ -122,7 +123,7 @@ func TestCSIHook(t *testing.T) { // "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)}, // }, // expectedMountCalls: 1, - // expectedUnmountCalls: 0, // not until this is done client-side + // expectedUnmountCalls: 1, // expectedClaimCalls: 1, // expectedUnpublishCalls: 1, // }, @@ -144,6 +145,9 @@ func TestCSIHook(t *testing.T) { }, } hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret") + hook.maxBackoffInterval = 100 * time.Millisecond + hook.maxBackoffDuration = 2 * time.Second + require.NotNil(t, hook) require.NoError(t, hook.Prerun()) diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 9bca471cf085..4c6bf1d144e4 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -353,11 +353,16 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo } } + if errors.Is(err, structs.ErrCSIClientRPCIgnorable) { + logger.Trace("unmounting volume failed with ignorable error", "error", err) + err = nil + } + event := structs.NewNodeEvent(). SetSubsystem(structs.NodeEventSubsystemStorage). SetMessage("Unmount volume"). AddDetail("volume_id", volID) - if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) { + if err == nil { event.AddDetail("success", "true") } else { event.AddDetail("success", "false")