Skip to content

Commit

Permalink
csi: nomad volume detach command (#8584)
Browse files Browse the repository at this point in the history
The soundness guarantees of the CSI specification leave a little to be desired
in our ability to provide a 100% reliable automated solution for managing
volumes. This changeset provides a new command to bridge this gap by providing
the operator the ability to intervene.

The command doesn't take an allocation ID so that the operator doesn't have to
keep track of alloc IDs that may have been GC'd. Handle this case in the
unpublish RPC by sending the client RPC for all the terminal/nil allocs on the
selected node.
  • Loading branch information
tgross committed Aug 11, 2020
1 parent d21ef34 commit fbefdb9
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 42 deletions.
5 changes: 5 additions & 0 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (v *CSIVolumes) Deregister(id string, force bool, w *WriteOptions) error {
return err
}

func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/detach?node=%v", url.PathEscape(volID), nodeID), nil, w)
return err
}

// CSIVolumeAttachmentMode duplicated in nomad/structs/csi.go
type CSIVolumeAttachmentMode string

Expand Down
62 changes: 52 additions & 10 deletions command/agent/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,35 @@ func (s *HTTPServer) CSIVolumeSpecificRequest(resp http.ResponseWriter, req *htt
// Tokenize the suffix of the path to get the volume id
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/volume/csi/")
tokens := strings.Split(reqSuffix, "/")
if len(tokens) > 2 || len(tokens) < 1 {
if len(tokens) < 1 {
return nil, CodedError(404, resourceNotFoundErr)
}
id := tokens[0]

switch req.Method {
case "GET":
return s.csiVolumeGet(id, resp, req)
case "PUT":
return s.csiVolumePut(id, resp, req)
case "DELETE":
return s.csiVolumeDelete(id, resp, req)
default:
return nil, CodedError(405, ErrInvalidMethod)
if len(tokens) == 1 {
switch req.Method {
case http.MethodGet:
return s.csiVolumeGet(id, resp, req)
case http.MethodPut:
return s.csiVolumePut(id, resp, req)
case http.MethodDelete:
return s.csiVolumeDelete(id, resp, req)
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}

if len(tokens) == 2 {
if tokens[1] != "detach" {
return nil, CodedError(404, resourceNotFoundErr)
}
if req.Method != http.MethodDelete {
return nil, CodedError(405, ErrInvalidMethod)
}
return s.csiVolumeDetach(id, resp, req)
}

return nil, CodedError(404, resourceNotFoundErr)
}

func (s *HTTPServer) csiVolumeGet(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Expand Down Expand Up @@ -149,6 +163,34 @@ func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *h
return nil, nil
}

func (s *HTTPServer) csiVolumeDetach(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodDelete {
return nil, CodedError(405, ErrInvalidMethod)
}

nodeID := req.URL.Query().Get("node")
if nodeID == "" {
return nil, CodedError(400, "detach requires node ID")
}

args := structs.CSIVolumeUnpublishRequest{
VolumeID: id,
Claim: &structs.CSIVolumeClaim{
NodeID: nodeID,
Mode: structs.CSIVolumeClaimRelease,
},
}
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.CSIVolumeUnpublishResponse
if err := s.agent.RPC("CSIVolume.Unpublish", &args, &out); err != nil {
return nil, err
}

setMeta(resp, &out.QueryMeta)
return nil, nil
}

// CSIPluginsRequest lists CSI plugins
func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"volume detach": func() (cli.Command, error) {
return &VolumeDetachCommand{
Meta: meta,
}, nil
},
}

deprecated := map[string]cli.CommandFactory{
Expand Down
4 changes: 4 additions & 0 deletions command/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ Usage: nomad volume <subcommand> [options]
$ nomad volume deregister <id>
Detach an unused volume:
$ nomad volume detach <vol id> <node id>
Please see the individual subcommand help for detailed usage information.
`
return strings.TrimSpace(helpText)
Expand Down
97 changes: 97 additions & 0 deletions command/volume_detach.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package command

import (
"fmt"
"strings"

"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)

type VolumeDetachCommand struct {
Meta
}

func (c *VolumeDetachCommand) Help() string {
helpText := `
Usage: nomad volume detach [options] <vol id> <node id>
Detach a volume from a Nomad client.
General Options:
` + generalOptionsUsage() + `
`
return strings.TrimSpace(helpText)
}

func (c *VolumeDetachCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{})
}

func (c *VolumeDetachCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}

resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Volumes, nil)
if err != nil {
return []string{}
}
matches := resp.Matches[contexts.Volumes]

resp, _, err = client.Search().PrefixSearch(a.Last, contexts.Nodes, nil)
if err != nil {
return []string{}
}
for _, match := range resp.Matches[contexts.Nodes] {
matches = append(matches, match)
}
return matches
})
}

func (c *VolumeDetachCommand) Synopsis() string {
return "Detach a volume"
}

func (c *VolumeDetachCommand) Name() string { return "volume detach" }

func (c *VolumeDetachCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }

if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err))
return 1
}

// Check that we get exactly two arguments
args = flags.Args()
if l := len(args); l != 2 {
c.Ui.Error("This command takes two arguments: <vol id> <node id>")
c.Ui.Error(commandErrorText(c))
return 1
}
volID := args[0]
nodeID := args[1]

// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}

err = client.CSIVolumes().Detach(volID, nodeID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error detaching volume: %s", err))
return 1
}

return 0
}
63 changes: 61 additions & 2 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,66 @@ RELEASE_CLAIM:
}

func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
if claim.AllocationID != "" {
err := v.nodeUnpublishVolumeImpl(vol, claim)
if err != nil {
return err
}
claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
}

// The RPC sent from the 'nomad node detach' command won't have an
// allocation ID set so we try to unpublish every terminal or invalid
// alloc on the node
allocIDs := []string{}
state := v.srv.fsm.State()
vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
if err != nil {
return err
}
for allocID, alloc := range vol.ReadAllocs {
if alloc == nil {
rclaim, ok := vol.ReadClaims[allocID]
if ok && rclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else {
if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)
}
}
}
for allocID, alloc := range vol.WriteAllocs {
if alloc == nil {
wclaim, ok := vol.WriteClaims[allocID]
if ok && wclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else {
if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)
}
}
}
var merr multierror.Error
for _, allocID := range allocIDs {
claim.AllocationID = allocID
err := v.nodeUnpublishVolumeImpl(vol, claim)
if err != nil {
merr.Errors = append(merr.Errors, err)
}
}
err = merr.ErrorOrNil()
if err != nil {
return err
}

claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
}

func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
req := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: vol.PluginID,
VolumeID: vol.ID,
Expand All @@ -609,8 +669,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
return fmt.Errorf("could not detach from node: %w", err)
}
}
claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
return nil
}

func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
Expand Down
Loading

0 comments on commit fbefdb9

Please sign in to comment.