Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sealing: Split PCA/PCB batches if gas used exceeds block limit #10647

Merged
merged 7 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node/impl/full/gas.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (m *GasModule) GasEstimateMessageGas(ctx context.Context, msg *types.Messag
return nil, err
}
msg.GasLimit = int64(float64(gasLimit) * m.Mpool.GetConfig().GasLimitOverestimation)

// Gas overestimation can cause us to exceed the block gas limit, cap it.
if msg.GasLimit > build.BlockGasLimit {
msg.GasLimit = build.BlockGasLimit
Expand Down
55 changes: 41 additions & 14 deletions storage/pipeline/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var aggFeeDen = big.NewInt(100)

type CommitBatcherApi interface {
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
ChainHead(ctx context.Context) (*types.TipSet, error)

Expand Down Expand Up @@ -234,7 +235,11 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
if individual {
res, err = b.processIndividually(cfg)
} else {
res, err = b.processBatch(cfg)
var sectors []abi.SectorNumber
for sn := range b.todo {
sectors = append(sectors, sn)
}
res, err = b.processBatch(cfg, sectors)
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
}

if err != nil {
Expand Down Expand Up @@ -264,13 +269,13 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return res, nil
}

func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) {
func (b *CommitBatcher) processBatch(cfg sealiface.Config, sectors []abi.SectorNumber) ([]sealiface.CommitBatchRes, error) {
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}

total := len(b.todo)
total := len(sectors)

res := sealiface.CommitBatchRes{
FailedSectors: map[abi.SectorNumber]string{},
Expand All @@ -284,24 +289,19 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
infos := make([]proof.AggregateSealVerifyInfo, 0, total)
collateral := big.Zero()

for id, p := range b.todo {
if len(infos) >= cfg.MaxCommitBatch {
log.Infow("commit batch full")
break
}
for _, sector := range sectors {
res.Sectors = append(res.Sectors, sector)

res.Sectors = append(res.Sectors, id)

sc, err := b.getSectorCollateral(id, ts.Key())
sc, err := b.getSectorCollateral(sector, ts.Key())
if err != nil {
res.FailedSectors[id] = err.Error()
res.FailedSectors[sector] = err.Error()
continue
}

collateral = big.Add(collateral, sc)

params.SectorNumbers.Set(uint64(id))
infos = append(infos, p.Info)
params.SectorNumbers.Set(uint64(sector))
infos = append(infos, b.todo[sector].Info)
}

if len(infos) == 0 {
Expand All @@ -318,17 +318,20 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa

mid, err := address.IDFromAddress(b.maddr)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err)
}

nv, err := b.api.StateNetworkVersion(b.mctx, ts.Key())
if err != nil {
res.Error = err.Error()
log.Errorf("getting network version: %s", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err)
}

arp, err := b.aggregateProofType(nv)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate proof type: %w", err)
}

Expand All @@ -339,23 +342,27 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
Infos: infos,
}, proofs)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err)
}

enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err)
}

mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
}

maxFee := b.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos))

aggFeeRaw, err := policy.AggregateProveCommitNetworkFee(nv, len(infos), ts.MinTicketBlock().ParentBaseFee)
if err != nil {
res.Error = err.Error()
log.Errorf("getting aggregate commit network fee: %s", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate commit network fee: %s", err)
}
Expand All @@ -365,16 +372,36 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
needFunds := big.Add(collateral, aggFee)
needFunds, err = collateralSendAmount(b.mctx, b.api, b.maddr, cfg, needFunds)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, err
}

goodFunds := big.Add(maxFee, needFunds)

from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, needFunds)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}

_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())

if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) {
log.Errorf("simulating CommitBatch message failed: %s", err)
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err)
}

// If we're out of gas, split the batch in half and evaluate again
if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) {
log.Warnf("CommitAggregate message ran out of gas, splitting batch in half and trying again (sectors: %d)", len(sectors))
mid := len(sectors) / 2
ret0, _ := b.processBatch(cfg, sectors[:mid])
ret1, _ := b.processBatch(cfg, sectors[mid:])

return append(ret0, ret1...), nil
}

mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
Expand Down
52 changes: 51 additions & 1 deletion storage/pipeline/commit_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestCommitBatcher(t *testing.T) {

if batch {
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
//s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 100000}, nil)
}

s.EXPECT().MpoolPushMessage(gomock.Any(), funMatcher(func(i interface{}) bool {
Expand All @@ -224,6 +224,48 @@ func TestCommitBatcher(t *testing.T) {
}
}

expectProcessBatch := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool, gasOverLimit bool) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *pipeline.CommitBatcher) promise {
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{Owner: t0123, Worker: t0123}, nil)

ti := len(expect)
batch := false
if ti >= minBatch {
batch = true
ti = 1
}

if !aboveBalancer {
batch = false
ti = len(expect)
}

pciC := len(expect)
if failOnePCI {
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), abi.SectorNumber(1), gomock.Any()).Return(nil, nil).Times(1) // not found
pciC = len(expect) - 1
if !batch {
ti--
}
}
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&minertypes.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(),
}, nil).Times(pciC)
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(pciC)

