From debffe2436970ff9ae630b88492c10113ade4db2 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 24 Jan 2022 11:49:50 -0500 Subject: [PATCH] csi: update leader's ACL in volumewatcher (#11891) The volumewatcher that runs on the leader needs to make RPC calls rather than writing to raft (as we do in the deploymentwatcher) because the unpublish workflow needs to make RPC calls to the clients. This requires that the volumewatcher has access to the leader's ACL token. But when leadership transitions, the new leader creates a new leader ACL token. This ACL token needs to be passed into the volumewatcher when we enable it, otherwise the volumewatcher can find itself with a stale token. --- .changelog/11891.txt | 3 +++ nomad/leader.go | 4 ++-- nomad/volumewatcher/volumes_watcher.go | 7 ++++--- nomad/volumewatcher/volumes_watcher_test.go | 14 +++++++------- 4 files changed, 16 insertions(+), 12 deletions(-) create mode 100644 .changelog/11891.txt diff --git a/.changelog/11891.txt b/.changelog/11891.txt new file mode 100644 index 000000000000..f09bf8e13294 --- /dev/null +++ b/.changelog/11891.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where releasing volume claims would fail with ACL errors after leadership transitions. +``` diff --git a/nomad/leader.go b/nomad/leader.go index c6dca9dc677e..ecffabc85c77 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -263,7 +263,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { s.nodeDrainer.SetEnabled(true, s.State()) // Enable the volume watcher, since we are now the leader - s.volumeWatcher.SetEnabled(true, s.State()) + s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl()) // Restore the eval broker state if err := s.restoreEvals(); err != nil { @@ -1074,7 +1074,7 @@ func (s *Server) revokeLeadership() error { s.nodeDrainer.SetEnabled(false, nil) // Disable the volume watcher - s.volumeWatcher.SetEnabled(false, nil) + s.volumeWatcher.SetEnabled(false, nil, "") // Disable any enterprise systems required. if err := s.revokeEnterpriseLeadership(); err != nil { diff --git a/nomad/volumewatcher/volumes_watcher.go b/nomad/volumewatcher/volumes_watcher.go index 061df061398e..c8c687addea1 100644 --- a/nomad/volumewatcher/volumes_watcher.go +++ b/nomad/volumewatcher/volumes_watcher.go @@ -57,14 +57,15 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W // SetEnabled is used to control if the watcher is enabled. The // watcher should only be enabled on the active leader. When being -// enabled the state is passed in as it is no longer valid once a -// leader election has taken place. -func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) { +// enabled the state and leader's ACL is passed in as it is no longer +// valid once a leader election has taken place. +func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore, leaderAcl string) { w.wlock.Lock() defer w.wlock.Unlock() wasEnabled := w.enabled w.enabled = enabled + w.leaderAcl = leaderAcl if state != nil { w.state = state diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 185a7225e68b..7f0365be3518 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -23,7 +23,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") plugin := mock.CSIPlugin() node := testNode(plugin, srv.State()) @@ -48,7 +48,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { return 1 == len(watcher.watchers) }, time.Second, 10*time.Millisecond) - watcher.SetEnabled(false, nil) + watcher.SetEnabled(false, nil, "") require.Equal(0, len(watcher.watchers)) } @@ -70,7 +70,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete vol := testVolume(plugin, alloc, node.ID) - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") index++ err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) @@ -94,7 +94,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { // watches for that change will fire on the new watcher // step-down (this is sync) - watcher.SetEnabled(false, nil) + watcher.SetEnabled(false, nil, "") require.Equal(0, len(watcher.watchers)) // allocation is now invalid @@ -116,7 +116,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { // create a new watcher and enable it to simulate the leadership // transition watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") require.Eventually(func() bool { watcher.wlock.RLock() @@ -142,7 +142,7 @@ func TestVolumeWatch_StartStop(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") require.Equal(0, len(watcher.watchers)) plugin := mock.CSIPlugin() @@ -237,7 +237,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) { watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") require.Equal(0, len(watcher.watchers)) plugin := mock.CSIPlugin()