Skip to content

Commit

Permalink
csi: nomad volume detach command
Browse files Browse the repository at this point in the history
The soundness guarantees of the CSI specification leave a little to be desired
in our ability to provide a 100% reliable automated solution for managing
volumes. This changeset provides a new command to bridge this gap by providing
the operator the ability to intervene.

The command doesn't take an allocation ID so that the operator doesn't have to
keep track of alloc IDs that may have been GC'd. Handle this case in the
unpublish RPC by sending the client RPC for all the terminal/nil allocs on the
selected node.
  • Loading branch information
tgross committed Aug 7, 2020
1 parent 079f60c commit ef6fc9f
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 28 deletions.
5 changes: 5 additions & 0 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (v *CSIVolumes) Deregister(id string, force bool, w *WriteOptions) error {
return err
}

func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v?detach=true&node=%v", url.PathEscape(volID), nodeID), nil, w)
return err
}

// CSIVolumeAttachmentMode duplicated in nomad/structs/csi.go
type CSIVolumeAttachmentMode string

Expand Down
36 changes: 35 additions & 1 deletion command/agent/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,17 @@ func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *h
return nil, CodedError(405, ErrInvalidMethod)
}

raw := req.URL.Query().Get("force")
raw := req.URL.Query().Get("detach")
var detach bool
if raw != "" {
var err error
detach, err = strconv.ParseBool(raw)
if err != nil {
return nil, CodedError(400, "invalid detach value")
}
}

raw = req.URL.Query().Get("force")
var force bool
if raw != "" {
var err error
Expand All @@ -133,6 +143,30 @@ func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *h
}
}

if detach {
nodeID := req.URL.Query().Get("node")
if nodeID == "" {
return nil, CodedError(400, "detach requires node ID")
}

args := structs.CSIVolumeUnpublishRequest{
VolumeID: id,
Claim: &structs.CSIVolumeClaim{
NodeID: nodeID,
Mode: structs.CSIVolumeClaimRelease,
},
}
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.CSIVolumeUnpublishResponse
if err := s.agent.RPC("CSIVolume.Unpublish", &args, &out); err != nil {
return nil, err
}

setMeta(resp, &out.QueryMeta)
return nil, nil
}

args := structs.CSIVolumeDeregisterRequest{
VolumeIDs: []string{id},
Force: force,
Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"volume detach": func() (cli.Command, error) {
return &VolumeDetachCommand{
Meta: meta,
}, nil
},
}

