Skip to content

Commit

Permalink
Merge pull request #984 from ipfs-force-community/feat/use-PreCommitS…
Browse files Browse the repository at this point in the history
…ectorBatch2

Feat/use pre commit sector batch2
  • Loading branch information
0x5459 authored Oct 12, 2023
2 parents a9db4c1 + 7aa6e0c commit b1270f0
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 69 deletions.
2 changes: 1 addition & 1 deletion damocles-manager/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ type AggregateInput struct {

type PreCommitEntry struct {
Deposit abi.TokenAmount
Pcsp *miner.PreCommitSectorParams
Pcsp *miner.SectorPreCommitInfo
}

type MessageInfo struct {
Expand Down
2 changes: 1 addition & 1 deletion damocles-manager/core/types_specs_actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type (
ExpirationExtension2 = miner9.ExpirationExtension2
ExtendSectorExpirationParams = miner9.ExtendSectorExpirationParams
ExtendSectorExpiration2Params = miner9.ExtendSectorExpiration2Params
PreCommitSectorBatchParams = miner9.PreCommitSectorBatchParams
PreCommitSectorBatchParams = miner9.PreCommitSectorBatchParams2
TerminationDeclaration = miner9.TerminationDeclaration
TerminateSectorsParams = miner9.TerminateSectorsParams
WithdrawBalanceParams = miner9.WithdrawBalanceParams
Expand Down
19 changes: 6 additions & 13 deletions damocles-manager/modules/impl/commitmgr/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/ipfs-force-community/damocles/damocles-manager/core"
)

func (p PreCommitProcessor) preCommitParams(ctx context.Context, sector core.SectorState) (*miner.PreCommitSectorParams, big.Int, core.TipSetToken, error) {
func (p PreCommitProcessor) preCommitInfo(ctx context.Context, sector core.SectorState) (*miner.SectorPreCommitInfo, big.Int, core.TipSetToken, error) {
stateMgr := p.api
tok, _, err := stateMgr.ChainHead(ctx)
if err != nil {
Expand Down Expand Up @@ -67,14 +67,18 @@ func (p PreCommitProcessor) preCommitParams(ctx context.Context, sector core.Sec
DealIDs: sector.DealIDs(),
}

if len(sector.Pieces) > 0 {
params.UnsealedCid = &sector.Pre.CommD
}

// TODO: upgrade sector

deposit, err := stateMgr.StateMinerPreCommitDepositForPower(ctx, maddr, *params, tok)
if err != nil {
return nil, big.Zero(), nil, fmt.Errorf("getting initial pledge collateral: %w", err)
}

return infoToPreCommitSectorParams(params), deposit, tok, nil
return params, deposit, tok, nil
}

func getSectorCollateral(ctx context.Context, stateMgr SealingAPI, mid abi.ActorID, sn abi.SectorNumber, tok core.TipSetToken) (abi.TokenAmount, error) {
Expand Down Expand Up @@ -103,14 +107,3 @@ func getSectorCollateral(ctx context.Context, stateMgr SealingAPI, mid abi.Actor

return collateral, nil
}

func infoToPreCommitSectorParams(info *miner.SectorPreCommitInfo) *miner.PreCommitSectorParams {
return &miner.PreCommitSectorParams{
SealProof: info.SealProof,
SectorNumber: info.SectorNumber,
SealedCID: info.SealedCID,
SealRandEpoch: info.SealRandEpoch,
DealIDs: info.DealIDs,
Expiration: info.Expiration,
}
}
58 changes: 5 additions & 53 deletions damocles-manager/modules/impl/commitmgr/precommit_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"sync"
"time"

"github.com/filecoin-project/go-address"
Expand All @@ -16,7 +15,6 @@ import (

"github.com/ipfs-force-community/damocles/damocles-manager/core"
"github.com/ipfs-force-community/damocles/damocles-manager/modules"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/logging"
"github.com/ipfs-force-community/damocles/damocles-manager/pkg/messager"
)

Expand All @@ -31,50 +29,6 @@ type PreCommitProcessor struct {
config *modules.SafeConfig
}

func (p PreCommitProcessor) processIndividually(ctx context.Context, sectors []core.SectorState, from address.Address, mid abi.ActorID, l *logging.ZapLogger) {
mcfg, err := p.config.MinerConfig(mid)
if err != nil {
l.Errorf("get miner config for %d: %s", mid, err)
return
}

wg := sync.WaitGroup{}
wg.Add(len(sectors))
for i := range sectors {
go func(idx int) {
slog := l.With("sector", sectors[idx].ID.Number)

defer wg.Done()

params, deposit, _, err := p.preCommitParams(ctx, sectors[idx])
if err != nil {
slog.Error("get pre-commit params failed: ", err)
return
}

enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
slog.Error("serialize pre-commit sector parameters failed: ", err)
return
}

if !mcfg.Commitment.Pre.SendFund {
deposit = big.Zero()
}

mcid, err := pushMessage(ctx, from, mid, deposit, stbuiltin.MethodsMiner.PreCommitSector, p.msgClient, &mcfg.Commitment.Pre.FeeConfig, enc.Bytes(), slog)
if err != nil {
slog.Error("push pre-commit single failed: ", err)
return
}

sectors[idx].MessageInfo.PreCommitCid = &mcid
slog.Info("push pre-commit success, cid: ", mcid)
}(i)
}
wg.Wait()
}

func (p PreCommitProcessor) Process(ctx context.Context, sectors []core.SectorState, mid abi.ActorID, ctrlAddr address.Address) error {
// Notice: If a sector in sectors has been sent, it's cid failed should be changed already.
plog := log.With("proc", "pre", "miner", mid, "ctrl", ctrlAddr.String(), "len", len(sectors))
Expand All @@ -83,10 +37,7 @@ func (p PreCommitProcessor) Process(ctx context.Context, sectors []core.SectorSt
defer plog.Infof("finished process, elapsed %s", time.Since(start))
defer updateSector(ctx, p.smgr, sectors, plog)

if !p.EnableBatch(mid) {
p.processIndividually(ctx, sectors, ctrlAddr, mid, plog)
return nil
}
// For precommits the only method to precommit sectors after nv21 is to use the new precommit_batch2 method

mcfg, err := p.config.MinerConfig(mid)
if err != nil {
Expand All @@ -96,7 +47,7 @@ func (p PreCommitProcessor) Process(ctx context.Context, sectors []core.SectorSt
infos := []core.PreCommitEntry{}
failed := map[abi.SectorID]struct{}{}
for _, s := range sectors {
params, deposit, _, err := p.preCommitParams(ctx, s)
params, deposit, _, err := p.preCommitInfo(ctx, s)
if err != nil {
plog.Errorf("get precommit params for %d failed: %s\n", s.ID.Number, err)
failed[s.ID] = struct{}{}
Expand Down Expand Up @@ -129,7 +80,7 @@ func (p PreCommitProcessor) Process(ctx context.Context, sectors []core.SectorSt
return fmt.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err)
}

ccid, err := pushMessage(ctx, ctrlAddr, mid, deposit, stbuiltin.MethodsMiner.PreCommitSectorBatch,
ccid, err := pushMessage(ctx, ctrlAddr, mid, deposit, stbuiltin.MethodsMiner.PreCommitSectorBatch2,
p.msgClient, &mcfg.Commitment.Pre.Batch.FeeConfig, enc.Bytes(), plog)
if err != nil {
return fmt.Errorf("push batch precommit message failed: %w", err)
Expand Down Expand Up @@ -169,7 +120,8 @@ func (p PreCommitProcessor) Threshold(mid abi.ActorID) int {
}

func (p PreCommitProcessor) EnableBatch(mid abi.ActorID) bool {
return p.config.MustMinerConfig(mid).Commitment.Pre.Batch.Enabled
// always batch after nv21
return true
}

var _ Processor = (*PreCommitProcessor)(nil)

0 comments on commit b1270f0

Please sign in to comment.