diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 0b1d7e62420f..08f65e5bf823 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -8,9 +8,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" version "github.com/hashicorp/go-version" - cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -711,212 +709,30 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time, return timeDiff > interval.Nanoseconds() } +// TODO: we need a periodic trigger to iterate over all the volumes and split +// them up into separate work items, same as we do for jobs. + // csiVolumeClaimGC is used to garbage collect CSI volume claims func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { - c.logger.Trace("garbage collecting unclaimed CSI volume claims") + c.logger.Trace("garbage collecting unclaimed CSI volume claims", "eval.JobID", eval.JobID) // Volume ID smuggled in with the eval's own JobID evalVolID := strings.Split(eval.JobID, ":") - if len(evalVolID) != 3 { + + // COMPAT(1.0): 0.11.0 shipped with 3 fields. tighten this check to len == 2 + if len(evalVolID) < 2 { c.logger.Error("volume gc called without volID") return nil } volID := evalVolID[1] - runningAllocs := evalVolID[2] == "purge" - return volumeClaimReap(c.srv, volID, eval.Namespace, - c.srv.config.Region, eval.LeaderACL, runningAllocs) -} - -func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error { - - ws := memdb.NewWatchSet() - - vol, err := srv.State().CSIVolumeByID(ws, namespace, volID) - if err != nil { - return err - } - if vol == nil { - return nil - } - vol, err = srv.State().CSIVolumeDenormalize(ws, vol) - if err != nil { - return err - } - - plug, err := srv.State().CSIPluginByID(ws, vol.PluginID) - if err != nil { - return err - } - - nodeClaims := collectClaimsToGCImpl(vol, runningAllocs) - - var result *multierror.Error - for _, claim := range vol.PastClaims { - nodeClaims, err = volumeClaimReapImpl(srv, - &volumeClaimReapArgs{ - vol: vol, - plug: plug, - claim: claim, - namespace: namespace, - region: region, - leaderACL: leaderACL, - nodeClaims: nodeClaims, - }, - ) - if err != nil { - result = multierror.Append(result, err) - continue - } + req := &structs.CSIVolumeClaimRequest{ + VolumeID: volID, + Claim: structs.CSIVolumeClaimRelease, } - return result.ErrorOrNil() - -} + req.Namespace = eval.Namespace + req.Region = c.srv.config.Region -func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) map[string]int { - nodeClaims := map[string]int{} // node IDs -> count - - collectFunc := func(allocs map[string]*structs.Allocation, - claims map[string]*structs.CSIVolumeClaim) { - - for allocID, alloc := range allocs { - claim, ok := claims[allocID] - if !ok { - // COMPAT(1.0): the CSIVolumeClaim fields were added - // after 0.11.1, so claims made before that may be - // missing this value. note that we'll have non-nil - // allocs here because we called denormalize on the - // value. - claim = &structs.CSIVolumeClaim{ - AllocationID: allocID, - NodeID: alloc.NodeID, - State: structs.CSIVolumeClaimStateTaken, - } - } - nodeClaims[claim.NodeID]++ - if runningAllocs || alloc.Terminated() { - // only overwrite the PastClaim if this is new, - // so that we can track state between subsequent calls - if _, exists := vol.PastClaims[claim.AllocationID]; !exists { - claim.State = structs.CSIVolumeClaimStateTaken - vol.PastClaims[claim.AllocationID] = claim - } - } - } - } - - collectFunc(vol.WriteAllocs, vol.WriteClaims) - collectFunc(vol.ReadAllocs, vol.ReadClaims) - return nodeClaims -} - -type volumeClaimReapArgs struct { - vol *structs.CSIVolume - plug *structs.CSIPlugin - claim *structs.CSIVolumeClaim - region string - namespace string - leaderACL string - nodeClaims map[string]int // node IDs -> count -} - -func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]int, error) { - vol := args.vol - claim := args.claim - - var err error - var nReq *cstructs.ClientCSINodeDetachVolumeRequest - - checkpoint := func(claimState structs.CSIVolumeClaimState) error { - req := &structs.CSIVolumeClaimRequest{ - VolumeID: vol.ID, - AllocationID: claim.AllocationID, - Claim: structs.CSIVolumeClaimRelease, - WriteRequest: structs.WriteRequest{ - Region: args.region, - Namespace: args.namespace, - AuthToken: args.leaderACL, - }, - } - return srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) - } - - // previous checkpoints may have set the past claim state already. - // in practice we should never see CSIVolumeClaimStateControllerDetached - // but having an option for the state makes it easy to add a checkpoint - // in a backwards compatible way if we need one later - switch claim.State { - case structs.CSIVolumeClaimStateNodeDetached: - goto NODE_DETACHED - case structs.CSIVolumeClaimStateControllerDetached: - goto RELEASE_CLAIM - case structs.CSIVolumeClaimStateReadyToFree: - goto RELEASE_CLAIM - } - - // (1) NodePublish / NodeUnstage must be completed before controller - // operations or releasing the claim. - nReq = &cstructs.ClientCSINodeDetachVolumeRequest{ - PluginID: args.plug.ID, - VolumeID: vol.ID, - ExternalID: vol.RemoteID(), - AllocID: claim.AllocationID, - NodeID: claim.NodeID, - AttachmentMode: vol.AttachmentMode, - AccessMode: vol.AccessMode, - ReadOnly: claim.Mode == structs.CSIVolumeClaimRead, - } - err = srv.RPC("ClientCSI.NodeDetachVolume", nReq, - &cstructs.ClientCSINodeDetachVolumeResponse{}) - if err != nil { - return args.nodeClaims, err - } - err = checkpoint(structs.CSIVolumeClaimStateNodeDetached) - if err != nil { - return args.nodeClaims, err - } - -NODE_DETACHED: - args.nodeClaims[claim.NodeID]-- - - // (2) we only emit the controller unpublish if no other allocs - // on the node need it, but we also only want to make this - // call at most once per node - if vol.ControllerRequired && args.nodeClaims[claim.NodeID] < 1 { - - // we need to get the CSI Node ID, which is not the same as - // the Nomad Node ID - ws := memdb.NewWatchSet() - targetNode, err := srv.State().NodeByID(ws, claim.NodeID) - if err != nil { - return args.nodeClaims, err - } - if targetNode == nil { - return args.nodeClaims, fmt.Errorf("%s: %s", - structs.ErrUnknownNodePrefix, claim.NodeID) - } - targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID] - if !ok { - return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID) - } - - cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{ - VolumeID: vol.RemoteID(), - ClientCSINodeID: targetCSIInfo.NodeInfo.ID, - } - cReq.PluginID = args.plug.ID - err = srv.RPC("ClientCSI.ControllerDetachVolume", cReq, - &cstructs.ClientCSIControllerDetachVolumeResponse{}) - if err != nil { - return args.nodeClaims, err - } - } - -RELEASE_CLAIM: - // (3) release the claim from the state store, allowing it to be rescheduled - err = checkpoint(structs.CSIVolumeClaimStateReadyToFree) - if err != nil { - return args.nodeClaims, err - } - return args.nodeClaims, nil + err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) + return err } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 819e0908ddbc..70b500a82bcf 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -6,10 +6,8 @@ import ( "time" memdb "github.com/hashicorp/go-memdb" - cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" @@ -2195,268 +2193,3 @@ func TestAllocation_GCEligible(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete require.True(allocGCEligible(alloc, nil, time.Now(), 1000)) } - -func TestCSI_GCVolumeClaims_Collection(t *testing.T) { - t.Parallel() - srv, shutdownSrv := TestServer(t, func(c *Config) { c.NumSchedulers = 0 }) - defer shutdownSrv() - testutil.WaitForLeader(t, srv.RPC) - - state := srv.fsm.State() - ws := memdb.NewWatchSet() - index := uint64(100) - - // Create a client node, plugin, and volume - node := mock.Node() - node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version - node.CSINodePlugins = map[string]*structs.CSIInfo{ - "csi-plugin-example": { - PluginID: "csi-plugin-example", - Healthy: true, - RequiresControllerPlugin: true, - NodeInfo: &structs.CSINodeInfo{}, - }, - } - node.CSIControllerPlugins = map[string]*structs.CSIInfo{ - "csi-plugin-example": { - PluginID: "csi-plugin-example", - Healthy: true, - RequiresControllerPlugin: true, - ControllerInfo: &structs.CSIControllerInfo{ - SupportsReadOnlyAttach: true, - SupportsAttachDetach: true, - SupportsListVolumes: true, - SupportsListVolumesAttachedNodes: false, - }, - }, - } - err := state.UpsertNode(99, node) - require.NoError(t, err) - volId0 := uuid.Generate() - ns := structs.DefaultNamespace - vols := []*structs.CSIVolume{{ - ID: volId0, - Namespace: ns, - PluginID: "csi-plugin-example", - AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, - AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - }} - - err = state.CSIVolumeRegister(index, vols) - index++ - require.NoError(t, err) - vol, err := state.CSIVolumeByID(ws, ns, volId0) - - require.NoError(t, err) - require.True(t, vol.ControllerRequired) - require.Len(t, vol.ReadAllocs, 0) - require.Len(t, vol.WriteAllocs, 0) - - // Create a job with 2 allocations - job := mock.Job() - job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ - "_": { - Name: "someVolume", - Type: structs.VolumeTypeCSI, - Source: volId0, - ReadOnly: false, - }, - } - err = state.UpsertJob(index, job) - index++ - require.NoError(t, err) - - alloc1 := mock.Alloc() - alloc1.JobID = job.ID - alloc1.NodeID = node.ID - err = state.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID)) - index++ - require.NoError(t, err) - alloc1.TaskGroup = job.TaskGroups[0].Name - - alloc2 := mock.Alloc() - alloc2.JobID = job.ID - alloc2.NodeID = node.ID - err = state.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID)) - index++ - require.NoError(t, err) - alloc2.TaskGroup = job.TaskGroups[0].Name - - err = state.UpsertAllocs(104, []*structs.Allocation{alloc1, alloc2}) - require.NoError(t, err) - - // Claim the volumes and verify the claims were set - err = state.CSIVolumeClaim(index, ns, volId0, &structs.CSIVolumeClaim{ - AllocationID: alloc1.ID, - NodeID: alloc1.NodeID, - Mode: structs.CSIVolumeClaimWrite, - }) - index++ - require.NoError(t, err) - - err = state.CSIVolumeClaim(index, ns, volId0, &structs.CSIVolumeClaim{ - AllocationID: alloc2.ID, - NodeID: alloc2.NodeID, - Mode: structs.CSIVolumeClaimRead, - }) - index++ - require.NoError(t, err) - - vol, err = state.CSIVolumeByID(ws, ns, volId0) - require.NoError(t, err) - require.Len(t, vol.ReadAllocs, 1) - require.Len(t, vol.WriteAllocs, 1) - - // Update both allocs as failed/terminated - alloc1.ClientStatus = structs.AllocClientStatusFailed - alloc2.ClientStatus = structs.AllocClientStatusFailed - err = state.UpdateAllocsFromClient(index, []*structs.Allocation{alloc1, alloc2}) - require.NoError(t, err) - - vol, err = state.CSIVolumeDenormalize(ws, vol) - require.NoError(t, err) - - nodeClaims := collectClaimsToGCImpl(vol, false) - require.Equal(t, nodeClaims[node.ID], 2) - require.Len(t, vol.PastClaims, 2) -} - -func TestCSI_GCVolumeClaims_Reap(t *testing.T) { - t.Parallel() - require := require.New(t) - - s, shutdownSrv := TestServer(t, func(c *Config) { c.NumSchedulers = 0 }) - defer shutdownSrv() - testutil.WaitForLeader(t, s.RPC) - - node := mock.Node() - plugin := mock.CSIPlugin() - vol := mock.CSIVolume(plugin) - alloc := mock.Alloc() - - cases := []struct { - Name string - ClaimsCount map[string]int - ControllerRequired bool - ExpectedErr string - ExpectedCount int - ExpectedClaimsCount int - ExpectedNodeDetachVolumeCount int - ExpectedControllerDetachVolumeCount int - ExpectedVolumeClaimCount int - srv *MockRPCServer - }{ - { - Name: "NodeDetachVolume fails", - ClaimsCount: map[string]int{node.ID: 1}, - ControllerRequired: true, - ExpectedErr: "node plugin missing", - ExpectedClaimsCount: 1, - ExpectedNodeDetachVolumeCount: 1, - srv: &MockRPCServer{ - state: s.State(), - nextCSINodeDetachVolumeError: fmt.Errorf("node plugin missing"), - }, - }, - { - Name: "ControllerDetachVolume no controllers", - ClaimsCount: map[string]int{node.ID: 1}, - ControllerRequired: true, - ExpectedErr: fmt.Sprintf("Unknown node: %s", node.ID), - ExpectedClaimsCount: 0, - ExpectedNodeDetachVolumeCount: 1, - ExpectedControllerDetachVolumeCount: 0, - ExpectedVolumeClaimCount: 1, - srv: &MockRPCServer{ - state: s.State(), - }, - }, - { - Name: "ControllerDetachVolume node-only", - ClaimsCount: map[string]int{node.ID: 1}, - ControllerRequired: false, - ExpectedClaimsCount: 0, - ExpectedNodeDetachVolumeCount: 1, - ExpectedControllerDetachVolumeCount: 0, - ExpectedVolumeClaimCount: 2, - srv: &MockRPCServer{ - state: s.State(), - }, - }, - } - - for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - vol.ControllerRequired = tc.ControllerRequired - claim := &structs.CSIVolumeClaim{ - AllocationID: alloc.ID, - NodeID: node.ID, - State: structs.CSIVolumeClaimStateTaken, - Mode: structs.CSIVolumeClaimRead, - } - nodeClaims, err := volumeClaimReapImpl(tc.srv, &volumeClaimReapArgs{ - vol: vol, - plug: plugin, - claim: claim, - region: "global", - namespace: "default", - leaderACL: "not-in-use", - nodeClaims: tc.ClaimsCount, - }) - if tc.ExpectedErr != "" { - require.EqualError(err, tc.ExpectedErr) - } else { - require.NoError(err) - } - require.Equal(tc.ExpectedClaimsCount, - nodeClaims[claim.NodeID], "expected claims remaining") - require.Equal(tc.ExpectedNodeDetachVolumeCount, - tc.srv.countCSINodeDetachVolume, "node detach RPC count") - require.Equal(tc.ExpectedControllerDetachVolumeCount, - tc.srv.countCSIControllerDetachVolume, "controller detach RPC count") - require.Equal(tc.ExpectedVolumeClaimCount, - tc.srv.countCSIVolumeClaim, "volume claim RPC count") - }) - } -} - -type MockRPCServer struct { - state *state.StateStore - - // mock responses for ClientCSI.NodeDetachVolume - nextCSINodeDetachVolumeResponse *cstructs.ClientCSINodeDetachVolumeResponse - nextCSINodeDetachVolumeError error - countCSINodeDetachVolume int - - // mock responses for ClientCSI.ControllerDetachVolume - nextCSIControllerDetachVolumeResponse *cstructs.ClientCSIControllerDetachVolumeResponse - nextCSIControllerDetachVolumeError error - countCSIControllerDetachVolume int - - // mock responses for CSI.VolumeClaim - nextCSIVolumeClaimResponse *structs.CSIVolumeClaimResponse - nextCSIVolumeClaimError error - countCSIVolumeClaim int -} - -func (srv *MockRPCServer) RPC(method string, args interface{}, reply interface{}) error { - switch method { - case "ClientCSI.NodeDetachVolume": - reply = srv.nextCSINodeDetachVolumeResponse - srv.countCSINodeDetachVolume++ - return srv.nextCSINodeDetachVolumeError - case "ClientCSI.ControllerDetachVolume": - reply = srv.nextCSIControllerDetachVolumeResponse - srv.countCSIControllerDetachVolume++ - return srv.nextCSIControllerDetachVolumeError - case "CSIVolume.Claim": - reply = srv.nextCSIVolumeClaimResponse - srv.countCSIVolumeClaim++ - return srv.nextCSIVolumeClaimError - default: - return fmt.Errorf("unexpected method %q passed to mock", method) - } - -} - -func (srv *MockRPCServer) State() *state.StateStore { return srv.state } diff --git a/nomad/interfaces.go b/nomad/interfaces.go deleted file mode 100644 index 4dc266d8b808..000000000000 --- a/nomad/interfaces.go +++ /dev/null @@ -1,11 +0,0 @@ -package nomad - -import "github.com/hashicorp/nomad/nomad/state" - -// RPCServer is a minimal interface of the Server, intended as -// an aid for testing logic surrounding server-to-server or -// server-to-client RPC calls -type RPCServer interface { - RPC(method string, args interface{}, reply interface{}) error - State() *state.StateStore -} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index a451817fb588..3cbd3f204db7 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -737,19 +737,13 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD for _, vol := range volumesToGC { // we have to build this eval by hand rather than calling srv.CoreJob // here because we need to use the volume's namespace - - runningAllocs := ":ok" - if args.Purge { - runningAllocs = ":purge" - } - eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: job.Namespace, Priority: structs.CoreJobPriority, Type: structs.JobTypeCore, TriggeredBy: structs.EvalTriggerAllocStop, - JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source + runningAllocs, + JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source, LeaderACL: j.srv.getLeaderAcl(), Status: structs.EvalStatusPending, CreateTime: now, diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index fcfbcfcc2326..7308c00aa63c 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1149,7 +1149,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene Priority: structs.CoreJobPriority, Type: structs.JobTypeCore, TriggeredBy: structs.EvalTriggerAllocStop, - JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0] + ":no", + JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0], LeaderACL: n.srv.getLeaderAcl(), Status: structs.EvalStatusPending, CreateTime: now.UTC().UnixNano(), diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index c1d54ebc8491..e687614409a6 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2414,7 +2414,7 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { // Verify the eval for the claim GC was emitted // Lookup the evaluations - eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+volId0+":no") + eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+volId0) require.NotNil(t, eval) require.Nil(t, err) }