From 6ed6295a95b11e105c5d0b33733427bed23564bd Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Fri, 4 Nov 2022 10:55:46 +0800 Subject: [PATCH] feat: add deal expired check --- storageprovider/deal_tracker.go | 58 +++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/storageprovider/deal_tracker.go b/storageprovider/deal_tracker.go index 318e9301..3fd68005 100644 --- a/storageprovider/deal_tracker.go +++ b/storageprovider/deal_tracker.go @@ -3,6 +3,7 @@ package storageprovider import ( "context" "errors" + "fmt" "strings" "time" @@ -44,6 +45,8 @@ func NewDealTracker(lc fx.Lifecycle, r repo.Repo, minerMgr minermgr.IAddrMgr, fu func (dealTracker *DealTracker) Start(ctx metrics.MetricsCtx) { dealTracker.scanDeal(ctx) ticker := time.NewTicker(dealTracker.period) + defer ticker.Stop() + for { select { case <-ticker.C: @@ -66,42 +69,56 @@ func (dealTracker *DealTracker) scanDeal(ctx metrics.MetricsCtx) { } for _, addr := range addrs { - dealTracker.checkSlash(ctx, addr, head.Key()) - dealTracker.checkPreCommitAndCommit(ctx, addr, head.Key()) - // todo check expire + err = dealTracker.checkSlash(ctx, addr, head) + if err != nil { + log.Errorf("fail to check slash %w", err) + } + + err = dealTracker.checkPreCommitAndCommit(ctx, addr, head) + if err != nil { + log.Errorf("fail to check precommit/commit/expired %w", err) + } } } -func (dealTracker *DealTracker) checkPreCommitAndCommit(ctx metrics.MetricsCtx, addr address.Address, tsk vTypes.TipSetKey) { +func (dealTracker *DealTracker) checkPreCommitAndCommit(ctx metrics.MetricsCtx, addr address.Address, ts *vTypes.TipSet) error { deals, err := dealTracker.storageRepo.GetDealByAddrAndStatus(ctx, addr, storagemarket.StorageDealAwaitingPreCommit, storagemarket.StorageDealSealing) if err != nil && !errors.Is(err, repo.ErrNotFound) { - log.Errorf("get miner %s storage deals for check StorageDealAwaitingPreCommit %w", addr, err) + return fmt.Errorf("get miner %s storage deals for check StorageDealAwaitingPreCommit %w", addr, err) } + curHeight := ts.Height() + for _, deal := range deals { + if deal.Proposal.StartEpoch > curHeight { + err = dealTracker.storageRepo.UpdateDealStatus(ctx, deal.ProposalCid, storagemarket.StorageDealExpired, "") + if err != nil { + return fmt.Errorf("update deal %d status to of miner %s expired %w", deal.DealID, addr, err) + } + log.Info("update deal %d status to of miner %s expired", deal.DealID, addr) + } + // not check market piece status , maybe skip Packing and update to proving status directly - dealProposal, err := dealTracker.fullNode.StateMarketStorageDeal(ctx, deal.DealID, tsk) + dealProposal, err := dealTracker.fullNode.StateMarketStorageDeal(ctx, deal.DealID, ts.Key()) if err != nil { // todo if deal not found maybe need to market storage deal as error - log.Errorf("get market deal for sector %d of miner %s %s", deal.SectorNumber, addr, err.Error()) - continue + return fmt.Errorf("get market deal for sector %d of miner %s %w", deal.SectorNumber, addr, err) } if dealProposal.State.SectorStartEpoch > -1 { // include in sector err = dealTracker.storageRepo.UpdateDealStatus(ctx, deal.ProposalCid, storagemarket.StorageDealActive, market.Proving) if err != nil { - log.Errorf("update deal status to active for sector %d of miner %s %w", deal.SectorNumber, addr, err) + return fmt.Errorf("update deal status to active for sector %d of miner %s %w", deal.SectorNumber, addr, err) } continue } if deal.State == storagemarket.StorageDealAwaitingPreCommit && deal.PieceStatus == market.Assigned { - preInfo, err := dealTracker.fullNode.StateSectorPreCommitInfo(ctx, addr, deal.SectorNumber, tsk) + preInfo, err := dealTracker.fullNode.StateSectorPreCommitInfo(ctx, addr, deal.SectorNumber, ts.Key()) if err != nil { if strings.Contains(err.Error(), "not found") { // todo remove this check after nv17 update continue } - log.Debugf("get precommit info for sector %d of miner %s: %s", deal.SectorNumber, addr, err) - continue + return fmt.Errorf("get precommit info for sector %d of miner %s: %w", deal.SectorNumber, addr, err) } if preInfo == nil { // precommit maybe not submitted @@ -122,31 +139,30 @@ func (dealTracker *DealTracker) checkPreCommitAndCommit(ctx metrics.MetricsCtx, err = dealTracker.storageRepo.UpdateDealStatus(ctx, deal.ProposalCid, storagemarket.StorageDealSealing, market.Packing) if err != nil { - log.Errorf("update deal status to sealing for sector %d of miner %s %w", deal.SectorNumber, addr, err) + return fmt.Errorf("update deal status to sealing for sector %d of miner %s %w", deal.SectorNumber, addr, err) } } - - // todo may skip storage dealsealing, and run into active } + return nil } -func (dealTracker *DealTracker) checkSlash(ctx metrics.MetricsCtx, addr address.Address, tsk vTypes.TipSetKey) { +func (dealTracker *DealTracker) checkSlash(ctx metrics.MetricsCtx, addr address.Address, ts *vTypes.TipSet) error { deals, err := dealTracker.storageRepo.GetDealByAddrAndStatus(ctx, addr, storagemarket.StorageDealActive) if err != nil && !errors.Is(err, repo.ErrNotFound) { - log.Errorf("get miner %s storage deals for check StorageDealActive %w", addr, err) + return fmt.Errorf("get miner %s storage deals for check StorageDealActive %w", addr, err) } for _, deal := range deals { - dealProposal, err := dealTracker.fullNode.StateMarketStorageDeal(ctx, deal.DealID, tsk) + dealProposal, err := dealTracker.fullNode.StateMarketStorageDeal(ctx, deal.DealID, ts.Key()) if err != nil { - log.Errorf("get market deal info for sector %d of miner %s %w", deal.SectorNumber, addr, err) - continue + return fmt.Errorf("get market deal info for sector %d of miner %s %w", deal.SectorNumber, addr, err) } if dealProposal.State.SlashEpoch > -1 { // include in sector err = dealTracker.storageRepo.UpdateDealStatus(ctx, deal.ProposalCid, storagemarket.StorageDealSlashed, "") if err != nil { - log.Errorf("update deal status to slash for sector %d of miner %s %w", deal.SectorNumber, addr, err) + return fmt.Errorf("update deal status to slash for sector %d of miner %s %w", deal.SectorNumber, addr, err) } } } + return nil }