diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1e498d1078f5..88d9b1bb8388 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2816,6 +2816,10 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a return err } + if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil { + return err + } + // Update the allocation if err := txn.Insert("allocs", copyAlloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) @@ -2922,6 +2926,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation return err } + if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil { + return err + } + if err := txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } @@ -4579,6 +4587,42 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat return nil } +// updatePluginWithAlloc updates the CSI plugins for an alloc when the +// allocation is updated or inserted with a terminal server status. +func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation, + txn *memdb.Txn) error { + if !alloc.ServerTerminalStatus() { + return nil + } + + ws := memdb.NewWatchSet() + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + for _, t := range tg.Tasks { + if t.CSIPluginConfig != nil { + pluginID := t.CSIPluginConfig.ID + plug, err := s.CSIPluginByID(ws, pluginID) + if err != nil { + return err + } + if plug == nil { + // plugin may not have been created because it never + // became healthy, just move on + return nil + } + err = plug.DeleteAlloc(alloc.ID, alloc.NodeID) + if err != nil { + return err + } + err = updateOrGCPlugin(index, txn, plug) + if err != nil { + return err + } + } + } + + return nil +} + // UpsertACLPolicies is used to create or update a set of ACL policies func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error { txn := s.db.Txn(true) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index aa5374ce8113..a0b8dc40d49c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3214,6 +3214,277 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { require.False(t, vol.Schedulable) } +// TestStateStore_CSIPluginAllocUpdates tests the ordering +// interactions for CSI plugins between Nomad client node updates and +// allocation updates. +func TestStateStore_CSIPluginAllocUpdates(t *testing.T) { + t.Parallel() + index := uint64(999) + state := testStateStore(t) + ws := memdb.NewWatchSet() + + n := mock.Node() + index++ + err := state.UpsertNode(index, n) + require.NoError(t, err) + + // (1) unhealthy fingerprint, then terminal alloc, then healthy node update + plugID0 := "foo0" + + alloc0 := mock.Alloc() + alloc0.NodeID = n.ID + alloc0.DesiredStatus = "run" + alloc0.ClientStatus = "running" + alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID0} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID0: { + PluginID: plugID0, + AllocID: alloc0.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err := state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: not yet healthy") + + alloc0.DesiredStatus = "stopped" + alloc0.ClientStatus = "complete" + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: allocs never healthy") + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins[plugID0].Healthy = true + n.CSINodePlugins[plugID0].UpdateTime = time.Now() + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.NotNil(t, plug, "plugin should exist") + + // (2) healthy fingerprint, then terminal alloc update + plugID1 := "foo1" + + alloc1 := mock.Alloc() + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID1: { + PluginID: plugID1, + AllocID: alloc1.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID1) + require.NoError(t, err) + require.NotNil(t, plug, "plugin should exist") + + alloc1.NodeID = n.ID + alloc1.DesiredStatus = "stop" + alloc1.ClientStatus = "complete" + alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID1} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc1}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID1) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: alloc became terminal") + + // (3) terminal alloc update, then unhealthy fingerprint + plugID2 := "foo2" + + alloc2 := mock.Alloc() + alloc2.NodeID = n.ID + alloc2.DesiredStatus = "stop" + alloc2.ClientStatus = "complete" + alloc2.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID2} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc2}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID2) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: alloc became terminal") + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID2: { + PluginID: plugID2, + AllocID: alloc2.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID2) + require.NoError(t, err) + require.Nil(t, plug, "plugin should not exist: never became healthy") + +} + +// TestStateStore_CSIPluginMultiNodeUpdates tests the ordering +// interactions for CSI plugins between Nomad client node updates and +// allocation updates when multiple nodes are involved +func TestStateStore_CSIPluginMultiNodeUpdates(t *testing.T) { + t.Parallel() + index := uint64(999) + state := testStateStore(t) + ws := memdb.NewWatchSet() + + var err error + + // Create Nomad client Nodes + ns := []*structs.Node{mock.Node(), mock.Node()} + for _, n := range ns { + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + } + + plugID := "foo" + plugCfg := &structs.TaskCSIPluginConfig{ID: plugID} + + // Fingerprint two running node plugins and their allocs; we'll + // leave these in place for the test to ensure we don't GC the + // plugin + for _, n := range ns[:] { + nAlloc := mock.Alloc() + n, _ := state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: nAlloc.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + nAlloc.NodeID = n.ID + nAlloc.DesiredStatus = "run" + nAlloc.ClientStatus = "running" + nAlloc.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{nAlloc}) + require.NoError(t, err) + } + + // Fingerprint a running controller plugin + alloc0 := mock.Alloc() + n0, _ := state.NodeByID(ws, ns[0].ID) + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc0.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + plug, err := state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + n1, _ := state.NodeByID(ws, ns[1].ID) + + alloc0.NodeID = n0.ID + alloc0.DesiredStatus = "stop" + alloc0.ClientStatus = "complete" + alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + alloc1 := mock.Alloc() + alloc1.NodeID = n1.ID + alloc1.DesiredStatus = "run" + alloc1.ClientStatus = "running" + alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0, alloc1}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + n1.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc1.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n1) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.True(t, plug.ControllerRequired) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + +} + func TestStateStore_CSIPluginJobs(t *testing.T) { s := testStateStore(t) deleteNodes := CreateTestCSIPlugin(s, "foo")