diff --git a/api/csi.go b/api/csi.go index fbc984e511a0..6a2bcfe633dc 100644 --- a/api/csi.go +++ b/api/csi.go @@ -60,6 +60,11 @@ func (v *CSIVolumes) Deregister(id string, force bool, w *WriteOptions) error { return err } +func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error { + _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v?detach=true&node=%v", url.PathEscape(volID), nodeID), nil, w) + return err +} + // CSIVolumeAttachmentMode duplicated in nomad/structs/csi.go type CSIVolumeAttachmentMode string diff --git a/command/agent/csi_endpoint.go b/command/agent/csi_endpoint.go index babf13e77f63..6800fead2c92 100644 --- a/command/agent/csi_endpoint.go +++ b/command/agent/csi_endpoint.go @@ -123,7 +123,17 @@ func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *h return nil, CodedError(405, ErrInvalidMethod) } - raw := req.URL.Query().Get("force") + raw := req.URL.Query().Get("detach") + var detach bool + if raw != "" { + var err error + detach, err = strconv.ParseBool(raw) + if err != nil { + return nil, CodedError(400, "invalid detach value") + } + } + + raw = req.URL.Query().Get("force") var force bool if raw != "" { var err error @@ -133,6 +143,30 @@ func (s *HTTPServer) csiVolumeDelete(id string, resp http.ResponseWriter, req *h } } + if detach { + nodeID := req.URL.Query().Get("node") + if nodeID == "" { + return nil, CodedError(400, "detach requires node ID") + } + + args := structs.CSIVolumeUnpublishRequest{ + VolumeID: id, + Claim: &structs.CSIVolumeClaim{ + NodeID: nodeID, + Mode: structs.CSIVolumeClaimRelease, + }, + } + s.parseWriteRequest(req, &args.WriteRequest) + + var out structs.CSIVolumeUnpublishResponse + if err := s.agent.RPC("CSIVolume.Unpublish", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + return nil, nil + } + args := structs.CSIVolumeDeregisterRequest{ VolumeIDs: []string{id}, Force: force, diff --git a/command/commands.go b/command/commands.go index 12a18abe86cb..dc0166bc8836 100644 --- a/command/commands.go +++ b/command/commands.go @@ -723,6 +723,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "volume detach": func() (cli.Command, error) { + return &VolumeDetachCommand{ + Meta: meta, + }, nil + }, } deprecated := map[string]cli.CommandFactory{ diff --git a/command/volume.go b/command/volume.go index 83eb44093b26..6260b0bfc6b5 100644 --- a/command/volume.go +++ b/command/volume.go @@ -28,6 +28,10 @@ Usage: nomad volume [options] $ nomad volume deregister + Detach an unused volume: + + $ nomad volume detach + Please see the individual subcommand help for detailed usage information. ` return strings.TrimSpace(helpText) diff --git a/command/volume_detach.go b/command/volume_detach.go new file mode 100644 index 000000000000..1810d4d4d5a1 --- /dev/null +++ b/command/volume_detach.go @@ -0,0 +1,97 @@ +package command + +import ( + "fmt" + "strings" + + "github.com/hashicorp/nomad/api/contexts" + "github.com/posener/complete" +) + +type VolumeDetachCommand struct { + Meta +} + +func (c *VolumeDetachCommand) Help() string { + helpText := ` +Usage: nomad volume detach [options] + + Detach a volume from a Nomad client. + +General Options: + + ` + generalOptionsUsage() + ` + +` + return strings.TrimSpace(helpText) +} + +func (c *VolumeDetachCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{}) +} + +func (c *VolumeDetachCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Volumes, nil) + if err != nil { + return []string{} + } + matches := resp.Matches[contexts.Volumes] + + resp, _, err = client.Search().PrefixSearch(a.Last, contexts.Nodes, nil) + if err != nil { + return []string{} + } + for _, match := range resp.Matches[contexts.Nodes] { + matches = append(matches, match) + } + return matches + }) +} + +func (c *VolumeDetachCommand) Synopsis() string { + return "Detach a volume" +} + +func (c *VolumeDetachCommand) Name() string { return "volume detach" } + +func (c *VolumeDetachCommand) Run(args []string) int { + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + + if err := flags.Parse(args); err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err)) + return 1 + } + + // Check that we get exactly two arguments + args = flags.Args() + if l := len(args); l != 2 { + c.Ui.Error("This command takes two arguments: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + volID := args[0] + nodeID := args[1] + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + err = client.CSIVolumes().Detach(volID, nodeID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error detaching volume: %s", err)) + return 1 + } + + return 0 +} diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 290574cd9f18..ed95e994f300 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -589,6 +589,66 @@ RELEASE_CLAIM: } func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { + if claim.AllocationID != "" { + err := v.nodeUnpublishVolumeImpl(vol, claim) + if err != nil { + return err + } + claim.State = structs.CSIVolumeClaimStateNodeDetached + return v.checkpointClaim(vol, claim) + } + + // The RPC sent from the 'nomad node detach' command won't have an + // allocation ID set so we try to unpublish every terminal or invalid + // alloc on the node + allocIDs := []string{} + state := v.srv.fsm.State() + vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol) + if err != nil { + return err + } + for allocID, alloc := range vol.ReadAllocs { + if alloc == nil { + rclaim, ok := vol.ReadClaims[allocID] + if ok && rclaim.NodeID == claim.NodeID { + allocIDs = append(allocIDs, allocID) + } + } else { + if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() { + allocIDs = append(allocIDs, allocID) + } + } + } + for allocID, alloc := range vol.WriteAllocs { + if alloc == nil { + wclaim, ok := vol.WriteClaims[allocID] + if ok && wclaim.NodeID == claim.NodeID { + allocIDs = append(allocIDs, allocID) + } + } else { + if alloc.NodeID == claim.NodeID && alloc.TerminalStatus() { + allocIDs = append(allocIDs, allocID) + } + } + } + var merr multierror.Error + for _, allocID := range allocIDs { + claim.AllocationID = allocID + err := v.nodeUnpublishVolumeImpl(vol, claim) + if err != nil { + merr.Errors = append(merr.Errors, err) + } + } + err = merr.ErrorOrNil() + if err != nil { + return err + } + + claim.State = structs.CSIVolumeClaimStateNodeDetached + return v.checkpointClaim(vol, claim) +} + +func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { req := &cstructs.ClientCSINodeDetachVolumeRequest{ PluginID: vol.PluginID, VolumeID: vol.ID, @@ -609,8 +669,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C return fmt.Errorf("could not detach from node: %w", err) } } - claim.State = structs.CSIVolumeClaimStateNodeDetached - return v.checkpointClaim(vol, claim) + return nil } func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index d372e080df22..900ca1be3505 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -425,48 +425,51 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { codec := rpcClient(t, srv) + // setup: create a client node with a controller and node plugin + node := mock.Node() + node.Attributes["nomad.version"] = "0.11.0" + node.CSINodePlugins = map[string]*structs.CSIInfo{ + "minnie": {PluginID: "minnie", + Healthy: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + node.CSIControllerPlugins = map[string]*structs.CSIInfo{ + "minnie": {PluginID: "minnie", + Healthy: true, + ControllerInfo: &structs.CSIControllerInfo{SupportsAttachDetach: true}, + RequiresControllerPlugin: true, + }, + } + index++ + require.NoError(t, state.UpsertNode(index, node)) + type tc struct { name string startingState structs.CSIVolumeClaimState - hasController bool expectedErrMsg string } - testCases := []tc{ { - name: "no path to node plugin", - startingState: structs.CSIVolumeClaimStateTaken, - hasController: true, - expectedErrMsg: "could not detach from node: Unknown node ", + name: "success", + startingState: structs.CSIVolumeClaimStateControllerDetached, }, { - name: "no registered controller plugin", + name: "unpublish previously detached node", startingState: structs.CSIVolumeClaimStateNodeDetached, - hasController: true, - expectedErrMsg: "could not detach from controller: controller detach volume: plugin missing: minnie", + expectedErrMsg: "could not detach from controller: No path to node", }, { - name: "success", - startingState: structs.CSIVolumeClaimStateControllerDetached, - hasController: true, + name: "first unpublish", + startingState: structs.CSIVolumeClaimStateTaken, + expectedErrMsg: "could not detach from node: No path to node", }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - + // setup: register a volume volID := uuid.Generate() - nodeID := uuid.Generate() - allocID := uuid.Generate() - - claim := &structs.CSIVolumeClaim{ - AllocationID: allocID, - NodeID: nodeID, - ExternalNodeID: "i-example", - Mode: structs.CSIVolumeClaimRead, - State: tc.startingState, - } - vol := &structs.CSIVolume{ ID: volID, Namespace: ns, @@ -474,13 +477,36 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, PluginID: "minnie", Secrets: structs.CSISecrets{"mysecret": "secretvalue"}, - ControllerRequired: tc.hasController, + ControllerRequired: true, } index++ err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) require.NoError(t, err) + // setup: create an alloc that will claim our volume + alloc := mock.BatchAlloc() + alloc.NodeID = node.ID + alloc.ClientStatus = structs.AllocClientStatusFailed + + index++ + require.NoError(t, state.UpsertAllocs(index, []*structs.Allocation{alloc})) + + // setup: claim the volume for our alloc + claim := &structs.CSIVolumeClaim{ + AllocationID: alloc.ID, + NodeID: node.ID, + ExternalNodeID: "i-example", + Mode: structs.CSIVolumeClaimRead, + } + + index++ + claim.State = structs.CSIVolumeClaimStateTaken + err = state.CSIVolumeClaim(index, ns, volID, claim) + require.NoError(t, err) + + // test: unpublish and check the results + claim.State = tc.startingState req := &structs.CSIVolumeUnpublishRequest{ VolumeID: volID, Claim: claim, @@ -497,6 +523,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { if tc.expectedErrMsg == "" { require.NoError(t, err) } else { + require.Error(t, err) require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg), "error message %q did not contain %q", err.Error(), tc.expectedErrMsg) } diff --git a/vendor/github.com/hashicorp/nomad/api/csi.go b/vendor/github.com/hashicorp/nomad/api/csi.go index fbc984e511a0..6a2bcfe633dc 100644 --- a/vendor/github.com/hashicorp/nomad/api/csi.go +++ b/vendor/github.com/hashicorp/nomad/api/csi.go @@ -60,6 +60,11 @@ func (v *CSIVolumes) Deregister(id string, force bool, w *WriteOptions) error { return err } +func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error { + _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v?detach=true&node=%v", url.PathEscape(volID), nodeID), nil, w) + return err +} + // CSIVolumeAttachmentMode duplicated in nomad/structs/csi.go type CSIVolumeAttachmentMode string