Skip to content

Commit

Permalink
feat: add deal expired check
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Nov 4, 2022
1 parent 4a24cee commit 6ed6295
Showing 1 changed file with 37 additions and 21 deletions.
58 changes: 37 additions & 21 deletions storageprovider/deal_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storageprovider
import (
"context"
"errors"
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -44,6 +45,8 @@ func NewDealTracker(lc fx.Lifecycle, r repo.Repo, minerMgr minermgr.IAddrMgr, fu
func (dealTracker *DealTracker) Start(ctx metrics.MetricsCtx) {
dealTracker.scanDeal(ctx)
ticker := time.NewTicker(dealTracker.period)
defer ticker.Stop()

for {
select {
case <-ticker.C:
Expand All @@ -66,42 +69,56 @@ func (dealTracker *DealTracker) scanDeal(ctx metrics.MetricsCtx) {
}

for _, addr := range addrs {
dealTracker.checkSlash(ctx, addr, head.Key())
dealTracker.checkPreCommitAndCommit(ctx, addr, head.Key())
// todo check expire
err = dealTracker.checkSlash(ctx, addr, head)
if err != nil {
log.Errorf("fail to check slash %w", err)
}

err = dealTracker.checkPreCommitAndCommit(ctx, addr, head)
if err != nil {
log.Errorf("fail to check precommit/commit/expired %w", err)
}
}
}

func (dealTracker *DealTracker) checkPreCommitAndCommit(ctx metrics.MetricsCtx, addr address.Address, tsk vTypes.TipSetKey) {
func (dealTracker *DealTracker) checkPreCommitAndCommit(ctx metrics.MetricsCtx, addr address.Address, ts *vTypes.TipSet) error {
deals, err := dealTracker.storageRepo.GetDealByAddrAndStatus(ctx, addr, storagemarket.StorageDealAwaitingPreCommit, storagemarket.StorageDealSealing)
if err != nil && !errors.Is(err, repo.ErrNotFound) {
log.Errorf("get miner %s storage deals for check StorageDealAwaitingPreCommit %w", addr, err)
return fmt.Errorf("get miner %s storage deals for check StorageDealAwaitingPreCommit %w", addr, err)
}

curHeight := ts.Height()

for _, deal := range deals {
if deal.Proposal.StartEpoch > curHeight {
err = dealTracker.storageRepo.UpdateDealStatus(ctx, deal.ProposalCid, storagemarket.StorageDealExpired, "")
if err != nil {
return fmt.Errorf("update deal %d status to of miner %s expired %w", deal.DealID, addr, err)
}
log.Info("update deal %d status to of miner %s expired", deal.DealID, addr)
}

// not check market piece status , maybe skip Packing and update to proving status directly
dealProposal, err := dealTracker.fullNode.StateMarketStorageDeal(ctx, deal.DealID, tsk)
dealProposal, err := dealTracker.fullNode.StateMarketStorageDeal(ctx, deal.DealID, ts.Key())
if err != nil {
// todo if deal not found maybe need to market storage deal as error
log.Errorf("get market deal for sector %d of miner %s %s", deal.SectorNumber, addr, err.Error())
continue
return fmt.Errorf("get market deal for sector %d of miner %s %w", deal.SectorNumber, addr, err)
}
if dealProposal.State.SectorStartEpoch > -1 { // include in sector
err = dealTracker.storageRepo.UpdateDealStatus(ctx, deal.ProposalCid, storagemarket.StorageDealActive, market.Proving)
if err != nil {
log.Errorf("update deal status to active for sector %d of miner %s %w", deal.SectorNumber, addr, err)
return fmt.Errorf("update deal status to active for sector %d of miner %s %w", deal.SectorNumber, addr, err)
}
continue
}

if deal.State == storagemarket.StorageDealAwaitingPreCommit && deal.PieceStatus == market.Assigned {
preInfo, err := dealTracker.fullNode.StateSectorPreCommitInfo(ctx, addr, deal.SectorNumber, tsk)
preInfo, err := dealTracker.fullNode.StateSectorPreCommitInfo(ctx, addr, deal.SectorNumber, ts.Key())
if err != nil {
if strings.Contains(err.Error(), "not found") { // todo remove this check after nv17 update
continue
}
log.Debugf("get precommit info for sector %d of miner %s: %s", deal.SectorNumber, addr, err)
continue
return fmt.Errorf("get precommit info for sector %d of miner %s: %w", deal.SectorNumber, addr, err)
}

if preInfo == nil { // precommit maybe not submitted
Expand All @@ -122,31 +139,30 @@ func (dealTracker *DealTracker) checkPreCommitAndCommit(ctx metrics.MetricsCtx,

err = dealTracker.storageRepo.UpdateDealStatus(ctx, deal.ProposalCid, storagemarket.StorageDealSealing, market.Packing)
if err != nil {
log.Errorf("update deal status to sealing for sector %d of miner %s %w", deal.SectorNumber, addr, err)
return fmt.Errorf("update deal status to sealing for sector %d of miner %s %w", deal.SectorNumber, addr, err)
}
}

// todo may skip storage dealsealing, and run into active
}
return nil
}

func (dealTracker *DealTracker) checkSlash(ctx metrics.MetricsCtx, addr address.Address, tsk vTypes.TipSetKey) {
func (dealTracker *DealTracker) checkSlash(ctx metrics.MetricsCtx, addr address.Address, ts *vTypes.TipSet) error {
deals, err := dealTracker.storageRepo.GetDealByAddrAndStatus(ctx, addr, storagemarket.StorageDealActive)
if err != nil && !errors.Is(err, repo.ErrNotFound) {
log.Errorf("get miner %s storage deals for check StorageDealActive %w", addr, err)
return fmt.Errorf("get miner %s storage deals for check StorageDealActive %w", addr, err)
}

for _, deal := range deals {
dealProposal, err := dealTracker.fullNode.StateMarketStorageDeal(ctx, deal.DealID, tsk)
dealProposal, err := dealTracker.fullNode.StateMarketStorageDeal(ctx, deal.DealID, ts.Key())
if err != nil {
log.Errorf("get market deal info for sector %d of miner %s %w", deal.SectorNumber, addr, err)
continue
return fmt.Errorf("get market deal info for sector %d of miner %s %w", deal.SectorNumber, addr, err)
}
if dealProposal.State.SlashEpoch > -1 { // include in sector
err = dealTracker.storageRepo.UpdateDealStatus(ctx, deal.ProposalCid, storagemarket.StorageDealSlashed, "")
if err != nil {
log.Errorf("update deal status to slash for sector %d of miner %s %w", deal.SectorNumber, addr, err)
return fmt.Errorf("update deal status to slash for sector %d of miner %s %w", deal.SectorNumber, addr, err)
}
}
}
return nil
}

0 comments on commit 6ed6295

Please sign in to comment.