Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mongo UI fixes #79

Merged
merged 6 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/curio/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/filecoin-project/curio/api/client"
"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/lib/paths"
"github.com/filecoin-project/curio/market"
"github.com/filecoin-project/curio/web"
Expand Down Expand Up @@ -244,7 +245,7 @@ func (p *CurioAPI) LogSetLevel(ctx context.Context, subsystem, level string) err
return logging.SetLogLevel(subsystem, level)
}

func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan chan struct{}) error {
func ListenAndServe(ctx context.Context, dependencies *deps.Deps, activeTasks []harmonytask.TaskInterface, shutdownChan chan struct{}) error {
fh := &paths.FetchHandler{Local: dependencies.LocalStore, PfHandler: &paths.DefaultPartialFileHandler{}}
remoteHandler := func(w http.ResponseWriter, r *http.Request) {
if !auth.HasPerm(r.Context(), nil, api.PermAdmin) {
Expand Down Expand Up @@ -291,7 +292,7 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c
eg.Go(srv.ListenAndServe)

if dependencies.Cfg.Subsystems.EnableWebGui {
web, err := web.GetSrv(ctx, dependencies)
web, err := web.GetSrv(ctx, dependencies, activeTasks)
if err != nil {
return err
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/curio/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/pkg/errors"
Expand Down Expand Up @@ -99,6 +100,13 @@ var runCmd = &cli.Command{
log.Errorf("ensuring tempdir exists: %s", err)
}

if os.Getenv("GOLOG_FILE") != "" {
err := os.MkdirAll(filepath.Dir(os.Getenv("GOLOG_FILE")), 0755)
if err != nil {
return xerrors.Errorf("ensuring log file parent exists: %w", err)
}
}

ctx := lcli.DaemonContext(cctx)
shutdownChan := make(chan struct{})
{
Expand Down Expand Up @@ -134,7 +142,7 @@ var runCmd = &cli.Command{

go ffiSelfTest() // Panics on failure

taskEngine, err := tasks.StartTasks(ctx, dependencies)
taskEngine, activeTasks, err := tasks.StartTasks(ctx, dependencies)

if err != nil {
return nil
Expand All @@ -145,7 +153,7 @@ var runCmd = &cli.Command{
return xerrors.Errorf("starting market RPCs: %w", err)
}

err = rpc.ListenAndServe(ctx, dependencies, shutdownChan) // Monitor for shutdown.
err = rpc.ListenAndServe(ctx, dependencies, activeTasks, shutdownChan) // Monitor for shutdown.
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func WindowPostScheduler(ctx context.Context, fc config.CurioFees, pc config.Cur
return computeTask, submitTask, recoverTask, nil
}

func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.TaskEngine, error) {
func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.TaskEngine, []harmonytask.TaskInterface, error) {
cfg := dependencies.Cfg
db := dependencies.DB
full := dependencies.Full
Expand Down Expand Up @@ -132,7 +132,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)

if err != nil {
return nil, err
return nil, nil, err
}
activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask)
}
Expand All @@ -154,7 +154,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
if cfg.Subsystems.EnableParkPiece {
parkPieceTask, err := piece2.NewParkPieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.ParkPieceMaxTasks)
if err != nil {
return nil, err
return nil, nil, err
}
cleanupPieceTask := piece2.NewCleanupPieceTask(db, must.One(slrLazy.Val()), 0)
activeTasks = append(activeTasks, parkPieceTask, cleanupPieceTask)
Expand All @@ -170,7 +170,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
if hasAnySealingTask {
sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore)
if err != nil {
return nil, err
return nil, nil, err
}
activeTasks = append(activeTasks, sealingTasks...)
}
Expand All @@ -193,14 +193,14 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task

ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr)
if err != nil {
return nil, err
return nil, nil, err
}
go machineDetails(dependencies, activeTasks, ht.ResourcesAvailable().MachineID, dependencies.Name)

if hasAnySealingTask {
watcher, err := message.NewMessageWatcher(db, ht, chainSched, full)
if err != nil {
return nil, err
return nil, nil, err
}
_ = watcher
}
Expand All @@ -209,7 +209,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
go chainSched.Run(ctx)
}

return ht, nil
return ht, activeTasks, nil
}

func addSealingTasks(
Expand Down
111 changes: 59 additions & 52 deletions cmd/curio/proving.go → cmd/curio/test-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"time"

"github.com/samber/lo"
"github.com/urfave/cli/v2"
"github.com/yugabyte/pgx/v5"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -51,7 +52,8 @@ var wdPostCmd = &cli.Command{
var wdPostTaskCmd = &cli.Command{
Name: "task",
Aliases: []string{"scheduled", "schedule", "async", "asynchronous"},
Usage: "Test the windowpost scheduler by running it on the next available curio. ",
Usage: `Test the windowpost scheduler by running it on the next available curio.
If tasks fail all retries, you will need to ctrl+c to exit.`,
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "deadline",
Expand All @@ -77,92 +79,97 @@ var wdPostTaskCmd = &cli.Command{
}
ht := ts.Height()

// It's not important to be super-accurate as it's only for basic testing.
addr, err := address.NewFromString(deps.Cfg.Addresses[0].MinerAddresses[0])
if err != nil {
return xerrors.Errorf("cannot get miner address %w", err)
}
maddr, err := address.IDFromAddress(addr)
if err != nil {
return xerrors.Errorf("cannot get miner id %w", err)
}
var taskId int64

_, err = deps.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&taskId)
var taskIDs []int64
for addr := range deps.Maddrs {
maddr, err := address.IDFromAddress(address.Address(addr))
if err != nil {
log.Error("inserting harmony_task: ", err)
return false, xerrors.Errorf("inserting harmony_task: %w", err)
return xerrors.Errorf("cannot get miner id %w", err)
}
_, err = tx.Exec(`INSERT INTO wdpost_partition_tasks
var taskId int64

_, err = deps.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&taskId)
if err != nil {
log.Error("inserting harmony_task: ", err)
return false, xerrors.Errorf("inserting harmony_task: %w", err)
}
_, err = tx.Exec(`INSERT INTO wdpost_partition_tasks
(task_id, sp_id, proving_period_start, deadline_index, partition_index) VALUES ($1, $2, $3, $4, $5)`,
taskId, maddr, ht, cctx.Uint64("deadline"), 0)
if err != nil {
log.Error("inserting wdpost_partition_tasks: ", err)
return false, xerrors.Errorf("inserting wdpost_partition_tasks: %w", err)
}
_, err = tx.Exec("INSERT INTO harmony_test (task_id) VALUES ($1)", taskId)
taskId, maddr, ht, cctx.Uint64("deadline"), 0)
if err != nil {
log.Error("inserting wdpost_partition_tasks: ", err)
return false, xerrors.Errorf("inserting wdpost_partition_tasks: %w", err)
}
_, err = tx.Exec("INSERT INTO harmony_test (task_id) VALUES ($1)", taskId)
if err != nil {
return false, xerrors.Errorf("inserting into harmony_tests: %w", err)
}
return true, nil
}, harmonydb.OptionRetry())
if err != nil {
return false, xerrors.Errorf("inserting into harmony_tests: %w", err)
return xerrors.Errorf("writing SQL transaction: %w", err)
}
return true, nil
}, harmonydb.OptionRetry())
if err != nil {
return xerrors.Errorf("writing SQL transaction: %w", err)

fmt.Printf("Inserted task %v for miner ID %v. Waiting for success ", taskId, addr)
taskIDs = append(taskIDs, taskId)
}
fmt.Printf("Inserted task %v. Waiting for success ", taskId)

var taskID int64
var result sql.NullString
var lastHistID *int64
prevFound := true
var historyIDs []int64

for {
for len(taskIDs) > 0 {
time.Sleep(time.Second)
err = deps.DB.QueryRow(ctx, `SELECT result FROM harmony_test WHERE task_id=$1`, taskId).Scan(&result)
err = deps.DB.QueryRow(ctx, `SELECT task_id, result FROM harmony_test WHERE task_id IN $1`, taskIDs).Scan(&taskID, &result)
if err != nil {
return xerrors.Errorf("reading result from harmony_test: %w", err)
}
if result.Valid {
break
log.Infof("Result for task %v: %s", taskID, result.String)
// remove task from list
taskIDs = lo.Filter(taskIDs, func(v int64, i int) bool {
return v != taskID
})
continue
}
fmt.Print(".")

{
// look at history
var histID *int64
var errmsg sql.NullString
err = deps.DB.QueryRow(ctx, `SELECT id, result, err FROM harmony_task_history WHERE task_id=$1 ORDER BY work_end DESC LIMIT 1`, taskId).Scan(&histID, &result, &errmsg)
var hist []struct {
histID int64
taskID int64
result sql.NullString
errmsg sql.NullString
}
err = deps.DB.Select(ctx, &hist, `SELECT id, task_id, result, err FROM harmony_task_history WHERE task_id IN $1 ORDER BY work_end DESC`, taskIDs)
if err != nil && err != pgx.ErrNoRows {
return xerrors.Errorf("reading result from harmony_task_history: %w", err)
}

if err == nil && histID != nil && (lastHistID == nil || *histID != *lastHistID) {
fmt.Println()
var errstr string
if errmsg.Valid {
errstr = errmsg.String
for _, h := range hist {
if !lo.Contains(historyIDs, h.histID) {
historyIDs = append(historyIDs, h.histID)
var errstr string
if h.errmsg.Valid {
errstr = h.errmsg.String
}
fmt.Printf("History for task %v historyID %d: %s\n", taskID, h.histID, errstr)
}
fmt.Printf("History %d: %s\n", *histID, errstr)
lastHistID = histID
}
}

{
// look for fails
var found bool
err = deps.DB.QueryRow(ctx, `SELECT true FROM harmony_task WHERE id=$1`, taskId).Scan(&found)
var found []int64
err = deps.DB.Select(ctx, found, `SELECT task_id FROM harmony_task WHERE id IN $1`, taskIDs)
if err != nil && err != pgx.ErrNoRows {
return xerrors.Errorf("reading result from harmony_task: %w", err)
}

if !found && !prevFound {
return xerrors.Errorf("task %d was not found in harmony_task, likely out of retries", taskId)
}
prevFound = found
log.Infof("Tasks found in harmony_task: %v", found)
}
}
fmt.Println()
log.Infof("Result: %s", result.String)
return nil
},
}
Expand Down
2 changes: 2 additions & 0 deletions documentation/en/curio-cli/curio.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ USAGE:
COMMANDS:
here, cli Compute WindowPoSt for performance and configuration testing.
task, scheduled, schedule, async, asynchronous Test the windowpost scheduler by running it on the next available curio.
If tasks fail all retries, you will need to ctrl+c to exit.
help, h Shows a list of commands or help for one command

OPTIONS:
Expand Down Expand Up @@ -447,6 +448,7 @@ OPTIONS:
```
NAME:
curio test window-post task - Test the windowpost scheduler by running it on the next available curio.
If tasks fail all retries, you will need to ctrl+c to exit.

USAGE:
curio test window-post task [command options] [arguments...]
Expand Down
10 changes: 5 additions & 5 deletions itests/curio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func TestCurioNewActor(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

full, miner, esemble := kit.EnsembleMinimal(t,
full, miner, ensemble := kit.EnsembleMinimal(t,
kit.LatestActorsAt(-1),
kit.MockProofs(),
kit.WithSectorIndexDB(),
)

esemble.Start()
ensemble.Start()
blockTime := 100 * time.Millisecond
esemble.BeginMiningMustPost(blockTime)
ensemble.BeginMiningMustPost(blockTime)

addr := miner.OwnerKey.Address
sectorSizeInt, err := units.RAMInBytes("8MiB")
Expand Down Expand Up @@ -348,15 +348,15 @@ func ConstructCurioTest(ctx context.Context, t *testing.T, dir string, db *harmo
err = dependencies.PopulateRemainingDeps(ctx, cctx, false)
require.NoError(t, err)

taskEngine, err := tasks.StartTasks(ctx, dependencies)
taskEngine, activeTasks, err := tasks.StartTasks(ctx, dependencies)
require.NoError(t, err)

dependencies.Cfg.Subsystems.BoostAdapters = []string{fmt.Sprintf("%s:127.0.0.1:32000", maddr)}
err = lmrpc.ServeCurioMarketRPCFromConfig(dependencies.DB, dependencies.Full, dependencies.Cfg)
require.NoError(t, err)

go func() {
err = rpc.ListenAndServe(ctx, dependencies, shutdownChan) // Monitor for shutdown.
err = rpc.ListenAndServe(ctx, dependencies, activeTasks, shutdownChan) // Monitor for shutdown.
require.NoError(t, err)
}()

Expand Down
11 changes: 11 additions & 0 deletions tasks/seal/task_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ func NewFinalizeTask(max int, sp *SealPoller, sc *ffi.SealCalls, db *harmonydb.D
}
}

func (f *FinalizeTask) GetSpid(taskID int64) string {
var spid string
err := f.db.QueryRow(context.Background(), `SELECT sp_id FROM sectors_sdr_pipeline WHERE task_id_finalize = $1`, taskID).Scan(&spid)
if err != nil {
log.Errorf("getting spid: %v", err)
return ""
}
return spid

}

func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var tasks []struct {
SpID int64 `db:"sp_id"`
Expand Down
10 changes: 10 additions & 0 deletions tasks/seal/task_movestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ func (m *MoveStorageTask) TypeDetails() harmonytask.TaskTypeDetails {
}
}

func (m *MoveStorageTask) GetSpid(taskID int64) string {
var spid string
err := m.db.QueryRow(context.Background(), `SELECT sp_id FROM sectors_sdr_pipeline WHERE task_id_move_storage = $1`, taskID).Scan(&spid)
if err != nil {
log.Errorf("getting spid: %s", err)
return ""
}
return spid
}

func (m *MoveStorageTask) taskToSector(id harmonytask.TaskID) (ffi2.SectorRef, error) {
var refs []ffi2.SectorRef

Expand Down
10 changes: 10 additions & 0 deletions tasks/seal/task_porep.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ func (p *PoRepTask) TypeDetails() harmonytask.TaskTypeDetails {
return res
}

func (p *PoRepTask) GetSpid(taskID int64) string {
var spid string
err := p.db.QueryRow(context.Background(), `SELECT sp_id FROM sectors_sdr_pipeline WHERE task_id_porep = $1`, taskID).Scan(&spid)
if err != nil {
log.Errorf("getting spid: %s", err)
return ""
}
return spid
}

func (p *PoRepTask) Adder(taskFunc harmonytask.AddTaskFunc) {
p.sp.pollers[pollerPoRep].Set(taskFunc)
}
Expand Down
Loading