Skip to content

Commit

Permalink
csi: update leader's ACL in volumewatcher
Browse files Browse the repository at this point in the history
The volumewatcher that runs on the leader needs to make RPC calls
rather than writing to raft (as we do in the deploymentwatcher)
because the unpublish workflow needs to make RPC calls to the
clients. This requires that the volumewatcher has access to the
leader's ACL token.

But when leadership transitions, the new leader creates a new leader
ACL token. This ACL token needs to be passed into the volumewatcher
when we enable it, otherwise the volumewatcher can find itself with a
stale token.
  • Loading branch information
tgross committed Jan 24, 2022
1 parent 1471de4 commit 3fc9835
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .changelog/11891.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where releasing volume claims would fail with ACL errors after leadership transitions.
```
4 changes: 2 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
s.nodeDrainer.SetEnabled(true, s.State())

// Enable the volume watcher, since we are now the leader
s.volumeWatcher.SetEnabled(true, s.State())
s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl())

// Restore the eval broker state
if err := s.restoreEvals(); err != nil {
Expand Down Expand Up @@ -1074,7 +1074,7 @@ func (s *Server) revokeLeadership() error {
s.nodeDrainer.SetEnabled(false, nil)

// Disable the volume watcher
s.volumeWatcher.SetEnabled(false, nil)
s.volumeWatcher.SetEnabled(false, nil, "")

// Disable any enterprise systems required.
if err := s.revokeEnterpriseLeadership(); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions nomad/volumewatcher/volumes_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W

// SetEnabled is used to control if the watcher is enabled. The
// watcher should only be enabled on the active leader. When being
// enabled the state is passed in as it is no longer valid once a
// leader election has taken place.
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) {
// enabled the state and leader's ACL is passed in as it is no longer
// valid once a leader election has taken place.
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore, leaderAcl string) {
w.wlock.Lock()
defer w.wlock.Unlock()

wasEnabled := w.enabled
w.enabled = enabled
w.leaderAcl = leaderAcl

if state != nil {
w.state = state
Expand Down
14 changes: 7 additions & 7 deletions nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
index := uint64(100)

watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")

plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
Expand All @@ -48,7 +48,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)

watcher.SetEnabled(false, nil)
watcher.SetEnabled(false, nil, "")
require.Equal(0, len(watcher.watchers))
}

Expand All @@ -70,7 +70,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(plugin, alloc, node.ID)

watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")

index++
err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
Expand All @@ -94,7 +94,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
// watches for that change will fire on the new watcher

// step-down (this is sync)
watcher.SetEnabled(false, nil)
watcher.SetEnabled(false, nil, "")
require.Equal(0, len(watcher.watchers))

// allocation is now invalid
Expand All @@ -116,7 +116,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
// create a new watcher and enable it to simulate the leadership
// transition
watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")

require.Eventually(func() bool {
watcher.wlock.RLock()
Expand All @@ -142,7 +142,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")

watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")
require.Equal(0, len(watcher.watchers))

plugin := mock.CSIPlugin()
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) {

watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")

watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")
require.Equal(0, len(watcher.watchers))

plugin := mock.CSIPlugin()
Expand Down

0 comments on commit 3fc9835

Please sign in to comment.