diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 888c10dc9a..89b6e0f6f1 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -251,6 +251,10 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a if err := stmt.Allocate(task, bestNode); err != nil { klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", task.UID, bestNode.Name, ssn.UID, err) + if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil { + klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.", + task.UID, bestNode.Name, ssn.UID, rollbackErr) + } } else { metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) @@ -266,6 +270,10 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a if err := stmt.Pipeline(task, bestNode.Name); err != nil { klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.", task.UID, bestNode.Name, ssn.UID, err) + if rollbackErr := stmt.UnPipeline(task); rollbackErr != nil { + klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.", + task.UID, bestNode.Name, ssn.UID, rollbackErr) + } } else { metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 504c635dac..25eeb0ac53 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -291,6 +291,10 @@ func preempt( if err := stmt.Pipeline(preemptor, node.Name); err != nil { klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>", preemptor.Namespace, preemptor.Name, node.Name) + if rollbackErr := stmt.UnPipeline(preemptor); rollbackErr != nil { + klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.", + preemptor.UID, node.Name, ssn.UID, rollbackErr) + } } // Ignore pipeline error, will be corrected in next scheduling loop. diff --git a/pkg/scheduler/framework/statement.go b/pkg/scheduler/framework/statement.go index 71da984f92..c4eb4ec494 100644 --- a/pkg/scheduler/framework/statement.go +++ b/pkg/scheduler/framework/statement.go @@ -143,15 +143,19 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo) error { // Pipeline the task for the node func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error { + errInfos := make([]error, 0) job, found := s.ssn.Jobs[task.Job] if found { if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil { klog.Errorf("Failed to update task <%v/%v> status to %v when pipeline in Session <%v>: %v", task.Namespace, task.Name, api.Pipelined, s.ssn.UID, err) + errInfos = append(errInfos, err) } } else { - klog.Errorf("Failed to find Job <%s> in Session <%s> index when pipeline.", + err := fmt.Errorf("Failed to find Job <%s> in Session <%s> index when pipeline.", task.Job, s.ssn.UID) + klog.Errorf("%v", err) + errInfos = append(errInfos, err) } task.NodeName = hostname @@ -160,26 +164,40 @@ func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error { if err := node.AddTask(task); err != nil { klog.Errorf("Failed to add task <%v/%v> to node <%v> when pipeline in Session <%v>: %v", task.Namespace, task.Name, hostname, s.ssn.UID, err) + errInfos = append(errInfos, err) } klog.V(3).Infof("After pipelined Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>", task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing) } else { - klog.Errorf("Failed to find Node <%s> in Session <%s> index when pipeline.", + err := fmt.Errorf("Failed to find Node <%s> in Session <%s> index when pipeline.", hostname, s.ssn.UID) + klog.Errorf("%v", err) + errInfos = append(errInfos, err) } for _, eh := range s.ssn.eventHandlers { if eh.AllocateFunc != nil { - eh.AllocateFunc(&Event{ + eventInfo := &Event{ Task: task, - }) + } + eh.AllocateFunc(eventInfo) + if eventInfo.Err != nil { + klog.Errorf("Failed to exec allocate callback functions for task <%v/%v> to node <%v> when pipeline in Session <%v>: %v", + task.Namespace, task.Name, hostname, s.ssn.UID, eventInfo.Err) + errInfos = append(errInfos, eventInfo.Err) + } } } - s.operations = append(s.operations, operation{ - name: Pipeline, - task: task, - }) + if len(errInfos) != 0 { + return fmt.Errorf("Task(%s/%s) pipeline to node(%s) error and errInfos num is %d, UnPipeline will be called later to roll back the resources and status of the task.", + task.Namespace, task.Name, hostname, len(errInfos)) + } else { + s.operations = append(s.operations, operation{ + name: Pipeline, + task: task, + }) + } return nil } @@ -212,9 +230,14 @@ func (s *Statement) UnPipeline(task *api.TaskInfo) error { for _, eh := range s.ssn.eventHandlers { if eh.DeallocateFunc != nil { - eh.DeallocateFunc(&Event{ + eventInfo := &Event{ Task: task, - }) + } + eh.DeallocateFunc(eventInfo) + if eventInfo.Err != nil { + klog.Errorf("Failed to exec deallocate callback functions for task <%v/%v> to node <%v> when pipeline in Session <%v>: %v", + task.Namespace, task.Name, task.NodeName, s.ssn.UID, eventInfo.Err) + } } } task.NodeName = "" @@ -224,20 +247,21 @@ func (s *Statement) UnPipeline(task *api.TaskInfo) error { // Allocate the task to node func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err error) { + errInfos := make([]error, 0) + hostname := nodeInfo.Name + podVolumes, err := s.ssn.cache.GetPodVolumes(task, nodeInfo.Node) if err != nil { - return err + klog.Errorf("Failed to get pod volume for task %v/%v on node %v when allocating in Session <%v>: %v", + task.Namespace, task.Name, hostname, s.ssn.UID, err) + errInfos = append(errInfos, err) } - hostname := nodeInfo.Name if err := s.ssn.cache.AllocateVolumes(task, hostname, podVolumes); err != nil { - return err + klog.Errorf("Failed to allocate volume for task %v/%v on node %v when allocating in Session <%v>: %v", + task.Namespace, task.Name, hostname, s.ssn.UID, err) + errInfos = append(errInfos, err) } - defer func() { - if err != nil { - s.ssn.cache.RevertVolumes(task, podVolumes) - } - }() task.Pod.Spec.NodeName = hostname task.PodVolumes = podVolumes @@ -248,52 +272,31 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er if err := job.UpdateTaskStatus(task, api.Allocated); err != nil { klog.Errorf("Failed to update task <%v/%v> status to %v when allocating in Session <%v>: %v", task.Namespace, task.Name, api.Allocated, s.ssn.UID, err) - return err + errInfos = append(errInfos, err) } } else { - klog.Errorf("Failed to find Job <%s> in Session <%s> index when allocating.", + err := fmt.Errorf("Failed to find Job <%s> in Session <%s> index when allocating.", task.Job, s.ssn.UID) - return fmt.Errorf("failed to find job %s", task.Job) + klog.Errorf("%v", err) + errInfos = append(errInfos, err) } - defer func() { - if err != nil { - if err := job.UpdateTaskStatus(task, api.Pending); err != nil { - klog.Errorf("Revert: Failed to update task <%v/%v> status to %v when allocating in Session <%v>: %v", - task.Namespace, task.Name, api.Pending, s.ssn.UID, err) - } - } - }() - task.NodeName = hostname if node, found := s.ssn.Nodes[hostname]; found { if err := node.AddTask(task); err != nil { klog.Errorf("Failed to add task <%v/%v> to node <%v> when allocating in Session <%v>: %v", task.Namespace, task.Name, hostname, s.ssn.UID, err) - return err + errInfos = append(errInfos, err) } klog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>", task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing) } else { - klog.Errorf("Failed to find Node <%s> in Session <%s> index when allocating.", + err := fmt.Errorf("Failed to find Node <%s> in Session <%s> index when allocating.", hostname, s.ssn.UID) - return fmt.Errorf("failed to find node %s", hostname) + klog.Errorf("%v", err) + errInfos = append(errInfos, err) } - defer func() { - if err != nil { - if node, found := s.ssn.Nodes[hostname]; found { - if err := node.RemoveTask(task); err != nil { - klog.Errorf("Revert: failed to remove task <%v/%v> to node <%v> when allocating in Session <%v>: %v", - task.Namespace, task.Name, hostname, s.ssn.UID, err) - } - klog.V(3).Infof("Revert: After remove allocate Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>", - task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing) - } - } - }() - - successEventHandlers := make([]*EventHandler, 0) // Callbacks for _, eh := range s.ssn.eventHandlers { if eh.AllocateFunc != nil { @@ -304,30 +307,31 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er if eventInfo.Err != nil { klog.Errorf("Failed to exec allocate callback functions for task <%v/%v> to node <%v> when allocating in Session <%v>: %v", task.Namespace, task.Name, hostname, s.ssn.UID, eventInfo.Err) - for _, reh := range successEventHandlers { - if reh.DeallocateFunc != nil { - reh.DeallocateFunc(&Event{ - Task: task, - }) - } - } - return fmt.Errorf("Failed to exec allocate callback functions") - } else { - successEventHandlers = append(successEventHandlers, eh) + errInfos = append(errInfos, eventInfo.Err) } } } - // Update status in session - klog.V(3).Info("Allocating operations ...") - s.operations = append(s.operations, operation{ - name: Allocate, - task: task, - }) + if len(errInfos) != 0 { + return fmt.Errorf("Task %s/%s allocate to node %s error and errInfos num is %d, UnAllocate will be called later to roll back the resources and status of the task.", + task.Namespace, task.Name, hostname, len(errInfos)) + } else { + // Update status in session + klog.V(3).Info("Allocating operations ...") + s.operations = append(s.operations, operation{ + name: Allocate, + task: task, + }) + } return nil } +// UnAllocate the pod for task +func (s *Statement) UnAllocate(task *api.TaskInfo) error { + return s.unallocate(task) +} + func (s *Statement) allocate(task *api.TaskInfo) error { if err := s.ssn.cache.AddBindTask(task); err != nil { return err