Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lotus-provider: Wait for the correct taskID #11493

Merged
merged 3 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func New(
TaskTypeDetails: c.TypeDetails(),
TaskEngine: e,
}

if len(h.Name) > 16 {
return nil, fmt.Errorf("task name too long: %s, max 16 characters", h.Name)
}

e.handlers = append(e.handlers, &h)
e.taskMap[h.TaskTypeDetails.Name] = &h
}
Expand Down
20 changes: 16 additions & 4 deletions provider/lpmessage/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,23 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return cid.Undef, xerrors.Errorf("marshaling message: %w", err)
}

var sendTaskID *harmonytask.TaskID
taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
_, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`,
msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id)
if err != nil {
return false, xerrors.Errorf("inserting message into db: %w", err)
}

sendTaskID = &id

return true, nil
})

if sendTaskID == nil {
return cid.Undef, xerrors.Errorf("failed to add task")
}

// wait for exec
var (
pollInterval = 50 * time.Millisecond
Expand All @@ -347,10 +354,10 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS

for {
var err error
var sigCidStr, sendError string
var sigCidStr, sendError *string
var sendSuccess *bool

err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError)
err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, &sendTaskID).Scan(&sigCidStr, &sendSuccess, &sendError)
if err != nil {
return cid.Undef, xerrors.Errorf("getting cid for task: %w", err)
}
Expand All @@ -366,10 +373,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
continue
}

if sigCidStr == nil || sendError == nil {
// should never happen because sendSuccess is already not null here
return cid.Undef, xerrors.Errorf("got null values for sigCidStr or sendError, this should never happen")
}

if !*sendSuccess {
sendErr = xerrors.Errorf("send error: %s", sendError)
sendErr = xerrors.Errorf("send error: %s", *sendError)
} else {
sigCid, err = cid.Parse(sigCidStr)
sigCid, err = cid.Parse(*sigCidStr)
if err != nil {
return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion provider/lpwindow/recover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *h
func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: 128,
Name: "WdPostRecoverDeclare",
Name: "WdPostRecover",
Cost: resources.Resources{
Cpu: 1,
Gpu: 0,
Expand Down