Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

volumewatcher: prevent panic on nil volume #15101

Merged
merged 1 commit into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/15101.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed race condition that can cause a panic when volume is garbage collected
```
40 changes: 8 additions & 32 deletions nomad/volumewatcher/volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func (vw *volumeWatcher) Notify(v *structs.CSIVolume) {
select {
case vw.updateCh <- v:
case <-vw.shutdownCtx.Done(): // prevent deadlock if we stopped
case <-vw.ctx.Done(): // prevent deadlock if we stopped
}
}

Expand All @@ -83,17 +82,14 @@ func (vw *volumeWatcher) Start() {
vw.wLock.Lock()
defer vw.wLock.Unlock()
vw.running = true
ctx, exitFn := context.WithCancel(vw.shutdownCtx)
vw.ctx = ctx
vw.exitFn = exitFn
go vw.watch()
}

// Stop stops watching the volume. This should be called whenever a
// volume's claims are fully reaped or the watcher is no longer needed.
func (vw *volumeWatcher) Stop() {
vw.logger.Trace("no more claims")
vw.exitFn()
vw.wLock.Lock()
defer vw.wLock.Unlock()
vw.running = false
}

func (vw *volumeWatcher) isRunning() bool {
Expand All @@ -102,8 +98,6 @@ func (vw *volumeWatcher) isRunning() bool {
select {
case <-vw.shutdownCtx.Done():
return false
case <-vw.ctx.Done():
return false
default:
return vw.running
}
Expand All @@ -113,12 +107,8 @@ func (vw *volumeWatcher) isRunning() bool {
// Each pass steps the volume's claims through the various states of reaping
// until the volume has no more claims eligible to be reaped.
func (vw *volumeWatcher) watch() {
// always denormalize the volume and call reap when we first start
// the watcher so that we ensure we don't drop events that
// happened during leadership transitions and didn't get completed
// by the prior leader
vol := vw.getVolume(vw.v)
vw.volumeReap(vol)
defer vw.deleteFn()
defer vw.Stop()

timer, stop := helper.NewSafeTimer(vw.quiescentTimeout)
defer stop()
Expand All @@ -129,31 +119,17 @@ func (vw *volumeWatcher) watch() {
// context, so we can't stop the long-runner RPCs gracefully
case <-vw.shutdownCtx.Done():
return
case <-vw.ctx.Done():
return
case vol := <-vw.updateCh:
vol = vw.getVolume(vol)
if vol == nil {
// We stop the goroutine whenever we have no more
// work, but only delete the watcher when the volume
// is gone to avoid racing the blocking query
vw.deleteFn()
vw.Stop()
return
}
vw.volumeReap(vol)
timer.Reset(vw.quiescentTimeout)
case <-timer.C:
// Wait until the volume has "settled" before stopping
// this goroutine so that the race between shutdown and
// the parent goroutine sending on <-updateCh is pushed to
// after the window we most care about quick freeing of
// claims (and the GC job will clean up anything we miss)
vol = vw.getVolume(vol)
if vol == nil {
vw.deleteFn()
}
vw.Stop()
// Wait until the volume has "settled" before stopping this
// goroutine so that we can handle the burst of updates around
// freeing claims without having to spin it back up
return
}
}
Expand Down
8 changes: 8 additions & 0 deletions nomad/volumewatcher/volumes_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ func (w *Watcher) addLocked(v *structs.CSIVolume) (*volumeWatcher, error) {

watcher := newVolumeWatcher(w, v)
w.watchers[v.ID+v.Namespace] = watcher

// Sending the first volume update here before we return ensures we've hit
// the run loop in the goroutine before freeing the lock. This prevents a
// race between shutting down the watcher and the blocking query.
//
// It also ensures that we don't drop events that happened during leadership
// transitions and didn't get completed by the prior leader
watcher.updateCh <- v
return watcher, nil
}

Expand Down
81 changes: 63 additions & 18 deletions nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -53,6 +54,8 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
}, time.Second, 10*time.Millisecond)

watcher.SetEnabled(false, nil, "")
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
require.Equal(t, 0, len(watcher.watchers))
}

Expand Down Expand Up @@ -104,7 +107,9 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {

// step-down (this is sync)
watcher.SetEnabled(false, nil, "")
watcher.wlock.RLock()
require.Equal(t, 0, len(watcher.watchers))
watcher.wlock.RUnlock()

// allocation is now invalid
index++
Expand All @@ -131,8 +136,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
return 0 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)

vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID)
Expand Down Expand Up @@ -168,7 +172,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
err = srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)

// register a volume
// register a volume and an unused volume
vol := testVolume(plugin, alloc1, node.ID)
index++
err = srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
Expand All @@ -178,8 +182,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
return 0 == len(watcher.watchers)
}, time.Second*2, 10*time.Millisecond)

// claim the volume for both allocs
Expand Down Expand Up @@ -212,6 +215,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
require.Equal(t, 2, len(vol.ReadAllocs))

// alloc becomes terminal
alloc1 = alloc1.Copy()
alloc1.ClientStatus = structs.AllocClientStatusComplete
index++
err = srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1})
Expand All @@ -221,18 +225,65 @@ func TestVolumeWatch_StartStop(t *testing.T) {
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)

// 1 claim has been released and watcher stops
// watcher stops and 1 claim has been released
require.Eventually(t, func() bool {
ws := memdb.NewWatchSet()
vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
return len(vol.ReadAllocs) == 1 && len(vol.PastClaims) == 0
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 0 == len(watcher.watchers)
}, time.Second*5, 10*time.Millisecond)

vol, _ = srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
must.Eq(t, 1, len(vol.ReadAllocs))
must.Eq(t, 0, len(vol.PastClaims))
}

// TestVolumeWatch_Delete tests the stop of the watcher when it receives
// notifications around a deleted volume
func TestVolumeWatch_Delete(t *testing.T) {
ci.Parallel(t)

srv := &MockStatefulRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond

watcher.SetEnabled(true, srv.State(), "")
must.Eq(t, 0, len(watcher.watchers))

// register an unused volume
plugin := mock.CSIPlugin()
vol := mock.CSIVolume(plugin)
index++
must.NoError(t, srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol}))

// assert we get a watcher; there are no claims so it should immediately stop
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 0 == len(watcher.watchers)
}, time.Second*2, 10*time.Millisecond)

// write a GC claim to the volume and then immediately delete, to
// potentially hit the race condition between updates and deletes
index++
must.NoError(t, srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID,
&structs.CSIVolumeClaim{
Mode: structs.CSIVolumeClaimGC,
State: structs.CSIVolumeClaimStateReadyToFree,
}))

index++
must.NoError(t, srv.State().CSIVolumeDeregister(
index, vol.Namespace, []string{vol.ID}, false))

// the watcher should not be running
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
return 0 == len(watcher.watchers)
}, time.Second*5, 10*time.Millisecond)

}

// TestVolumeWatch_RegisterDeregister tests the start and stop of
Expand Down Expand Up @@ -261,16 +312,10 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) {
err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)

// watcher should be started but immediately stopped
// watcher should stop
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers)
return 0 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)

require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, 1*time.Second, 10*time.Millisecond)
}