From 2f7f862077f9a27980e05f2cfbd51cfd85eec96a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 16 Jun 2022 16:55:56 -0400 Subject: [PATCH 1/6] fix deadlock in plan_apply The plan applier has to get a snapshot with a minimum index for the plan it's working on in order to ensure consistency. Under heavy raft loads, we can exceed the timeout. When this happens, we hit a bug where the plan applier blocks waiting on the `indexCh` forever, and all schedulers will block in `Plan.Submit`. Closing the `indexCh` when the `asyncPlanWait` is done with it will prevent the deadlock without impacting correctness of the previous snapshot index. This changeset includes the a PoC failing test that works by injecting a large timeout into the state store. We need to turn this into a test we can run normally without breaking the state store before we can merge this PR. --- nomad/plan_apply.go | 7 ++-- nomad/plan_endpoint_test.go | 68 +++++++++++++++++++++++++++++++++++++ nomad/state/state_store.go | 10 ++++++ 3 files changed, 82 insertions(+), 3 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 7f1fffc54f8c..8a1cd7caa304 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -153,12 +153,15 @@ func (p *planner) planApply() { // Ensure any parallel apply is complete before starting the next one. // This also limits how out of date our snapshot can be. if planIndexCh != nil { + fmt.Println("waiting for idx...") // DEBUG idx := <-planIndexCh + fmt.Println("got index", idx) // DEBUG prevPlanResultIndex = max(prevPlanResultIndex, idx) snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { p.logger.Error("failed to update snapshot state", "error", err) pending.respond(nil, err) + planIndexCh = nil continue } } @@ -368,14 +371,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) { func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture, result *structs.PlanResult, pending *pendingPlan) { defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) + defer close(indexCh) // Wait for the plan to apply if err := future.Error(); err != nil { p.logger.Error("failed to apply plan", "error", err) pending.respond(nil, err) - - // Close indexCh on error - close(indexCh) return } diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go index 8c02c2ba9a9f..963be8972ca4 100644 --- a/nomad/plan_endpoint_test.go +++ b/nomad/plan_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "sync" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) { // Ensure no plans were enqueued require.Zero(t, s1.planner.planQueue.Stats().Depth) } + +func TestPlanEndpoint_ApplyDeadlock(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + plans := []*structs.Plan{} + + for i := 0; i < 5; i++ { + + // Create a node to place on + node := mock.Node() + store := s1.fsm.State() + require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node)) + + // Create the eval + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + require.NoError(t, store.UpsertEvals( + structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1})) + + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) + require.NoError(t, err) + require.Equal(t, eval1, evalOut) + + // Submit a plan + plan := mock.Plan() + plan.EvalID = eval1.ID + plan.EvalToken = token + plan.Job = mock.Job() + + alloc := mock.Alloc() + alloc.JobID = plan.Job.ID + alloc.Job = plan.Job + + plan.NodeAllocation = map[string][]*structs.Allocation{ + node.ID: []*structs.Allocation{alloc}} + + plans = append(plans, plan) + } + + var wg sync.WaitGroup + + for _, plan := range plans { + plan := plan + wg.Add(1) + go func() { + + req := &structs.PlanRequest{ + Plan: plan, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.PlanResponse + err := s1.RPC("Plan.Submit", req, &resp) + assert.NoError(t, err) + assert.NotNil(t, resp.Result, "missing result") + wg.Done() + }() + } + + wg.Wait() +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6c5fabdda054..bc48130d7801 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -204,6 +204,9 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { return snap, nil } +// DEBUG: this is to introduce a one-time timeout +var stop = true + // SnapshotMinIndex is used to create a state snapshot where the index is // guaranteed to be greater than or equal to the index parameter. // @@ -222,6 +225,13 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State var retries uint var retryTimer *time.Timer + // DEBUG: this is to introduce a one-time timeout + if index == 7 && stop { + stop = false + time.Sleep(6000 * time.Millisecond) + return nil, ctx.Err() + } + // XXX: Potential optimization is to set up a watch on the state // store's index table and only unblock via a trigger rather than // polling. From 6fab93703fc70a7440a6e4fbd7dd5d7e5bcb81db Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 17 Jun 2022 14:08:45 -0400 Subject: [PATCH 2/6] remove temporarily broken state store code --- nomad/plan_apply.go | 2 -- nomad/plan_endpoint_test.go | 2 +- nomad/state/state_store.go | 10 ---------- 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 8a1cd7caa304..0e7e998cff58 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -153,9 +153,7 @@ func (p *planner) planApply() { // Ensure any parallel apply is complete before starting the next one. // This also limits how out of date our snapshot can be. if planIndexCh != nil { - fmt.Println("waiting for idx...") // DEBUG idx := <-planIndexCh - fmt.Println("got index", idx) // DEBUG prevPlanResultIndex = max(prevPlanResultIndex, idx) snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go index 963be8972ca4..c36cb4f1989d 100644 --- a/nomad/plan_endpoint_test.go +++ b/nomad/plan_endpoint_test.go @@ -131,7 +131,7 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) { require.Zero(t, s1.planner.planQueue.Stats().Depth) } -func TestPlanEndpoint_ApplyDeadlock(t *testing.T) { +func TestPlanEndpoint_ApplyConcurrent(t *testing.T) { t.Parallel() s1, cleanupS1 := TestServer(t, func(c *Config) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index bc48130d7801..6c5fabdda054 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -204,9 +204,6 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { return snap, nil } -// DEBUG: this is to introduce a one-time timeout -var stop = true - // SnapshotMinIndex is used to create a state snapshot where the index is // guaranteed to be greater than or equal to the index parameter. // @@ -225,13 +222,6 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State var retries uint var retryTimer *time.Timer - // DEBUG: this is to introduce a one-time timeout - if index == 7 && stop { - stop = false - time.Sleep(6000 * time.Millisecond) - return nil, ctx.Err() - } - // XXX: Potential optimization is to set up a watch on the state // store's index table and only unblock via a trigger rather than // polling. From 5e0964ef1768aa4bc8d2b1f71989da040a69b25a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 17 Jun 2022 14:28:46 -0400 Subject: [PATCH 3/6] changelog entry --- .changelog/13407.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/13407.txt diff --git a/.changelog/13407.txt b/.changelog/13407.txt new file mode 100644 index 000000000000..8376f89eac42 --- /dev/null +++ b/.changelog/13407.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Fixed a bug where the plan applier could deadlock if raft load prevented a state store snapshot from completing within 5 seconds +``` From 247a03f5e9c666fd64a66e6a8aacd61de73adfad Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 21 Jun 2022 14:31:22 -0400 Subject: [PATCH 4/6] increase `snapshotMinIndex` timeout to 10s This timeout creates backpressure where any concurrent `Plan.Submit` RPCs will block waiting for results. This sheds load across all servers and gives raft some CPU to catch up, because schedulers won't dequeue more work while waiting. Increase it to 10s based on observations of large production clusters. --- nomad/plan_apply.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 0e7e998cff58..f56ddb0402d8 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -178,7 +178,7 @@ func (p *planner) planApply() { } } -// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout +// snapshotMinIndex wraps SnapshotAfter with a 10s timeout and converts timeout // errors to a more descriptive error message. The snapshot is guaranteed to // include both the previous plan and all objects referenced by the plan or // return an error. @@ -189,7 +189,11 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64 // plan result's and current plan's snapshot index. minIndex := max(prevPlanResultIndex, planSnapshotIndex) - const timeout = 5 * time.Second + // This timeout creates backpressure where any concurrent + // Plan.Submit RPCs will block waiting for results. This sheds + // load across all servers and gives raft some CPU to catch up, + // because schedulers won't dequeue more work while waiting. + const timeout = 10 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex) cancel() From 6fbd2a86b1ee80fbd4112a3432efe7b4e558bda3 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 22 Jun 2022 09:45:59 -0400 Subject: [PATCH 5/6] Apply suggestions from code review Co-authored-by: Michael Schurter --- .changelog/13407.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changelog/13407.txt b/.changelog/13407.txt index 8376f89eac42..7a2fb340bacd 100644 --- a/.changelog/13407.txt +++ b/.changelog/13407.txt @@ -1,3 +1,3 @@ ```release-note:bug -core: Fixed a bug where the plan applier could deadlock if raft load prevented a state store snapshot from completing within 5 seconds +core: Fixed a bug where the plan applier could deadlock if leader's state lagged behind plan's creation index for more than 5 seconds. ``` From 2a358ace8be4aaf02b789f5687de0da653f531bf Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 22 Jun 2022 10:11:29 -0400 Subject: [PATCH 6/6] move up nil of indexCh --- nomad/plan_apply.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f56ddb0402d8..58002b2258aa 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -154,12 +154,12 @@ func (p *planner) planApply() { // This also limits how out of date our snapshot can be. if planIndexCh != nil { idx := <-planIndexCh + planIndexCh = nil prevPlanResultIndex = max(prevPlanResultIndex, idx) snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { p.logger.Error("failed to update snapshot state", "error", err) pending.respond(nil, err) - planIndexCh = nil continue } }