Skip to content

Commit

Permalink
scheduler: seed random shuffle nodes with eval ID
Browse files Browse the repository at this point in the history
Processing an evaluation is nearly a pure function over the state
snapshot, but we randomly shuffle the nodes. This means that
developers can't take a given state snapshot and pass an evaluation
through it and be guaranteed the same plan results.

But the evaluation ID is already random, so if we use this as the seed
for shuffling the nodes we can greatly reduce the sources of
non-determinism. Unfortunately golang map iteration uses a global
source of randomness and not a goroutine-local one, but arguably
if the scheduler behavior is impacted by this, that's a bug in the
iteration.
  • Loading branch information
tgross committed Feb 8, 2022
1 parent 21f7d01 commit 20b3270
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .changelog/12008.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
scheduler: Seed node shuffling with the evaluation ID to make the order reproducible
```
1 change: 1 addition & 0 deletions scheduler/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
func testContext(t testing.TB) (*state.StateStore, *EvalContext) {
state := state.TestStateStore(t)
plan := &structs.Plan{
EvalID: uuid.Generate(),
NodeUpdate: make(map[string][]*structs.Allocation),
NodeAllocation: make(map[string][]*structs.Allocation),
NodePreemptions: make(map[string][]*structs.Allocation),
Expand Down
3 changes: 2 additions & 1 deletion scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func (iter *StaticIterator) SetNodes(nodes []*structs.Node) {
// is applied in-place
func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// shuffle with the Fisher-Yates algorithm
shuffleNodes(nodes)
idx, _ := ctx.State().LatestIndex()
shuffleNodes(ctx.Plan(), idx, nodes)

// Create a static iterator
return NewStaticIterator(ctx, nodes)
Expand Down
3 changes: 3 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type State interface {

// CSIVolumeByID fetch CSI volumes, containing controller jobs
CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error)

// LatestIndex returns the greatest index value for all indexes.
LatestIndex() (uint64, error)
}

// Planner interface is used to submit a task allocation plan.
Expand Down
3 changes: 2 additions & 1 deletion scheduler/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type GenericStack struct {

func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
// Shuffle base nodes
shuffleNodes(baseNodes)
idx, _ := s.ctx.State().LatestIndex()
shuffleNodes(s.ctx.Plan(), idx, baseNodes)

// Update the set of base nodes
s.source.SetNodes(baseNodes)
Expand Down
21 changes: 18 additions & 3 deletions scheduler/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"encoding/binary"
"fmt"
"math/rand"
"reflect"
Expand Down Expand Up @@ -376,11 +377,25 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct
return out, nil
}

// shuffleNodes randomizes the slice order with the Fisher-Yates algorithm
func shuffleNodes(nodes []*structs.Node) {
// shuffleNodes randomizes the slice order with the Fisher-Yates
// algorithm. We seed the random source with the eval ID (which is
// random) to aid in postmortem debugging of specific evaluations and
// state snapshots.
func shuffleNodes(plan *structs.Plan, index uint64, nodes []*structs.Node) {

// use the last 4 bytes because those are the random bits
// if we have sortable IDs
buf := []byte(plan.EvalID)
seed := binary.BigEndian.Uint64(buf[len(buf)-8:])

// for retried plans the index is the plan result's RefreshIndex
// so that we don't retry with the exact same shuffle
seed |= index
r := rand.New(rand.NewSource(int64(seed >> 2)))

n := len(nodes)
for i := n - 1; i > 0; i-- {
j := rand.Intn(i + 1)
j := r.Intn(i + 1)
nodes[i], nodes[j] = nodes[j], nodes[i]
}
}
Expand Down
11 changes: 10 additions & 1 deletion scheduler/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,17 @@ func TestShuffleNodes(t *testing.T) {
}
orig := make([]*structs.Node, len(nodes))
copy(orig, nodes)
shuffleNodes(nodes)
eval := mock.Eval() // will have random EvalID
plan := eval.MakePlan(mock.Job())
shuffleNodes(plan, 1000, nodes)
require.False(t, reflect.DeepEqual(nodes, orig))

nodes2 := make([]*structs.Node, len(nodes))
copy(nodes2, orig)
shuffleNodes(plan, 1000, nodes2)

require.True(t, reflect.DeepEqual(nodes, nodes2))

}

func TestTaskUpdatedAffinity(t *testing.T) {
Expand Down

0 comments on commit 20b3270

Please sign in to comment.