Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webui: Show task fails and allow bulk restart on PoRep page #170

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions web/api/webrpc/pipeline.go → web/api/webrpc/pipeline_porep.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
Expand Down Expand Up @@ -61,6 +62,10 @@ type PipelineTask struct {
FailedReason string `db:"failed_reason"`
}

func (pt PipelineTask) sectorID() abi.SectorID {
return abi.SectorID{Miner: abi.ActorID(pt.SpID), Number: abi.SectorNumber(pt.SectorNumber)}
}

type sectorListEntry struct {
PipelineTask

Expand All @@ -69,6 +74,9 @@ type sectorListEntry struct {
AfterSeed bool

ChainAlloc, ChainSector, ChainActive, ChainUnproven, ChainFaulty bool

MissingTasks []int64
AllTasks []int64
}

type minerBitfields struct {
Expand Down Expand Up @@ -99,6 +107,16 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e
return nil, xerrors.Errorf("failed to fetch pipeline tasks: %w", err)
}

missingTasks, err := a.pipelinePorepMissingTasks(ctx)
if err != nil {
return nil, xerrors.Errorf("failed to fetch missing tasks: %w", err)
}

missingTasksMap := make(map[abi.SectorID]porepMissingTask)
for _, mt := range missingTasks {
missingTasksMap[mt.sectorID()] = mt
}

head, err := a.deps.Chain.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("failed to fetch chain head: %w", err)
Expand Down Expand Up @@ -129,6 +147,12 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e

afterSeed := task.SeedEpoch != nil && *task.SeedEpoch <= int64(epoch)

var missingTasks, allTasks []int64
if mt, ok := missingTasksMap[task.sectorID()]; ok {
missingTasks = mt.MissingTaskIDs
allTasks = mt.AllTaskIDs
}

sectorList = append(sectorList, sectorListEntry{
PipelineTask: task,
Address: addr,
Expand All @@ -140,6 +164,9 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e
ChainActive: must.One(mbf.active.IsSet(uint64(task.SectorNumber))),
ChainUnproven: must.One(mbf.unproven.IsSet(uint64(task.SectorNumber))),
ChainFaulty: must.One(mbf.faulty.IsSet(uint64(task.SectorNumber))),

MissingTasks: missingTasks,
AllTasks: allTasks,
})
}

Expand Down Expand Up @@ -249,3 +276,87 @@ func (a *WebRPC) PorepPipelineSummary(ctx context.Context) ([]PorepPipelineSumma
}
return summaries, nil
}

func (a *WebRPC) PipelinePorepRestartAll(ctx context.Context) error {
missing, err := a.pipelinePorepMissingTasks(ctx)
if err != nil {
return err
}

for _, mt := range missing {
if len(mt.AllTaskIDs) != len(mt.MissingTaskIDs) || len(mt.MissingTaskIDs) == 0 {
continue
}

log.Infow("Restarting sector", "sector", mt.sectorID(), "missing_tasks", mt.MissingTasksCount)

if err := a.SectorResume(ctx, mt.SpID, mt.SectorNumber); err != nil {
return err
}
}
return nil
}

