diff --git a/scheduler/benchmarks/benchmarks_test.go b/scheduler/benchmarks/benchmarks_test.go new file mode 100644 index 000000000000..d57e385a1bf5 --- /dev/null +++ b/scheduler/benchmarks/benchmarks_test.go @@ -0,0 +1,252 @@ +package benchmarks + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/scheduler" +) + +// TestProfileSchedulerExample is an example of how to write a one-off +// cpu profile for the Nomad scheduler. The pprof profile will be +// written to the path in the env var NOMAD_BENCHMARK_PROFILE_PATH. +// The starting state for your implementation will depend on the +// following environment variables: +// +// - NOMAD_BENCHMARK_DATADIR: path to data directory +// - NOMAD_BENCHMARK_SNAPSHOT: path to raft snapshot +// - neither: empty starting state +// +// Note that h.Process doesn't return errors for most states that +// result in blocked plans, so you'll want to make assertions about +// h.Plans so that you're sure you're profiling a successful run and +// not a failed plan. +func TestProfileSchedulerExample(t *testing.T) { + + h := NewBenchmarkingHarness(t) + var eval *structs.Evaluation + + // (implement me!) this is your setup for the state and the eval + // you're going to process, all of which happens before profiling + // starts. If you're profiling a real world datadir or snapshot, + // you should assert your assumptions about the contents here. + { + upsertNodes(h, 5000, 100) + + iter, err := h.State.Nodes(nil) + require.NoError(t, err) + nodes := 0 + for { + raw := iter.Next() + if raw == nil { + break + } + nodes++ + } + require.Equal(t, 5000, nodes) + job := generateJob(true, 600) + eval = upsertJob(h, job) + } + + doneFn := runCPUProfile(t, os.Getenv("NOMAD_BENCHMARK_PROFILE_PATH")) + defer doneFn() + + start := time.Now() + err := h.Process(scheduler.NewServiceScheduler, eval) + require.NoError(t, err) + require.LessOrEqual(t, time.Since(start), time.Duration(60*time.Second), + "time to evaluate exceeded EvalNackTimeout") + + // (implement me!) this is your test assertion for the profiled plan + { + require.Len(t, h.Plans, 1) + require.False(t, h.Plans[0].IsNoOp()) + } + +} + +// BenchmarkSchedulerExample is an example of how to write a one-off +// benchmark for the Nomad scheduler. The starting state for your +// implementation will depend on the following environment variables: +// +// - NOMAD_BENCHMARK_DATADIR: path to data directory +// - NOMAD_BENCHMARK_SNAPSHOT: path to raft snapshot +// - neither: empty starting state +// +// Note that h.Process doesn't return errors for most states that +// result in blocked plans, so it's recommended you try your job out +// with TestProfileSchedulerExample (see above) first so that you're +// sure you're benchmarking a successful run and not a failed plan. +func BenchmarkSchedulerExample(b *testing.B) { + + h := NewBenchmarkingHarness(b) + var eval *structs.Evaluation + + // (implement me!) this is your setup for the state and the eval + // you're going to process, all of which happens before benchmarking + // starts. If you're benchmarking a real world datadir or snapshot, + // you should assert your assumptions about the contents here. + { + upsertNodes(h, 5000, 100) + + iter, err := h.State.Nodes(nil) + require.NoError(b, err) + nodes := 0 + for { + raw := iter.Next() + if raw == nil { + break + } + nodes++ + } + require.Equal(b, 5000, nodes) + job := generateJob(true, 600) + eval = upsertJob(h, job) + } + + for i := 0; i < b.N; i++ { + err := h.Process(scheduler.NewServiceScheduler, eval) + require.NoError(b, err) + } +} + +// BenchmarkServiceScheduler exercises the service scheduler at a +// variety of cluster sizes, with both spread and non-spread jobs +func BenchmarkServiceScheduler(b *testing.B) { + + clusterSizes := []int{1000, 5000, 10000} + rackSets := []int{10, 25, 50, 75} + jobSizes := []int{300, 600, 900, 1200} + + type benchmark struct { + name string + clusterSize int + racks int + jobSize int + withSpread bool + } + + benchmarks := []benchmark{} + for _, clusterSize := range clusterSizes { + for _, racks := range rackSets { + for _, jobSize := range jobSizes { + benchmarks = append(benchmarks, + benchmark{ + name: fmt.Sprintf("%d nodes %d racks %d allocs spread", + clusterSize, racks, jobSize, + ), + clusterSize: clusterSize, racks: racks, jobSize: jobSize, + withSpread: true, + }, + ) + benchmarks = append(benchmarks, + benchmark{ + name: fmt.Sprintf("%d nodes %d racks %d allocs no spread", + clusterSize, racks, jobSize, + ), + clusterSize: clusterSize, racks: racks, jobSize: jobSize, + withSpread: false, + }, + ) + } + } + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + h := scheduler.NewHarness(b) + upsertNodes(h, bm.clusterSize, bm.racks) + job := generateJob(bm.withSpread, bm.jobSize) + eval := upsertJob(h, job) + for i := 0; i < b.N; i++ { + err := h.Process(scheduler.NewServiceScheduler, eval) + require.NoError(b, err) + } + }) + } +} + +func upsertJob(h *scheduler.Harness, job *structs.Job) *structs.Evaluation { + err := h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job) + if err != nil { + panic(err) + } + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + err = h.State.UpsertEvals(structs.MsgTypeTestSetup, + h.NextIndex(), []*structs.Evaluation{eval}) + if err != nil { + panic(err) + } + return eval +} + +func generateJob(withSpread bool, jobSize int) *structs.Job { + job := mock.Job() + job.Datacenters = []string{"dc-1", "dc-2"} + if withSpread { + job.Spreads = []*structs.Spread{{Attribute: "${meta.rack}"}} + } + job.Constraints = []*structs.Constraint{} + job.TaskGroups[0].Count = jobSize + job.TaskGroups[0].Networks = nil + job.TaskGroups[0].Services = []*structs.Service{} + job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 6000, + MemoryMB: 6000, + } + return job +} + +func upsertNodes(h *scheduler.Harness, count, racks int) { + + datacenters := []string{"dc-1", "dc-2"} + + for i := 0; i < count; i++ { + node := mock.Node() + node.Datacenter = datacenters[i%2] + node.Meta = map[string]string{} + node.Meta["rack"] = fmt.Sprintf("r%d", i%racks) + cpuShares := 14000 + memoryMB := 32000 + diskMB := 100 * 1024 + + node.NodeResources = &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: int64(cpuShares), + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: int64(memoryMB), + }, + Disk: structs.NodeDiskResources{ + DiskMB: int64(diskMB), + }, + Networks: []*structs.NetworkResource{ + { + Mode: "host", + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + } + + err := h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node) + if err != nil { + panic(err) + } + } +} diff --git a/scheduler/benchmarks/helpers_test.go b/scheduler/benchmarks/helpers_test.go new file mode 100644 index 000000000000..be4092668ef9 --- /dev/null +++ b/scheduler/benchmarks/helpers_test.go @@ -0,0 +1,98 @@ +package benchmarks + +// Test helper functions for running scheduling tests and benchmarks +// against real world state snapshots or data directories. These live +// here and not in the the parent scheduler package because it would +// create circular imports between the scheduler and raftutils package +// (via the nomad package) + +import ( + "errors" + "os" + "runtime/pprof" + "testing" + + "github.com/hashicorp/nomad/helper/raftutil" + "github.com/hashicorp/nomad/scheduler" + "github.com/stretchr/testify/require" +) + +// NewBenchmarkingHarness creates a starting test harness with state +// store. The starting contents of the state store depends on which +// env var is set: +// - NOMAD_BENCHMARK_DATADIR: path to data directory +// - NOMAD_BENCHMARK_SNAPSHOT: path to raft snapshot +// - neither: empty starting state +func NewBenchmarkingHarness(t testing.TB) *scheduler.Harness { + // create the Harness and starting state. + datadir := os.Getenv("NOMAD_BENCHMARK_DATADIR") + if datadir != "" { + h, err := NewHarnessFromDataDir(t, datadir) + require.NoError(t, err) + return h + } else { + snapshotPath := os.Getenv("NOMAD_BENCHMARK_SNAPSHOT") + if snapshotPath != "" { + h, err := NewHarnessFromSnapshot(t, snapshotPath) + require.NoError(t, err) + return h + } + } + return scheduler.NewHarness(t) +} + +// NewHarnessFromDataDir creates a new scheduler test harness with +// state loaded from an existing datadir. +func NewHarnessFromDataDir(t testing.TB, datadirPath string) (*scheduler.Harness, error) { + if datadirPath == "" { + return nil, errors.New("datadir path was not set") + } + fsm, err := raftutil.NewFSM(datadirPath) + if err != nil { + return nil, err + } + _, _, err = fsm.ApplyAll() + if err != nil { + return nil, err + } + + return scheduler.NewHarnessWithState(t, fsm.State()), nil +} + +// NewHarnessFromDataDir creates a new harness with state loaded +// from an existing raft snapshot. +func NewHarnessFromSnapshot(t testing.TB, snapshotPath string) (*scheduler.Harness, error) { + if snapshotPath == "" { + return nil, errors.New("snapshot path was not set") + } + f, err := os.Open(snapshotPath) + if err != nil { + return nil, err + } + defer f.Close() + + state, _, err := raftutil.RestoreFromArchive(f) + if err != nil { + return nil, err + } + + return scheduler.NewHarnessWithState(t, state), nil +} + +// runCPUProfile is a helper function that sets up a CPU profile and +// returns a "done" function which should be deferred by the caller. +func runCPUProfile(t *testing.T, cpuProfilePath string) func() { + if cpuProfilePath != "" { + profile, err := os.Create(cpuProfilePath) + require.NoError(t, err, "could not create file for CPU profile") + + err = pprof.StartCPUProfile(profile) + require.NoError(t, err, "could not start CPU profile") + + return func() { + pprof.StopCPUProfile() + profile.Close() + } + } + return func() {} +}