Skip to content

Commit

Permalink
Merge pull request #11513 from filecoin-project/lotus-provider-backports
Browse files Browse the repository at this point in the history
backports: lotus-provider: fixes caught in rc1-testing
  • Loading branch information
snadrus authored Dec 18, 2023
2 parents eb0c5cb + 970ce52 commit bc5a2a6
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 25 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ docsgen-md-bin: api-gen actors-gen
docsgen-openrpc-bin: api-gen actors-gen
$(GOCC) build $(GOFLAGS) -o docgen-openrpc ./api/docgen-openrpc/cmd

docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker
docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker docsgen-md-provider

docsgen-md-full: docsgen-md-bin
./docgen-md "api/api_full.go" "FullNode" "api" "./api" > documentation/en/api-v1-unstable-methods.md
Expand All @@ -365,6 +365,8 @@ docsgen-md-storage: docsgen-md-bin
./docgen-md "api/api_storage.go" "StorageMiner" "api" "./api" > documentation/en/api-v0-methods-miner.md
docsgen-md-worker: docsgen-md-bin
./docgen-md "api/api_worker.go" "Worker" "api" "./api" > documentation/en/api-v0-methods-worker.md
docsgen-md-provider: docsgen-md-bin
./docgen-md "api/api_lp.go" "Provider" "api" "./api" > documentation/en/api-v0-methods-provider.md

docsgen-openrpc: docsgen-openrpc-full docsgen-openrpc-storage docsgen-openrpc-worker docsgen-openrpc-gateway

Expand Down
4 changes: 4 additions & 0 deletions api/docgen/docgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,10 @@ func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []r
i = &api.GatewayStruct{}
t = reflect.TypeOf(new(struct{ api.Gateway })).Elem()
permStruct = append(permStruct, reflect.TypeOf(api.GatewayStruct{}.Internal))
case "Provider":
i = &api.LotusProviderStruct{}
t = reflect.TypeOf(new(struct{ api.LotusProvider })).Elem()
permStruct = append(permStruct, reflect.TypeOf(api.LotusProviderStruct{}.Internal))
default:
panic("unknown type")
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/lotus-provider/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,17 @@ func fromMiner(cctx *cli.Context) (err error) {
dbSettings += ` --db-name="` + smCfg.HarmonyDB.Database + `"`
}

var layerMaybe string
if name != "base" {
layerMaybe = "--layer=" + name
}

