Skip to content

Commit

Permalink
Merge pull request #3780 from wangyang0616/fix_statement_rollback
Browse files Browse the repository at this point in the history
Supports rollback when allocate callback function fails
  • Loading branch information
volcano-sh-bot authored Oct 17, 2024
2 parents 112dafd + b2ccad1 commit 0096a4c
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 63 deletions.
8 changes: 8 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
if err := stmt.Allocate(task, bestNode); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, bestNode.Name, ssn.UID, err)
if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil {
klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, rollbackErr)
}
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
Expand All @@ -266,6 +270,10 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
if err := stmt.Pipeline(task, bestNode.Name); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, err)
if rollbackErr := stmt.UnPipeline(task); rollbackErr != nil {
klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, rollbackErr)
}
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ func preempt(
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, node.Name)
if rollbackErr := stmt.UnPipeline(preemptor); rollbackErr != nil {
klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.",
preemptor.UID, node.Name, ssn.UID, rollbackErr)
}
}

// Ignore pipeline error, will be corrected in next scheduling loop.
Expand Down
130 changes: 67 additions & 63 deletions pkg/scheduler/framework/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,19 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo) error {

// Pipeline the task for the node
func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {
errInfos := make([]error, 0)
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v when pipeline in Session <%v>: %v",
task.Namespace, task.Name, api.Pipelined, s.ssn.UID, err)
errInfos = append(errInfos, err)
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when pipeline.",
err := fmt.Errorf("Failed to find Job <%s> in Session <%s> index when pipeline.",
task.Job, s.ssn.UID)
klog.Errorf("%v", err)
errInfos = append(errInfos, err)
}

task.NodeName = hostname
Expand All @@ -160,26 +164,40 @@ func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to add task <%v/%v> to node <%v> when pipeline in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
errInfos = append(errInfos, err)
}
klog.V(3).Infof("After pipelined Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when pipeline.",
err := fmt.Errorf("Failed to find Node <%s> in Session <%s> index when pipeline.",
hostname, s.ssn.UID)
klog.Errorf("%v", err)
errInfos = append(errInfos, err)
}

for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
eventInfo := &Event{
Task: task,
})
}
eh.AllocateFunc(eventInfo)
if eventInfo.Err != nil {
klog.Errorf("Failed to exec allocate callback functions for task <%v/%v> to node <%v> when pipeline in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, eventInfo.Err)
errInfos = append(errInfos, eventInfo.Err)
}
}
}

s.operations = append(s.operations, operation{
name: Pipeline,
task: task,
})
if len(errInfos) != 0 {
return fmt.Errorf("Task(%s/%s) pipeline to node(%s) error and errInfos num is %d, UnPipeline will be called later to roll back the resources and status of the task.",
task.Namespace, task.Name, hostname, len(errInfos))
} else {
s.operations = append(s.operations, operation{
name: Pipeline,
task: task,
})
}

return nil
}
Expand Down Expand Up @@ -212,9 +230,14 @@ func (s *Statement) UnPipeline(task *api.TaskInfo) error {

for _, eh := range s.ssn.eventHandlers {
if eh.DeallocateFunc != nil {
eh.DeallocateFunc(&Event{
eventInfo := &Event{
Task: task,
})
}
eh.DeallocateFunc(eventInfo)
if eventInfo.Err != nil {
klog.Errorf("Failed to exec deallocate callback functions for task <%v/%v> to node <%v> when pipeline in Session <%v>: %v",
task.Namespace, task.Name, task.NodeName, s.ssn.UID, eventInfo.Err)
}
}
}
task.NodeName = ""
Expand All @@ -224,20 +247,21 @@ func (s *Statement) UnPipeline(task *api.TaskInfo) error {

// Allocate the task to node
func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err error) {
errInfos := make([]error, 0)
hostname := nodeInfo.Name

podVolumes, err := s.ssn.cache.GetPodVolumes(task, nodeInfo.Node)
if err != nil {
return err
klog.Errorf("Failed to get pod volume for task %v/%v on node %v when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
errInfos = append(errInfos, err)
}

hostname := nodeInfo.Name
if err := s.ssn.cache.AllocateVolumes(task, hostname, podVolumes); err != nil {
return err
klog.Errorf("Failed to allocate volume for task %v/%v on node %v when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
errInfos = append(errInfos, err)
}
defer func() {
if err != nil {
s.ssn.cache.RevertVolumes(task, podVolumes)
}
}()

task.Pod.Spec.NodeName = hostname
task.PodVolumes = podVolumes
Expand All @@ -248,52 +272,31 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v when allocating in Session <%v>: %v",
task.Namespace, task.Name, api.Allocated, s.ssn.UID, err)
return err
errInfos = append(errInfos, err)
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when allocating.",
err := fmt.Errorf("Failed to find Job <%s> in Session <%s> index when allocating.",
task.Job, s.ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
klog.Errorf("%v", err)
errInfos = append(errInfos, err)
}

defer func() {
if err != nil {
if err := job.UpdateTaskStatus(task, api.Pending); err != nil {
klog.Errorf("Revert: Failed to update task <%v/%v> status to %v when allocating in Session <%v>: %v",
task.Namespace, task.Name, api.Pending, s.ssn.UID, err)
}
}
}()

task.NodeName = hostname
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to add task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
return err
errInfos = append(errInfos, err)
}
klog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when allocating.",
err := fmt.Errorf("Failed to find Node <%s> in Session <%s> index when allocating.",
hostname, s.ssn.UID)
return fmt.Errorf("failed to find node %s", hostname)
klog.Errorf("%v", err)
errInfos = append(errInfos, err)
}

defer func() {
if err != nil {
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.RemoveTask(task); err != nil {
klog.Errorf("Revert: failed to remove task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
}
klog.V(3).Infof("Revert: After remove allocate Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
}
}
}()

successEventHandlers := make([]*EventHandler, 0)
// Callbacks
for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
Expand All @@ -304,30 +307,31 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
if eventInfo.Err != nil {
klog.Errorf("Failed to exec allocate callback functions for task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, eventInfo.Err)
for _, reh := range successEventHandlers {
if reh.DeallocateFunc != nil {
reh.DeallocateFunc(&Event{
Task: task,
})
}
}
return fmt.Errorf("Failed to exec allocate callback functions")
} else {
successEventHandlers = append(successEventHandlers, eh)
errInfos = append(errInfos, eventInfo.Err)
}
}
}

// Update status in session
klog.V(3).Info("Allocating operations ...")
s.operations = append(s.operations, operation{
name: Allocate,
task: task,
})
if len(errInfos) != 0 {
return fmt.Errorf("Task %s/%s allocate to node %s error and errInfos num is %d, UnAllocate will be called later to roll back the resources and status of the task.",
task.Namespace, task.Name, hostname, len(errInfos))
} else {
// Update status in session
klog.V(3).Info("Allocating operations ...")
s.operations = append(s.operations, operation{
name: Allocate,
task: task,
})
}

return nil
}

// UnAllocate the pod for task
func (s *Statement) UnAllocate(task *api.TaskInfo) error {
return s.unallocate(task)
}

func (s *Statement) allocate(task *api.TaskInfo) error {
if err := s.ssn.cache.AddBindTask(task); err != nil {
return err
Expand Down

0 comments on commit 0096a4c

Please sign in to comment.