Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simulator improvements (#167) #3665

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion cmd/simulator/cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"math"
"os"

"github.com/pkg/errors"
Expand All @@ -27,6 +28,9 @@ func RootCmd() *cobra.Command {
cmd.Flags().Bool("showSchedulerLogs", false, "Show scheduler logs.")
cmd.Flags().Int("logInterval", 0, "Log summary statistics every this many events. Disabled if 0.")
cmd.Flags().String("eventsOutputFilePath", "", "Path of file to write events to.")
cmd.Flags().Bool("enableFastForward", false, "Skips schedule events when we're in a steady state")
cmd.Flags().Int("hardTerminationMinutes", math.MaxInt, "Limit the time simulated")
cmd.Flags().Int("schedulerCyclePeriodSeconds", 10, "How often we should trigger schedule events")
return cmd
}

Expand Down Expand Up @@ -57,6 +61,19 @@ func runSimulations(cmd *cobra.Command, args []string) error {
return err
}

enableFastForward, err := cmd.Flags().GetBool("enableFastForward")
if err != nil {
return err
}
hardTerminationMinutes, err := cmd.Flags().GetInt("hardTerminationMinutes")
if err != nil {
return err
}
schedulerCyclePeriodSeconds, err := cmd.Flags().GetInt("schedulerCyclePeriodSeconds")
if err != nil {
return err
}

// Load test specs. and config.
clusterSpecs, err := simulator.ClusterSpecsFromPattern(clusterPattern)
if err != nil {
Expand Down Expand Up @@ -108,7 +125,7 @@ func runSimulations(cmd *cobra.Command, args []string) error {
for _, clusterSpec := range clusterSpecs {
for _, workloadSpec := range workloadSpecs {
for schedulingConfigPath, schedulingConfig := range schedulingConfigsByFilePath {
if s, err := simulator.NewSimulator(clusterSpec, workloadSpec, schedulingConfig); err != nil {
if s, err := simulator.NewSimulator(clusterSpec, workloadSpec, schedulingConfig, enableFastForward, hardTerminationMinutes, schedulerCyclePeriodSeconds); err != nil {
return err
} else {
if !showSchedulerLogs {
Expand Down
30 changes: 23 additions & 7 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,20 @@ type Simulator struct {
SuppressSchedulerLogs bool
// For making internaltypes.ResourceList
resourceListFactory *internaltypes.ResourceListFactory
// Skips schedule events when we're in a steady state
enableFastForward bool
// Limit the time simulated
hardTerminationMinutes int
// Determines how often we trigger schedule events
schedulerCyclePeriodSeconds int
}

type StateTransition struct {
Jobs []*jobdb.Job
EventSequence *armadaevents.EventSequence
}

func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, schedulingConfig configuration.SchedulingConfig) (*Simulator, error) {
func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, schedulingConfig configuration.SchedulingConfig, enableFastForward bool, hardTerminationMinutes int, schedulerCyclePeriodSeconds int) (*Simulator, error) {
// TODO: Move clone to caller?
// Copy specs to avoid concurrent mutation.
resourceListFactory, err := internaltypes.MakeResourceListFactory(schedulingConfig.SupportedResourceTypes)
Expand All @@ -114,7 +120,7 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli
)
randomSeed := workloadSpec.RandomSeed
if randomSeed == 0 {
// Seed the RNG using the local time if no explic random seed is provided.
// Seed the RNG using the local time if no explicit random seed is provided.
randomSeed = time.Now().Unix()
}
s := &Simulator{
Expand All @@ -134,9 +140,12 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli
rate.Limit(schedulingConfig.MaximumSchedulingRate),
schedulingConfig.MaximumSchedulingBurst,
),
limiterByQueue: make(map[string]*rate.Limiter),
rand: rand.New(rand.NewSource(randomSeed)),
resourceListFactory: resourceListFactory,
limiterByQueue: make(map[string]*rate.Limiter),
rand: rand.New(rand.NewSource(randomSeed)),
resourceListFactory: resourceListFactory,
enableFastForward: enableFastForward,
hardTerminationMinutes: hardTerminationMinutes,
schedulerCyclePeriodSeconds: schedulerCyclePeriodSeconds,
}
jobDb.SetClock(s)
s.limiter.SetBurstAt(s.time, schedulingConfig.MaximumSchedulingBurst)
Expand Down Expand Up @@ -166,6 +175,9 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error {
}()
// Bootstrap the simulator by pushing an event that triggers a scheduler run.
s.pushScheduleEvent(s.time)

simTerminationTime := s.time.Add(time.Minute * time.Duration(s.hardTerminationMinutes))

// Then run the scheduler until all jobs have completed.
for s.eventLog.Len() > 0 {
select {
Expand All @@ -177,6 +189,10 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error {
return err
}
}
if s.time.After(simTerminationTime) {
ctx.Infof("Current simulated time (%s) exceeds runtime deadline (%s). Terminating", s.time, simTerminationTime)
return nil
}
}
return nil
}
Expand Down Expand Up @@ -418,9 +434,9 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
// Schedule the next run of the scheduler, unless there are no more active jobTemplates.
// TODO: Make timeout configurable.
if len(s.activeJobTemplatesById) > 0 {
s.pushScheduleEvent(s.time.Add(10 * time.Second))
s.pushScheduleEvent(s.time.Add(time.Duration(s.schedulerCyclePeriodSeconds) * time.Second))
}
if !s.shouldSchedule {
if !s.shouldSchedule && s.enableFastForward {
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion internal/scheduler/simulator/simulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
)

func TestSimulator(t *testing.T) {
enableFastForward := false
schedulerCyclePeriodSeconds := 10
tests := map[string]struct {
clusterSpec *ClusterSpec
workloadSpec *WorkloadSpec
Expand Down Expand Up @@ -434,7 +436,7 @@ func TestSimulator(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
s, err := NewSimulator(tc.clusterSpec, tc.workloadSpec, tc.schedulingConfig)
s, err := NewSimulator(tc.clusterSpec, tc.workloadSpec, tc.schedulingConfig, enableFastForward, int((tc.simulatedTimeLimit + time.Hour).Minutes()), schedulerCyclePeriodSeconds)
require.NoError(t, err)
mc := NewMetricsCollector(s.StateTransitions())
actualEventSequences := make([]*armadaevents.EventSequence, 0, 128)
Expand Down