From 2f7f862077f9a27980e05f2cfbd51cfd85eec96a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 16 Jun 2022 16:55:56 -0400 Subject: [PATCH] fix deadlock in plan_apply The plan applier has to get a snapshot with a minimum index for the plan it's working on in order to ensure consistency. Under heavy raft loads, we can exceed the timeout. When this happens, we hit a bug where the plan applier blocks waiting on the `indexCh` forever, and all schedulers will block in `Plan.Submit`. Closing the `indexCh` when the `asyncPlanWait` is done with it will prevent the deadlock without impacting correctness of the previous snapshot index. This changeset includes the a PoC failing test that works by injecting a large timeout into the state store. We need to turn this into a test we can run normally without breaking the state store before we can merge this PR. --- nomad/plan_apply.go | 7 ++-- nomad/plan_endpoint_test.go | 68 +++++++++++++++++++++++++++++++++++++ nomad/state/state_store.go | 10 ++++++ 3 files changed, 82 insertions(+), 3 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 7f1fffc54f8c..8a1cd7caa304 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -153,12 +153,15 @@ func (p *planner) planApply() { // Ensure any parallel apply is complete before starting the next one. // This also limits how out of date our snapshot can be. if planIndexCh != nil { + fmt.Println("waiting for idx...") // DEBUG idx := <-planIndexCh + fmt.Println("got index", idx) // DEBUG prevPlanResultIndex = max(prevPlanResultIndex, idx) snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex) if err != nil { p.logger.Error("failed to update snapshot state", "error", err) pending.respond(nil, err) + planIndexCh = nil continue } } @@ -368,14 +371,12 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) { func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture, result *structs.PlanResult, pending *pendingPlan) { defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now()) + defer close(indexCh) // Wait for the plan to apply if err := future.Error(); err != nil { p.logger.Error("failed to apply plan", "error", err) pending.respond(nil, err) - - // Close indexCh on error - close(indexCh) return } diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go index 8c02c2ba9a9f..963be8972ca4 100644 --- a/nomad/plan_endpoint_test.go +++ b/nomad/plan_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "sync" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -128,3 +130,69 @@ func TestPlanEndpoint_Submit_Bad(t *testing.T) { // Ensure no plans were enqueued require.Zero(t, s1.planner.planQueue.Stats().Depth) } + +func TestPlanEndpoint_ApplyDeadlock(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + plans := []*structs.Plan{} + + for i := 0; i < 5; i++ { + + // Create a node to place on + node := mock.Node() + store := s1.fsm.State() + require.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, node)) + + // Create the eval + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + require.NoError(t, store.UpsertEvals( + structs.MsgTypeTestSetup, 150, []*structs.Evaluation{eval1})) + + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) + require.NoError(t, err) + require.Equal(t, eval1, evalOut) + + // Submit a plan + plan := mock.Plan() + plan.EvalID = eval1.ID + plan.EvalToken = token + plan.Job = mock.Job() + + alloc := mock.Alloc() + alloc.JobID = plan.Job.ID + alloc.Job = plan.Job + + plan.NodeAllocation = map[string][]*structs.Allocation{ + node.ID: []*structs.Allocation{alloc}} + + plans = append(plans, plan) + } + + var wg sync.WaitGroup + + for _, plan := range plans { + plan := plan + wg.Add(1) + go func() { + + req := &structs.PlanRequest{ + Plan: plan, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.PlanResponse + err := s1.RPC("Plan.Submit", req, &resp) + assert.NoError(t, err) + assert.NotNil(t, resp.Result, "missing result") + wg.Done() + }() + } + + wg.Wait() +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6c5fabdda054..bc48130d7801 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -204,6 +204,9 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { return snap, nil } +// DEBUG: this is to introduce a one-time timeout +var stop = true + // SnapshotMinIndex is used to create a state snapshot where the index is // guaranteed to be greater than or equal to the index parameter. // @@ -222,6 +225,13 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State var retries uint var retryTimer *time.Timer + // DEBUG: this is to introduce a one-time timeout + if index == 7 && stop { + stop = false + time.Sleep(6000 * time.Millisecond) + return nil, ctx.Err() + } + // XXX: Potential optimization is to set up a watch on the state // store's index table and only unblock via a trigger rather than // polling.