Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of fix deadlock in plan_apply into release/1.2.x #13470

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/13407.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: Fixed a bug where the plan applier could deadlock if leader's state lagged behind plan's creation index for more than 5 seconds.
```
13 changes: 8 additions & 5 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (p *planner) planApply() {
// This also limits how out of date our snapshot can be.
if planIndexCh != nil {
idx := <-planIndexCh
planIndexCh = nil
prevPlanResultIndex = max(prevPlanResultIndex, idx)
snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
if err != nil {
Expand All @@ -177,7 +178,7 @@ func (p *planner) planApply() {
}
}

// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout
// snapshotMinIndex wraps SnapshotAfter with a 10s timeout and converts timeout
// errors to a more descriptive error message. The snapshot is guaranteed to
// include both the previous plan and all objects referenced by the plan or
// return an error.
Expand All @@ -188,7 +189,11 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64
// plan result's and current plan's snapshot index.
minIndex := max(prevPlanResultIndex, planSnapshotIndex)

const timeout = 5 * time.Second
// This timeout creates backpressure where any concurrent
// Plan.Submit RPCs will block waiting for results. This sheds
// load across all servers and gives raft some CPU to catch up,
// because schedulers won't dequeue more work while waiting.
const timeout = 10 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex)
cancel()
Expand Down Expand Up @@ -368,14 +373,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
defer close(indexCh)

// Wait for the plan to apply
if err := future.Error(); err != nil {
p.logger.Error("failed to apply plan", "error", err)
pending.respond(nil, err)

// Close indexCh on error
close(indexCh)
return
}

Expand Down
68 changes: 68 additions & 0 deletions nomad/plan_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"sync"
"testing"
"time"

Expand All @@ -9,6 +10,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) {
// Ensure no plans were enqueued
require.Zero(t, s1.planner.planQueue.Stats().Depth)
}

func TestPlanEndpoint_ApplyConcurrent(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

plans := []*structs.Plan{}

for i := 0; i < 5; i++ {

// Create a node to place on
node := mock.Node()
store := s1.fsm.State()
require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node))

// Create the eval
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
require.NoError(t, store.UpsertEvals(
structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1}))

evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
require.NoError(t, err)
require.Equal(t, eval1, evalOut)

// Submit a plan
plan := mock.Plan()
plan.EvalID = eval1.ID
plan.EvalToken = token
plan.Job = mock.Job()

alloc := mock.Alloc()
alloc.JobID = plan.Job.ID
alloc.Job = plan.Job

plan.NodeAllocation = map[string][]*structs.Allocation{
node.ID: []*structs.Allocation{alloc}}

plans = append(plans, plan)
}

var wg sync.WaitGroup

for _, plan := range plans {
plan := plan
wg.Add(1)
go func() {

req := &structs.PlanRequest{
Plan: plan,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.PlanResponse
err := s1.RPC("Plan.Submit", req, &resp)
assert.NoError(t, err)
assert.NotNil(t, resp.Result, "missing result")
wg.Done()
}()
}

wg.Wait()
}