if batch {
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version18, nil)
if gasOverLimit {
s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, &api.ErrOutOfGas{})
} else {
s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 100000}, nil)
}

}
return nil
}
}

flush := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *pipeline.CommitBatcher) promise {
_ = expectSend(expect, aboveBalancer, failOnePCI)(t, s, pcb)
Expand Down Expand Up @@ -309,6 +351,14 @@ func TestCommitBatcher(t *testing.T) {
addSectors(getSectors(maxBatch), true),
},
},
"addMax-aboveBalancer-gasAboveLimit": {
actions: []action{
expectProcessBatch(getSectors(maxBatch), true, false, true),
expectSend(getSectors(maxBatch)[:maxBatch/2], true, false),
expectSend(getSectors(maxBatch)[maxBatch/2:], true, false),
addSectors(getSectors(maxBatch), true),
},
},
"addSingle-belowBalancer": {
actions: []action{
addSector(0, false),
Expand Down
15 changes: 15 additions & 0 deletions storage/pipeline/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions storage/pipeline/mocks/mock_commit_batcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions storage/pipeline/mocks/mock_precommit_batcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 34 additions & 11 deletions storage/pipeline/precommit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

type PreCommitBatcherApi interface {
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
Expand Down Expand Up @@ -319,29 +320,26 @@ func (b *PreCommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo,
return mcid, nil
}

func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tsk types.TipSetKey, bf abi.TokenAmount, nv network.Version) ([]sealiface.PreCommitBatchRes, error) {
func (b *PreCommitBatcher) processPreCommitBatch(cfg sealiface.Config, bf abi.TokenAmount, entries []*preCommitEntry, nv network.Version) ([]sealiface.PreCommitBatchRes, error) {
params := miner.PreCommitSectorBatchParams{}
deposit := big.Zero()
var res sealiface.PreCommitBatchRes

for _, p := range b.todo {
if len(params.Sectors) >= cfg.MaxPreCommitBatch {
log.Infow("precommit batch full")
break
}

for _, p := range entries {
res.Sectors = append(res.Sectors, p.pci.SectorNumber)
params.Sectors = append(params.Sectors, *infoToPreCommitSectorParams(p.pci))
deposit = big.Add(deposit, p.deposit)
}

enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
res.Error = err.Error()
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err)
}

mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK)
if err != nil {
res.Error = err.Error()
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
}

Expand All @@ -350,6 +348,7 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tsk types.TipSetKe
aggFeeRaw, err := policy.AggregatePreCommitNetworkFee(nv, len(params.Sectors), bf)
if err != nil {
log.Errorf("getting aggregate precommit network fee: %s", err)
res.Error = err.Error()
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("getting aggregate precommit network fee: %s", err)
}

Expand All @@ -368,16 +367,40 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tsk types.TipSetKe
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}

_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, needFunds, maxFee, enc.Bytes())

if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(entries) == 1) {
res.Error = err.Error()
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("simulating PreCommitBatch message failed: %w", err)
}

// If we're out of gas, split the batch in half and evaluate again
if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) {
log.Warnf("PreCommitBatch out of gas, splitting batch in half and trying again")
mid := len(entries) / 2
ret0, _ := b.processPreCommitBatch(cfg, bf, entries[:mid], nv)
ret1, _ := b.processPreCommitBatch(cfg, bf, entries[mid:], nv)

return append(ret0, ret1...), nil
}

// If state call succeeds, we can send the message for real
mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, needFunds, maxFee, enc.Bytes())
if err != nil {
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
res.Error = err.Error()
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("pushing message to mpool: %w", err)
}

res.Msg = &mcid
return []sealiface.PreCommitBatchRes{res}, nil
}

log.Infow("Sent PreCommitSectorBatch message", "cid", mcid, "from", from, "sectors", len(b.todo))
func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tsk types.TipSetKey, bf abi.TokenAmount, nv network.Version) ([]sealiface.PreCommitBatchRes, error) {
var pcEntries []*preCommitEntry
for _, p := range b.todo {
pcEntries = append(pcEntries, p)
}

return []sealiface.PreCommitBatchRes{res}, nil
return b.processPreCommitBatch(cfg, bf, pcEntries, nv)
}

// register PreCommit, wait for batch message, return message CID
Expand Down
Loading