diff --git a/market/deal_ingest_snap.go b/market/deal_ingest_snap.go index 6d3b4b0c6..619aee99a 100644 --- a/market/deal_ingest_snap.go +++ b/market/deal_ingest_snap.go @@ -300,13 +300,16 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add // minExpiration = piece.DealSchedule.EndEpoch // ideal expiration = minExpiration + 2 days err = tx.Select(&candidates, ` - SELECT sector_num, expiration_epoch - FROM sectors_meta - WHERE is_cc = true AND sp_id = $4 - AND expiration_epoch IS NOT NULL - AND expiration_epoch > $1 - AND ($2 = 0 OR expiration_epoch < $2) - ORDER BY ABS(expiration_epoch - ($1 + $3)) + SELECT sm.sector_num, sm.expiration_epoch + FROM sectors_meta sm + LEFT JOIN sectors_snap_pipeline ssp on sm.sp_id = ssp.sp_id and sm.sector_num = ssp.sector_number + LEFT JOIN open_sector_pieces osp on sm.sp_id = osp.sp_id and sm.sector_num = osp.sector_number and osp.piece_index = 0 + WHERE sm.is_cc = true AND ssp.start_time IS NULL AND osp.created_at IS NULL + AND sm.sp_id = $4 + AND sm.expiration_epoch IS NOT NULL + AND sm.expiration_epoch > $1 + AND ($2 = 0 OR sm.expiration_epoch < $2) + ORDER BY ABS(sm.expiration_epoch - ($1 + $3)) LIMIT 10 `, int64(piece.DealSchedule.EndEpoch), maxExpiration, IdealEndEpochBuffer, p.mid) if err != nil { diff --git a/tasks/snap/finalize_pieces.go b/tasks/snap/finalize_pieces.go new file mode 100644 index 000000000..18c020409 --- /dev/null +++ b/tasks/snap/finalize_pieces.go @@ -0,0 +1,49 @@ +package snap + +import ( + "context" + "net/url" + "strconv" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/curio/harmony/harmonydb" +) + +func DropSectorPieceRefsSnap(ctx context.Context, db *harmonydb.DB, sid abi.SectorID) error { + var PieceURL []struct { + URL string `db:"data_url"` + } + + err := db.Select(ctx, &PieceURL, `SELECT data_url FROM sectors_snap_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sid.Miner, sid.Number) + if err != nil { + return xerrors.Errorf("getting piece url: %w", err) + } + + for _, pu := range PieceURL { + gourl, err := url.Parse(pu.URL) + if err != nil { + log.Errorw("failed to parse piece url", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number) + continue + } + + if gourl.Scheme == "pieceref" { + refID, err := strconv.ParseInt(gourl.Opaque, 10, 64) + if err != nil { + log.Errorw("failed to parse piece ref id", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number) + continue + } + + n, err := db.Exec(ctx, `DELETE FROM parked_piece_refs WHERE ref_id = $1`, refID) + if err != nil { + log.Errorw("failed to delete piece ref", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number) + } + + log.Debugw("deleted piece ref", "url", pu.URL, "miner", sid.Miner, "sector", sid.Number, "rows", n) + } + } + + return err +} diff --git a/tasks/snap/task_encode.go b/tasks/snap/task_encode.go index 1c932bae8..2fc327fd7 100644 --- a/tasks/snap/task_encode.go +++ b/tasks/snap/task_encode.go @@ -106,6 +106,10 @@ func (e *EncodeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("updating sector pipeline: %w", err) } + if err := DropSectorPieceRefsSnap(ctx, e.db, sref.ID); err != nil { + return true, xerrors.Errorf("dropping piece refs: %w", err) + } + return true, nil } diff --git a/web/api/webrpc/upgrade.go b/web/api/webrpc/upgrade.go index 109d2ae76..59e09087f 100644 --- a/web/api/webrpc/upgrade.go +++ b/web/api/webrpc/upgrade.go @@ -1,6 +1,12 @@ package webrpc -import "context" +import ( + "context" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/curio/tasks/snap" +) type UpgradeSector struct { SpID uint64 `db:"sp_id"` @@ -38,3 +44,13 @@ func (a *WebRPC) UpgradeResetTaskIDs(ctx context.Context, spid, sectorNum uint64 _, err := a.deps.DB.Exec(ctx, `SELECT unset_task_id_snap($1, $2)`, spid, sectorNum) return err } + +func (a *WebRPC) UpgradeDelete(ctx context.Context, spid, sectorNum uint64) error { + if err := snap.DropSectorPieceRefsSnap(ctx, a.deps.DB, abi.SectorID{Miner: abi.ActorID(spid), Number: abi.SectorNumber(sectorNum)}); err != nil { + // bad, but still do best we can and continue + log.Errorw("failed to drop sector piece refs", "error", err) + } + + _, err := a.deps.DB.Exec(ctx, `DELETE FROM sectors_snap_pipeline WHERE sp_id = $1 AND sector_number = $2`, spid, sectorNum) + return err +} diff --git a/web/static/snap/upgrade-sectors.mjs b/web/static/snap/upgrade-sectors.mjs index 4f88ba4ae..8a38398cb 100644 --- a/web/static/snap/upgrade-sectors.mjs +++ b/web/static/snap/upgrade-sectors.mjs @@ -2,6 +2,16 @@ import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/al import RPCCall from '/lib/jsonrpc.mjs'; class UpgradeSectors extends LitElement { + static styles = css` + .btn-delete { + background-color: red; + color: white; + font-size: 0.8em; + padding: 2px 5px; + margin-left: 5px; + } + `; + constructor() { super(); this.data = []; @@ -55,6 +65,9 @@ class UpgradeSectors extends LitElement { ${ '' /*todo: this button is a massive footgun, it should get some more safety*/ } + ${entry.Failed ? html` + + ` : ''} `)} @@ -63,4 +76,4 @@ class UpgradeSectors extends LitElement { `; } } -customElements.define('upgrade-sectors', UpgradeSectors); \ No newline at end of file +customElements.define('upgrade-sectors', UpgradeSectors);