From dd7016b84725526566d718acc6b5b435cb6a76b2 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Thu, 27 Aug 2020 17:20:00 -0400 Subject: [PATCH] csi: plugins track jobs in addition to allocations, and use job information to set expected counts (#8699) * nomad/structs/csi: add explicit job support * nomad/state/state_store: capture job updates directly * api/nodes: CSIInfo needs the AllocID * command/agent/csi_endpoint: AllocID was missing Co-authored-by: Tim Gross --- api/nodes.go | 1 + command/agent/csi_endpoint.go | 5 +- command/agent/csi_endpoint_test.go | 11 +- nomad/csi_endpoint_test.go | 77 ++++++++ nomad/state/state_store.go | 184 +++++++++++++++--- nomad/state/state_store_test.go | 10 +- nomad/structs/csi.go | 142 +++++++++++++- nomad/structs/csi_test.go | 50 +++++ .../github.com/hashicorp/nomad/api/nodes.go | 1 + 9 files changed, 441 insertions(+), 40 deletions(-) diff --git a/api/nodes.go b/api/nodes.go index 94b660d6db01..3a6aa3b3b59d 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -568,6 +568,7 @@ type CSIControllerInfo struct { // as plugin health changes on the node. type CSIInfo struct { PluginID string + AllocID string Healthy bool HealthDescription string UpdateTime time.Time diff --git a/command/agent/csi_endpoint.go b/command/agent/csi_endpoint.go index ac6e69f36554..d25528e7b2e4 100644 --- a/command/agent/csi_endpoint.go +++ b/command/agent/csi_endpoint.go @@ -278,10 +278,10 @@ func structsCSIPluginToApi(plug *structs.CSIPlugin) *api.CSIPlugin { Allocations: make([]*api.AllocationListStub, 0, len(plug.Allocations)), ControllerRequired: plug.ControllerRequired, ControllersHealthy: plug.ControllersHealthy, - ControllersExpected: len(plug.Controllers), + ControllersExpected: plug.ControllersExpected, Controllers: make(map[string]*api.CSIInfo, len(plug.Controllers)), NodesHealthy: plug.NodesHealthy, - NodesExpected: len(plug.Nodes), + NodesExpected: plug.NodesExpected, Nodes: make(map[string]*api.CSIInfo, len(plug.Nodes)), CreateIndex: plug.CreateIndex, ModifyIndex: plug.ModifyIndex, @@ -364,6 +364,7 @@ func structsCSIInfoToApi(info *structs.CSIInfo) *api.CSIInfo { } out := &api.CSIInfo{ PluginID: info.PluginID, + AllocID: info.AllocID, Healthy: info.Healthy, HealthDescription: info.HealthDescription, UpdateTime: info.UpdateTime, diff --git a/command/agent/csi_endpoint_test.go b/command/agent/csi_endpoint_test.go index 5dc8ed5f6b80..3415c9c52e30 100644 --- a/command/agent/csi_endpoint_test.go +++ b/command/agent/csi_endpoint_test.go @@ -10,7 +10,6 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/kr/pretty" "github.com/stretchr/testify/require" ) @@ -33,11 +32,13 @@ func TestHTTP_CSIEndpointPlugin(t *testing.T) { out, ok := obj.(*api.CSIPlugin) require.True(t, ok) - require.Equal(t, 1, out.ControllersExpected) + // ControllersExpected is 0 because this plugin was created without a job, + // which sets expected + require.Equal(t, 0, out.ControllersExpected) require.Equal(t, 1, out.ControllersHealthy) require.Len(t, out.Controllers, 1) - require.Equal(t, 2, out.NodesExpected) + require.Equal(t, 0, out.NodesExpected) require.Equal(t, 2, out.NodesHealthy) require.Len(t, out.Nodes, 2) }) @@ -92,11 +93,7 @@ func TestHTTP_CSIEndpointVolume(t *testing.T) { out, ok := raw.(*api.CSIVolume) require.True(t, ok) - pretty.Log(out) - - require.Equal(t, 1, out.ControllersExpected) require.Equal(t, 1, out.ControllersHealthy) - require.Equal(t, 2, out.NodesExpected) require.Equal(t, 2, out.NodesHealthy) }) } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 601ede7773ca..0763f7418c80 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -689,6 +689,83 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) { require.Nil(t, resp2.Plugin) } +func TestCSIPluginEndpoint_RegisterViaJob(t *testing.T) { + t.Parallel() + srv, shutdown := TestServer(t, nil) + defer shutdown() + testutil.WaitForLeader(t, srv.RPC) + + codec := rpcClient(t, srv) + + // Register a job that creates the plugin + job := mock.Job() + job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ + ID: "foo", + Type: structs.CSIPluginTypeNode, + } + + req1 := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + resp1 := &structs.JobRegisterResponse{} + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req1, resp1) + require.NoError(t, err) + + // Verify that the plugin exists and is unhealthy + req2 := &structs.CSIPluginGetRequest{ + ID: "foo", + QueryOptions: structs.QueryOptions{Region: "global"}, + } + resp2 := &structs.CSIPluginGetResponse{} + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) + require.NoError(t, err) + require.NotNil(t, resp2.Plugin) + require.Zero(t, resp2.Plugin.ControllersHealthy) + require.Zero(t, resp2.Plugin.NodesHealthy) + require.Equal(t, job.ID, resp2.Plugin.NodeJobs[structs.DefaultNamespace][job.ID].ID) + + // Health depends on node fingerprints + deleteNodes := state.CreateTestCSIPlugin(srv.fsm.State(), "foo") + defer deleteNodes() + + resp2.Plugin = nil + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) + require.NoError(t, err) + require.NotNil(t, resp2.Plugin) + require.NotZero(t, resp2.Plugin.ControllersHealthy) + require.NotZero(t, resp2.Plugin.NodesHealthy) + require.Equal(t, job.ID, resp2.Plugin.NodeJobs[structs.DefaultNamespace][job.ID].ID) + + // All fingerprints failing makes the plugin unhealthy, but does not delete it + deleteNodes() + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) + require.NoError(t, err) + require.NotNil(t, resp2.Plugin) + require.Zero(t, resp2.Plugin.ControllersHealthy) + require.Zero(t, resp2.Plugin.NodesHealthy) + require.Equal(t, job.ID, resp2.Plugin.NodeJobs[structs.DefaultNamespace][job.ID].ID) + + // Job deregistration is necessary to gc the plugin + req3 := &structs.JobDeregisterRequest{ + JobID: job.ID, + Purge: true, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + resp3 := &structs.JobDeregisterResponse{} + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req3, resp3) + require.NoError(t, err) + + // Plugin has been gc'ed + resp2.Plugin = nil + err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) + require.NoError(t, err) + require.Nil(t, resp2.Plugin) +} + func TestCSIPluginEndpoint_DeleteViaGC(t *testing.T) { t.Parallel() srv, shutdown := TestServer(t, func(c *Config) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 652f7c3fa08a..d8ebbabd9a1a 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1050,10 +1050,14 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro return nil } plug = structs.NewCSIPlugin(info.PluginID, index) - plug.Provider = info.Provider - plug.Version = info.ProviderVersion } + // the plugin may have been created by the job being updated, in which case + // this data will not be configured, it's only available to the fingerprint + // system + plug.Provider = info.Provider + plug.Version = info.ProviderVersion + err = plug.AddPlugin(node.ID, info) if err != nil { return err @@ -1203,10 +1207,15 @@ func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) err return nil } -// deleteJobFromPlugin removes the allocations of this job from any plugins the job is +// deleteJobFromPlugins removes the allocations of this job from any plugins the job is // running, possibly deleting the plugin if it's no longer in use. It's called in DeleteJobTxn -func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *structs.Job) error { +func (s *StateStore) deleteJobFromPlugins(index uint64, txn *memdb.Txn, job *structs.Job) error { ws := memdb.NewWatchSet() + summary, err := s.JobSummaryByID(ws, job.Namespace, job.ID) + if err != nil { + return fmt.Errorf("error getting job summary: %v", err) + } + allocs, err := s.AllocsByJob(ws, job.Namespace, job.ID, false) if err != nil { return fmt.Errorf("error getting allocations: %v", err) @@ -1218,20 +1227,41 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru } plugAllocs := []*pair{} - plugins := map[string]*structs.CSIPlugin{} + found := map[string]struct{}{} + // Find plugins for allocs that belong to this job for _, a := range allocs { tg := a.Job.LookupTaskGroup(a.TaskGroup) + found[tg.Name] = struct{}{} for _, t := range tg.Tasks { - if t.CSIPluginConfig != nil { - plugAllocs = append(plugAllocs, &pair{ - pluginID: t.CSIPluginConfig.ID, - alloc: a, - }) + if t.CSIPluginConfig == nil { + continue } + plugAllocs = append(plugAllocs, &pair{ + pluginID: t.CSIPluginConfig.ID, + alloc: a, + }) } } + // Find any plugins that do not yet have allocs for this job + for _, tg := range job.TaskGroups { + if _, ok := found[tg.Name]; ok { + continue + } + + for _, t := range tg.Tasks { + if t.CSIPluginConfig == nil { + continue + } + plugAllocs = append(plugAllocs, &pair{ + pluginID: t.CSIPluginConfig.ID, + }) + } + } + + plugins := map[string]*structs.CSIPlugin{} + for _, x := range plugAllocs { plug, ok := plugins[x.pluginID] @@ -1248,6 +1278,9 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru plug = plugins[x.pluginID] } + if x.alloc == nil { + continue + } err := plug.DeleteAlloc(x.alloc.ID, x.alloc.NodeID) if err != nil { return err @@ -1255,6 +1288,7 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru } for _, plug := range plugins { + plug.DeleteJob(job, summary) err = updateOrGCPlugin(index, txn, plug) if err != nil { return err @@ -1354,6 +1388,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b // Check if the job already exists existing, err := txn.First("jobs", "id", job.Namespace, job.ID) + var existingJob *structs.Job if err != nil { return fmt.Errorf("job lookup failed: %v", err) } @@ -1363,7 +1398,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index - existingJob := existing.(*structs.Job) + existingJob = existing.(*structs.Job) // Bump the version unless asked to keep it. This should only be done // when changing an internal field such as Stable. A spec change should @@ -1413,6 +1448,10 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b return fmt.Errorf("unable to update job scaling policies: %v", err) } + if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil { + return fmt.Errorf("unable to update job scaling policies: %v", err) + } + // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -1507,6 +1546,12 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn return err } + // Cleanup plugins registered by this job, before we delete the summary + err = s.deleteJobFromPlugins(index, txn, job) + if err != nil { + return fmt.Errorf("deleting job from plugin: %v", err) + } + // Delete the job summary if _, err = txn.DeleteAll("job_summary", "id", namespace, jobID); err != nil { return fmt.Errorf("deleting job summary failed: %v", err) @@ -1528,12 +1573,6 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn return fmt.Errorf("index update failed: %v", err) } - // Cleanup plugins registered by this job - err = s.deleteJobFromPlugin(index, txn, job) - if err != nil { - return fmt.Errorf("deleting job from plugin: %v", err) - } - return nil } @@ -2241,10 +2280,10 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs vol.ControllerRequired = plug.ControllerRequired vol.ControllersHealthy = plug.ControllersHealthy vol.NodesHealthy = plug.NodesHealthy - // This number is incorrect! The expected number of node plugins is actually this + - // the number of blocked evaluations for the jobs controlling these plugins - vol.ControllersExpected = len(plug.Controllers) - vol.NodesExpected = len(plug.Nodes) + + // This value may be stale, but stale is ok + vol.ControllersExpected = plug.ControllersExpected + vol.NodesExpected = plug.NodesExpected vol.Schedulable = vol.NodesHealthy > 0 if vol.ControllerRequired { @@ -2346,7 +2385,7 @@ func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPl return plug, nil } -// CSIPluginDenormalize returns a CSIPlugin with allocation details +// CSIPluginDenormalize returns a CSIPlugin with allocation details. Always called on a copy of the plugin. func (s *StateStore) CSIPluginDenormalize(ws memdb.WatchSet, plug *structs.CSIPlugin) (*structs.CSIPlugin, error) { if plug == nil { return nil, nil @@ -4517,6 +4556,70 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx return nil } +// updateJobCSIPlugins runs on job update, and indexes the job in the plugin +func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, txn *memdb.Txn) error { + ws := memdb.NewWatchSet() + plugIns := make(map[string]*structs.CSIPlugin) + + loop := func(job *structs.Job, delete bool) error { + for _, tg := range job.TaskGroups { + for _, t := range tg.Tasks { + if t.CSIPluginConfig == nil { + continue + } + + plugIn, ok := plugIns[t.CSIPluginConfig.ID] + if !ok { + p, err := s.CSIPluginByID(ws, t.CSIPluginConfig.ID) + if err != nil { + return err + } + if p == nil { + plugIn = structs.NewCSIPlugin(t.CSIPluginConfig.ID, index) + } else { + plugIn = p.Copy() + plugIn.ModifyIndex = index + } + plugIns[plugIn.ID] = plugIn + } + + if delete { + plugIn.DeleteJob(job, nil) + } else { + plugIn.AddJob(job, nil) + } + } + } + + return nil + } + + if prev != nil { + err := loop(prev, true) + if err != nil { + return err + } + } + + err := loop(job, false) + if err != nil { + return err + } + + for _, plugIn := range plugIns { + err = txn.Insert("csi_plugins", plugIn) + if err != nil { + return fmt.Errorf("csi_plugins insert error: %v", err) + } + } + + if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + return nil +} + // updateDeploymentWithAlloc is used to update the deployment state associated // with the given allocation. The passed alloc may be updated if the deployment // status has changed to capture the modify index at which it has changed. @@ -4728,6 +4831,8 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if summaryChanged { jobSummary.ModifyIndex = index + s.updatePluginWithJobSummary(index, jobSummary, alloc, txn) + // Update the indexes table for job summary if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { return fmt.Errorf("index update failed: %v", err) @@ -4778,6 +4883,41 @@ func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocati return nil } +// updatePluginWithJobSummary updates the CSI plugins for a job when the +// job summary is updated by an alloc +func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.JobSummary, alloc *structs.Allocation, + txn *memdb.Txn) error { + + ws := memdb.NewWatchSet() + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg == nil { + return nil + } + + for _, t := range tg.Tasks { + if t.CSIPluginConfig != nil { + pluginID := t.CSIPluginConfig.ID + plug, err := s.CSIPluginByID(ws, pluginID) + if err != nil { + return err + } + if plug == nil { + plug = structs.NewCSIPlugin(pluginID, index) + } else { + plug = plug.Copy() + } + + plug.UpdateExpectedWithJob(alloc.Job, summary, alloc.ServerTerminalStatus()) + err = updateOrGCPlugin(index, txn, plug) + if err != nil { + return err + } + } + } + + return nil +} + // UpsertACLPolicies is used to create or update a set of ACL policies func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error { txn := s.db.Txn(true) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1a4d3e6bfd9d..4458bcaa94f1 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3522,9 +3522,6 @@ func TestStateStore_CSIPluginMultiNodeUpdates(t *testing.T) { func TestStateStore_CSIPluginJobs(t *testing.T) { s := testStateStore(t) - deleteNodes := CreateTestCSIPlugin(s, "foo") - defer deleteNodes() - index := uint64(1001) controllerJob := mock.Job() @@ -3581,6 +3578,11 @@ func TestStateStore_CSIPluginJobs(t *testing.T) { require.NoError(t, err) index++ + // We use the summary to add + err = s.ReconcileJobSummaries(index) + require.NoError(t, err) + index++ + // Delete a job err = s.DeleteJob(index, controllerJob.Namespace, controllerJob.ID) require.NoError(t, err) @@ -3600,7 +3602,7 @@ func TestStateStore_CSIPluginJobs(t *testing.T) { // plugin was collected plug, err = s.CSIPluginByID(ws, "foo") require.NoError(t, err) - require.Nil(t, plug) + require.True(t, plug.IsEmpty()) } func TestStateStore_RestoreCSIPlugin(t *testing.T) { diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 09862bc7550b..425d3e3cfd8f 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -709,9 +709,15 @@ type CSIPlugin struct { // Allocations are populated by denormalize to show running allocations Allocations []*AllocListStub + // Jobs are populated to by job update to support expected counts and the UI + ControllerJobs JobDescriptions + NodeJobs JobDescriptions + // Cache the count of healthy plugins - ControllersHealthy int - NodesHealthy int + ControllersHealthy int + ControllersExpected int + NodesHealthy int + NodesExpected int CreateIndex uint64 ModifyIndex uint64 @@ -732,6 +738,8 @@ func NewCSIPlugin(id string, index uint64) *CSIPlugin { func (p *CSIPlugin) newStructs() { p.Controllers = map[string]*CSIInfo{} p.Nodes = map[string]*CSIInfo{} + p.ControllerJobs = make(JobDescriptions) + p.NodeJobs = make(JobDescriptions) } func (p *CSIPlugin) Copy() *CSIPlugin { @@ -747,6 +755,14 @@ func (p *CSIPlugin) Copy() *CSIPlugin { out.Nodes[k] = v } + for k, v := range p.ControllerJobs { + out.ControllerJobs[k] = v + } + + for k, v := range p.NodeJobs { + out.NodeJobs[k] = v + } + return out } @@ -874,6 +890,118 @@ func (p *CSIPlugin) DeleteAlloc(allocID, nodeID string) error { return nil } +// AddJob adds a job to the plugin and increments expected +func (p *CSIPlugin) AddJob(job *Job, summary *JobSummary) { + p.UpdateExpectedWithJob(job, summary, false) +} + +// DeleteJob removes the job from the plugin and decrements expected +func (p *CSIPlugin) DeleteJob(job *Job, summary *JobSummary) { + p.UpdateExpectedWithJob(job, summary, true) +} + +// UpdateExpectedWithJob maintains the expected instance count +// we use the summary to add non-allocation expected counts +func (p *CSIPlugin) UpdateExpectedWithJob(job *Job, summary *JobSummary, terminal bool) { + var count int + + for _, tg := range job.TaskGroups { + if job.Type == JobTypeSystem { + if summary == nil { + continue + } + + s, ok := summary.Summary[tg.Name] + if !ok { + continue + } + + count = s.Running + s.Queued + s.Starting + } else { + count = tg.Count + } + + for _, t := range tg.Tasks { + if t.CSIPluginConfig == nil || + t.CSIPluginConfig.ID != p.ID { + continue + } + + // Change the correct plugin expected, monolith should change both + if t.CSIPluginConfig.Type == CSIPluginTypeController || + t.CSIPluginConfig.Type == CSIPluginTypeMonolith { + if terminal { + p.ControllerJobs.Delete(job) + } else { + p.ControllerJobs.Add(job, count) + } + } + + if t.CSIPluginConfig.Type == CSIPluginTypeNode || + t.CSIPluginConfig.Type == CSIPluginTypeMonolith { + if terminal { + p.NodeJobs.Delete(job) + } else { + p.NodeJobs.Add(job, count) + } + } + } + } + + p.ControllersExpected = p.ControllerJobs.Count() + p.NodesExpected = p.NodeJobs.Count() +} + +// JobDescription records Job identification and the count of expected plugin instances +type JobDescription struct { + Namespace string + ID string + Expected int +} + +// JobNamespacedDescriptions maps Job.ID to JobDescription +type JobNamespacedDescriptions map[string]JobDescription + +// JobDescriptions maps Namespace to a mapping of Job.ID to JobDescription +type JobDescriptions map[string]JobNamespacedDescriptions + +// Add the Job to the JobDescriptions, creating maps as necessary +func (j JobDescriptions) Add(job *Job, expected int) { + if j == nil { + j = make(JobDescriptions) + } + if j[job.Namespace] == nil { + j[job.Namespace] = make(JobNamespacedDescriptions) + } + j[job.Namespace][job.ID] = JobDescription{ + Namespace: job.Namespace, + ID: job.ID, + Expected: expected, + } +} + +// Count the Expected instances for all JobDescriptions +func (j JobDescriptions) Count() int { + if j == nil { + return 0 + } + count := 0 + for _, jnd := range j { + for _, jd := range jnd { + count += jd.Expected + } + } + return count +} + +// Delete the Job from the JobDescriptions +func (j JobDescriptions) Delete(job *Job) { + if j != nil && + j[job.Namespace] != nil { + delete(j[job.Namespace], job.ID) + } +} + type CSIPluginListStub struct { ID string Provider string @@ -892,16 +1020,20 @@ func (p *CSIPlugin) Stub() *CSIPluginListStub { Provider: p.Provider, ControllerRequired: p.ControllerRequired, ControllersHealthy: p.ControllersHealthy, - ControllersExpected: len(p.Controllers), + ControllersExpected: p.ControllersExpected, NodesHealthy: p.NodesHealthy, - NodesExpected: len(p.Nodes), + NodesExpected: p.NodesExpected, CreateIndex: p.CreateIndex, ModifyIndex: p.ModifyIndex, } } func (p *CSIPlugin) IsEmpty() bool { - return len(p.Controllers) == 0 && len(p.Nodes) == 0 + return p == nil || + len(p.Controllers) == 0 && + len(p.Nodes) == 0 && + p.ControllerJobs.Count() == 0 && + p.NodeJobs.Count() == 0 } type CSIPluginListRequest struct { diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index 053055018f5d..e048ec938a28 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -38,6 +38,56 @@ func TestCSIVolumeClaim(t *testing.T) { require.True(t, vol.WriteFreeClaims()) } +func TestCSIPluginJobs(t *testing.T) { + plug := NewCSIPlugin("foo", 1000) + controller := &Job{ + ID: "job", + Type: "service", + TaskGroups: []*TaskGroup{{ + Name: "foo", + Count: 11, + Tasks: []*Task{{ + CSIPluginConfig: &TaskCSIPluginConfig{ + ID: "foo", + Type: CSIPluginTypeController, + }, + }}, + }}, + } + + summary := &JobSummary{} + + plug.AddJob(controller, summary) + require.Equal(t, 11, plug.ControllersExpected) + + // New job id & make it a system node plugin job + node := controller.Copy() + node.ID = "bar" + node.Type = "system" + node.TaskGroups[0].Tasks[0].CSIPluginConfig.Type = CSIPluginTypeNode + + summary = &JobSummary{ + Summary: map[string]TaskGroupSummary{ + "foo": { + Queued: 1, + Running: 1, + Starting: 1, + }, + }, + } + + plug.AddJob(node, summary) + require.Equal(t, 3, plug.NodesExpected) + + plug.DeleteJob(node, summary) + require.Equal(t, 0, plug.NodesExpected) + require.Empty(t, plug.NodeJobs[""]) + + plug.DeleteJob(controller, nil) + require.Equal(t, 0, plug.ControllersExpected) + require.Empty(t, plug.ControllerJobs[""]) +} + func TestCSIPluginCleanup(t *testing.T) { plug := NewCSIPlugin("foo", 1000) plug.AddPlugin("n0", &CSIInfo{ diff --git a/vendor/github.com/hashicorp/nomad/api/nodes.go b/vendor/github.com/hashicorp/nomad/api/nodes.go index 94b660d6db01..3a6aa3b3b59d 100644 --- a/vendor/github.com/hashicorp/nomad/api/nodes.go +++ b/vendor/github.com/hashicorp/nomad/api/nodes.go @@ -568,6 +568,7 @@ type CSIControllerInfo struct { // as plugin health changes on the node. type CSIInfo struct { PluginID string + AllocID string Healthy bool HealthDescription string UpdateTime time.Time