Skip to content

Commit

Permalink
csi: reap unused volume claims at leadership transitions (#11776)
Browse files Browse the repository at this point in the history
When `volumewatcher.Watcher` starts on the leader, it starts a watch
on every volume and triggers a reap of unused claims on any change to
that volume. But if a reaping is in-flight during leadership
transitions, it will fail and the event that triggered the reap will
be dropped. Perform one reap of unused claims at the start of the
watcher so that leadership transitions don't drop this event.
  • Loading branch information
tgross committed Jan 5, 2022
1 parent aa21628 commit 0ff0fa1
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .changelog/11776.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where volume claim releases that were not fully processed before a leadership transition would be ignored
```
7 changes: 7 additions & 0 deletions nomad/volumewatcher/volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ func (vw *volumeWatcher) isRunning() bool {
// Each pass steps the volume's claims through the various states of reaping
// until the volume has no more claims eligible to be reaped.
func (vw *volumeWatcher) watch() {
// always denormalize the volume and call reap when we first start
// the watcher so that we ensure we don't drop events that
// happened during leadership transitions and didn't get completed
// by the prior leader
vol := vw.getVolume(vw.v)
vw.volumeReap(vol)

for {
select {
// TODO(tgross): currently server->client RPC have no cancellation
Expand Down
43 changes: 38 additions & 5 deletions nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
require.Equal(0, len(watcher.watchers))
}

// TestVolumeWatch_Checkpoint tests the checkpointing of progress across
// leader leader step-up/step-down
func TestVolumeWatch_Checkpoint(t *testing.T) {
// TestVolumeWatch_LeadershipTransition tests the correct behavior of
// claim reaping across leader step-up/step-down
func TestVolumeWatch_LeadershipTransition(t *testing.T) {
t.Parallel()
require := require.New(t)

Expand Down Expand Up @@ -84,18 +84,51 @@ func TestVolumeWatch_Checkpoint(t *testing.T) {
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)

// step-down (this is sync, but step-up is async)
vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID)
require.Len(vol.PastClaims, 0, "expected to have 0 PastClaims")
require.Equal(srv.countCSIUnpublish, 0, "expected no CSI.Unpublish RPC calls")

// trying to test a dropped watch is racy, so to reliably simulate
// this condition, step-down the watcher first and then perform
// the writes to the volume before starting the new watcher. no
// watches for that change will fire on the new watcher

// step-down (this is sync)
watcher.SetEnabled(false, nil)
require.Equal(0, len(watcher.watchers))

// step-up again
// allocation is now invalid
index++
err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID})
require.NoError(err)

// emit a GC so that we have a volume change that's dropped
claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
NodeID: node.ID,
Mode: structs.CSIVolumeClaimGC,
State: structs.CSIVolumeClaimStateUnpublishing,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)

// create a new watcher and enable it to simulate the leadership
// transition
watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State())

require.Eventually(func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()

return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second, 10*time.Millisecond)

vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID)
require.Len(vol.PastClaims, 1, "expected to have 1 PastClaim")
require.Equal(srv.countCSIUnpublish, 1, "expected CSI.Unpublish RPC to be called")
}

// TestVolumeWatch_StartStop tests the start and stop of the watcher when
Expand Down

0 comments on commit 0ff0fa1

Please sign in to comment.