Skip to content

Commit

Permalink
Rollback is supported when the allocate callback function fails to be…
Browse files Browse the repository at this point in the history
… executed, for example, in the GPU allocation scenario

Signed-off-by: wangyang0616 <wangyang8126@gmail.com>
  • Loading branch information
wangyang0616 committed Oct 16, 2024
1 parent 682c39a commit 2d1bc13
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
1 change: 1 addition & 0 deletions pkg/scheduler/framework/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// Event structure
type Event struct {
Task *api.TaskInfo
Err error
}

// EventHandler structure
Expand Down
39 changes: 37 additions & 2 deletions pkg/scheduler/framework/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,15 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
return fmt.Errorf("failed to find job %s", task.Job)
}

defer func() {
if err != nil {
if err := job.UpdateTaskStatus(task, api.Pending); err != nil {
klog.Errorf("Revert: Failed to update task <%v/%v> status to %v when allocating in Session <%v>: %v",
task.Namespace, task.Name, api.Pending, s.ssn.UID, err)
}
}
}()

task.NodeName = hostname
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
Expand All @@ -271,12 +280,38 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
return fmt.Errorf("failed to find node %s", hostname)
}

defer func() {
if err != nil {
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.RemoveTask(task); err != nil {
klog.Errorf("Revert: failed to remove task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
}
klog.V(3).Infof("Revert: After remove allocate Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
}
}
}()

// Callbacks
for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
eventInfo := &Event{
Task: task,
})
}
eh.AllocateFunc(eventInfo)
if eventInfo.Err != nil {
klog.Errorf("Failed to exec allocate callback functions for task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, eventInfo.Err)
for _, reh := range s.ssn.eventHandlers {
if reh.DeallocateFunc != nil {
reh.DeallocateFunc(&Event{
Task: task,
})
}
}
return fmt.Errorf("Failed to exec allocate callback functions")
}
}
}

Expand Down

0 comments on commit 2d1bc13

Please sign in to comment.