Skip to content

Commit

Permalink
Merge pull request #11498 from filecoin-project/fix/harmony-reclaim
Browse files Browse the repository at this point in the history
harmony: Fix task reclaim on restart
  • Loading branch information
snadrus authored Dec 7, 2023
2 parents d32b8be + efb4a09 commit cf8fed9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
4 changes: 2 additions & 2 deletions lib/harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func New(
continue // not really fatal, but not great
}
}
if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) {
if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) {
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
}
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func (e *TaskEngine) pollerTryAllWork() {
continue
}
if len(unownedTasks) > 0 {
accepted := v.considerWork("poller", unownedTasks)
accepted := v.considerWork(workSourcePoller, unownedTasks)
if accepted {
return // accept new work slowly and in priority order
}
Expand Down
36 changes: 22 additions & 14 deletions lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
}
}

const (
workSourcePoller = "poller"
workSourceRecover = "recovered"
)

// considerWork is called to attempt to start work on a task-id of this task type.
// It presumes single-threaded calling, so there should not be a multi-threaded re-entry.
// The only caller should be the one work poller thread. This does spin off other threads,
Expand Down Expand Up @@ -87,22 +92,25 @@ top:
return false
}

// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
log.Error(err)
return false
}
if ct == 0 {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
var tryAgain = make([]TaskID, 0, len(ids)-1)
for _, id := range ids {
if id != *tID {
tryAgain = append(tryAgain, id)
// if recovering we don't need to try to claim anything because those tasks are already claimed by us
if from != workSourceRecover {
// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
log.Error(err)
return false
}
if ct == 0 {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
var tryAgain = make([]TaskID, 0, len(ids)-1)
for _, id := range ids {
if id != *tID {
tryAgain = append(tryAgain, id)
}
}
ids = tryAgain
goto top
}
ids = tryAgain
goto top
}

h.Count.Add(1)
Expand Down

0 comments on commit cf8fed9

Please sign in to comment.