Skip to content

Commit

Permalink
Erigon 3.0: Smarter full and minimal nodes (#11220)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Jul 19, 2024
1 parent 3bbd8aa commit af2a4a0
Show file tree
Hide file tree
Showing 20 changed files with 217 additions and 510 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ Supported networks: all (except Mumbai).
- datadir/chaindata is small now - to prevent it's grow: we recommend set `--batchSize <= 2G`. And it's fine
to `rm -rf chaindata`
- can symlink/mount latest state to fast drive and history to cheap drive
- ArchiveNode is default. FullNode same as in E2: --prune=hrtc
- Archive Node is default. Full Node: `--prune.mode=full`, Minimal Node: `--prune.mode=minimal`

### Known Problems of E3:

Expand Down
14 changes: 14 additions & 0 deletions cl/phase1/stages/clstages.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,25 @@ func ConsensusClStages(ctx context.Context,
})

for i, block := range blocks {
blockRoot, err := block.Block.HashSSZ()
if err != nil {
logger.Warn("failed to hash block", "err", err)
blocks = blocks[i:]
break
}

if err := processBlock(cfg.indiciesDB, block, false, true, false); err != nil {
log.Warn("bad blocks segment received", "err", err)
blocks = blocks[i:]
break
}

st, err := cfg.forkChoice.GetStateAtBlockRoot(blockRoot, false)
if err == nil && block.Block.Slot%(cfg.beaconCfg.SlotsPerEpoch*2) == 0 {
if err := cfg.forkChoice.DumpBeaconStateOnDisk(st); err != nil {
logger.Warn("failed to dump state", "err", err)
}
}
if shouldInsert && block.Version() >= clparams.BellatrixVersion {
if err := cfg.blockCollector.AddBlock(block.Block); err != nil {
logger.Warn("failed to add block to collector", "err", err)
Expand Down
9 changes: 6 additions & 3 deletions cl/sentinel/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ func (s *Sentinel) listenForPeers() {
continue
}

if err := s.ConnectWithPeer(s.ctx, *peerInfo); err != nil {
log.Trace("[Sentinel] Could not connect with peer", "err", err)
}
go func() {
if err := s.ConnectWithPeer(s.ctx, *peerInfo); err != nil {
log.Trace("[Sentinel] Could not connect with peer", "err", err)
}
}()

}
}

Expand Down
12 changes: 1 addition & 11 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,9 +1017,6 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
chainConfig, pm := fromdb.ChainConfig(db), fromdb.PruneMode(db)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)
pm.Receipts = prune.Distance(s.BlockNumber - pruneTo)
pm.CallTraces = prune.Distance(s.BlockNumber - pruneTo)
pm.TxIndex = prune.Distance(s.BlockNumber - pruneTo)
}

syncCfg := ethconfig.Defaults.Sync
Expand Down Expand Up @@ -1123,9 +1120,6 @@ func stageCustomTrace(db kv.RwDB, ctx context.Context, logger log.Logger) error
chainConfig, pm := fromdb.ChainConfig(db), fromdb.PruneMode(db)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)
pm.Receipts = prune.Distance(s.BlockNumber - pruneTo)
pm.CallTraces = prune.Distance(s.BlockNumber - pruneTo)
pm.TxIndex = prune.Distance(s.BlockNumber - pruneTo)
}

syncCfg := ethconfig.Defaults.Sync
Expand Down Expand Up @@ -1244,9 +1238,6 @@ func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error {
s := stage(sync, tx, nil, stages.TxLookup)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)
pm.Receipts = prune.Distance(s.BlockNumber - pruneTo)
pm.CallTraces = prune.Distance(s.BlockNumber - pruneTo)
pm.TxIndex = prune.Distance(s.BlockNumber - pruneTo)
}
logger.Info("Stage", "name", s.ID, "progress", s.BlockNumber)

Expand Down Expand Up @@ -1547,8 +1538,7 @@ func stage(st *stagedsync.Sync, tx kv.Tx, db kv.RoDB, stage stages.SyncStage) *s

