Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: update leader's ACL in volumewatcher #11891

Merged
merged 1 commit into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/11891.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where releasing volume claims would fail with ACL errors after leadership transitions.
```
4 changes: 2 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
s.nodeDrainer.SetEnabled(true, s.State())

// Enable the volume watcher, since we are now the leader
s.volumeWatcher.SetEnabled(true, s.State())
s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl())

// Restore the eval broker state
if err := s.restoreEvals(); err != nil {
Expand Down Expand Up @@ -1074,7 +1074,7 @@ func (s *Server) revokeLeadership() error {
s.nodeDrainer.SetEnabled(false, nil)

// Disable the volume watcher
s.volumeWatcher.SetEnabled(false, nil)
s.volumeWatcher.SetEnabled(false, nil, "")

// Disable any enterprise systems required.
if err := s.revokeEnterpriseLeadership(); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions nomad/volumewatcher/volumes_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W

// SetEnabled is used to control if the watcher is enabled. The
// watcher should only be enabled on the active leader. When being
// enabled the state is passed in as it is no longer valid once a
// leader election has taken place.
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) {
// enabled the state and leader's ACL is passed in as it is no longer
// valid once a leader election has taken place.
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore, leaderAcl string) {
w.wlock.Lock()
defer w.wlock.Unlock()

wasEnabled := w.enabled
w.enabled = enabled
w.leaderAcl = leaderAcl

if state != nil {
w.state = state
Expand Down
14 changes: 7 additions & 7 deletions nomad/volumewatcher/volumes_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
index := uint64(100)

watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")

plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
Expand All @@ -48,7 +48,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)

watcher.SetEnabled(false, nil)
watcher.SetEnabled(false, nil, "")
require.Equal(0, len(watcher.watchers))
}

Expand All @@ -70,7 +70,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(plugin, alloc, node.ID)

watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")

index++
err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
Expand All @@ -94,7 +94,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
// watches for that change will fire on the new watcher

// step-down (this is sync)
watcher.SetEnabled(false, nil)
watcher.SetEnabled(false, nil, "")
require.Equal(0, len(watcher.watchers))

// allocation is now invalid
Expand All @@ -116,7 +116,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
// create a new watcher and enable it to simulate the leadership
// transition
watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")

require.Eventually(func() bool {
watcher.wlock.RLock()
Expand All @@ -142,7 +142,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")

watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")
require.Equal(0, len(watcher.watchers))

plugin := mock.CSIPlugin()
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) {

watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")

watcher.SetEnabled(true, srv.State())
watcher.SetEnabled(true, srv.State(), "")
require.Equal(0, len(watcher.watchers))

plugin := mock.CSIPlugin()
Expand Down