Skip to content

Commit

Permalink
backport of commit 4fb2347
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Feb 24, 2022
1 parent 1094935 commit b899313
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 31 deletions.
7 changes: 7 additions & 0 deletions .changelog/12113.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
csi: Fixed a bug where allocations with volume claims would fail their first placement after a reschedule
```

```release-note:bug
csi: Fixed a bug where allocations with volume claims would fail to restore after a client restart
```
80 changes: 76 additions & 4 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package allocrunner
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -28,6 +29,7 @@ type csiHook struct {
nodeSecret string

volumeRequests map[string]*volumeAndRequest
minBackoffInterval time.Duration
maxBackoffInterval time.Duration
maxBackoffDuration time.Duration
}
Expand All @@ -47,6 +49,7 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
updater: updater,
nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{},
minBackoffInterval: time.Second,
maxBackoffInterval: time.Minute,
maxBackoffDuration: time.Hour * 24,
}
Expand Down Expand Up @@ -213,11 +216,10 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
},
}

var resp structs.CSIVolumeClaimResponse
if err := c.rpcClient.RPC("CSIVolume.Claim", req, &resp); err != nil {
resp, err := c.claimWithRetry(req)
if err != nil {
return nil, fmt.Errorf("could not claim volume %s: %w", req.VolumeID, err)
}

if resp.Volume == nil {
return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source)
}
Expand All @@ -230,6 +232,74 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
return result, nil
}

// claimWithRetry tries to claim the volume on the server, retrying
// with exponential backoff capped to a maximum interval
func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.CSIVolumeClaimResponse, error) {

// note: allocrunner hooks don't have access to the client's
// shutdown context, just the allocrunner's shutdown; if we make
// it available in the future we should thread it through here so
// that retry can exit gracefully instead of dropping the
// in-flight goroutine
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel()

var resp structs.CSIVolumeClaimResponse
var err error
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return nil, err
case <-t.C:
}

err = c.rpcClient.RPC("CSIVolume.Claim", req, &resp)
if err == nil {
break
}

if !isRetryableClaimRPCError(err) {
break
}

if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be claimed because it is in use, retrying in %v", backoff)
t.Reset(backoff)
}
return &resp, err
}

// isRetryableClaimRPCError looks for errors where we need to retry
// with backoff because we expect them to be eventually resolved.
func isRetryableClaimRPCError(err error) bool {

// note: because these errors are returned via RPC which breaks error
// wrapping, we can't check with errors.Is and need to read the string
errMsg := err.Error()
if strings.Contains(errMsg, structs.ErrCSIVolumeMaxClaims.Error()) {
return true
}
if strings.Contains(errMsg, structs.ErrCSIClientRPCRetryable.Error()) {
return true
}
if strings.Contains(errMsg, "no servers") {
return true
}
if strings.Contains(errMsg, structs.ErrNoLeader.Error()) {
return true
}
return false
}

func (c *csiHook) shouldRun() bool {
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
for _, vol := range tg.Volumes {
Expand Down Expand Up @@ -286,7 +356,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel()
var err error
backoff := time.Second
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
Expand All @@ -307,6 +377,8 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be unmounted, retrying in %v", backoff)
t.Reset(backoff)
}
return nil
Expand Down
147 changes: 120 additions & 27 deletions client/allocrunner/csi_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/pluginmanager"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -34,6 +36,9 @@ func TestCSIHook(t *testing.T) {
testcases := []struct {
name string
volumeRequests map[string]*structs.VolumeRequest
startsUnschedulable bool
startsWithClaims bool
expectedClaimErr error
expectedMounts map[string]*csimanager.MountInfo
expectedMountCalls int
expectedUnmountCalls int
Expand Down Expand Up @@ -89,6 +94,58 @@ func TestCSIHook(t *testing.T) {
expectedUnpublishCalls: 1,
},

{
name: "fatal error on claim",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{},
PerAlloc: false,
},
},
startsUnschedulable: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 0,
expectedUnmountCalls: 0,
expectedClaimCalls: 1,
expectedUnpublishCalls: 0,
expectedClaimErr: errors.New(
"claim volumes: could not claim volume testvolume0: volume is currently unschedulable"),
},

{
name: "retryable error on claim",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{},
PerAlloc: false,
},
},
startsWithClaims: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 1,
expectedClaimCalls: 2,
expectedUnpublishCalls: 1,
},

// TODO: this won't actually work on the client.
// https://github.com/hashicorp/nomad/issues/11798
//
Expand Down Expand Up @@ -136,7 +193,12 @@ func TestCSIHook(t *testing.T) {

callCounts := map[string]int{}
mgr := mockPluginManager{mounter: mockVolumeMounter{callCounts: callCounts}}
rpcer := mockRPCer{alloc: alloc, callCounts: callCounts}
rpcer := mockRPCer{
alloc: alloc,
callCounts: callCounts,
hasExistingClaim: helper.BoolToPtr(tc.startsWithClaims),
schedulable: helper.BoolToPtr(!tc.startsUnschedulable),
}
ar := mockAllocRunner{
res: &cstructs.AllocHookResources{},
caps: &drivers.Capabilities{
Expand All @@ -145,17 +207,24 @@ func TestCSIHook(t *testing.T) {
},
}
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook.maxBackoffInterval = 100 * time.Millisecond
hook.maxBackoffDuration = 2 * time.Second
hook.minBackoffInterval = 1 * time.Millisecond
hook.maxBackoffInterval = 10 * time.Millisecond
hook.maxBackoffDuration = 500 * time.Millisecond

require.NotNil(t, hook)

require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.NotNil(t, mounts)
require.Equal(t, tc.expectedMounts, mounts)
if tc.expectedClaimErr != nil {
require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.Nil(t, mounts)
} else {
require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.NotNil(t, mounts)
require.Equal(t, tc.expectedMounts, mounts)
require.NoError(t, hook.Postrun())
}

require.NoError(t, hook.Postrun())
require.Equal(t, tc.expectedMountCalls, callCounts["mount"])
require.Equal(t, tc.expectedUnmountCalls, callCounts["unmount"])
require.Equal(t, tc.expectedClaimCalls, callCounts["claim"])
Expand All @@ -168,25 +237,11 @@ func TestCSIHook(t *testing.T) {

// HELPERS AND MOCKS

func testVolume(id string) *structs.CSIVolume {
vol := structs.NewCSIVolume(id, 0)
vol.Schedulable = true
vol.RequestedCapabilities = []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
},
}
return vol
}

type mockRPCer struct {
alloc *structs.Allocation
callCounts map[string]int
alloc *structs.Allocation
callCounts map[string]int
hasExistingClaim *bool
schedulable *bool
}

// RPC mocks the server RPCs, acting as though any request succeeds
Expand All @@ -195,7 +250,7 @@ func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error
case "CSIVolume.Claim":
r.callCounts["claim"]++
req := args.(*structs.CSIVolumeClaimRequest)
vol := testVolume(req.VolumeID)
vol := r.testVolume(req.VolumeID)
err := vol.Claim(req.ToClaim(), r.alloc)
if err != nil {
return err
Expand All @@ -215,6 +270,44 @@ func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error
return nil
}

// testVolume is a helper that optionally starts as unschedulable /
// claimed until after the first claim RPC is made, so that we can
// test retryable vs non-retryable failures
func (r mockRPCer) testVolume(id string) *structs.CSIVolume {
vol := structs.NewCSIVolume(id, 0)
vol.Schedulable = *r.schedulable
vol.RequestedCapabilities = []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
},
}

if *r.hasExistingClaim {
vol.AccessMode = structs.CSIVolumeAccessModeSingleNodeReader
vol.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
vol.ReadClaims["another-alloc-id"] = &structs.CSIVolumeClaim{
AllocationID: "another-alloc-id",
NodeID: "another-node-id",
Mode: structs.CSIVolumeClaimRead,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
}
}

if r.callCounts["claim"] >= 0 {
*r.hasExistingClaim = false
*r.schedulable = true
}

return vol
}

type mockVolumeMounter struct {
callCounts map[string]int
}
Expand Down

0 comments on commit b899313

Please sign in to comment.