Skip to content

Commit

Permalink
csi: wire-up volumewatcher to leader
Browse files Browse the repository at this point in the history
Enable the volume watcher on leader step-up and disable it on leader
step-down.
  • Loading branch information
tgross committed Apr 28, 2020
1 parent 902f8a2 commit d185d30
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
6 changes: 6 additions & 0 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Enable the NodeDrainer
s.nodeDrainer.SetEnabled(true, s.State())

// Enable the volume watcher, since we are now the leader
s.volumeWatcher.SetEnabled(true, s.State())

// Restore the eval broker state
if err := s.restoreEvals(); err != nil {
return err
Expand Down Expand Up @@ -870,6 +873,9 @@ func (s *Server) revokeLeadership() error {
// Disable the node drainer
s.nodeDrainer.SetEnabled(false, nil)

// Disable the volume watcher
s.volumeWatcher.SetEnabled(false, nil)

// Disable any enterprise systems required.
if err := s.revokeEnterpriseLeadership(); err != nil {
return err
Expand Down
31 changes: 31 additions & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/nomad/volumewatcher"
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
Expand Down Expand Up @@ -186,6 +187,9 @@ type Server struct {
// nodeDrainer is used to drain allocations from nodes.
nodeDrainer *drainer.NodeDrainer

// volumeWatcher is used to release volume claims
volumeWatcher *volumewatcher.Watcher

// evalBroker is used to manage the in-progress evaluations
// that are waiting to be brokered to a sub-scheduler
evalBroker *EvalBroker
Expand Down Expand Up @@ -399,6 +403,12 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulACLs consu
return nil, fmt.Errorf("failed to create deployment watcher: %v", err)
}

// Setup the volume watcher
if err := s.setupVolumeWatcher(); err != nil {
s.logger.Error("failed to create volume watcher", "error", err)
return nil, fmt.Errorf("failed to create volume watcher: %v", err)
}

// Setup the node drainer.
s.setupNodeDrainer()

Expand Down Expand Up @@ -993,6 +1003,27 @@ func (s *Server) setupDeploymentWatcher() error {
return nil
}

// setupVolumeWatcher creates a volume watcher that consumes the RPC
// endpoints for state information and makes transitions via Raft through a
// shim that provides the appropriate methods.
func (s *Server) setupVolumeWatcher() error {

// Create the raft shim type to restrict the set of raft methods that can be
// made
raftShim := &volumeWatcherRaftShim{
apply: s.raftApply,
}

// Create the volume watcher
s.volumeWatcher = volumewatcher.NewVolumesWatcher(
s.logger, raftShim,
s.staticEndpoints.ClientCSI,
volumewatcher.LimitStateQueriesPerSecond,
volumewatcher.CrossVolumeUpdateBatchDuration)

return nil
}

// setupNodeDrainer creates a node drainer which will be enabled when a server
// becomes a leader.
func (s *Server) setupNodeDrainer() {
Expand Down

0 comments on commit d185d30

Please sign in to comment.