Skip to content

Commit

Permalink
csi: update of terminal plugin allocs should clean themselves up
Browse files Browse the repository at this point in the history
When an allocation that implements a CSI plugin becomes terminal the
client fingerprint can't tell if the plugin is unhealthy intentionally
(for the case of updates or job stop). Allocations that are
server-terminal should delete themselves from the plugin and trigger a
plugin self-GC, the same as an unused node.
  • Loading branch information
tgross committed May 4, 2020
1 parent b09fafb commit 81c0702
Show file tree
Hide file tree
Showing 3 changed files with 351 additions and 2 deletions.
44 changes: 44 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2816,6 +2816,10 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a
return err
}

if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil {
return err
}

// Update the allocation
if err := txn.Insert("allocs", copyAlloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
Expand Down Expand Up @@ -2922,6 +2926,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
return err
}

if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil {
return err
}

if err := txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
Expand Down Expand Up @@ -4579,6 +4587,42 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
return nil
}

// updatePluginWithAlloc updates the CSI plugins for an alloc when the
// allocation is updated or inserted with a terminal server status.
func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation,
txn *memdb.Txn) error {
if !alloc.ServerTerminalStatus() {
return nil
}

ws := memdb.NewWatchSet()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
for _, t := range tg.Tasks {
if t.CSIPluginConfig != nil {
pluginID := t.CSIPluginConfig.ID
plug, err := s.CSIPluginByID(ws, pluginID)
if err != nil {
return err
}
if plug == nil {
// plugin may not have been created because it never
// became healthy, just move on
return nil
}
err = plug.DeleteAlloc(alloc.ID, alloc.NodeID)
if err != nil {
return err
}
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
}
}

return nil
}

// UpsertACLPolicies is used to create or update a set of ACL policies
func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error {
txn := s.db.Txn(true)
Expand Down
301 changes: 301 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3214,6 +3214,307 @@ func TestStateStore_CSIPluginNodes(t *testing.T) {
require.False(t, vol.Schedulable)
}

// TestStateStore_CSIPluginAllocUpdates tests the ordering
// interactions for CSI plugins between Nomad client node updates and
// allocation updates.
func TestStateStore_CSIPluginAllocUpdates(t *testing.T) {
t.Parallel()
index := uint64(999)
state := testStateStore(t)
ws := memdb.NewWatchSet()

n := mock.Node()
index++
err := state.UpsertNode(index, n)
require.NoError(t, err)

// (1) unhealthy fingerprint, then terminal alloc, then healthy node update
plugID0 := "foo0"

alloc0 := mock.Alloc()
alloc0.NodeID = n.ID
alloc0.DesiredStatus = "run"
alloc0.ClientStatus = "running"
alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID0}
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc0})
require.NoError(t, err)

n, _ = state.NodeByID(ws, n.ID)
n.CSINodePlugins = map[string]*structs.CSIInfo{
plugID0: {
PluginID: plugID0,
AllocID: alloc0.ID,
Healthy: false,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)

plug, err := state.CSIPluginByID(ws, plugID0)
require.NoError(t, err)
require.Nil(t, plug, "no plugin should exist: not yet healthy")

alloc0.DesiredStatus = "stopped"
alloc0.ClientStatus = "complete"
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc0})
require.NoError(t, err)

plug, err = state.CSIPluginByID(ws, plugID0)
require.NoError(t, err)
require.Nil(t, plug, "no plugin should exist: allocs never healthy")

n, _ = state.NodeByID(ws, n.ID)
n.CSINodePlugins[plugID0].Healthy = true
n.CSINodePlugins[plugID0].UpdateTime = time.Now()
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)

plug, err = state.CSIPluginByID(ws, plugID0)
require.NoError(t, err)
require.NotNil(t, plug, "plugin should exist")

// (2) healthy fingerprint, then terminal alloc update
plugID1 := "foo1"

alloc1 := mock.Alloc()
n, _ = state.NodeByID(ws, n.ID)
n.CSINodePlugins = map[string]*structs.CSIInfo{
plugID1: {
PluginID: plugID1,
AllocID: alloc1.ID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)

plug, err = state.CSIPluginByID(ws, plugID1)
require.NoError(t, err)
require.NotNil(t, plug, "plugin should exist")

alloc1.NodeID = n.ID
alloc1.DesiredStatus = "stop"
alloc1.ClientStatus = "complete"
alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID1}
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc1})
require.NoError(t, err)

plug, err = state.CSIPluginByID(ws, plugID1)
require.NoError(t, err)
require.Nil(t, plug, "no plugin should exist: alloc became terminal")

// (3) terminal alloc update, then unhealthy fingerprint
plugID2 := "foo2"

alloc2 := mock.Alloc()
alloc2.NodeID = n.ID
alloc2.DesiredStatus = "stop"
alloc2.ClientStatus = "complete"
alloc2.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID2}
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc2})
require.NoError(t, err)

