Skip to content

Commit

Permalink
core: always set scheduler config with job
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Jun 20, 2023
1 parent 2908d54 commit de31ec2
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,19 +281,7 @@ func (s *GenericScheduler) process() (bool, error) {
// Construct the placement stack
s.stack = NewGenericStack(s.batch, s.ctx)
if !s.job.Stopped() {
s.stack.SetJob(s.job)

// Fetch node pool and global scheduler configuration to determine how
// to configure the scheduler.
pool, err := s.state.NodePoolByName(ws, s.job.NodePool)
if err != nil {
return false, fmt.Errorf("failed to get job node pool '%s': %v", s.job.NodePool, err)
}
_, schedConfig, err := s.state.SchedulerConfig()
if err != nil {
return false, fmt.Errorf("failed to get scheduler configuration: %v", err)
}
s.stack.SetSchedulerConfiguration(schedConfig.WithNodePool(pool))
s.setJob(s.job)
}

// Compute the target job allocations
Expand Down Expand Up @@ -577,7 +565,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Use downgraded job in scheduling stack to honor
// old job resources and constraints
if downgradedJob != nil {
s.stack.SetJob(downgradedJob)
s.setJob(downgradedJob)
}

// Find the preferred node
Expand Down Expand Up @@ -610,7 +598,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul

// Restore stack job now that placement is done, to use plan job version
if downgradedJob != nil {
s.stack.SetJob(s.job)
s.setJob(s.job)
}

// Set fields based on if we found an allocation option
Expand Down Expand Up @@ -702,6 +690,26 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
return nil
}

// setJob updates the stack with the given job and job's node pool scheduler
// configuration.
func (s *GenericScheduler) setJob(job *structs.Job) error {
// Fetch node pool and global scheduler configuration to determine how to
// configure the scheduler.
pool, err := s.state.NodePoolByName(nil, s.job.NodePool)
if err != nil {
return fmt.Errorf("failed to get job node pool %q: %v", s.job.NodePool, err)
}

_, schedConfig, err := s.state.SchedulerConfig()
if err != nil {
return fmt.Errorf("failed to get scheduler configuration: %v", err)
}

s.stack.SetJob(s.job)
s.stack.SetSchedulerConfiguration(schedConfig.WithNodePool(pool))
return nil
}

// propagateTaskState copies task handles from previous allocations to
// replacement allocations when the previous allocation is being drained or was
// lost. Remote task drivers rely on this to reconnect to remote tasks when the
Expand Down

0 comments on commit de31ec2

Please sign in to comment.