Skip to content

Commit

Permalink
scheduler: stopped-yet-running allocs are still running (#10446)
Browse files Browse the repository at this point in the history
* scheduler: stopped-yet-running allocs are still running

* scheduler: test new stopped-but-running logic

* test: assert nonoverlapping alloc behavior

Also add a simpler Wait test helper to improve line numbers and save few
lines of code.

* docs: tried my best to describe #10446

it's not concise... feedback welcome

* scheduler: fix test that allowed overlapping allocs

* devices: only free devices when ClientStatus is terminal

* test: output nicer failure message if err==nil

Co-authored-by: Mahmood Ali <mahmood@hashicorp.com>
Co-authored-by: Michael Schurter <mschurter@hashicorp.com>
  • Loading branch information
3 people committed Sep 13, 2022
1 parent cac17b8 commit 757c3c9
Show file tree
Hide file tree
Showing 12 changed files with 474 additions and 19 deletions.
3 changes: 3 additions & 0 deletions .changelog/10446.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: prevent new allocations from overlapping execution with stopping allocations
```
164 changes: 164 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)

func TestJobEndpoint_Register(t *testing.T) {
Expand Down Expand Up @@ -109,6 +111,168 @@ func TestJobEndpoint_Register(t *testing.T) {
}
}

// TestJobEndpoint_Register_NonOverlapping asserts that ClientStatus must be
// terminal, not just DesiredStatus, for the resources used by a job to be
// considered free for subsequent placements to use.
//
// See: https://github.com/hashicorp/nomad/issues/10440
func TestJobEndpoint_Register_NonOverlapping(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
})
defer cleanupS1()
state := s1.fsm.State()

// Create a mock node with easy to check resources
node := mock.Node()
node.Resources = nil // Deprecated in 0.9
node.NodeResources.Cpu.CpuShares = 700
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node))

codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
job := mock.Job()
job.TaskGroups[0].Count = 1
req := &structs.JobRegisterRequest{
Job: job.Copy(),
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
var resp structs.JobRegisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.NonZero(t, resp.Index)

// Assert placement
jobReq := &structs.JobSpecificRequest{
JobID: job.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var alloc *structs.AllocListStub
testutil.Wait(t, func() (bool, error) {
resp := structs.JobAllocationsResponse{}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Allocations", jobReq, &resp))
if n := len(resp.Allocations); n != 1 {
return false, fmt.Errorf("expected 1 allocation but found %d:\n%v", n, resp.Allocations)
}

alloc = resp.Allocations[0]
return true, nil
})
must.Eq(t, alloc.NodeID, node.ID)
must.Eq(t, alloc.DesiredStatus, structs.AllocDesiredStatusRun)
must.Eq(t, alloc.ClientStatus, structs.AllocClientStatusPending)

// Stop
stopReq := &structs.JobDeregisterRequest{
JobID: job.ID,
Purge: false,
WriteRequest: req.WriteRequest,
}
var stopResp structs.JobDeregisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Deregister", stopReq, &stopResp))

// Assert new register blocked
req.Job = job.Copy()
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.NonZero(t, resp.Index)

blockedEval := ""
testutil.Wait(t, func() (bool, error) {
// Assert no new allocs
allocResp := structs.JobAllocationsResponse{}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Allocations", jobReq, &allocResp))
if n := len(allocResp.Allocations); n != 1 {
return false, fmt.Errorf("expected 1 allocation but found %d:\n%v", n, allocResp.Allocations)
}

if alloc.ID != allocResp.Allocations[0].ID {
return false, fmt.Errorf("unexpected change in alloc: %#v", *allocResp.Allocations[0])
}

eval, err := state.EvalByID(nil, resp.EvalID)
must.NoError(t, err)
if eval == nil {
return false, fmt.Errorf("eval not applied: %s", resp.EvalID)
}
if eval.Status != structs.EvalStatusComplete {
return false, fmt.Errorf("expected eval to be complete but found: %s", eval.Status)
}
if eval.BlockedEval == "" {
return false, fmt.Errorf("expected a blocked eval to be created")
}
blockedEval = eval.BlockedEval
return true, nil
})

// Set ClientStatus=complete like a client would
stoppedAlloc := &structs.Allocation{
ID: alloc.ID,
NodeID: alloc.NodeID,
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStateDead,
},
},
ClientStatus: structs.AllocClientStatusComplete,
DeploymentStatus: nil, // should not have an impact
NetworkStatus: nil, // should not have an impact
}
upReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{stoppedAlloc},
WriteRequest: req.WriteRequest,
}
var upResp structs.GenericResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", upReq, &upResp))

// Assert newer register's eval unblocked
testutil.Wait(t, func() (bool, error) {
eval, err := state.EvalByID(nil, blockedEval)
must.NoError(t, err)
must.NotNil(t, eval)
if eval.Status != structs.EvalStatusComplete {
return false, fmt.Errorf("expected blocked eval to be complete but found: %s", eval.Status)
}
return true, nil
})

// Assert new alloc placed
testutil.Wait(t, func() (bool, error) {
// Assert no new allocs
allocResp := structs.JobAllocationsResponse{}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Allocations", jobReq, &allocResp))
if n := len(allocResp.Allocations); n != 2 {
return false, fmt.Errorf("expected 2 allocs but found %d:\n%v", n, allocResp.Allocations)
}

slices.SortFunc(allocResp.Allocations, func(a, b *structs.AllocListStub) bool {
return a.CreateIndex < b.CreateIndex
})

if alloc.ID != allocResp.Allocations[0].ID {
return false, fmt.Errorf("unexpected change in alloc: %#v", *allocResp.Allocations[0])
}

if cs := allocResp.Allocations[0].ClientStatus; cs != structs.AllocClientStatusComplete {
return false, fmt.Errorf("expected old alloc to be complete but found: %s", cs)
}

if cs := allocResp.Allocations[1].ClientStatus; cs != structs.AllocClientStatusPending {
return false, fmt.Errorf("expected new alloc to be pending but found: %s", cs)
}
return true, nil
})
}

func TestJobEndpoint_Register_PreserveCounts(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewDeviceAccounter(n *Node) *DeviceAccounter {
func (d *DeviceAccounter) AddAllocs(allocs []*Allocation) (collision bool) {
for _, a := range allocs {
// Filter any terminal allocation
if a.TerminalStatus() {
if a.ClientTerminalStatus() {
continue
}

Expand Down
30 changes: 30 additions & 0 deletions nomad/structs/devices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/uuid"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -162,6 +163,35 @@ func TestDeviceAccounter_AddAllocs_Collision(t *testing.T) {
require.True(d.AddAllocs(allocs))
}

// Assert that devices are not freed when an alloc's ServerTerminalStatus is
// true, but only when ClientTerminalStatus is true.
func TestDeviceAccounter_AddAllocs_TerminalStatus(t *testing.T) {
ci.Parallel(t)

n := devNode()
d := NewDeviceAccounter(n)

// Create two allocations, both with the same device. First is being told to
// stop but has not stopped yet.
a1, a2 := nvidiaAlloc(), nvidiaAlloc()
a1.DesiredStatus = AllocDesiredStatusStop
a1.ClientStatus = AllocClientStatusRunning

nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID
a1.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID}
a2.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID}

allocs := []*Allocation{a1, a2}

// Since a1 has not stopped on the client, its device is still in use
must.True(t, d.AddAllocs(allocs))

// Assert that stop a1 on the client frees the device for use by a2
a1.ClientStatus = AllocClientStatusComplete
d = NewDeviceAccounter(n)
must.False(t, d.AddAllocs(allocs))
}

// Make sure that the device allocator works even if the node has no devices
func TestDeviceAccounter_AddReserved_NoDeviceNode(t *testing.T) {
ci.Parallel(t)
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
// For each alloc, add the resources
for _, alloc := range allocs {
// Do not consider the resource impact of terminal allocations
if alloc.TerminalStatus() {
if alloc.ClientTerminalStatus() {
continue
}

Expand Down
Loading

0 comments on commit 757c3c9

Please sign in to comment.