Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: stopped-yet-running allocs are still running #10446

Merged
merged 7 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/10446.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: prevent new allocations from overlapping execution with stopping allocations
```
164 changes: 164 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)

func TestJobEndpoint_Register(t *testing.T) {
Expand Down Expand Up @@ -109,6 +111,168 @@ func TestJobEndpoint_Register(t *testing.T) {
}
}

// TestJobEndpoint_Register_NonOverlapping asserts that ClientStatus must be
// terminal, not just DesiredStatus, for the resources used by a job to be
// considered free for subsequent placements to use.
//
// See: https://github.com/hashicorp/nomad/issues/10440
func TestJobEndpoint_Register_NonOverlapping(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
})
Comment on lines +122 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s1, cleanupS1 := TestServer(t, func(c *Config) {
})
s1, cleanupS1 := TestServer(t, nil)

defer cleanupS1()
state := s1.fsm.State()

// Create a mock node with easy to check resources
node := mock.Node()
node.Resources = nil // Deprecated in 0.9
node.NodeResources.Cpu.CpuShares = 700
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node))

codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
job := mock.Job()
job.TaskGroups[0].Count = 1
req := &structs.JobRegisterRequest{
Job: job.Copy(),
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
var resp structs.JobRegisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.NonZero(t, resp.Index)

// Assert placement
jobReq := &structs.JobSpecificRequest{
JobID: job.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var alloc *structs.AllocListStub
testutil.Wait(t, func() (bool, error) {
resp := structs.JobAllocationsResponse{}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Allocations", jobReq, &resp))
if n := len(resp.Allocations); n != 1 {
return false, fmt.Errorf("expected 1 allocation but found %d:\n%v", n, resp.Allocations)
}

alloc = resp.Allocations[0]
return true, nil
})
must.Eq(t, alloc.NodeID, node.ID)
must.Eq(t, alloc.DesiredStatus, structs.AllocDesiredStatusRun)
must.Eq(t, alloc.ClientStatus, structs.AllocClientStatusPending)

// Stop
stopReq := &structs.JobDeregisterRequest{
JobID: job.ID,
Purge: false,
WriteRequest: req.WriteRequest,
}
var stopResp structs.JobDeregisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Deregister", stopReq, &stopResp))

// Assert new register blocked
req.Job = job.Copy()
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.NonZero(t, resp.Index)

blockedEval := ""
testutil.Wait(t, func() (bool, error) {
// Assert no new allocs
allocResp := structs.JobAllocationsResponse{}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Allocations", jobReq, &allocResp))
if n := len(allocResp.Allocations); n != 1 {
return false, fmt.Errorf("expected 1 allocation but found %d:\n%v", n, allocResp.Allocations)
}

if alloc.ID != allocResp.Allocations[0].ID {
return false, fmt.Errorf("unexpected change in alloc: %#v", *allocResp.Allocations[0])
}

eval, err := state.EvalByID(nil, resp.EvalID)
must.NoError(t, err)
if eval == nil {
return false, fmt.Errorf("eval not applied: %s", resp.EvalID)
}
if eval.Status != structs.EvalStatusComplete {
return false, fmt.Errorf("expected eval to be complete but found: %s", eval.Status)
}
if eval.BlockedEval == "" {
return false, fmt.Errorf("expected a blocked eval to be created")
}
blockedEval = eval.BlockedEval
return true, nil
})

// Set ClientStatus=complete like a client would
stoppedAlloc := &structs.Allocation{
ID: alloc.ID,
NodeID: alloc.NodeID,
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStateDead,
},
},
ClientStatus: structs.AllocClientStatusComplete,
DeploymentStatus: nil, // should not have an impact
NetworkStatus: nil, // should not have an impact
}
upReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{stoppedAlloc},
WriteRequest: req.WriteRequest,
}
var upResp structs.GenericResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", upReq, &upResp))

// Assert newer register's eval unblocked
testutil.Wait(t, func() (bool, error) {
eval, err := state.EvalByID(nil, blockedEval)
must.NoError(t, err)
must.NotNil(t, eval)
if eval.Status != structs.EvalStatusComplete {
return false, fmt.Errorf("expected blocked eval to be complete but found: %s", eval.Status)
}
return true, nil
})

// Assert new alloc placed
testutil.Wait(t, func() (bool, error) {
// Assert no new allocs
allocResp := structs.JobAllocationsResponse{}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Allocations", jobReq, &allocResp))
if n := len(allocResp.Allocations); n != 2 {
return false, fmt.Errorf("expected 2 allocs but found %d:\n%v", n, allocResp.Allocations)
}

slices.SortFunc(allocResp.Allocations, func(a, b *structs.AllocListStub) bool {
return a.CreateIndex < b.CreateIndex
})

if alloc.ID != allocResp.Allocations[0].ID {
return false, fmt.Errorf("unexpected change in alloc: %#v", *allocResp.Allocations[0])
}

if cs := allocResp.Allocations[0].ClientStatus; cs != structs.AllocClientStatusComplete {
return false, fmt.Errorf("expected old alloc to be complete but found: %s", cs)
}

if cs := allocResp.Allocations[1].ClientStatus; cs != structs.AllocClientStatusPending {
return false, fmt.Errorf("expected new alloc to be pending but found: %s", cs)
}
return true, nil
})
}

func TestJobEndpoint_Register_PreserveCounts(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewDeviceAccounter(n *Node) *DeviceAccounter {
func (d *DeviceAccounter) AddAllocs(allocs []*Allocation) (collision bool) {
for _, a := range allocs {
// Filter any terminal allocation
if a.TerminalStatus() {
if a.ClientTerminalStatus() {
continue
}

Expand Down
30 changes: 30 additions & 0 deletions nomad/structs/devices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/uuid"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -162,6 +163,35 @@ func TestDeviceAccounter_AddAllocs_Collision(t *testing.T) {
require.True(d.AddAllocs(allocs))
}

// Assert that devices are not freed when an alloc's ServerTerminalStatus is
// true, but only when ClientTerminalStatus is true.
func TestDeviceAccounter_AddAllocs_TerminalStatus(t *testing.T) {
ci.Parallel(t)

n := devNode()
d := NewDeviceAccounter(n)

// Create two allocations, both with the same device. First is being told to
// stop but has not stopped yet.
a1, a2 := nvidiaAlloc(), nvidiaAlloc()
a1.DesiredStatus = AllocDesiredStatusStop
a1.ClientStatus = AllocClientStatusRunning

nvidiaDev0ID := n.NodeResources.Devices[0].Instances[0].ID
a1.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID}
a2.AllocatedResources.Tasks["web"].Devices[0].DeviceIDs = []string{nvidiaDev0ID}

allocs := []*Allocation{a1, a2}

// Since a1 has not stopped on the client, its device is still in use
must.True(t, d.AddAllocs(allocs))

// Assert that stop a1 on the client frees the device for use by a2
a1.ClientStatus = AllocClientStatusComplete
d = NewDeviceAccounter(n)
must.False(t, d.AddAllocs(allocs))
}

// Make sure that the device allocator works even if the node has no devices
func TestDeviceAccounter_AddReserved_NoDeviceNode(t *testing.T) {
ci.Parallel(t)
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
// For each alloc, add the resources
for _, alloc := range allocs {
// Do not consider the resource impact of terminal allocations
if alloc.TerminalStatus() {
if alloc.ClientTerminalStatus() {
continue
}
Comment on lines -176 to 178
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this code is ever hit. The 2 places we call AllocsFit both already filter down to just non-terminal allocs as far as I can tell.

The call in the plan applier calls AllocsByNodeTerminal(false) to generate the list of allocs, so all terminal allocs are already removed AFAICT.

The call in the worker is also passed a list of allocs from AllocsByNodeTerminal(false).

AllocsByNodeTerminal itself makes this all pretty tricky to trace because it's difficult to know the provenance of any given []*Allocation and how its been populated and filtered.

I think leaving this filter logic in place is ok as a defensive measure, but I think we need to ensure the half-terminal allocs aren't filtered out "upstream."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong. This is hit in the plan applier! Tested against main (c2428e1 - v1.3.2-dev) and rebased this commit on top of it.

I missed that after the call to snap.AllocsByNodeTerminal(ws, nodeID, false) (which returns only non-terminal allocs) we append the allocs from the Plan which include the failed alloc: https://github.com/hashicorp/nomad/blob/v1.3.1/nomad/plan_apply.go#L699

To repro:

nomad agent -dev
nomad job run example.nomad
nomad alloc signal $allocid
# wait for restart
nomad alloc signal $allocid
# wait for restart
nomad alloc signal #allocid

# eval for rescheduling sees old alloc as DesiredStatus=run + ClientStatus=failed


Expand Down
Loading