func overrideStorageMode(db kv.RwDB, logger log.Logger) error {
chainConfig := fromdb.ChainConfig(db)
pm, err := prune.FromCli(chainConfig.ChainID.Uint64(), pruneFlag, pruneB, pruneH, pruneR, pruneT, pruneC,
pruneHBefore, pruneRBefore, pruneTBefore, pruneCBefore, pruneBBefore, experiments)
pm, err := prune.FromCli(chainConfig.ChainID.Uint64(), pruneB, pruneH, experiments)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/rawdbhelpers/rawdbhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
)

func IdxStepsCountV3(tx kv.Tx) float64 {
fst, _ := kv.FirstKey(tx, kv.TblTracesToKeys)
lst, _ := kv.LastKey(tx, kv.TblTracesToKeys)
fst, _ := kv.FirstKey(tx, kv.TblAccountHistoryKeys)
lst, _ := kv.LastKey(tx, kv.TblAccountHistoryKeys)
if len(fst) > 0 && len(lst) > 0 {
fstTxNum := binary.BigEndian.Uint64(fst)
lstTxNum := binary.BigEndian.Uint64(lst)
Expand Down
12 changes: 12 additions & 0 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,30 @@ func (rs *StateV3) ApplyLogsAndTraces4(txTask *TxTask, domains *libstate.SharedD
if dbg.DiscardHistory() {
return nil
}
shouldPruneNonEssentials := txTask.PruneNonEssentials && txTask.Config != nil

for addr := range txTask.TraceFroms {
if shouldPruneNonEssentials && addr != txTask.Config.DepositContract {
continue
}
if err := domains.IndexAdd(kv.TblTracesFromIdx, addr[:]); err != nil {
return err
}
}

for addr := range txTask.TraceTos {
if shouldPruneNonEssentials && addr != txTask.Config.DepositContract {
continue
}
if err := domains.IndexAdd(kv.TblTracesToIdx, addr[:]); err != nil {
return err
}
}

for _, lg := range txTask.Logs {
if shouldPruneNonEssentials && lg.Address != txTask.Config.DepositContract {
continue
}
if err := domains.IndexAdd(kv.TblLogAddressIdx, lg.Address[:]); err != nil {
return err
}
Expand Down
38 changes: 20 additions & 18 deletions core/state/txtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,25 @@ import (
// which is processed by a single thread that writes into the ReconState1 and
// flushes to the database
type TxTask struct {
TxNum uint64
BlockNum uint64
Rules *chain.Rules
Header *types.Header
Txs types.Transactions
Uncles []*types.Header
Coinbase libcommon.Address
Withdrawals types.Withdrawals
BlockHash libcommon.Hash
Sender *libcommon.Address
SkipAnalysis bool
TxIndex int // -1 for block initialisation
Final bool
Failed bool
Tx types.Transaction
GetHashFn func(n uint64) libcommon.Hash
TxAsMessage types.Message
EvmBlockContext evmtypes.BlockContext
TxNum uint64
BlockNum uint64
Rules *chain.Rules
Header *types.Header
Txs types.Transactions
Uncles []*types.Header
Coinbase libcommon.Address
Withdrawals types.Withdrawals
BlockHash libcommon.Hash
Sender *libcommon.Address
SkipAnalysis bool
PruneNonEssentials bool
TxIndex int // -1 for block initialisation
Final bool
Failed bool
Tx types.Transaction
GetHashFn func(n uint64) libcommon.Hash
TxAsMessage types.Message
EvmBlockContext evmtypes.BlockContext

HistoryExecution bool // use history reader for that txn instead of state reader

Expand All @@ -79,6 +80,7 @@ type TxTask struct {
BlockReceipts types.Receipts

Requests types.Requests
Config *chain.Config
}

func (t *TxTask) CreateReceipt(cumulativeGasUsed uint64) *types.Receipt {
Expand Down
1 change: 1 addition & 0 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func (a *Aggregator) DiscardHistory(name kv.Domain) *Aggregator {
a.d[name].historyDisabled = true
return a
}

func (a *Aggregator) EnableHistory(name kv.Domain) *Aggregator {
a.d[name].historyDisabled = false
return a
Expand Down
9 changes: 4 additions & 5 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1781,11 +1781,8 @@ func (dt *DomainRoTx) Prune(ctx context.Context, rwTx kv.RwTx, step, txFrom, txT
}

var k, v []byte
if prunedKey != nil {
if prunedKey != nil && limit < 100_000 {
k, v, err = valsCursor.Seek(prunedKey)
if err != nil {
return stat, err
}
} else {
k, v, err = valsCursor.First()
}
Expand Down Expand Up @@ -1818,10 +1815,12 @@ func (dt *DomainRoTx) Prune(ctx context.Context, rwTx kv.RwTx, step, txFrom, txT
return stat, nil
}
limit--
stat.Values++
if err := ancientDomainValsCollector.Collect(k, v); err != nil {
return nil, err
}

stat.MinStep = min(stat.MinStep, is)
stat.MaxStep = max(stat.MaxStep, is)
select {
case <-ctx.Done():
// consider ctx exiting as incorrect outcome, error is returned
Expand Down
43 changes: 22 additions & 21 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,13 @@ func ExecV3(ctx context.Context,
if initialCycle {
agg.SetCollateAndBuildWorkers(min(2, estimate.StateV3Collate.Workers()))
agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
//if blockNum < cfg.blockReader.FrozenBlocks() {
//defer agg.DiscardHistory(kv.CommitmentDomain).EnableHistory(kv.CommitmentDomain)
//defer agg.LimitRecentHistoryWithoutFiles(0).LimitRecentHistoryWithoutFiles(agg.StepSize() / 10)
//}
} else {
agg.SetCompressWorkers(1)
agg.SetCollateAndBuildWorkers(1)
}

pruneNonEssentials := cfg.prune.History.Enabled() && cfg.prune.History.PruneTo(execStage.BlockNumber) == execStage.BlockNumber

var err error
inMemExec := txc.Doms != nil
var doms *state2.SharedDomains
Expand Down Expand Up @@ -718,26 +716,28 @@ Loop:
for txIndex := -1; txIndex <= len(txs); txIndex++ {
// Do not oversend, wait for the result heap to go under certain size
txTask := &state.TxTask{
BlockNum: blockNum,
Header: header,
Coinbase: b.Coinbase(),
Uncles: b.Uncles(),
Rules: rules,
Txs: txs,
TxNum: inputTxNum,
TxIndex: txIndex,
BlockHash: b.Hash(),
SkipAnalysis: skipAnalysis,
Final: txIndex == len(txs),
GetHashFn: getHashFn,
EvmBlockContext: blockContext,
Withdrawals: b.Withdrawals(),
Requests: b.Requests(),
BlockNum: blockNum,
Header: header,
Coinbase: b.Coinbase(),
Uncles: b.Uncles(),
Rules: rules,
Txs: txs,
TxNum: inputTxNum,
TxIndex: txIndex,
BlockHash: b.Hash(),
SkipAnalysis: skipAnalysis,
Final: txIndex == len(txs),
GetHashFn: getHashFn,
EvmBlockContext: blockContext,
Withdrawals: b.Withdrawals(),
Requests: b.Requests(),
PruneNonEssentials: pruneNonEssentials,

// use history reader instead of state reader to catch up to the tx where we left off
HistoryExecution: offsetFromBlockBeginning > 0 && txIndex < int(offsetFromBlockBeginning),

BlockReceipts: receipts,
Config: cfg.genesis.Config,
}
if txTask.TxNum <= txNumInDB && txTask.TxNum > 0 {
inputTxNum++
Expand Down Expand Up @@ -884,7 +884,8 @@ Loop:
// }
//}
// If we skip post evaluation, then we should compute root hash ASAP for fail-fast
if !skipPostEvaluation && (rs.SizeEstimate() < commitThreshold || inMemExec) {
aggregatorRo := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx)
if !skipPostEvaluation && (rs.SizeEstimate() < commitThreshold || inMemExec) && !aggregatorRo.CanPrune(applyTx, outputTxNum.Load()) {
break
}
var (
Expand All @@ -903,7 +904,7 @@ Loop:
t1 = time.Since(tt) + ts

tt = time.Now()
if _, err := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx).PruneSmallBatches(ctx, 10*time.Hour, applyTx); err != nil {
if _, err := aggregatorRo.PruneSmallBatches(ctx, 10*time.Hour, applyTx); err != nil {
return err
}
t3 = time.Since(tt)
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_call_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,8 @@ func PruneCallTraces(s *PruneState, tx kv.RwTx, cfg CallTracesCfg, ctx context.C
defer tx.Rollback()
}

if cfg.prune.CallTraces.Enabled() {
if err = pruneCallTraces(tx, logPrefix, cfg.prune.CallTraces.PruneTo(s.ForwardProgress), ctx, cfg.tmpdir, logger); err != nil {
if cfg.prune.History.Enabled() {
if err = pruneCallTraces(tx, logPrefix, cfg.prune.History.PruneTo(s.ForwardProgress), ctx, cfg.tmpdir, logger); err != nil {
return err
}
}
Expand Down
6 changes: 3 additions & 3 deletions eth/stagedsync/stage_log_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
}

startBlock := s.BlockNumber
pruneTo := cfg.prune.Receipts.PruneTo(endBlock) //endBlock - prune.r.older
pruneTo := cfg.prune.History.PruneTo(endBlock) //endBlock - prune.r.older
// if startBlock < pruneTo {
// startBlock = pruneTo
// }
Expand Down Expand Up @@ -435,7 +435,7 @@ func pruneOldLogChunks(tx kv.RwTx, bucket string, inMem *etl.Collector, pruneTo

// Call pruneLogIndex with the current sync progresses and commit the data to db
func PruneLogIndex(s *PruneState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Context, logger log.Logger) (err error) {
if !cfg.prune.Receipts.Enabled() {
if !cfg.prune.History.Enabled() {
return nil
}
logPrefix := s.LogPrefix()
Expand All @@ -449,7 +449,7 @@ func PruneLogIndex(s *PruneState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
defer tx.Rollback()
}

pruneTo := cfg.prune.Receipts.PruneTo(s.ForwardProgress)
pruneTo := cfg.prune.History.PruneTo(s.ForwardProgress)
if err = pruneLogIndex(logPrefix, tx, cfg.tmpdir, s.PruneProgress, pruneTo, ctx, logger, cfg.depositContract); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
}
if cfg.blockReader.FreezingCfg().Enabled {
// noop. in this case senders will be deleted by BlockRetire.PruneAncientBlocks after data-freezing.
} else if cfg.prune.TxIndex.Enabled() {
to := cfg.prune.TxIndex.PruneTo(s.ForwardProgress)
} else if cfg.prune.History.Enabled() {
to := cfg.prune.History.PruneTo(s.ForwardProgress)
if err = rawdb.PruneTable(tx, kv.Senders, to, ctx, 100); err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions eth/stagedsync/stage_txlookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c
}

startBlock := s.BlockNumber
if cfg.prune.TxIndex.Enabled() {
pruneTo := cfg.prune.TxIndex.PruneTo(endBlock)
if cfg.prune.History.Enabled() {
pruneTo := cfg.prune.History.PruneTo(endBlock)
if startBlock < pruneTo {
startBlock = pruneTo
if err = s.UpdatePrune(tx, pruneTo); err != nil { // prune func of this stage will use this value to prevent all ancient blocks traversal
Expand Down Expand Up @@ -248,8 +248,8 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte
var pruneBor bool

// Forward stage doesn't write anything before PruneTo point
if cfg.prune.TxIndex.Enabled() {
blockTo = cfg.prune.TxIndex.PruneTo(s.ForwardProgress)
if cfg.prune.History.Enabled() {
blockTo = cfg.prune.History.PruneTo(s.ForwardProgress)
pruneBor = true
} else if cfg.blockReader.FreezingCfg().Enabled {
blockTo = cfg.blockReader.CanPruneTo(s.ForwardProgress)
Expand Down
Loading

0 comments on commit af2a4a0

Please sign in to comment.