diff --git a/.circleci/config.yml b/.circleci/config.yml index 50edafef296..d4220e05598 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -945,6 +945,7 @@ workflows: - build suite: itest-sector_pledge target: "./itests/sector_pledge_test.go" + resource_class: 2xlarge get-params: true - test: diff --git a/.circleci/template.yml b/.circleci/template.yml index 9011f1a867e..6e2f2764ec2 100644 --- a/.circleci/template.yml +++ b/.circleci/template.yml @@ -551,7 +551,7 @@ workflows: - build suite: itest-[[ $name ]] target: "./itests/[[ $file ]]" - [[- if or (eq $name "worker") (eq $name "deals_concurrent") (eq $name "wdpost_worker_config")]] + [[- if or (eq $name "worker") (eq $name "deals_concurrent") (eq $name "wdpost_worker_config") (eq $name "sector_pledge")]] resource_class: 2xlarge [[- end]] [[- if or (eq $name "wdpost") (eq $name "sector_pledge")]] diff --git a/itests/kit/ensemble_opts.go b/itests/kit/ensemble_opts.go index d264da2bb25..95bdd8da872 100644 --- a/itests/kit/ensemble_opts.go +++ b/itests/kit/ensemble_opts.go @@ -35,7 +35,13 @@ var DefaultEnsembleOpts = ensembleOpts{ } // MockProofs activates mock proofs for the entire ensemble. -func MockProofs() EnsembleOpt { +func MockProofs(e ...bool) EnsembleOpt { + if len(e) > 0 && !e[0] { + return func(opts *ensembleOpts) error { + return nil + } + } + return func(opts *ensembleOpts) error { opts.mockProofs = true // since we're using mock proofs, we don't need to download diff --git a/itests/sector_pledge_test.go b/itests/sector_pledge_test.go index b4e5c11337c..eb93cfe9a1c 100644 --- a/itests/sector_pledge_test.go +++ b/itests/sector_pledge_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" "github.com/filecoin-project/lotus/api" @@ -39,7 +40,7 @@ func TestPledgeSectors(t *testing.T) { defer cancel() _, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(blockTime) + ens.InterconnectAll().BeginMiningMustPost(blockTime) miner.PledgeSectors(ctx, nSectors, 0, nil) } @@ -65,12 +66,18 @@ func TestPledgeBatching(t *testing.T) { //stm: @SECTOR_PRE_COMMIT_FLUSH_001, @SECTOR_COMMIT_FLUSH_001 blockTime := 50 * time.Millisecond - runTest := func(t *testing.T, nSectors int) { + runTest := func(t *testing.T, nSectors int, aggregate bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(blockTime) + kit.QuietMiningLogs() + + client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(!aggregate), kit.MutateSealingConfig(func(sc *config.SealingConfig) { + if aggregate { + sc.AggregateAboveBaseFee = types.FIL(big.Zero()) + } + })) + ens.InterconnectAll().BeginMiningMustPost(blockTime) client.WaitTillChain(ctx, kit.HeightAtLeast(10)) @@ -114,7 +121,10 @@ func TestPledgeBatching(t *testing.T) { } t.Run("100", func(t *testing.T) { - runTest(t, 100) + runTest(t, 100, false) + }) + t.Run("10-agg", func(t *testing.T) { + runTest(t, 10, true) }) } diff --git a/node/builder_chain.go b/node/builder_chain.go index 0e465ffe1d9..2d9c0ea2e54 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -225,7 +225,7 @@ func ConfigFullNode(c interface{}) Option { // If the Eth JSON-RPC is enabled, enable storing events at the ChainStore. // This is the case even if real-time and historic filtering are disabled, // as it enables us to serve logs in eth_getTransactionReceipt. - If(cfg.Fevm.EnableEthRPC, Override(StoreEventsKey, modules.EnableStoringEvents)), + If(cfg.Fevm.EnableEthRPC || cfg.Events.EnableActorEventsAPI, Override(StoreEventsKey, modules.EnableStoringEvents)), Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr), diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index 096f27e4c95..d702d307884 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -209,7 +209,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, return nil, xerrors.Errorf("getting config: %w", err) } - if notif && total < cfg.MaxCommitBatch { + if notif && total < cfg.MaxCommitBatch && cfg.AggregateCommits { return nil, nil } @@ -233,7 +233,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, return false } - individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut() + individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut() || !cfg.AggregateCommits if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) { if ts.MinTicketBlock().ParentBaseFee.LessThan(cfg.AggregateAboveBaseFee) { @@ -331,6 +331,9 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto return nil, err } + // sort sectors by number + sort.Slice(sectors, func(i, j int) bool { return sectors[i] < sectors[j] }) + total := len(sectors) res := sealiface.CommitBatchRes{ @@ -371,10 +374,6 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto return nil, nil } - sort.Slice(infos, func(i, j int) bool { - return infos[i].Number < infos[j].Number - }) - proofs := make([][]byte, 0, total) for _, info := range infos { proofs = append(proofs, b.todo[info.Number].Proof) @@ -444,13 +443,13 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors2Params: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors3Params: %w", err) } _, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSectors3, needFunds, maxFee, enc.Bytes()) if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) { - log.Errorf("simulating CommitBatch message failed: %s", err) + log.Errorf("simulating CommitBatch message failed (%x): %s", enc.Bytes(), err) res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err) } @@ -474,7 +473,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto res.Msg = &mcid - log.Infow("Sent ProveCommitSectors2 message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos)) + log.Infow("Sent ProveCommitSectors3 message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos)) return []sealiface.CommitBatchRes{res}, nil } @@ -591,7 +590,7 @@ func (b *CommitBatcher) processBatchV1(cfg sealiface.Config, sectors []abi.Secto _, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) { - log.Errorf("simulating CommitBatch message failed: %s", err) + log.Errorf("simulating CommitBatch message failed (%x): %s", enc.Bytes(), err) res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err) } diff --git a/storage/pipeline/fsm.go b/storage/pipeline/fsm.go index ac3dafa86ec..ced6867d198 100644 --- a/storage/pipeline/fsm.go +++ b/storage/pipeline/fsm.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "reflect" + "runtime" "time" "golang.org/x/xerrors" @@ -39,8 +40,27 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface return nil, processed, nil } - return func(ctx statemachine.Context, si SectorInfo) error { - err := next(ctx, si) + return func(ctx statemachine.Context, si SectorInfo) (err error) { + // handle panics + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 1<<16) + n := runtime.Stack(buf, false) + buf = buf[:n] + + l := Log{ + Timestamp: uint64(time.Now().Unix()), + Message: fmt.Sprintf("panic: %v\n%s", r, buf), + Kind: "panic", + } + si.logAppend(l) + + err = fmt.Errorf("panic: %v\n%s", r, buf) + } + }() + + // execute the next state + err = next(ctx, si) if err != nil { log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err) return nil @@ -127,8 +147,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto ), Committing: planCommitting, CommitFinalize: planOne( - on(SectorFinalized{}, SubmitCommit), - on(SectorFinalizedAvailable{}, SubmitCommit), + on(SectorFinalized{}, SubmitCommitAggregate), + on(SectorFinalizedAvailable{}, SubmitCommitAggregate), on(SectorFinalizeFailed{}, CommitFinalizeFailed), ), SubmitCommit: planOne( @@ -674,7 +694,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err } case SectorCommitted: // the normal case e.apply(state) - state.State = SubmitCommit + state.State = SubmitCommitAggregate case SectorProofReady: // early finalize e.apply(state) state.State = CommitFinalize diff --git a/storage/pipeline/fsm_test.go b/storage/pipeline/fsm_test.go index 7d7201953e8..c403fb12993 100644 --- a/storage/pipeline/fsm_test.go +++ b/storage/pipeline/fsm_test.go @@ -70,10 +70,10 @@ func TestHappyPath(t *testing.T) { require.Equal(m.t, m.state.State, Committing) m.planSingle(SectorCommitted{}) - require.Equal(m.t, m.state.State, SubmitCommit) + require.Equal(m.t, m.state.State, SubmitCommitAggregate) - m.planSingle(SectorCommitSubmitted{}) - require.Equal(m.t, m.state.State, CommitWait) + m.planSingle(SectorCommitAggregateSent{}) + require.Equal(m.t, m.state.State, CommitAggregateWait) m.planSingle(SectorProving{}) require.Equal(m.t, m.state.State, FinalizeSector) @@ -81,7 +81,7 @@ func TestHappyPath(t *testing.T) { m.planSingle(SectorFinalized{}) require.Equal(m.t, m.state.State, Proving) - expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector, Proving} + expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving} for i, n := range notif { if n.before.State != expected[i] { t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State) @@ -135,9 +135,6 @@ func TestHappyPathFinalizeEarly(t *testing.T) { require.Equal(m.t, m.state.State, CommitFinalize) m.planSingle(SectorFinalized{}) - require.Equal(m.t, m.state.State, SubmitCommit) - - m.planSingle(SectorSubmitCommitAggregate{}) require.Equal(m.t, m.state.State, SubmitCommitAggregate) m.planSingle(SectorCommitAggregateSent{}) @@ -149,7 +146,7 @@ func TestHappyPathFinalizeEarly(t *testing.T) { m.planSingle(SectorFinalized{}) require.Equal(m.t, m.state.State, Proving) - expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving} + expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving} for i, n := range notif { if n.before.State != expected[i] { t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State) @@ -188,9 +185,9 @@ func TestCommitFinalizeFailed(t *testing.T) { require.Equal(m.t, m.state.State, CommitFinalize) m.planSingle(SectorFinalized{}) - require.Equal(m.t, m.state.State, SubmitCommit) + require.Equal(m.t, m.state.State, SubmitCommitAggregate) - expected := []SectorState{Committing, CommitFinalize, CommitFinalizeFailed, CommitFinalize, SubmitCommit} + expected := []SectorState{Committing, CommitFinalize, CommitFinalizeFailed, CommitFinalize, SubmitCommitAggregate} for i, n := range notif { if n.before.State != expected[i] { t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State) @@ -242,10 +239,10 @@ func TestSeedRevert(t *testing.T) { // not changing the seed this time _, _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) require.NoError(t, err) - require.Equal(m.t, m.state.State, SubmitCommit) + require.Equal(m.t, m.state.State, SubmitCommitAggregate) - m.planSingle(SectorCommitSubmitted{}) - require.Equal(m.t, m.state.State, CommitWait) + m.planSingle(SectorCommitAggregateSent{}) + require.Equal(m.t, m.state.State, CommitAggregateWait) m.planSingle(SectorProving{}) require.Equal(m.t, m.state.State, FinalizeSector) diff --git a/storage/pipeline/input.go b/storage/pipeline/input.go index 2bc7c2f81aa..6d41f7e81b3 100644 --- a/storage/pipeline/input.go +++ b/storage/pipeline/input.go @@ -34,12 +34,16 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e for _, piece := range sector.Pieces { used += piece.Piece().Size.Unpadded() + if !piece.HasDealInfo() { + continue + } + endEpoch, err := piece.EndEpoch() if err != nil { return xerrors.Errorf("piece.EndEpoch: %w", err) } - if piece.HasDealInfo() && endEpoch > lastDealEnd { + if endEpoch > lastDealEnd { lastDealEnd = endEpoch } } @@ -953,20 +957,30 @@ func (m *Sealing) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showO return api.SectorInfo{}, err } + nv, err := m.Api.StateNetworkVersion(ctx, types.EmptyTSK) + if err != nil { + return api.SectorInfo{}, xerrors.Errorf("getting network version: %w", err) + } + deals := make([]abi.DealID, len(info.Pieces)) pieces := make([]api.SectorPiece, len(info.Pieces)) for i, piece := range info.Pieces { - // todo make this work with DDO deals in some reasonable way - pieces[i].Piece = piece.Piece() - if !piece.HasDealInfo() || piece.Impl().PublishCid == nil { + + if !piece.HasDealInfo() { + continue + } + + pdi := piece.Impl() + if pdi.Valid(nv) != nil { continue } - pdi := piece.DealInfo().Impl() // copy pieces[i].DealInfo = &pdi - deals[i] = piece.DealInfo().Impl().DealID + if pdi.PublishCid != nil { + deals[i] = pdi.DealID + } } log := make([]api.SectorLog, len(info.Log)) diff --git a/storage/pipeline/sector_state.go b/storage/pipeline/sector_state.go index e1f5bfd69d5..9e7f75171ab 100644 --- a/storage/pipeline/sector_state.go +++ b/storage/pipeline/sector_state.go @@ -94,7 +94,7 @@ const ( CommitFinalizeFailed SectorState = "CommitFinalizeFailed" // single commit - SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain + SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain (deprecated) CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain SubmitCommitAggregate SectorState = "SubmitCommitAggregate" diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index aef394789d1..4f40ac7c7d2 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -18,7 +18,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" actorstypes "github.com/filecoin-project/go-state-types/actors" "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/go-state-types/builtin" miner2 "github.com/filecoin-project/go-state-types/builtin/v13/miner" verifreg13 "github.com/filecoin-project/go-state-types/builtin/v13/verifreg" "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" @@ -740,89 +739,10 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) } func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo) error { - // TODO: Deprecate this path, always go through batcher, just respect the AggregateCommits config in there - - cfg, err := m.getConfig() - if err != nil { - return xerrors.Errorf("getting config: %w", err) - } - - if cfg.AggregateCommits { - nv, err := m.Api.StateNetworkVersion(ctx.Context(), types.EmptyTSK) - if err != nil { - return xerrors.Errorf("getting network version: %w", err) - } - - if nv >= network.Version13 { - return ctx.Send(SectorSubmitCommitAggregate{}) - } - } - - ts, err := m.Api.ChainHead(ctx.Context()) - if err != nil { - log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err) - return nil - } - - if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) - } - - enc := new(bytes.Buffer) - params := &miner.ProveCommitSectorParams{ - SectorNumber: sector.SectorNumber, - Proof: sector.Proof, - } - - if err := params.MarshalCBOR(enc); err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)}) - } - - mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, ts.Key()) - if err != nil { - log.Errorf("handleCommitting: api error, not proceeding: %+v", err) - return nil - } - - pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key()) - if err != nil { - return xerrors.Errorf("getting precommit info: %w", err) - } - if pci == nil { - return ctx.Send(SectorCommitFailed{error: xerrors.Errorf("precommit info not found on chain")}) - } - - collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, pci.Info, ts.Key()) - if err != nil { - return xerrors.Errorf("getting initial pledge collateral: %w", err) - } - - collateral = big.Sub(collateral, pci.PreCommitDeposit) - if collateral.LessThan(big.Zero()) { - collateral = big.Zero() - } - - collateral, err = collateralSendAmount(ctx.Context(), m.Api, m.maddr, cfg, collateral) - if err != nil { - return err - } - - goodFunds := big.Add(collateral, big.Int(m.feeCfg.MaxCommitGasFee)) - - from, _, err := m.addrSel.AddressFor(ctx.Context(), m.Api, mi, api.CommitAddr, goodFunds, collateral) - if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("no good address to send commit message from: %w", err)}) - } - - // TODO: check seed / ticket / deals are up to date - mcid, err := sendMsg(ctx.Context(), m.Api, from, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes()) - if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) - } - - return ctx.Send(SectorCommitSubmitted{ - Message: mcid, - }) + // like precommit this is a deprecated state, but we keep it around for + // existing state machines + // todo: drop after nv21 + return ctx.Send(SectorSubmitCommitAggregate{}) } // processPieces returns either: diff --git a/storage/pipeline/types.go b/storage/pipeline/types.go index 48ae60546be..7b263dd6a98 100644 --- a/storage/pipeline/types.go +++ b/storage/pipeline/types.go @@ -289,10 +289,18 @@ func (sp *SafeSectorPiece) handleDealInfo(params handleDealInfoParams) error { // SectorPiece Proxy func (sp *SafeSectorPiece) Impl() piece.PieceDealInfo { + if !sp.HasDealInfo() { + return piece.PieceDealInfo{} + } + return sp.real.DealInfo.Impl() } func (sp *SafeSectorPiece) String() string { + if !sp.HasDealInfo() { + return "" + } + return sp.real.DealInfo.String() } @@ -305,21 +313,41 @@ func (sp *SafeSectorPiece) Valid(nv network.Version) error { } func (sp *SafeSectorPiece) StartEpoch() (abi.ChainEpoch, error) { + if !sp.HasDealInfo() { + return 0, xerrors.Errorf("no deal info") + } + return sp.real.DealInfo.StartEpoch() } func (sp *SafeSectorPiece) EndEpoch() (abi.ChainEpoch, error) { + if !sp.HasDealInfo() { + return 0, xerrors.Errorf("no deal info") + } + return sp.real.DealInfo.EndEpoch() } func (sp *SafeSectorPiece) PieceCID() cid.Cid { + if !sp.HasDealInfo() { + return sp.real.Piece.PieceCID + } + return sp.real.DealInfo.PieceCID() } func (sp *SafeSectorPiece) KeepUnsealedRequested() bool { + if !sp.HasDealInfo() { + return false + } + return sp.real.DealInfo.KeepUnsealedRequested() } func (sp *SafeSectorPiece) GetAllocation(ctx context.Context, aapi piece.AllocationAPI, tsk types.TipSetKey) (*verifreg.Allocation, error) { + if !sp.HasDealInfo() { + return nil, xerrors.Errorf("no deal info") + } + return sp.real.DealInfo.GetAllocation(ctx, aapi, tsk) }