deprecated := map[string]cli.CommandFactory{
Expand Down
4 changes: 4 additions & 0 deletions command/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ Usage: nomad volume <subcommand> [options]
$ nomad volume deregister <id>
Detach an unused volume:
$ nomad volume detach <vol id> <node id>
Please see the individual subcommand help for detailed usage information.
`
return strings.TrimSpace(helpText)
Expand Down
97 changes: 97 additions & 0 deletions command/volume_detach.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package command

import (
"fmt"
"strings"

"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)

type VolumeDetachCommand struct {
Meta
}

func (c *VolumeDetachCommand) Help() string {
helpText := `
Usage: nomad volume detach [options] <vol id> <node id>
Detach a volume from a Nomad client.
General Options:
` + generalOptionsUsage() + `
`
return strings.TrimSpace(helpText)
}

func (c *VolumeDetachCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{})
}

func (c *VolumeDetachCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}

resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Volumes, nil)
if err != nil {
return []string{}
}
matches := resp.Matches[contexts.Volumes]

resp, _, err = client.Search().PrefixSearch(a.Last, contexts.Nodes, nil)
if err != nil {
return []string{}
}
for _, match := range resp.Matches[contexts.Nodes] {
matches = append(matches, match)
}
return matches
})
}

func (c *VolumeDetachCommand) Synopsis() string {
return "Detach a volume"
}

func (c *VolumeDetachCommand) Name() string { return "volume detach" }

func (c *VolumeDetachCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }

if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err))
return 1
}

// Check that we get exactly two arguments
args = flags.Args()
if l := len(args); l != 2 {
c.Ui.Error("This command takes two arguments: <vol id> <node id>")
c.Ui.Error(commandErrorText(c))
return 1
}
volID := args[0]
nodeID := args[1]

// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}

err = client.CSIVolumes().Detach(volID, nodeID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error detaching volume: %s", err))
return 1
}

return 0
}
63 changes: 61 additions & 2 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,66 @@ RELEASE_CLAIM:
}

func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
if claim.AllocationID != "" {
err := v.nodeUnpublishVolumeImpl(vol, claim)
if err != nil {
return err
}
claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
}

// The RPC sent from the 'nomad node detach' command won't have an
// allocation ID set so we try to unpublish every terminal or invalid
// alloc on the node
allocIDs := []string{}
state := v.srv.fsm.State()
vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
if err != nil {
return err
}
for allocID, alloc := range vol.ReadAllocs {
if alloc == nil {
rclaim, ok := vol.ReadClaims[allocID]
if ok && rclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else {
if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)
}
}
}
for allocID, alloc := range vol.WriteAllocs {
if alloc == nil {
wclaim, ok := vol.WriteClaims[allocID]
if ok && wclaim.NodeID == claim.NodeID {
allocIDs = append(allocIDs, allocID)
}
} else {
if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() {
allocIDs = append(allocIDs, allocID)
}
}
}
var merr multierror.Error
for _, allocID := range allocIDs {
claim.AllocationID = allocID
err := v.nodeUnpublishVolumeImpl(vol, claim)
if err != nil {
merr.Errors = append(merr.Errors, err)
}
}
err = merr.ErrorOrNil()
if err != nil {
return err
}

claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
}

func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
req := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: vol.PluginID,
VolumeID: vol.ID,
Expand All @@ -609,8 +669,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
return fmt.Errorf("could not detach from node: %w", err)
}
}
claim.State = structs.CSIVolumeClaimStateNodeDetached
return v.checkpointClaim(vol, claim)
return nil
}

func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
Expand Down
77 changes: 52 additions & 25 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,62 +425,88 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {

codec := rpcClient(t, srv)

// setup: create a client node with a controller and node plugin
node := mock.Node()
node.Attributes["nomad.version"] = "0.11.0"
node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{SupportsAttachDetach: true},
RequiresControllerPlugin: true,
},
}
index++
require.NoError(t, state.UpsertNode(index, node))

type tc struct {
name string
startingState structs.CSIVolumeClaimState
hasController bool
expectedErrMsg string
}

testCases := []tc{
{
name: "no path to node plugin",
startingState: structs.CSIVolumeClaimStateTaken,
hasController: true,
expectedErrMsg: "could not detach from node: Unknown node ",
name: "success",
startingState: structs.CSIVolumeClaimStateControllerDetached,
},
{
name: "no registered controller plugin",
name: "unpublish previously detached node",
startingState: structs.CSIVolumeClaimStateNodeDetached,
hasController: true,
expectedErrMsg: "could not detach from controller: controller detach volume: plugin missing: minnie",
expectedErrMsg: "could not detach from controller: No path to node",
},
{
name: "success",
startingState: structs.CSIVolumeClaimStateControllerDetached,
hasController: true,
name: "first unpublish",
startingState: structs.CSIVolumeClaimStateTaken,
expectedErrMsg: "could not detach from node: No path to node",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

// setup: register a volume
volID := uuid.Generate()
nodeID := uuid.Generate()
allocID := uuid.Generate()

claim := &structs.CSIVolumeClaim{
AllocationID: allocID,
NodeID: nodeID,
ExternalNodeID: "i-example",
Mode: structs.CSIVolumeClaimRead,
State: tc.startingState,
}

vol := &structs.CSIVolume{
ID: volID,
Namespace: ns,
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
PluginID: "minnie",
Secrets: structs.CSISecrets{"mysecret": "secretvalue"},
ControllerRequired: tc.hasController,
ControllerRequired: true,
}

index++
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(t, err)

// setup: create an alloc that will claim our volume
alloc := mock.BatchAlloc()
alloc.NodeID = node.ID
alloc.ClientStatus = structs.AllocClientStatusFailed

index++
require.NoError(t, state.UpsertAllocs(index, []*structs.Allocation{alloc}))

// setup: claim the volume for our alloc
claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
NodeID: node.ID,
ExternalNodeID: "i-example",
Mode: structs.CSIVolumeClaimRead,
}

index++
claim.State = structs.CSIVolumeClaimStateTaken
err = state.CSIVolumeClaim(index, ns, volID, claim)
require.NoError(t, err)

// test: unpublish and check the results
claim.State = tc.startingState
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: volID,
Claim: claim,
Expand All @@ -497,6 +523,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
if tc.expectedErrMsg == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg),
"error message %q did not contain %q", err.Error(), tc.expectedErrMsg)
}
Expand Down
5 changes: 5 additions & 0 deletions vendor/github.com/hashicorp/nomad/api/csi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ef6fc9f

Please sign in to comment.