Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports rollback when allocate callback function fails #3776

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
kubectl describe node
- name: Upload Spark on K8S integration tests log files
if: failure()
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: spark-on-kubernetes-it-log
path: |
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/framework/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// Event structure
type Event struct {
Task *api.TaskInfo
Err error
}

// EventHandler structure
Expand Down
42 changes: 40 additions & 2 deletions pkg/scheduler/framework/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,15 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
return fmt.Errorf("failed to find job %s", task.Job)
}

defer func() {
if err != nil {
if err := job.UpdateTaskStatus(task, api.Pending); err != nil {
klog.Errorf("Revert: Failed to update task <%v/%v> status to %v when allocating in Session <%v>: %v",
task.Namespace, task.Name, api.Pending, s.ssn.UID, err)
}
}
}()

task.NodeName = hostname
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
Expand All @@ -271,12 +280,41 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
return fmt.Errorf("failed to find node %s", hostname)
}

defer func() {
if err != nil {
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.RemoveTask(task); err != nil {
klog.Errorf("Revert: failed to remove task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
}
klog.V(3).Infof("Revert: After remove allocate Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
}
}
}()

successEventHandlers := make([]*EventHandler, 0)
// Callbacks
for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
eventInfo := &Event{
Task: task,
})
}
eh.AllocateFunc(eventInfo)
if eventInfo.Err != nil {
klog.Errorf("Failed to exec allocate callback functions for task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, eventInfo.Err)
for _, reh := range successEventHandlers {
if reh.DeallocateFunc != nil {
reh.DeallocateFunc(&Event{
Task: task,
})
}
}
return fmt.Errorf("Failed to exec allocate callback functions")
} else {
successEventHandlers = append(successEventHandlers, eh)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about aggregate errs and call unallocate func when there is err?

Copy link
Member Author

@wangyang0616 wangyang0616 Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When each pod allocation is successful, but the overall vcjob does not meet expectations, unallocate is used to roll back the previous resource scheduling information.

For direct resource errors and exceptions during the scheduling process, I understand that it would be more appropriate to roll back directly based on the error.

}

Expand Down
1 change: 1 addition & 0 deletions test/e2e/schedulingaction/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ var _ = Describe("Reclaim E2E Test", func() {
})

It("Reclaim Case 3: New queue with job created no reclaim when job.PodGroup.Status.Phase pending", func() {
Skip("Occasionally Failed E2E Test Cases for Claim, See issue: #3562")
q1 := e2eutil.DefaultQueue
q2 := "reclaim-q2"
j1 := "reclaim-j1"
Expand Down
Loading