msg += `
To work with the config:
` + cliCommandColor(`lotus-provider `+dbSettings+` config help `)
msg += `
To run Lotus Provider: in its own machine or cgroup without other files, use the command:
` + cliCommandColor(`lotus-provider `+dbSettings+` run --layers="`+name+`"`)
` + cliCommandColor(`lotus-provider `+dbSettings+` run `+layerMaybe)
fmt.Println(msg)
return nil
}
2 changes: 1 addition & 1 deletion cmd/lotus-provider/proving.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ var wdPostTaskCmd = &cli.Command{
}
fmt.Print(".")
}
log.Infof("Result:", result.String)
log.Infof("Result: %s", result.String)
return nil
},
}
Expand Down
10 changes: 9 additions & 1 deletion cmd/lotus-provider/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ var runCmd = &cli.Command{
}
}
log.Infow("This lotus_provider instance handles",
"miner_addresses", maddrs,
"miner_addresses", minerAddressesToStrings(maddrs),
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))

taskEngine, err := harmonytask.New(db, activeTasks, deps.listenAddr)
Expand Down Expand Up @@ -457,3 +457,11 @@ func (p *ProviderAPI) Shutdown(context.Context) error {
close(p.ShutdownChan)
return nil
}

func minerAddressesToStrings(maddrs []dtypes.MinerAddress) []string {
strs := make([]string, len(maddrs))
for i, addr := range maddrs {
strs[i] = address.Address(addr).String()
}
return strs
}
25 changes: 25 additions & 0 deletions documentation/en/api-v0-methods-provider.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Groups
* [](#)
* [Shutdown](#Shutdown)
* [Version](#Version)
##


### Shutdown


Perms: admin

Inputs: `null`

Response: `{}`

### Version


Perms: admin

Inputs: `null`

Response: `131840`

9 changes: 7 additions & 2 deletions lib/harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func New(
TaskTypeDetails: c.TypeDetails(),
TaskEngine: e,
}

if len(h.Name) > 16 {
return nil, fmt.Errorf("task name too long: %s, max 16 characters", h.Name)
}

e.handlers = append(e.handlers, &h)
e.taskMap[h.TaskTypeDetails.Name] = &h
}
Expand All @@ -171,7 +176,7 @@ func New(
continue // not really fatal, but not great
}
}
if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) {
if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) {
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
}
}
Expand Down Expand Up @@ -280,7 +285,7 @@ func (e *TaskEngine) pollerTryAllWork() {
continue
}
if len(unownedTasks) > 0 {
accepted := v.considerWork("poller", unownedTasks)
accepted := v.considerWork(workSourcePoller, unownedTasks)
if accepted {
return // accept new work slowly and in priority order
}
Expand Down
36 changes: 22 additions & 14 deletions lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
}
}

const (
workSourcePoller = "poller"
workSourceRecover = "recovered"
)

// considerWork is called to attempt to start work on a task-id of this task type.
// It presumes single-threaded calling, so there should not be a multi-threaded re-entry.
// The only caller should be the one work poller thread. This does spin off other threads,
Expand Down Expand Up @@ -87,22 +92,25 @@ top:
return false
}

// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
log.Error(err)
return false
}
if ct == 0 {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
var tryAgain = make([]TaskID, 0, len(ids)-1)
for _, id := range ids {
if id != *tID {
tryAgain = append(tryAgain, id)
// if recovering we don't need to try to claim anything because those tasks are already claimed by us
if from != workSourceRecover {
// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
log.Error(err)
return false
}
if ct == 0 {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
var tryAgain = make([]TaskID, 0, len(ids)-1)
for _, id := range ids {
if id != *tID {
tryAgain = append(tryAgain, id)
}
}
ids = tryAgain
goto top
}
ids = tryAgain
goto top
}

h.Count.Add(1)
Expand Down
20 changes: 16 additions & 4 deletions provider/lpmessage/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,23 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return cid.Undef, xerrors.Errorf("marshaling message: %w", err)
}

var sendTaskID *harmonytask.TaskID
taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
_, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`,
msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id)
if err != nil {
return false, xerrors.Errorf("inserting message into db: %w", err)
}

sendTaskID = &id

return true, nil
})

if sendTaskID == nil {
return cid.Undef, xerrors.Errorf("failed to add task")
}

// wait for exec
var (
pollInterval = 50 * time.Millisecond
Expand All @@ -347,10 +354,10 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS

for {
var err error
var sigCidStr, sendError string
var sigCidStr, sendError *string
var sendSuccess *bool

err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError)
err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, &sendTaskID).Scan(&sigCidStr, &sendSuccess, &sendError)
if err != nil {
return cid.Undef, xerrors.Errorf("getting cid for task: %w", err)
}
Expand All @@ -366,10 +373,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
continue
}

if sigCidStr == nil || sendError == nil {
// should never happen because sendSuccess is already not null here
return cid.Undef, xerrors.Errorf("got null values for sigCidStr or sendError, this should never happen")
}

if !*sendSuccess {
sendErr = xerrors.Errorf("send error: %s", sendError)
sendErr = xerrors.Errorf("send error: %s", *sendError)
} else {
sigCid, err = cid.Parse(sigCidStr)
sigCid, err = cid.Parse(*sigCidStr)
if err != nil {
return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion provider/lpwindow/recover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *h
func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: 128,
Name: "WdPostRecoverDeclare",
Name: "WdPostRecover",
Cost: resources.Resources{
Cpu: 1,
Gpu: 0,
Expand Down

0 comments on commit bc5a2a6

Please sign in to comment.