Skip to content

Commit

Permalink
volumewatcher: prevent panic on nil volume (#15101)
Browse files Browse the repository at this point in the history
If a GC claim is written and then volume is deleted before the `volumewatcher`
enters its run loop, we panic on the nil-pointer access. Simply doing a
nil-check at the top of the loop reveals a race condition around shutting down
the loop just as a new update is coming in.

Have the parent `volumeswatcher` send an initial update on the channel before
returning, so that we're still holding the lock. Update the watcher's `Stop`
method to set the running state, which lets us avoid having a second context and
makes stopping synchronous. This reduces the cases we have to handle in the run
loop.

Updated the tests now that we'll safely return from the goroutine and stop the
runner in a larger set of cases. Ran the tests with the `-race` detection flag
and fixed up any problems found here as well.
  • Loading branch information
tgross committed Nov 1, 2022
1 parent 6afb2b5 commit 9f65c24
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 85 deletions.
3 changes: 3 additions & 0 deletions .changelog/15101.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed race condition that can cause a panic when volume is garbage collected
```
40 changes: 8 additions & 32 deletions nomad/volumewatcher/volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func (vw *volumeWatcher) Notify(v *structs.CSIVolume) {
select {
case vw.updateCh <- v:
case <-vw.shutdownCtx.Done(): // prevent deadlock if we stopped
case <-vw.ctx.Done(): // prevent deadlock if we stopped
}
}

Expand All @@ -83,17 +82,14 @@ func (vw *volumeWatcher) Start() {
vw.wLock.Lock()
defer vw.wLock.Unlock()
vw.running = true
ctx, exitFn := context.WithCancel(vw.shutdownCtx)
vw.ctx = ctx
vw.exitFn = exitFn
go vw.watch()
}

// Stop stops watching the volume. This should be called whenever a
// volume's claims are fully reaped or the watcher is no longer needed.
func (vw *volumeWatcher) Stop() {
vw.logger.Trace("no more claims")
vw.exitFn()
vw.wLock.Lock()
defer vw.wLock.Unlock()
vw.running = false
}

func (vw *volumeWatcher) isRunning() bool {
Expand All @@ -102,8 +98,6 @@ func (vw *volumeWatcher) isRunning() bool {
select {
case <-vw.shutdownCtx.Done():
return false
case <-vw.ctx.Done():
return false
default:
return vw.running
}
Expand All @@ -113,12 +107,8 @@ func (vw *volumeWatcher) isRunning() bool {
// Each pass steps the volume's claims through the various states of reaping
// until the volume has no more claims eligible to be reaped.
func (vw *volumeWatcher) watch() {
// always denormalize the volume and call reap when we first start
// the watcher so that we ensure we don't drop events that
// happened during leadership transitions and didn't get completed
// by the prior leader
vol := vw.getVolume(vw.v)
vw.volumeReap(vol)
defer vw.deleteFn()
defer vw.Stop()

timer, stop := helper.NewSafeTimer(vw.quiescentTimeout)
defer stop()
Expand All @@ -129,31 +119,17 @@ func (vw *volumeWatcher) watch() {
// context, so we can't stop the long-runner RPCs gracefully
case <-vw.shutdownCtx.Done():
return
case <-vw.ctx.Done():
return
case vol := <-vw.updateCh:
vol = vw.getVolume(vol)
if vol == nil {
// We stop the goroutine whenever we have no more
// work, but only delete the watcher when the volume
// is gone to avoid racing the blocking query
vw.deleteFn()
vw.Stop()
return
}
vw.volumeReap(vol)
timer.Reset(vw.quiescentTimeout)
case <-timer.C:
// Wait until the volume has "settled" before stopping
// this goroutine so that the race between shutdown and
// the parent goroutine sending on <-updateCh is pushed to
// after the window we most care about quick freeing of
// claims (and the GC job will clean up anything we miss)
vol = vw.getVolume(vol)
if vol == nil {
vw.deleteFn()
}
vw.Stop()
// Wait until the volume has "settled" before stopping this
// goroutine so that we can handle the burst of updates around
// freeing claims without having to spin it back up
return
}
}
Expand Down
8 changes: 8 additions & 0 deletions nomad/volumewatcher/volumes_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ func (w *Watcher) addLocked(v *structs.CSIVolume) (*volumeWatcher, error) {

watcher := newVolumeWatcher(w, v)
w.watchers[v.ID+v.Namespace] = watcher

// Sending the first volume update here before we return ensures we've hit
// the run loop in the goroutine before freeing the lock. This prevents a
// race between shutting down the watcher and the blocking query.
//
// It also ensures that we don't drop events that happened during leadership
// transitions and didn't get completed by the prior leader
watcher.updateCh <- v
return watcher, nil
}

Expand Down
Loading

0 comments on commit 9f65c24

Please sign in to comment.