Skip to content

Commit

Permalink
CSI: resolve invalid claim states (#11890)
Browse files Browse the repository at this point in the history
* csi: resolve invalid claim states on read

It's currently possible for CSI volumes to be claimed by allocations
that no longer exist. This changeset asserts a reasonable state at
the state store level by registering these nil allocations as "past
claims" on any read. This will cause any pass through the periodic GC
or volumewatcher to trigger the unpublishing workflow for those claims.

* csi: make feasibility check errors more understandable

When the feasibility checker finds we have no free write claims, it
checks to see if any of those claims are for the job we're currently
scheduling (so that earlier versions of a job can't block claims for
new versions) and reports a conflict if the volume can't be scheduled
so that the user can fix their claims. But when the checker hits a
claim that has a GCd allocation, the state is recoverable by the
server once claim reaping completes and no user intervention is
required; the blocked eval should complete. Differentiate the
scheduler error produced by these two conditions.
  • Loading branch information
tgross committed Jan 28, 2022
1 parent 73740b2 commit 9154033
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 21 deletions.
3 changes: 3 additions & 0 deletions .changelog/11890.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where garbage collected allocations could block new claims on a volume
```
4 changes: 4 additions & 0 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,10 @@ NEXT_VOLUME:
continue
}

// TODO(tgross): consider moving the TerminalStatus check into
// the denormalize volume logic so that we can just check the
// volume for past claims

// we only call the claim release RPC if the volume has claims
// that no longer have valid allocations. otherwise we'd send
// out a lot of do-nothing RPCs.
Expand Down
55 changes: 50 additions & 5 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2381,19 +2381,64 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
c := core.(*CoreScheduler)
require.NoError(c.csiVolumeClaimGC(gc))

// the volumewatcher will hit an error here because there's no
// path to the node. but we can't update the claim to bypass the
// client RPCs without triggering the volumewatcher's normal code
// path.
// TODO(tgross): the condition below means this test doesn't tell
// us much; ideally we should be intercepting the claim request
// and verifying that we send the expected claims but we don't
// have test infra in place to do that for server RPCs

// sending the GC claim will trigger the volumewatcher's normal
// code path. but the volumewatcher will hit an error here
// because there's no path to the node, so we shouldn't see
// the WriteClaims removed
require.Eventually(func() bool {
vol, _ := state.CSIVolumeByID(ws, ns, volID)
return len(vol.WriteClaims) == 1 &&
len(vol.WriteAllocs) == 1 &&
len(vol.PastClaims) == 0
len(vol.PastClaims) == 1
}, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly")

}

func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) {
t.Parallel()
require := require.New(t)

srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})

defer shutdown()
testutil.WaitForLeader(t, srv.RPC)

err := state.TestBadCSIState(t, srv.State())
require.NoError(err)

snap, err := srv.State().Snapshot()
require.NoError(err)
core := NewCoreScheduler(srv, snap)

index, _ := srv.State().LatestIndex()
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
require.NoError(c.csiVolumeClaimGC(gc))

require.Eventually(func() bool {
vol, _ := srv.State().CSIVolumeByID(nil,
structs.DefaultNamespace, "csi-volume-nfs0")
if len(vol.PastClaims) != 2 {
return false
}
for _, claim := range vol.PastClaims {
if claim.State != structs.CSIVolumeClaimStateUnpublishing {
return false
}
}
return true
}, time.Second*1, 10*time.Millisecond, "invalid claims should be marked for GC")

}

func TestCoreScheduler_FailLoop(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
26 changes: 26 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2461,6 +2461,18 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st
State: structs.CSIVolumeClaimStateTaken,
}
}
} else if _, ok := vol.PastClaims[id]; !ok {
// ensure that any allocs that have been GC'd since
// our last read are marked as past claims
vol.PastClaims[id] = &structs.CSIVolumeClaim{
AllocationID: id,
Mode: structs.CSIVolumeClaimRead,
State: structs.CSIVolumeClaimStateUnpublishing,
}
readClaim := vol.ReadClaims[id]
if readClaim != nil {
vol.PastClaims[id].NodeID = readClaim.NodeID
}
}
}

Expand All @@ -2479,6 +2491,20 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st
State: structs.CSIVolumeClaimStateTaken,
}
}
} else if _, ok := vol.PastClaims[id]; !ok {
// ensure that any allocs that have been GC'd since
// our last read are marked as past claims

vol.PastClaims[id] = &structs.CSIVolumeClaim{
AllocationID: id,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateUnpublishing,
}
writeClaim := vol.WriteClaims[id]
if writeClaim != nil {
vol.PastClaims[id].NodeID = writeClaim.NodeID
}

}
}

Expand Down
148 changes: 148 additions & 0 deletions nomad/state/testing.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package state

import (
"math"
"testing"

"github.com/hashicorp/nomad/helper/testlog"
Expand Down Expand Up @@ -122,3 +123,150 @@ func createTestCSIPlugin(s *StateStore, id string, requiresController bool) func
s.DeleteNode(structs.MsgTypeTestSetup, index, ids)
}
}

func TestBadCSIState(t testing.TB, store *StateStore) error {

pluginID := "org.democratic-csi.nfs"

controllerInfo := func(isHealthy bool) map[string]*structs.CSIInfo {
desc := "healthy"
if !isHealthy {
desc = "failed fingerprinting with error"
}
return map[string]*structs.CSIInfo{
pluginID: {
PluginID: pluginID,
AllocID: uuid.Generate(),
Healthy: isHealthy,
HealthDescription: desc,
RequiresControllerPlugin: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsAttachDetach: true,
},
},
}
}

nodeInfo := func(nodeName string, isHealthy bool) map[string]*structs.CSIInfo {
desc := "healthy"
if !isHealthy {
desc = "failed fingerprinting with error"
}
return map[string]*structs.CSIInfo{
pluginID: {
PluginID: pluginID,
AllocID: uuid.Generate(),
Healthy: isHealthy,
HealthDescription: desc,
RequiresControllerPlugin: true,
NodeInfo: &structs.CSINodeInfo{
ID: nodeName,
MaxVolumes: math.MaxInt64,
RequiresNodeStageVolume: true,
},
},
}
}

nodes := make([]*structs.Node, 3)
for i := range nodes {
n := mock.Node()
n.Attributes["nomad.version"] = "1.2.4"
nodes[i] = n
}

nodes[0].CSIControllerPlugins = controllerInfo(true)
nodes[0].CSINodePlugins = nodeInfo("nomad-client0", true)

// drained node
nodes[1].CSIControllerPlugins = controllerInfo(false)
nodes[1].CSINodePlugins = nodeInfo("nomad-client1", false)
nodes[1].SchedulingEligibility = structs.NodeSchedulingIneligible

// previously drained but now eligible
nodes[2].CSIControllerPlugins = controllerInfo(true)
nodes[2].CSINodePlugins = nodeInfo("nomad-client2", true)
nodes[2].SchedulingEligibility = structs.NodeSchedulingEligible

// Insert nodes into the state store
index := uint64(999)
for _, n := range nodes {
index++
err := store.UpsertNode(structs.MsgTypeTestSetup, index, n)
if err != nil {
return err
}
}

allocID0 := uuid.Generate() // nil alloc
allocID2 := uuid.Generate() // nil alloc

alloc1 := mock.Alloc()
alloc1.ClientStatus = "complete"
alloc1.DesiredStatus = "stop"

// Insert allocs into the state store
err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1})
if err != nil {
return err
}

vol := &structs.CSIVolume{
ID: "csi-volume-nfs0",
Name: "csi-volume-nfs0",
ExternalID: "csi-volume-nfs0",
Namespace: "default",
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{
MountFlags: []string{"noatime"},
},
Context: map[string]string{
"node_attach_driver": "nfs",
"provisioner_driver": "nfs-client",
"server": "192.168.56.69",
},
WriteAllocs: map[string]*structs.Allocation{
allocID0: nil,
alloc1.ID: nil,
allocID2: nil,
},
WriteClaims: map[string]*structs.CSIVolumeClaim{
allocID0: {
AllocationID: allocID0,
NodeID: nodes[0].ID,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateTaken,
},
alloc1.ID: {
AllocationID: alloc1.ID,
NodeID: nodes[1].ID,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateTaken,
},
allocID2: {
AllocationID: allocID2,
NodeID: nodes[2].ID,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateTaken,
},
},
Schedulable: true,
PluginID: pluginID,
Provider: pluginID,
ProviderVersion: "1.4.3",
ControllerRequired: true,
ControllersHealthy: 2,
ControllersExpected: 2,
NodesHealthy: 2,
NodesExpected: 0,
}

err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
if err != nil {
return err
}

return nil
}
29 changes: 29 additions & 0 deletions nomad/volumewatcher/volume_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,32 @@ func TestVolumeWatch_Reap(t *testing.T) {
require.NoError(err)
require.Len(vol.PastClaims, 2) // alloc claim + GC claim
}

func TestVolumeReapBadState(t *testing.T) {

store := state.TestStateStore(t)
err := state.TestBadCSIState(t, store)
require.NoError(t, err)
srv := &MockRPCServer{
state: store,
}

vol, err := srv.state.CSIVolumeByID(nil,
structs.DefaultNamespace, "csi-volume-nfs0")
require.NoError(t, err)
srv.state.CSIVolumeDenormalize(nil, vol)

ctx, exitFn := context.WithCancel(context.Background())
w := &volumeWatcher{
v: vol,
rpc: srv,
state: srv.State(),
ctx: ctx,
exitFn: exitFn,
logger: testlog.HCLogger(t),
}

err = w.volumeReapImpl(vol)
require.NoError(t, err)
require.Equal(t, 2, srv.countCSIUnpublish)
}
7 changes: 6 additions & 1 deletion nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,15 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(plugin, alloc, node.ID)

index++
err := srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index,
[]*structs.Allocation{alloc})
require.NoError(err)

watcher.SetEnabled(true, srv.State(), "")

index++
err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(err)

// we should get or start up a watcher when we get an update for
Expand Down
38 changes: 24 additions & 14 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ import (
)

const (
FilterConstraintHostVolumes = "missing compatible host volumes"
FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s"
FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s"
FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s"
FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed"
FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s"
FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims"
FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only"
FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" //
FilterConstraintDrivers = "missing drivers"
FilterConstraintDevices = "missing devices"
FilterConstraintHostVolumes = "missing compatible host volumes"
FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s"
FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s"
FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s"
FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed"
FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s"
FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims"
FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only"
FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims"
FilterConstraintCSIVolumeGCdAllocationTemplate = "CSI volume %s has exhausted its available writer claims and is claimed by a garbage collected allocation %s; waiting for claim to be released"
FilterConstraintDrivers = "missing drivers"
FilterConstraintDevices = "missing devices"
)

var (
Expand Down Expand Up @@ -313,11 +314,20 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
return false, fmt.Sprintf(FilterConstraintCSIVolumeNoWriteTemplate, vol.ID)
}
if !vol.WriteFreeClaims() {
// Check the blocking allocations to see if they belong to this job
for id := range vol.WriteAllocs {
a, err := c.ctx.State().AllocByID(ws, id)
if err != nil || a == nil ||
a.Namespace != c.namespace || a.JobID != c.jobID {
// the alloc for this blocking claim has been
// garbage collected but the volumewatcher hasn't
// finished releasing the claim (and possibly
// detaching the volume), so we need to block
// until it can be scheduled
if err != nil || a == nil {
return false, fmt.Sprintf(
FilterConstraintCSIVolumeGCdAllocationTemplate, vol.ID, id)
} else if a.Namespace != c.namespace || a.JobID != c.jobID {
// the blocking claim is for another live job
// so it's legitimately blocking more write
// claims
return false, fmt.Sprintf(
FilterConstraintCSIVolumeInUseTemplate, vol.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion testutil/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func WaitForLeader(t testing.TB, rpc rpcFn) {
}

// WaitForClient blocks until the client can be found
func WaitForClient(t testing.T, rpc rpcFn, nodeID string, region string) {
func WaitForClient(t testing.TB, rpc rpcFn, nodeID string, region string) {
t.Helper()

if region == "" {
Expand Down

0 comments on commit 9154033

Please sign in to comment.