Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: stopped-yet-running allocs are still running #10446

Merged
merged 7 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/10446.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: prevent new allocations from overlapping execution with stopping allocations
```
164 changes: 164 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)

func TestJobEndpoint_Register(t *testing.T) {
Expand Down Expand Up @@ -109,6 +111,168 @@ func TestJobEndpoint_Register(t *testing.T) {
}
}

// TestJobEndpoint_Register_NonOverlapping asserts that ClientStatus must be
// terminal, not just DesiredStatus, for the resources used by a job to be
// considered free for subsequent placements to use.
//
// See: https://github.com/hashicorp/nomad/issues/10440
func TestJobEndpoint_Register_NonOverlapping(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
})
Comment on lines +122 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s1, cleanupS1 := TestServer(t, func(c *Config) {
})
s1, cleanupS1 := TestServer(t, nil)

defer cleanupS1()
state := s1.fsm.State()

// Create a mock node with easy to check resources
node := mock.Node()
node.Resources = nil // Deprecated in 0.9
node.NodeResources.Cpu.CpuShares = 700
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node))

codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register request
job := mock.Job()
job.TaskGroups[0].Count = 1
req := &structs.JobRegisterRequest{
Job: job.Copy(),
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
var resp structs.JobRegisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.NonZero(t, resp.Index)

// Assert placement
jobReq := &structs.JobSpecificRequest{
JobID: job.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var alloc *structs.AllocListStub
testutil.Wait(t, func() (bool, error) {
resp := structs.JobAllocationsResponse{}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Allocations", jobReq, &resp))
if n := len(resp.Allocations); n != 1 {
return false, fmt.Errorf("expected 1 allocation but found %d:\n%v", n, resp.Allocations)
}

alloc = resp.Allocations[0]
return true, nil
})
must.Eq(t, alloc.NodeID, node.ID)
must.Eq(t, alloc.DesiredStatus, structs.AllocDesiredStatusRun)
must.Eq(t, alloc.ClientStatus, structs.AllocClientStatusPending)

// Stop
stopReq := &structs.JobDeregisterRequest{
JobID: job.ID,
Purge: false,
WriteRequest: req.WriteRequest,
}
var stopResp structs.JobDeregisterResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Deregister", stopReq, &stopResp))

// Assert new register blocked
req.Job = job.Copy()
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
must.NonZero(t, resp.Index)

blockedEval := ""
testutil.Wait(t, func() (bool, error) {
// Assert no new allocs
allocResp := structs.JobAllocationsResponse{}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Allocations", jobReq, &allocResp))
if n := len(allocResp.Allocations); n != 1 {
return false, fmt.Errorf("expected 1 allocation but found %d:\n%v", n, allocResp.Allocations)
}

if alloc.ID != allocResp.Allocations[0].ID {
return false, fmt.Errorf("unexpected change in alloc: %#v", *allocResp.Allocations[0])
}

eval, err := state.EvalByID(nil, resp.EvalID)
must.NoError(t, err)
if eval == nil {
return false, fmt.Errorf("eval not applied: %s", resp.EvalID)
}
if eval.Status != structs.EvalStatusComplete {
return false, fmt.Errorf("expected eval to be complete but found: %s", eval.Status)
}
if eval.BlockedEval == "" {
return false, fmt.Errorf("expected a blocked eval to be created")
}
blockedEval = eval.BlockedEval
return true, nil
})

// Set ClientStatus=complete like a client would
stoppedAlloc := &structs.Allocation{
ID: alloc.ID,
NodeID: alloc.NodeID,
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStateDead,
},
},
ClientStatus: structs.AllocClientStatusComplete,
DeploymentStatus: nil, // should not have an impact
NetworkStatus: nil, // should not have an impact
}
upReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{stoppedAlloc},
WriteRequest: req.WriteRequest,
}
var upResp structs.GenericResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", upReq, &upResp))

// Assert newer register's eval unblocked
testutil.Wait(t, func() (bool, error) {
eval, err := state.EvalByID(nil, blockedEval)
must.NoError(t, err)
must.NotNil(t, eval)
if eval.Status != structs.EvalStatusComplete {
return false, fmt.Errorf("expected blocked eval to be complete but found: %s", eval.Status)
}
return true, nil
})

// Assert new alloc placed
testutil.Wait(t, func() (bool, error) {
// Assert no new allocs
allocResp := structs.JobAllocationsResponse{}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Allocations", jobReq, &allocResp))
if n := len(allocResp.Allocations); n != 2 {
return false, fmt.Errorf("expected 2 allocs but found %d:\n%v", n, allocResp.Allocations)
}

slices.SortFunc(allocResp.Allocations, func(a, b *structs.AllocListStub) bool {
return a.CreateIndex < b.CreateIndex
})

if alloc.ID != allocResp.Allocations[0].ID {
return false, fmt.Errorf("unexpected change in alloc: %#v", *allocResp.Allocations[0])
}

if cs := allocResp.Allocations[0].ClientStatus; cs != structs.AllocClientStatusComplete {
return false, fmt.Errorf("expected old alloc to be complete but found: %s", cs)
}

if cs := allocResp.Allocations[1].ClientStatus; cs != structs.AllocClientStatusPending {
return false, fmt.Errorf("expected new alloc to be pending but found: %s", cs)
}
return true, nil
})
}

func TestJobEndpoint_Register_PreserveCounts(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
// For each alloc, add the resources
for _, alloc := range allocs {
// Do not consider the resource impact of terminal allocations
if alloc.TerminalStatus() {
if alloc.ClientTerminalStatus() {
continue
}
Comment on lines -176 to 178
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this code is ever hit. The 2 places we call AllocsFit both already filter down to just non-terminal allocs as far as I can tell.

The call in the plan applier calls AllocsByNodeTerminal(false) to generate the list of allocs, so all terminal allocs are already removed AFAICT.

The call in the worker is also passed a list of allocs from AllocsByNodeTerminal(false).

AllocsByNodeTerminal itself makes this all pretty tricky to trace because it's difficult to know the provenance of any given []*Allocation and how its been populated and filtered.

I think leaving this filter logic in place is ok as a defensive measure, but I think we need to ensure the half-terminal allocs aren't filtered out "upstream."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong. This is hit in the plan applier! Tested against main (c2428e1 - v1.3.2-dev) and rebased this commit on top of it.

I missed that after the call to snap.AllocsByNodeTerminal(ws, nodeID, false) (which returns only non-terminal allocs) we append the allocs from the Plan which include the failed alloc: https://github.com/hashicorp/nomad/blob/v1.3.1/nomad/plan_apply.go#L699

To repro:

nomad agent -dev
nomad job run example.nomad
nomad alloc signal $allocid
# wait for restart
nomad alloc signal $allocid
# wait for restart
nomad alloc signal #allocid

# eval for rescheduling sees old alloc as DesiredStatus=run + ClientStatus=failed


Expand Down
172 changes: 172 additions & 0 deletions nomad/structs/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func TestAllocsFit_TerminalAlloc_Old(t *testing.T) {
// Should fit second allocation since it is terminal
a2 := a1.Copy()
a2.DesiredStatus = AllocDesiredStatusStop
a2.ClientStatus = AllocClientStatusComplete
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil, false)
require.NoError(err)
require.True(fit)
Expand Down Expand Up @@ -494,13 +495,184 @@ func TestAllocsFit_TerminalAlloc(t *testing.T) {
// Should fit second allocation since it is terminal
a2 := a1.Copy()
a2.DesiredStatus = AllocDesiredStatusStop
a2.ClientStatus = AllocClientStatusComplete
fit, dim, used, err := AllocsFit(n, []*Allocation{a1, a2}, nil, false)
require.NoError(err)
require.True(fit, dim)
require.EqualValues(1000, used.Flattened.Cpu.CpuShares)
require.EqualValues(1024, used.Flattened.Memory.MemoryMB)
}

// TestAllocsFit_ClientTerminalAlloc asserts that allocs which have a terminal
// ClientStatus *do not* have their resources counted as in-use.
func TestAllocsFit_ClientTerminalAlloc(t *testing.T) {
ci.Parallel(t)

n := &Node{
ID: "test-node",
NodeResources: &NodeResources{
Cpu: NodeCpuResources{
CpuShares: 2000,
},
Memory: NodeMemoryResources{
MemoryMB: 2048,
},
Disk: NodeDiskResources{
DiskMB: 10000,
},
Networks: []*NetworkResource{
{
Device: "eth0",
CIDR: "10.0.0.0/8",
IP: "10.0.0.1",
MBits: 100,
},
},
},
ReservedResources: &NodeReservedResources{
Cpu: NodeReservedCpuResources{
CpuShares: 1000,
},
Memory: NodeReservedMemoryResources{
MemoryMB: 1024,
},
Disk: NodeReservedDiskResources{
DiskMB: 5000,
},
Networks: NodeReservedNetworkResources{
ReservedHostPorts: "80",
},
},
}

liveAlloc := &Allocation{
ID: "test-alloc-live",
ClientStatus: AllocClientStatusPending,
DesiredStatus: AllocDesiredStatusRun,
AllocatedResources: &AllocatedResources{
Tasks: map[string]*AllocatedTaskResources{
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
Networks: []*NetworkResource{
{
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 8000, 80, ""}},
},
},
},
},
Shared: AllocatedSharedResources{
DiskMB: 5000,
},
},
}

deadAlloc := liveAlloc.Copy()
deadAlloc.ID = "test-alloc-dead"
deadAlloc.ClientStatus = AllocClientStatusFailed
deadAlloc.DesiredStatus = AllocDesiredStatusRun

// *Should* fit both allocations since deadAlloc is not running on the
// client
fit, _, used, err := AllocsFit(n, []*Allocation{liveAlloc, deadAlloc}, nil, false)
require.NoError(t, err)
require.True(t, fit)
require.EqualValues(t, 1000, used.Flattened.Cpu.CpuShares)
require.EqualValues(t, 1024, used.Flattened.Memory.MemoryMB)
}

// TestAllocsFit_ServerTerminalAlloc asserts that allocs which have a terminal
// DesiredStatus but are still running on clients *do* have their resources
// counted as in-use.
func TestAllocsFit_ServerTerminalAlloc(t *testing.T) {
ci.Parallel(t)

n := &Node{
ID: "test-node",
NodeResources: &NodeResources{
Cpu: NodeCpuResources{
CpuShares: 2000,
},
Memory: NodeMemoryResources{
MemoryMB: 2048,
},
Disk: NodeDiskResources{
DiskMB: 10000,
},
Networks: []*NetworkResource{
{
Device: "eth0",
CIDR: "10.0.0.0/8",
IP: "10.0.0.1",
MBits: 100,
},
},
},
ReservedResources: &NodeReservedResources{
Cpu: NodeReservedCpuResources{
CpuShares: 1000,
},
Memory: NodeReservedMemoryResources{
MemoryMB: 1024,
},
Disk: NodeReservedDiskResources{
DiskMB: 5000,
},
Networks: NodeReservedNetworkResources{
ReservedHostPorts: "80",
},
},
}

liveAlloc := &Allocation{
ID: "test-alloc-live",
ClientStatus: AllocClientStatusPending,
DesiredStatus: AllocDesiredStatusRun,
AllocatedResources: &AllocatedResources{
Tasks: map[string]*AllocatedTaskResources{
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
},
Networks: []*NetworkResource{
{
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 8000, 80, ""}},
},
},
},
},
Shared: AllocatedSharedResources{
DiskMB: 5000,
},
},
}

deadAlloc := liveAlloc.Copy()
deadAlloc.ID = "test-alloc-dead"
deadAlloc.ClientStatus = AllocClientStatusRunning
deadAlloc.DesiredStatus = AllocDesiredStatusStop

// Should *not* fit both allocations since deadAlloc is still running
fit, _, used, err := AllocsFit(n, []*Allocation{liveAlloc, deadAlloc}, nil, false)
require.NoError(t, err)
require.False(t, fit)
require.EqualValues(t, 2000, used.Flattened.Cpu.CpuShares)
require.EqualValues(t, 2048, used.Flattened.Memory.MemoryMB)
}

// Tests that AllocsFit detects device collisions
func TestAllocsFit_Devices(t *testing.T) {
ci.Parallel(t)
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (idx *NetworkIndex) SetNode(node *Node) error {
func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool, reason string) {
for _, alloc := range allocs {
// Do not consider the resource impact of terminal allocations
if alloc.TerminalStatus() {
if alloc.ClientTerminalStatus() {
continue
}

Expand Down
Loading