diff --git a/.github/workflows/e2e_spark.yaml b/.github/workflows/e2e_spark.yaml index 126ad633c1..068863c1c2 100644 --- a/.github/workflows/e2e_spark.yaml +++ b/.github/workflows/e2e_spark.yaml @@ -87,7 +87,7 @@ jobs: kubectl describe node - name: Upload Spark on K8S integration tests log files if: failure() - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: spark-on-kubernetes-it-log path: | diff --git a/pkg/scheduler/framework/event.go b/pkg/scheduler/framework/event.go index d21f5e9fdb..dd4c42d3d0 100644 --- a/pkg/scheduler/framework/event.go +++ b/pkg/scheduler/framework/event.go @@ -23,6 +23,7 @@ import ( // Event structure type Event struct { Task *api.TaskInfo + Err error } // EventHandler structure diff --git a/pkg/scheduler/framework/statement.go b/pkg/scheduler/framework/statement.go index 22bd7ebb92..71da984f92 100644 --- a/pkg/scheduler/framework/statement.go +++ b/pkg/scheduler/framework/statement.go @@ -256,6 +256,15 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er return fmt.Errorf("failed to find job %s", task.Job) } + defer func() { + if err != nil { + if err := job.UpdateTaskStatus(task, api.Pending); err != nil { + klog.Errorf("Revert: Failed to update task <%v/%v> status to %v when allocating in Session <%v>: %v", + task.Namespace, task.Name, api.Pending, s.ssn.UID, err) + } + } + }() + task.NodeName = hostname if node, found := s.ssn.Nodes[hostname]; found { if err := node.AddTask(task); err != nil { @@ -271,12 +280,41 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er return fmt.Errorf("failed to find node %s", hostname) } + defer func() { + if err != nil { + if node, found := s.ssn.Nodes[hostname]; found { + if err := node.RemoveTask(task); err != nil { + klog.Errorf("Revert: failed to remove task <%v/%v> to node <%v> when allocating in Session <%v>: %v", + task.Namespace, task.Name, hostname, s.ssn.UID, err) + } + klog.V(3).Infof("Revert: After remove allocate Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>", + task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing) + } + } + }() + + successEventHandlers := make([]*EventHandler, 0) // Callbacks for _, eh := range s.ssn.eventHandlers { if eh.AllocateFunc != nil { - eh.AllocateFunc(&Event{ + eventInfo := &Event{ Task: task, - }) + } + eh.AllocateFunc(eventInfo) + if eventInfo.Err != nil { + klog.Errorf("Failed to exec allocate callback functions for task <%v/%v> to node <%v> when allocating in Session <%v>: %v", + task.Namespace, task.Name, hostname, s.ssn.UID, eventInfo.Err) + for _, reh := range successEventHandlers { + if reh.DeallocateFunc != nil { + reh.DeallocateFunc(&Event{ + Task: task, + }) + } + } + return fmt.Errorf("Failed to exec allocate callback functions") + } else { + successEventHandlers = append(successEventHandlers, eh) + } } } diff --git a/test/e2e/schedulingaction/reclaim.go b/test/e2e/schedulingaction/reclaim.go index d5025c3f20..0c5dd3009c 100644 --- a/test/e2e/schedulingaction/reclaim.go +++ b/test/e2e/schedulingaction/reclaim.go @@ -129,6 +129,7 @@ var _ = Describe("Reclaim E2E Test", func() { }) It("Reclaim Case 3: New queue with job created no reclaim when job.PodGroup.Status.Phase pending", func() { + Skip("Occasionally Failed E2E Test Cases for Claim, See issue: #3562") q1 := e2eutil.DefaultQueue q2 := "reclaim-q2" j1 := "reclaim-j1"