Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backports: lotus-provider: fixes caught in rc1-testing #11513

Merged
merged 6 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ docsgen-md-bin: api-gen actors-gen
docsgen-openrpc-bin: api-gen actors-gen
$(GOCC) build $(GOFLAGS) -o docgen-openrpc ./api/docgen-openrpc/cmd

docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker
docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker docsgen-md-provider

docsgen-md-full: docsgen-md-bin
./docgen-md "api/api_full.go" "FullNode" "api" "./api" > documentation/en/api-v1-unstable-methods.md
Expand All @@ -365,6 +365,8 @@ docsgen-md-storage: docsgen-md-bin
./docgen-md "api/api_storage.go" "StorageMiner" "api" "./api" > documentation/en/api-v0-methods-miner.md
docsgen-md-worker: docsgen-md-bin
./docgen-md "api/api_worker.go" "Worker" "api" "./api" > documentation/en/api-v0-methods-worker.md
docsgen-md-provider: docsgen-md-bin
./docgen-md "api/api_lp.go" "Provider" "api" "./api" > documentation/en/api-v0-methods-provider.md

docsgen-openrpc: docsgen-openrpc-full docsgen-openrpc-storage docsgen-openrpc-worker docsgen-openrpc-gateway

Expand Down
4 changes: 4 additions & 0 deletions api/docgen/docgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,10 @@ func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []r
i = &api.GatewayStruct{}
t = reflect.TypeOf(new(struct{ api.Gateway })).Elem()
permStruct = append(permStruct, reflect.TypeOf(api.GatewayStruct{}.Internal))
case "Provider":
i = &api.LotusProviderStruct{}
t = reflect.TypeOf(new(struct{ api.LotusProvider })).Elem()
permStruct = append(permStruct, reflect.TypeOf(api.LotusProviderStruct{}.Internal))
default:
panic("unknown type")
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/lotus-provider/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,17 @@ func fromMiner(cctx *cli.Context) (err error) {
dbSettings += ` --db-name="` + smCfg.HarmonyDB.Database + `"`
}

var layerMaybe string
if name != "base" {
layerMaybe = "--layer=" + name
}

msg += `
To work with the config:
` + cliCommandColor(`lotus-provider `+dbSettings+` config help `)
msg += `
To run Lotus Provider: in its own machine or cgroup without other files, use the command:
` + cliCommandColor(`lotus-provider `+dbSettings+` run --layers="`+name+`"`)
` + cliCommandColor(`lotus-provider `+dbSettings+` run `+layerMaybe)
fmt.Println(msg)
return nil
}
2 changes: 1 addition & 1 deletion cmd/lotus-provider/proving.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ var wdPostTaskCmd = &cli.Command{
}
fmt.Print(".")
}
log.Infof("Result:", result.String)
log.Infof("Result: %s", result.String)
return nil
},
}
Expand Down
10 changes: 9 additions & 1 deletion cmd/lotus-provider/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ var runCmd = &cli.Command{
}
}
log.Infow("This lotus_provider instance handles",
"miner_addresses", maddrs,
"miner_addresses", minerAddressesToStrings(maddrs),
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))

taskEngine, err := harmonytask.New(db, activeTasks, deps.listenAddr)
Expand Down Expand Up @@ -457,3 +457,11 @@ func (p *ProviderAPI) Shutdown(context.Context) error {
close(p.ShutdownChan)
return nil
}

func minerAddressesToStrings(maddrs []dtypes.MinerAddress) []string {
strs := make([]string, len(maddrs))
for i, addr := range maddrs {
strs[i] = address.Address(addr).String()
}
return strs
}
25 changes: 25 additions & 0 deletions documentation/en/api-v0-methods-provider.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Groups
* [](#)
* [Shutdown](#Shutdown)
* [Version](#Version)
##


### Shutdown


Perms: admin

Inputs: `null`

Response: `{}`

### Version


Perms: admin

Inputs: `null`

Response: `131840`

9 changes: 7 additions & 2 deletions lib/harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func New(
TaskTypeDetails: c.TypeDetails(),
TaskEngine: e,
}

if len(h.Name) > 16 {
return nil, fmt.Errorf("task name too long: %s, max 16 characters", h.Name)
}

e.handlers = append(e.handlers, &h)
e.taskMap[h.TaskTypeDetails.Name] = &h
}
Expand All @@ -171,7 +176,7 @@ func New(
continue // not really fatal, but not great
}
}
if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) {
if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) {
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
}
}
Expand Down Expand Up @@ -280,7 +285,7 @@ func (e *TaskEngine) pollerTryAllWork() {
continue
}
if len(unownedTasks) > 0 {
accepted := v.considerWork("poller", unownedTasks)
accepted := v.considerWork(workSourcePoller, unownedTasks)
if accepted {
return // accept new work slowly and in priority order
}
Expand Down
36 changes: 22 additions & 14 deletions lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
}
}

const (
workSourcePoller = "poller"
workSourceRecover = "recovered"
)

// considerWork is called to attempt to start work on a task-id of this task type.
// It presumes single-threaded calling, so there should not be a multi-threaded re-entry.
// The only caller should be the one work poller thread. This does spin off other threads,
Expand Down Expand Up @@ -87,22 +92,25 @@ top:
return false
}

// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
log.Error(err)
return false
}
if ct == 0 {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
var tryAgain = make([]TaskID, 0, len(ids)-1)
for _, id := range ids {
if id != *tID {
tryAgain = append(tryAgain, id)
// if recovering we don't need to try to claim anything because those tasks are already claimed by us
if from != workSourceRecover {
// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
log.Error(err)
return false
}
if ct == 0 {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
var tryAgain = make([]TaskID, 0, len(ids)-1)
for _, id := range ids {
if id != *tID {
tryAgain = append(tryAgain, id)
}
}
ids = tryAgain
goto top
}
ids = tryAgain
goto top
}

h.Count.Add(1)
Expand Down
20 changes: 16 additions & 4 deletions provider/lpmessage/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,23 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return cid.Undef, xerrors.Errorf("marshaling message: %w", err)
}

var sendTaskID *harmonytask.TaskID
taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
_, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`,
msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id)
if err != nil {
return false, xerrors.Errorf("inserting message into db: %w", err)
}

sendTaskID = &id

return true, nil
})

if sendTaskID == nil {
return cid.Undef, xerrors.Errorf("failed to add task")
}

// wait for exec
var (
pollInterval = 50 * time.Millisecond
Expand All @@ -347,10 +354,10 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS

for {
var err error
var sigCidStr, sendError string
var sigCidStr, sendError *string
var sendSuccess *bool

err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError)
err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, &sendTaskID).Scan(&sigCidStr, &sendSuccess, &sendError)
if err != nil {
return cid.Undef, xerrors.Errorf("getting cid for task: %w", err)
}
Expand All @@ -366,10 +373,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
continue
}

if sigCidStr == nil || sendError == nil {
// should never happen because sendSuccess is already not null here
return cid.Undef, xerrors.Errorf("got null values for sigCidStr or sendError, this should never happen")
}

if !*sendSuccess {
sendErr = xerrors.Errorf("send error: %s", sendError)
sendErr = xerrors.Errorf("send error: %s", *sendError)
} else {
sigCid, err = cid.Parse(sigCidStr)
sigCid, err = cid.Parse(*sigCidStr)
if err != nil {
return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion provider/lpwindow/recover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *h
func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: 128,
Name: "WdPostRecoverDeclare",
Name: "WdPostRecover",
Cost: resources.Resources{
Cpu: 1,
Gpu: 0,
Expand Down