Skip to content

Commit

Permalink
test for periodic volume claim gc scan
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed May 7, 2020
1 parent eb7f26d commit cf20687
Showing 1 changed file with 178 additions and 1 deletion.
179 changes: 178 additions & 1 deletion nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

memdb "github.com/hashicorp/go-memdb"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
Expand Down Expand Up @@ -2212,7 +2213,7 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) {
// Update the time tables to make this work
tt := srv.fsm.TimeTable()
index := uint64(2000)
tt.Witness(index, time.Now().UTC().Add(-1*srv.config.DeploymentGCThreshold))
tt.Witness(index, time.Now().UTC().Add(-1*srv.config.CSIPluginGCThreshold))

// Create a core scheduler
snap, err := state.Snapshot()
Expand Down Expand Up @@ -2248,3 +2249,179 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) {
require.Nil(plug)
require.NoError(err)
}

func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
t.Parallel()
require := require.New(t)

srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})

defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
codec := rpcClient(t, srv)

index := uint64(1)
volID := uuid.Generate()
ns := structs.DefaultNamespace
pluginID := "foo"

state := srv.fsm.State()
ws := memdb.NewWatchSet()

index, _ = state.LatestIndex()

// Create client node and plugin
node := mock.Node()
node.Attributes["nomad.version"] = "0.11.0" // needs client RPCs
node.CSINodePlugins = map[string]*structs.CSIInfo{
pluginID: {
PluginID: pluginID,
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err := state.UpsertNode(index, node)
require.NoError(err)

// Note that for volume writes in this test we need to use the
// RPCs rather than StateStore methods directly so that the GC
// job's RPC call updates a later index. otherwise the
// volumewatcher won't trigger for the final GC

// Register a volume
vols := []*structs.CSIVolume{{
ID: volID,
Namespace: ns,
PluginID: pluginID,
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
Topologies: []*structs.CSITopology{},
}}
volReq := &structs.CSIVolumeRegisterRequest{Volumes: vols}
volReq.Namespace = ns
volReq.Region = srv.config.Region

err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Register",
volReq, &structs.CSIVolumeRegisterResponse{})
require.NoError(err)

// Create a job with two allocations that claim the volume.
// We use two allocs here, one of which is not running, so
// that we can assert that the volumewatcher has made one
// complete pass (and removed the 2nd alloc) before running
// the GC.
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
index++
state.UpsertJobSummary(index, mock.JobSummary(eval.JobID))
index++
err = state.UpsertEvals(index, []*structs.Evaluation{eval})
require.Nil(err)

job := mock.Job()
job.ID = eval.JobID
job.Status = structs.JobStatusRunning
index++
err = state.UpsertJob(index, job)
require.NoError(err)

alloc1, alloc2 := mock.Alloc(), mock.Alloc()
alloc1.NodeID = node.ID
alloc1.ClientStatus = structs.AllocClientStatusRunning
alloc1.Job = job
alloc1.JobID = job.ID
alloc1.EvalID = eval.ID

alloc2.NodeID = node.ID
alloc2.ClientStatus = structs.AllocClientStatusComplete
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID

summary := mock.JobSummary(alloc1.JobID)
index++
require.NoError(state.UpsertJobSummary(index, summary))
summary = mock.JobSummary(alloc2.JobID)
index++
require.NoError(state.UpsertJobSummary(index, summary))
index++
require.NoError(state.UpsertAllocs(index,
[]*structs.Allocation{alloc1, alloc2}))

// Claim the volume for the alloc
req := &structs.CSIVolumeClaimRequest{
AllocationID: alloc1.ID,
NodeID: node.ID,
VolumeID: volID,
Claim: structs.CSIVolumeClaimWrite,
}
req.Namespace = ns
req.Region = srv.config.Region
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
req, &structs.CSIVolumeClaimResponse{})
require.NoError(err)

// ready-to-free claim; once it's gone we know the volumewatcher
// has run once and stopped
req.AllocationID = alloc2.ID
req.Claim = structs.CSIVolumeClaimRelease
req.State = structs.CSIVolumeClaimStateControllerDetached
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
req, &structs.CSIVolumeClaimResponse{})
require.NoError(err)

// wait for volumewatcher
var vol *structs.CSIVolume
require.Eventually(func() bool {
vol, _ = state.CSIVolumeByID(ws, ns, volID)
return len(vol.ReadAllocs) == 0 &&
len(vol.ReadClaims) == 0 &&
len(vol.PastClaims) == 0
}, time.Second*1, 10*time.Millisecond, "stale claim was not released")

// Delete allocation and job
index++
err = state.DeleteJob(index, ns, job.ID)
require.NoError(err)
index++
err = state.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID, alloc2.ID})
require.NoError(err)

// Create a core scheduler and attempt the volume claim GC
snap, err := state.Snapshot()
require.NoError(err)
core := NewCoreScheduler(srv, snap)

index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
require.NoError(c.csiVolumeClaimGC(gc))

// the volumewatcher will hit an error here because there's no
// path to the node. but we can't update the claim to bypass the
// client RPCs without triggering the volumewatcher's normal code
// path.
require.Eventually(func() bool {
vol, _ = state.CSIVolumeByID(ws, ns, volID)
return len(vol.WriteClaims) == 1 &&
len(vol.WriteAllocs) == 1 &&
len(vol.PastClaims) == 0
}, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly")

req.AllocationID = alloc1.ID
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
req, &structs.CSIVolumeClaimResponse{})
require.NoError(err)

// wait for volumewatcher
require.Eventually(func() bool {
vol, _ = state.CSIVolumeByID(ws, ns, volID)
return len(vol.WriteClaims) == 0 &&
len(vol.WriteAllocs) == 0 &&
len(vol.PastClaims) == 0
}, time.Second*1, 10*time.Millisecond, "claims were not released")

}

0 comments on commit cf20687

Please sign in to comment.