Skip to content

Commit

Permalink
Merge 3474c27 into backport/b-panic-volumewatcher/virtually-unified-o…
Browse files Browse the repository at this point in the history
…ctopus
  • Loading branch information
hc-github-team-nomad-core committed Nov 1, 2022
2 parents 2391034 + 3474c27 commit a5c749c
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 50 deletions.
3 changes: 3 additions & 0 deletions .changelog/15101.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed race condition that can cause a panic when volume is garbage collected
```
40 changes: 8 additions & 32 deletions nomad/volumewatcher/volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func (vw *volumeWatcher) Notify(v *structs.CSIVolume) {
select {
case vw.updateCh <- v:
case <-vw.shutdownCtx.Done(): // prevent deadlock if we stopped
case <-vw.ctx.Done(): // prevent deadlock if we stopped
}
}

Expand All @@ -83,17 +82,14 @@ func (vw *volumeWatcher) Start() {
vw.wLock.Lock()
defer vw.wLock.Unlock()
vw.running = true
ctx, exitFn := context.WithCancel(vw.shutdownCtx)
vw.ctx = ctx
vw.exitFn = exitFn
go vw.watch()
}

// Stop stops watching the volume. This should be called whenever a
// volume's claims are fully reaped or the watcher is no longer needed.
func (vw *volumeWatcher) Stop() {
vw.logger.Trace("no more claims")
vw.exitFn()
vw.wLock.Lock()
defer vw.wLock.Unlock()
vw.running = false
}

func (vw *volumeWatcher) isRunning() bool {
Expand All @@ -102,8 +98,6 @@ func (vw *volumeWatcher) isRunning() bool {
select {
case <-vw.shutdownCtx.Done():
return false
case <-vw.ctx.Done():
return false
default:
return vw.running
}
Expand All @@ -113,12 +107,8 @@ func (vw *volumeWatcher) isRunning() bool {
// Each pass steps the volume's claims through the various states of reaping
// until the volume has no more claims eligible to be reaped.
func (vw *volumeWatcher) watch() {
// always denormalize the volume and call reap when we first start
// the watcher so that we ensure we don't drop events that
// happened during leadership transitions and didn't get completed
// by the prior leader
vol := vw.getVolume(vw.v)
vw.volumeReap(vol)
defer vw.deleteFn()
defer vw.Stop()

timer, stop := helper.NewSafeTimer(vw.quiescentTimeout)
defer stop()
Expand All @@ -129,31 +119,17 @@ func (vw *volumeWatcher) watch() {
// context, so we can't stop the long-runner RPCs gracefully
case <-vw.shutdownCtx.Done():
return
case <-vw.ctx.Done():
return
case vol := <-vw.updateCh:
vol = vw.getVolume(vol)
if vol == nil {
// We stop the goroutine whenever we have no more
// work, but only delete the watcher when the volume
// is gone to avoid racing the blocking query
vw.deleteFn()
vw.Stop()
return
}
vw.volumeReap(vol)
timer.Reset(vw.quiescentTimeout)
case <-timer.C:
// Wait until the volume has "settled" before stopping
// this goroutine so that the race between shutdown and
// the parent goroutine sending on <-updateCh is pushed to
// after the window we most care about quick freeing of
// claims (and the GC job will clean up anything we miss)
vol = vw.getVolume(vol)
if vol == nil {
vw.deleteFn()
}
vw.Stop()
// Wait until the volume has "settled" before stopping this
// goroutine so that we can handle the burst of updates around
// freeing claims without having to spin it back up
return
}
}
Expand Down
8 changes: 8 additions & 0 deletions nomad/volumewatcher/volumes_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ func (w *Watcher) addLocked(v *structs.CSIVolume) (*volumeWatcher, error) {

watcher := newVolumeWatcher(w, v)
w.watchers[v.ID+v.Namespace] = watcher

// Sending the first volume update here before we return ensures we've hit
// the run loop in the goroutine before freeing the lock. This prevents a
// race between shutting down the watcher and the blocking query.
//
// It also ensures that we don't drop events that happened during leadership
// transitions and didn't get completed by the prior leader
watcher.updateCh <- v
return watcher, nil
}

Expand Down
81 changes: 63 additions & 18 deletions nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -53,6 +54,8 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
}, time.Second, 10*time.Millisecond)

watcher.SetEnabled(false, nil, "")
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
require.Equal(t, 0, len(watcher.watchers))
}

Expand Down Expand Up @@ -104,7 +107,9 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {

// step-down (this is sync)
watcher.SetEnabled(false, nil, "")
watcher.wlock.RLock()
require.Equal(t, 0, len(watcher.watchers))
watcher.wlock.RUnlock()

// allocation is now invalid
index++
Expand All @@ -131,8 +136,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
return 0 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)

vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID)
Expand Down Expand Up @@ -168,7 +172,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
err = srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)

// register a volume
// register a volume and an unused volume
vol := testVolume(plugin, alloc1, node.ID)
index++
err = srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
Expand All @@ -178,8 +182,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
return 0 == len(watcher.watchers)
}, time.Second*2, 10*time.Millisecond)

// claim the volume for both allocs
Expand Down Expand Up @@ -212,6 +215,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
require.Equal(t, 2, len(vol.ReadAllocs))

// alloc becomes terminal
alloc1 = alloc1.Copy()
alloc1.ClientStatus = structs.AllocClientStatusComplete
index++
err = srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1})
Expand All @@ -221,18 +225,65 @@ func TestVolumeWatch_StartStop(t *testing.T) {
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)

// 1 claim has been released and watcher stops
// watcher stops and 1 claim has been released
require.Eventually(t, func() bool {
ws := memdb.NewWatchSet()
vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
return len(vol.ReadAllocs) == 1 && len(vol.PastClaims) == 0
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 0 == len(watcher.watchers)
}, time.Second*5, 10*time.Millisecond)

vol, _ = srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
must.Eq(t, 1, len(vol.ReadAllocs))
must.Eq(t, 0, len(vol.PastClaims))
}

// TestVolumeWatch_Delete tests the stop of the watcher when it receives
// notifications around a deleted volume
func TestVolumeWatch_Delete(t *testing.T) {
ci.Parallel(t)

srv := &MockStatefulRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond

watcher.SetEnabled(true, srv.State(), "")
must.Eq(t, 0, len(watcher.watchers))

// register an unused volume
plugin := mock.CSIPlugin()
vol := mock.CSIVolume(plugin)
index++
must.NoError(t, srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol}))

// assert we get a watcher; there are no claims so it should immediately stop
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 0 == len(watcher.watchers)
}, time.Second*2, 10*time.Millisecond)

// write a GC claim to the volume and then immediately delete, to
// potentially hit the race condition between updates and deletes
index++
must.NoError(t, srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID,
&structs.CSIVolumeClaim{
Mode: structs.CSIVolumeClaimGC,
State: structs.CSIVolumeClaimStateReadyToFree,
}))

index++
must.NoError(t, srv.State().CSIVolumeDeregister(
index, vol.Namespace, []string{vol.ID}, false))

// the watcher should not be running
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
return 0 == len(watcher.watchers)
}, time.Second*5, 10*time.Millisecond)

}

// TestVolumeWatch_RegisterDeregister tests the start and stop of
Expand Down Expand Up @@ -261,16 +312,10 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) {
err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)

// watcher should be started but immediately stopped
// watcher should stop
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers)
return 0 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)

require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, 1*time.Second, 10*time.Millisecond)
}

0 comments on commit a5c749c

Please sign in to comment.