plug, err = state.CSIPluginByID(ws, plugID2)
require.NoError(t, err)
require.Nil(t, plug, "no plugin should exist: alloc became terminal")

n, _ = state.NodeByID(ws, n.ID)
n.CSINodePlugins = map[string]*structs.CSIInfo{
plugID2: {
PluginID: plugID2,
AllocID: alloc2.ID,
Healthy: false,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)

plug, err = state.CSIPluginByID(ws, plugID2)
require.NoError(t, err)
require.Nil(t, plug, "plugin should not exist: never became healthy")

}

// TestStateStore_CSIPluginMultiNodeUpdates tests the ordering
// interactions for CSI plugins between Nomad client node updates and
// allocation updates when multiple nodes are involved
func TestStateStore_CSIPluginMultiNodeUpdates(t *testing.T) {
t.Parallel()
index := uint64(999)
state := testStateStore(t)
ws := memdb.NewWatchSet()

var err error

// Create Nomad client Nodes
ns := []*structs.Node{mock.Node(), mock.Node()}
for _, n := range ns {
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)
}

plugID := "foo"
plugCfg := &structs.TaskCSIPluginConfig{ID: plugID}

// Fingerprint two running node plugins and their allocs; we'll
// leave these in place for the test to ensure we don't GC the
// plugin
for _, n := range ns[:] {
nAlloc := mock.Alloc()
n, _ := state.NodeByID(ws, n.ID)
n.CSINodePlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
AllocID: nAlloc.ID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err = state.UpsertNode(index, n)
require.NoError(t, err)

nAlloc.NodeID = n.ID
nAlloc.DesiredStatus = "run"
nAlloc.ClientStatus = "running"
nAlloc.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg

index++
err = state.UpsertAllocs(index, []*structs.Allocation{nAlloc})
require.NoError(t, err)
}

// Fingerprint a running controller plugin
alloc0 := mock.Alloc()
n0, _ := state.NodeByID(ws, ns[0].ID)
n0.CSIControllerPlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
AllocID: alloc0.ID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsListVolumes: true,
},
},
}
index++
err = state.UpsertNode(index, n0)
require.NoError(t, err)

plug, err := state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 1, len(plug.Controllers), "controllers expected")
require.Equal(t, 2, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 2, len(plug.Nodes), "nodes expected")

n1, _ := state.NodeByID(ws, ns[1].ID)

alloc0.NodeID = n0.ID
alloc0.DesiredStatus = "stop"
alloc0.ClientStatus = "complete"
alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg

index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc0})
require.NoError(t, err)

plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 0, len(plug.Controllers), "controllers expected")
require.Equal(t, 2, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 2, len(plug.Nodes), "nodes expected")

alloc1 := mock.Alloc()
alloc1.NodeID = n1.ID
alloc1.DesiredStatus = "run"
alloc1.ClientStatus = "running"
alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg

index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc1})
require.NoError(t, err)

plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 0, len(plug.Controllers), "controllers expected")
require.Equal(t, 2, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 2, len(plug.Nodes), "nodes expected")

n0, _ = state.NodeByID(ws, ns[0].ID)
n0.CSIControllerPlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
AllocID: alloc0.ID,
Healthy: false,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsListVolumes: true,
},
},
}
index++
err = state.UpsertNode(index, n0)
require.NoError(t, err)

n1.CSIControllerPlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
AllocID: alloc1.ID,
Healthy: true,
UpdateTime: time.Now(),
RequiresControllerPlugin: true,
RequiresTopologies: false,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsListVolumes: true,
},
},
}
index++
err = state.UpsertNode(index, n1)
require.NoError(t, err)

plug, err = state.CSIPluginByID(ws, plugID)
require.NoError(t, err)
require.True(t, plug.ControllerRequired)
require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy")
require.Equal(t, 1, len(plug.Controllers), "controllers expected")
require.Equal(t, 2, plug.NodesHealthy, "nodes healthy")
require.Equal(t, 2, len(plug.Nodes), "nodes expected")

}

func TestStateStore_CSIPluginJobs(t *testing.T) {
s := testStateStore(t)
deleteNodes := CreateTestCSIPlugin(s, "foo")
Expand Down
8 changes: 6 additions & 2 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,9 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error {
p.ControllersHealthy -= 1
}
}
p.Controllers[nodeID] = info
if prev != nil || prev == nil && info.Healthy {
p.Controllers[nodeID] = info
}
if info.Healthy {
p.ControllersHealthy += 1
}
Expand All @@ -729,7 +731,9 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error {
p.NodesHealthy -= 1
}
}
p.Nodes[nodeID] = info
if prev != nil || prev == nil && info.Healthy {
p.Nodes[nodeID] = info
}
if info.Healthy {
p.NodesHealthy += 1
}
Expand Down

0 comments on commit 81c0702

Please sign in to comment.