Skip to content

Commit

Permalink
CSI: fix missing ACL tokens for leader-driven RPCs
Browse files Browse the repository at this point in the history
The volumewatcher and GC job in the leader can't make CSI RPCs when ACLs are
enabled without the leader ACL token being passed thru.
  • Loading branch information
tgross committed Aug 7, 2020
1 parent db363cc commit d0cdec1
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 13 deletions.
14 changes: 10 additions & 4 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,9 +724,12 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
req := &structs.CSIVolumeClaimRequest{
VolumeID: volID,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: structs.WriteRequest{
Namespace: ns,
Region: c.srv.Region(),
AuthToken: eval.LeaderACL,
},
}
req.Namespace = ns
req.Region = c.srv.config.Region
err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
return err
}
Expand Down Expand Up @@ -850,8 +853,11 @@ func (c *CoreScheduler) csiPluginGC(eval *structs.Evaluation) error {
continue
}

req := &structs.CSIPluginDeleteRequest{ID: plugin.ID}
req.Region = c.srv.Region()
req := &structs.CSIPluginDeleteRequest{ID: plugin.ID,
QueryOptions: structs.QueryOptions{
Region: c.srv.Region(),
AuthToken: eval.LeaderACL,
}}
err := c.srv.RPC("CSIPlugin.Delete", req, &structs.CSIPluginDeleteResponse{})
if err != nil {
if err.Error() == "plugin in use" {
Expand Down
2 changes: 1 addition & 1 deletion nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func (s *Server) setupDeploymentWatcher() error {
// setupVolumeWatcher creates a volume watcher that sends CSI RPCs
func (s *Server) setupVolumeWatcher() error {
s.volumeWatcher = volumewatcher.NewVolumesWatcher(
s.logger, s.staticEndpoints.CSIVolume)
s.logger, s.staticEndpoints.CSIVolume, s.getLeaderAcl())

return nil
}
Expand Down
14 changes: 11 additions & 3 deletions nomad/volumewatcher/volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type volumeWatcher struct {
// server interface for CSI client RPCs
rpc CSIVolumeRPC

// the ACL needed to send RPCs
leaderAcl string

logger log.Logger
shutdownCtx context.Context // parent context
ctx context.Context // own context
Expand All @@ -44,6 +47,7 @@ func newVolumeWatcher(parent *Watcher, vol *structs.CSIVolume) *volumeWatcher {
v: vol,
state: parent.state,
rpc: parent.rpc,
leaderAcl: parent.leaderAcl,
logger: parent.logger.With("volume_id", vol.ID, "namespace", vol.Namespace),
shutdownCtx: parent.ctx,
}
Expand Down Expand Up @@ -228,9 +232,13 @@ func (vw *volumeWatcher) collectPastClaims(vol *structs.CSIVolume) *structs.CSIV

func (vw *volumeWatcher) unpublish(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: vol.ID,
Claim: claim,
WriteRequest: structs.WriteRequest{Namespace: vol.Namespace},
VolumeID: vol.ID,
Claim: claim,
WriteRequest: structs.WriteRequest{
Namespace: vol.Namespace,
Region: vw.state.Config().Region,
AuthToken: vw.leaderAcl,
},
}
err := vw.rpc.Unpublish(req, &structs.CSIVolumeUnpublishResponse{})
if err != nil {
Expand Down
14 changes: 9 additions & 5 deletions nomad/volumewatcher/volumes_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type Watcher struct {
// the volumes watcher for RPC
rpc CSIVolumeRPC

// the ACL needed to send RPCs
leaderAcl string

// state is the state that is watched for state changes.
state *state.StateStore

Expand All @@ -36,18 +39,19 @@ type Watcher struct {

// NewVolumesWatcher returns a volumes watcher that is used to watch
// volumes and trigger the scheduler as needed.
func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC) *Watcher {
func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *Watcher {

// the leader step-down calls SetEnabled(false) which is what
// cancels this context, rather than passing in its own shutdown
// context
ctx, exitFn := context.WithCancel(context.Background())

return &Watcher{
rpc: rpc,
logger: logger.Named("volumes_watcher"),
ctx: ctx,
exitFn: exitFn,
rpc: rpc,
logger: logger.Named("volumes_watcher"),
ctx: ctx,
exitFn: exitFn,
leaderAcl: leaderAcl,
}
}

Expand Down

0 comments on commit d0cdec1

Please sign in to comment.