type porepMissingTask struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`

AllTaskIDs []int64 `db:"all_task_ids"`
MissingTaskIDs []int64 `db:"missing_task_ids"`
TotalTasks int `db:"total_tasks"`
MissingTasksCount int `db:"missing_tasks_count"`
RestartStatus string `db:"restart_status"`
}

func (pmt porepMissingTask) sectorID() abi.SectorID {
return abi.SectorID{Miner: abi.ActorID(pmt.SpID), Number: abi.SectorNumber(pmt.SectorNumber)}
}

func (a *WebRPC) pipelinePorepMissingTasks(ctx context.Context) ([]porepMissingTask, error) {
var tasks []porepMissingTask
err := a.deps.DB.Select(ctx, &tasks, `
WITH sector_tasks AS (
SELECT
sp.sp_id,
sp.sector_number,
get_sdr_pipeline_tasks(sp.sp_id, sp.sector_number) AS task_ids
FROM
sectors_sdr_pipeline sp
),
missing_tasks AS (
SELECT
st.sp_id,
st.sector_number,
st.task_ids,
array_agg(CASE WHEN ht.id IS NULL THEN task_id ELSE NULL END) AS missing_task_ids
FROM
sector_tasks st
CROSS JOIN UNNEST(st.task_ids) WITH ORDINALITY AS t(task_id, task_order)
LEFT JOIN harmony_task ht ON ht.id = task_id
GROUP BY
st.sp_id, st.sector_number, st.task_ids
)
SELECT
mt.sp_id,
mt.sector_number,
mt.task_ids AS all_task_ids,
mt.missing_task_ids,
array_length(mt.task_ids, 1) AS total_tasks,
array_length(mt.missing_task_ids, 1) AS missing_tasks_count,
CASE
WHEN array_length(mt.task_ids, 1) = array_length(mt.missing_task_ids, 1) THEN 'All tasks missing'
ELSE 'Some tasks missing'
END AS restart_status
FROM
missing_tasks mt
WHERE
array_length(mt.task_ids, 1) > 0 -- Has at least one task
AND array_length(array_remove(mt.missing_task_ids, NULL), 1) > 0 -- At least one task is missing
ORDER BY
mt.sp_id, mt.sector_number;`)
if err != nil {
return nil, xerrors.Errorf("failed to fetch missing tasks: %w", err)
}

return tasks, nil
}
4 changes: 2 additions & 2 deletions web/api/webrpc/sector.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,15 +410,15 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto
}, nil
}

func (a *WebRPC) SectorResume(ctx context.Context, spid, id int) error {
func (a *WebRPC) SectorResume(ctx context.Context, spid, id int64) error {
_, err := a.deps.DB.Exec(ctx, `SELECT unset_task_id($1, $2)`, spid, id)
if err != nil {
return xerrors.Errorf("failed to resume sector: %w", err)
}
return nil
}

func (a *WebRPC) SectorRemove(ctx context.Context, spid, id int) error {
func (a *WebRPC) SectorRemove(ctx context.Context, spid, id int64) error {
_, err := a.deps.DB.Exec(ctx, `DELETE FROM batch_sector_refs WHERE sp_id = $1 AND sector_number = $2`, spid, id)
if err != nil {
return xerrors.Errorf("failed to remove sector batch refs: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion web/static/pages/node_info/node-info.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ customElements.define('node-info',class NodeInfoElement extends LitElement {
<td>${task.ID}</td>
<td>${task.Task}</td>
<td>${task.Posted}</td>
<td>${task.PoRepSector ? html`<a href="/pages/pipeline_porep/">f0${task.PoRepSectorSP}:${task.PoRepSector}</a>` : ''}</td>
<td>${task.PoRepSector ? html`<a href="/pages/sector/?sp=f0${task.PoRepSectorSP}&id=${task.PoRepSector}">f0${task.PoRepSectorSP}:${task.PoRepSector}</a>` : ''}</td>
magik6k marked this conversation as resolved.
Show resolved Hide resolved
</tr>
`)}
</table>
Expand Down
8 changes: 8 additions & 0 deletions web/static/pages/pipeline_porep/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<script type="module" src="/ux/curio-ux.mjs"></script>
<script type="module" src="/chain-connectivity.mjs"></script>
<script type="module" src="pipeline-porep-sectors.mjs"></script>
<script type="module" src="restart-all-button.mjs"></script>
<link rel="stylesheet" href="/ux/main.css">
</head>
<body style="visibility: hidden">
Expand All @@ -16,6 +17,13 @@ <h1>Curio PoRep Pipeline</h1>
</div>
<hr/>
<div class="page">
<div class="row">
<div class="row-md-auto" style="width: 50%">
<div class="info-block">
<restart-all-button></restart-all-button>
</div>
</div>
</div>
<div class="row">
<div class="row-md-auto" style="width: 50%">
<div class="info-block">
Expand Down
41 changes: 27 additions & 14 deletions web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,18 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
<table class="porep-state">
<tbody>
<tr>
${this.renderSectorState('SDR', 1, sector.TaskSDR, sector.AfterSDR)}
${this.renderSectorState('TreeC', 1, sector.TaskTreeC, sector.AfterTreeC)}
${this.renderSectorState('Synthetic', 2, sector.TaskSynthetic, sector.AfterSynthetic)}
${this.renderSectorState('PComm Msg', 2, sector.TaskPrecommitMsg, sector.AfterPrecommitMsg)}
${this.renderSectorState('SDR', 1, sector, sector.TaskSDR, sector.AfterSDR)}
${this.renderSectorState('TreeC', 1, sector, sector.TaskTreeC, sector.AfterTreeC)}
${this.renderSectorState('Synthetic', 2, sector, sector.TaskSynthetic, sector.AfterSynthetic)}
${this.renderSectorState('PComm Msg', 2, sector, sector.TaskPrecommitMsg, sector.AfterPrecommitMsg)}
${this.renderSectorStateNoTask('PComm Wait', 2, sector.AfterPrecommitMsg, sector.AfterPrecommitMsgSuccess)}
<td rowspan=2 class="${sector.AfterPrecommitMsgSuccess?'pipeline-active':''} ${sector.AfterSeed?'pipeline-success':''}">
<div>Wait Seed</div>
<div>${sector.AfterSeed?'done':sector.SeedEpoch}</div>
</td>
${this.renderSectorState('PoRep', 2, sector.TaskPoRep, sector.AfterPoRep)}
${this.renderSectorState('Clear Cache', 1, sector.TaskFinalize, sector.AfterFinalize)}
${this.renderSectorState('Move Storage', 1, sector.TaskMoveStorage, sector.AfterMoveStorage)}
${this.renderSectorState('PoRep', 2, sector, sector.TaskPoRep, sector.AfterPoRep)}
${this.renderSectorState('Clear Cache', 1, sector, sector.TaskFinalize, sector.AfterFinalize)}
${this.renderSectorState('Move Storage', 1, sector, sector.TaskMoveStorage, sector.AfterMoveStorage)}
<td class="${sector.ChainSector ? 'pipeline-success' : (sector.ChainAlloc ? 'pipeline-active' : 'pipeline-failed')}">
<div>On Chain</div>
<div>${sector.ChainSector ? 'yes' : (sector.ChainAlloc ? 'allocated' : 'no')}</div>
Expand All @@ -127,10 +127,10 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
</td>
</tr>
<tr>
${this.renderSectorState('TreeD', 1, sector.TaskTreeD, sector.AfterTreeD)}
${this.renderSectorState('TreeR', 1, sector.TaskTreeR, sector.AfterTreeR)}
${this.renderSectorState('TreeD', 1, sector, sector.TaskTreeD, sector.AfterTreeD)}
${this.renderSectorState('TreeR', 1, sector, sector.TaskTreeR, sector.AfterTreeR)}
<!-- PC-S, PC-W, WS, PoRep -->
${this.renderSectorState('Commit Msg', 1, sector.TaskCommitMsg, sector.AfterCommitMsg)}
${this.renderSectorState('Commit Msg', 1, sector, sector.TaskCommitMsg, sector.AfterCommitMsg)}
${this.renderSectorStateNoTask('Commit Wait', 1, sector.AfterCommitMsg, sector.AfterCommitMsgSuccess)}
<td class="${sector.ChainActive ? 'pipeline-success' : 'pipeline-failed'}">
<div>Active</div>
Expand All @@ -149,13 +149,26 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
</td>
`;
}
renderSectorState(name, rowspan, task, after) {
renderSectorState(name, rowspan, sector, task, after) {
if(task) {
// sector.MissingTasks is a list of tasks
// sector.MissingTasks.includes(task) is true if task is missing
let missing = sector.MissingTasks && sector.MissingTasks.includes(task);

return html`
<td rowspan="${rowspan}" class="${missing ? 'pipeline-failed' : 'pipeline-active'}">
<div>${name}</div>
<div>T:${task}</div>
${missing ? html`<div><b>FAILED</b></div>` : ''}
</td>
`;
}

return html`
<td rowspan="${rowspan}" class="${task?'pipeline-active':''} ${after?'pipeline-success':''}">
<td rowspan="${rowspan}" class="${after?'pipeline-success':''}">
<div>${name}</div>
<div>${after?'done':task?'T:'+task:'--'}</div>
<div>${after?'done':'--'}</div>
</td>
`;
}

} );
40 changes: 40 additions & 0 deletions web/static/pages/pipeline_porep/restart-all-button.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { LitElement, html } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/all/lit-all.min.js';
import RPCCall from '/lib/jsonrpc.mjs';

class RestartAllButton extends LitElement {
static properties = {
isProcessing: { type: Boolean },
};

constructor() {
super();
this.isProcessing = false;
}

render() {
return html`
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-1BmE4kWBq78iYhFldvKuhfTAU6auU8tT94WrHftjDbrCEXSU1oBoqyl2QvZ6jIW3" crossorigin="anonymous">
<button
@click="${this.handleClick}"
class="btn ${this.isProcessing ? 'btn-secondary' : 'btn-primary'}"
?disabled="${this.isProcessing}"
>
${this.isProcessing ? 'Processing...' : 'Restart All'}
</button>
`;
}

async handleClick() {
this.isProcessing = true;
try {
await RPCCall('PipelinePorepRestartAll', []);
console.log('Restart All operation completed successfully');
} catch (error) {
console.error('Error during Restart All operation:', error);
} finally {
this.isProcessing = false;
}
}
magik6k marked this conversation as resolved.
Show resolved Hide resolved
}

customElements.define('restart-all-button', RestartAllButton);