Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports rollback when allocate callback function fails #3780

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
if err := stmt.Allocate(task, bestNode); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, bestNode.Name, ssn.UID, err)
if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil {
klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, rollbackErr)
}
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
Expand All @@ -266,6 +270,10 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
if err := stmt.Pipeline(task, bestNode.Name); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, err)
if rollbackErr := stmt.UnPipeline(task); rollbackErr != nil {
klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, rollbackErr)
}
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ func preempt(
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, node.Name)
if rollbackErr := stmt.UnPipeline(preemptor); rollbackErr != nil {
klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.",
preemptor.UID, node.Name, ssn.UID, rollbackErr)
}
}

// Ignore pipeline error, will be corrected in next scheduling loop.
Expand Down
130 changes: 67 additions & 63 deletions pkg/scheduler/framework/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,19 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo) error {

// Pipeline the task for the node
func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {
errInfos := make([]error, 0)
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v when pipeline in Session <%v>: %v",
task.Namespace, task.Name, api.Pipelined, s.ssn.UID, err)
errInfos = append(errInfos, err)
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when pipeline.",
err := fmt.Errorf("Failed to find Job <%s> in Session <%s> index when pipeline.",
task.Job, s.ssn.UID)
klog.Errorf("%v", err)
errInfos = append(errInfos, err)
}

task.NodeName = hostname
Expand All @@ -160,26 +164,40 @@ func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to add task <%v/%v> to node <%v> when pipeline in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
errInfos = append(errInfos, err)
}
klog.V(3).Infof("After pipelined Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when pipeline.",
err := fmt.Errorf("Failed to find Node <%s> in Session <%s> index when pipeline.",
hostname, s.ssn.UID)
klog.Errorf("%v", err)
errInfos = append(errInfos, err)
}

for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
eventInfo := &Event{
Task: task,
})
}
eh.AllocateFunc(eventInfo)
if eventInfo.Err != nil {
klog.Errorf("Failed to exec allocate callback functions for task <%v/%v> to node <%v> when pipeline in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, eventInfo.Err)
errInfos = append(errInfos, eventInfo.Err)
}
}
}

s.operations = append(s.operations, operation{
name: Pipeline,
task: task,
})
if len(errInfos) != 0 {
return fmt.Errorf("Task(%s/%s) pipeline to node(%s) error and errInfos num is %d, UnPipeline will be called later to roll back the resources and status of the task.",
task.Namespace, task.Name, hostname, len(errInfos))
} else {
s.operations = append(s.operations, operation{
name: Pipeline,
task: task,
})
}

return nil
}
Expand Down Expand Up @@ -212,9 +230,14 @@ func (s *Statement) UnPipeline(task *api.TaskInfo) error {

for _, eh := range s.ssn.eventHandlers {
if eh.DeallocateFunc != nil {
eh.DeallocateFunc(&Event{
eventInfo := &Event{
Task: task,
})
}
eh.DeallocateFunc(eventInfo)
if eventInfo.Err != nil {
klog.Errorf("Failed to exec deallocate callback functions for task <%v/%v> to node <%v> when pipeline in Session <%v>: %v",
task.Namespace, task.Name, task.NodeName, s.ssn.UID, eventInfo.Err)
}
}
}
task.NodeName = ""
Expand All @@ -224,20 +247,21 @@ func (s *Statement) UnPipeline(task *api.TaskInfo) error {

// Allocate the task to node
func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err error) {
errInfos := make([]error, 0)
hostname := nodeInfo.Name

podVolumes, err := s.ssn.cache.GetPodVolumes(task, nodeInfo.Node)
if err != nil {
return err
klog.Errorf("Failed to get pod volume for task %v/%v on node %v when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
errInfos = append(errInfos, err)
}

hostname := nodeInfo.Name
if err := s.ssn.cache.AllocateVolumes(task, hostname, podVolumes); err != nil {
return err
klog.Errorf("Failed to allocate volume for task %v/%v on node %v when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
errInfos = append(errInfos, err)
}
defer func() {
if err != nil {
s.ssn.cache.RevertVolumes(task, podVolumes)
}
}()

task.Pod.Spec.NodeName = hostname
task.PodVolumes = podVolumes
Expand All @@ -248,52 +272,31 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v when allocating in Session <%v>: %v",
task.Namespace, task.Name, api.Allocated, s.ssn.UID, err)
return err
errInfos = append(errInfos, err)
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when allocating.",
err := fmt.Errorf("Failed to find Job <%s> in Session <%s> index when allocating.",
task.Job, s.ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
klog.Errorf("%v", err)
errInfos = append(errInfos, err)
}

defer func() {
if err != nil {
if err := job.UpdateTaskStatus(task, api.Pending); err != nil {
klog.Errorf("Revert: Failed to update task <%v/%v> status to %v when allocating in Session <%v>: %v",
task.Namespace, task.Name, api.Pending, s.ssn.UID, err)
}
}
}()

task.NodeName = hostname
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to add task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
return err
errInfos = append(errInfos, err)
}
klog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
klog.Errorf("Failed to find Node <%s> in Session <%s> index when allocating.",
err := fmt.Errorf("Failed to find Node <%s> in Session <%s> index when allocating.",
hostname, s.ssn.UID)
return fmt.Errorf("failed to find node %s", hostname)
klog.Errorf("%v", err)
errInfos = append(errInfos, err)
}

defer func() {
if err != nil {
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.RemoveTask(task); err != nil {
klog.Errorf("Revert: failed to remove task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
}
klog.V(3).Infof("Revert: After remove allocate Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
}
}
}()

successEventHandlers := make([]*EventHandler, 0)
// Callbacks
for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
Expand All @@ -304,30 +307,31 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
if eventInfo.Err != nil {
klog.Errorf("Failed to exec allocate callback functions for task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, eventInfo.Err)
for _, reh := range successEventHandlers {
if reh.DeallocateFunc != nil {
reh.DeallocateFunc(&Event{
Task: task,
})
}
}
return fmt.Errorf("Failed to exec allocate callback functions")
} else {
successEventHandlers = append(successEventHandlers, eh)
errInfos = append(errInfos, eventInfo.Err)
}
}
}

// Update status in session
klog.V(3).Info("Allocating operations ...")
s.operations = append(s.operations, operation{
name: Allocate,
task: task,
})
if len(errInfos) != 0 {
return fmt.Errorf("Task %s/%s allocate to node %s error and errInfos num is %d, UnAllocate will be called later to roll back the resources and status of the task.",
task.Namespace, task.Name, hostname, len(errInfos))
} else {
// Update status in session
klog.V(3).Info("Allocating operations ...")
s.operations = append(s.operations, operation{
name: Allocate,
task: task,
})
}

return nil
}

// UnAllocate the pod for task
func (s *Statement) UnAllocate(task *api.TaskInfo) error {
return s.unallocate(task)
}

func (s *Statement) allocate(task *api.TaskInfo) error {
if err := s.ssn.cache.AddBindTask(task); err != nil {
return err
Expand Down
Loading