Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Erigon 3.0: Smarter full and minimal nodes #11220

Merged
merged 49 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b8d919b
save
Giulio2002 Jul 16, 2024
8ba2cba
save
Giulio2002 Jul 16, 2024
0318a39
save
Giulio2002 Jul 16, 2024
cdc753d
save
Giulio2002 Jul 16, 2024
a372b4f
save
Giulio2002 Jul 16, 2024
d1f1e3d
save
Giulio2002 Jul 16, 2024
d0d21f0
save
Giulio2002 Jul 16, 2024
219020c
save
Giulio2002 Jul 16, 2024
cdd3392
save
Giulio2002 Jul 16, 2024
3ce0d86
save
Giulio2002 Jul 16, 2024
16df018
save
Giulio2002 Jul 16, 2024
9f6b6b2
save
Giulio2002 Jul 16, 2024
930fc93
save
Giulio2002 Jul 16, 2024
6f311b4
save
Giulio2002 Jul 16, 2024
5cd4e19
save
Giulio2002 Jul 16, 2024
0094f56
save
Giulio2002 Jul 16, 2024
ed8e38a
save
Giulio2002 Jul 16, 2024
3ac67d6
save
Giulio2002 Jul 16, 2024
c64bc9e
save
Giulio2002 Jul 16, 2024
4e22685
save
Giulio2002 Jul 16, 2024
23d3c45
save
Giulio2002 Jul 16, 2024
302b04b
save
Giulio2002 Jul 16, 2024
b5ca96c
save
Giulio2002 Jul 17, 2024
5f1fcf4
save
Giulio2002 Jul 17, 2024
ba82795
Merge remote-tracking branch 'origin/main' into HEAD
Giulio2002 Jul 17, 2024
fe2d671
save
Giulio2002 Jul 17, 2024
f442e2e
save
Giulio2002 Jul 17, 2024
06afbd5
save
Giulio2002 Jul 17, 2024
294865b
save
Giulio2002 Jul 17, 2024
cf37670
save
Giulio2002 Jul 18, 2024
76a664e
save
Giulio2002 Jul 18, 2024
7f65033
save
Giulio2002 Jul 18, 2024
c991853
save
Giulio2002 Jul 18, 2024
1553906
Merge branch 'main' into good-prune
Giulio2002 Jul 18, 2024
69c0b0e
save
Giulio2002 Jul 18, 2024
64af438
save
Giulio2002 Jul 18, 2024
8ada3d1
save
Giulio2002 Jul 18, 2024
d0735e9
save
Giulio2002 Jul 18, 2024
8f5acd4
save
Giulio2002 Jul 18, 2024
5320d7f
save
Giulio2002 Jul 18, 2024
8523a5c
save
Giulio2002 Jul 18, 2024
1651ee6
Merge remote-tracking branch 'origin/main' into HEAD
Giulio2002 Jul 18, 2024
1e6030b
save
Giulio2002 Jul 18, 2024
2698ef7
save
Giulio2002 Jul 18, 2024
bb81869
save
Giulio2002 Jul 18, 2024
692eb59
save
Giulio2002 Jul 18, 2024
b648f74
save
Giulio2002 Jul 18, 2024
4af3bac
save
Giulio2002 Jul 18, 2024
678416b
save
Giulio2002 Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ Supported networks: all (except Mumbai).
- datadir/chaindata is small now - to prevent it's grow: we recommend set `--batchSize <= 2G`. And it's fine
to `rm -rf chaindata`
- can symlink/mount latest state to fast drive and history to cheap drive
- ArchiveNode is default. FullNode same as in E2: --prune=hrtc
- Archive Node is default. Full Node: `--prune.mode=full`, Minimal Node: `--prune.mode=minimal`

### Known Problems of E3:

Expand Down
14 changes: 14 additions & 0 deletions cl/phase1/stages/clstages.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,25 @@ func ConsensusClStages(ctx context.Context,
})

for i, block := range blocks {
blockRoot, err := block.Block.HashSSZ()
if err != nil {
logger.Warn("failed to hash block", "err", err)
blocks = blocks[i:]
break
}

if err := processBlock(cfg.indiciesDB, block, false, true, false); err != nil {
log.Warn("bad blocks segment received", "err", err)
blocks = blocks[i:]
break
}

st, err := cfg.forkChoice.GetStateAtBlockRoot(blockRoot, false)
if err == nil && block.Block.Slot%(cfg.beaconCfg.SlotsPerEpoch*2) == 0 {
if err := cfg.forkChoice.DumpBeaconStateOnDisk(st); err != nil {
logger.Warn("failed to dump state", "err", err)
}
}
if shouldInsert && block.Version() >= clparams.BellatrixVersion {
if err := cfg.blockCollector.AddBlock(block.Block); err != nil {
logger.Warn("failed to add block to collector", "err", err)
Expand Down
9 changes: 6 additions & 3 deletions cl/sentinel/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ func (s *Sentinel) listenForPeers() {
continue
}

if err := s.ConnectWithPeer(s.ctx, *peerInfo); err != nil {
log.Trace("[Sentinel] Could not connect with peer", "err", err)
}
go func() {
if err := s.ConnectWithPeer(s.ctx, *peerInfo); err != nil {
log.Trace("[Sentinel] Could not connect with peer", "err", err)
}
}()

}
}

Expand Down
12 changes: 1 addition & 11 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,9 +1017,6 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
chainConfig, pm := fromdb.ChainConfig(db), fromdb.PruneMode(db)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)
pm.Receipts = prune.Distance(s.BlockNumber - pruneTo)
pm.CallTraces = prune.Distance(s.BlockNumber - pruneTo)
pm.TxIndex = prune.Distance(s.BlockNumber - pruneTo)
}

syncCfg := ethconfig.Defaults.Sync
Expand Down Expand Up @@ -1123,9 +1120,6 @@ func stageCustomTrace(db kv.RwDB, ctx context.Context, logger log.Logger) error
chainConfig, pm := fromdb.ChainConfig(db), fromdb.PruneMode(db)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)
pm.Receipts = prune.Distance(s.BlockNumber - pruneTo)
pm.CallTraces = prune.Distance(s.BlockNumber - pruneTo)
pm.TxIndex = prune.Distance(s.BlockNumber - pruneTo)
}

syncCfg := ethconfig.Defaults.Sync
Expand Down Expand Up @@ -1244,9 +1238,6 @@ func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error {
s := stage(sync, tx, nil, stages.TxLookup)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)
pm.Receipts = prune.Distance(s.BlockNumber - pruneTo)
pm.CallTraces = prune.Distance(s.BlockNumber - pruneTo)
pm.TxIndex = prune.Distance(s.BlockNumber - pruneTo)
}
logger.Info("Stage", "name", s.ID, "progress", s.BlockNumber)

Expand Down Expand Up @@ -1547,8 +1538,7 @@ func stage(st *stagedsync.Sync, tx kv.Tx, db kv.RoDB, stage stages.SyncStage) *s

func overrideStorageMode(db kv.RwDB, logger log.Logger) error {
chainConfig := fromdb.ChainConfig(db)
pm, err := prune.FromCli(chainConfig.ChainID.Uint64(), pruneFlag, pruneB, pruneH, pruneR, pruneT, pruneC,
pruneHBefore, pruneRBefore, pruneTBefore, pruneCBefore, pruneBBefore, experiments)
pm, err := prune.FromCli(chainConfig.ChainID.Uint64(), pruneB, pruneH, experiments)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/rawdbhelpers/rawdbhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
)

func IdxStepsCountV3(tx kv.Tx) float64 {
fst, _ := kv.FirstKey(tx, kv.TblTracesToKeys)
lst, _ := kv.LastKey(tx, kv.TblTracesToKeys)
fst, _ := kv.FirstKey(tx, kv.TblAccountHistoryKeys)
lst, _ := kv.LastKey(tx, kv.TblAccountHistoryKeys)
if len(fst) > 0 && len(lst) > 0 {
fstTxNum := binary.BigEndian.Uint64(fst)
lstTxNum := binary.BigEndian.Uint64(lst)
Expand Down
12 changes: 12 additions & 0 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,30 @@ func (rs *StateV3) ApplyLogsAndTraces4(txTask *TxTask, domains *libstate.SharedD
if dbg.DiscardHistory() {
return nil
}
shouldPruneNonEssentials := txTask.PruneNonEssentials && txTask.Config != nil

for addr := range txTask.TraceFroms {
if shouldPruneNonEssentials && addr != txTask.Config.DepositContract {
continue
}
if err := domains.IndexAdd(kv.TblTracesFromIdx, addr[:]); err != nil {
return err
}
}

for addr := range txTask.TraceTos {
if shouldPruneNonEssentials && addr != txTask.Config.DepositContract {
continue
}
if err := domains.IndexAdd(kv.TblTracesToIdx, addr[:]); err != nil {
return err
}
}

for _, lg := range txTask.Logs {
if shouldPruneNonEssentials && lg.Address != txTask.Config.DepositContract {
continue
}
if err := domains.IndexAdd(kv.TblLogAddressIdx, lg.Address[:]); err != nil {
return err
}
Expand Down
38 changes: 20 additions & 18 deletions core/state/txtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,25 @@ import (
// which is processed by a single thread that writes into the ReconState1 and
// flushes to the database
type TxTask struct {
TxNum uint64
BlockNum uint64
Rules *chain.Rules
Header *types.Header
Txs types.Transactions
Uncles []*types.Header
Coinbase libcommon.Address
Withdrawals types.Withdrawals
BlockHash libcommon.Hash
Sender *libcommon.Address
SkipAnalysis bool
TxIndex int // -1 for block initialisation
Final bool
Failed bool
Tx types.Transaction
GetHashFn func(n uint64) libcommon.Hash
TxAsMessage types.Message
EvmBlockContext evmtypes.BlockContext
TxNum uint64
BlockNum uint64
Rules *chain.Rules
Header *types.Header
Txs types.Transactions
Uncles []*types.Header
Coinbase libcommon.Address
Withdrawals types.Withdrawals
BlockHash libcommon.Hash
Sender *libcommon.Address
SkipAnalysis bool
PruneNonEssentials bool
TxIndex int // -1 for block initialisation
Final bool
Failed bool
Tx types.Transaction
GetHashFn func(n uint64) libcommon.Hash
TxAsMessage types.Message
EvmBlockContext evmtypes.BlockContext

HistoryExecution bool // use history reader for that txn instead of state reader

Expand All @@ -79,6 +80,7 @@ type TxTask struct {
BlockReceipts types.Receipts

Requests types.Requests
Config *chain.Config
}

func (t *TxTask) CreateReceipt(cumulativeGasUsed uint64) *types.Receipt {
Expand Down
1 change: 1 addition & 0 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func (a *Aggregator) DiscardHistory(name kv.Domain) *Aggregator {
a.d[name].historyDisabled = true
return a
}

func (a *Aggregator) EnableHistory(name kv.Domain) *Aggregator {
a.d[name].historyDisabled = false
return a
Expand Down
9 changes: 4 additions & 5 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1781,11 +1781,8 @@ func (dt *DomainRoTx) Prune(ctx context.Context, rwTx kv.RwTx, step, txFrom, txT
}

var k, v []byte
if prunedKey != nil {
if prunedKey != nil && limit < 100_000 {
k, v, err = valsCursor.Seek(prunedKey)
if err != nil {
return stat, err
}
} else {
k, v, err = valsCursor.First()
}
Expand Down Expand Up @@ -1818,10 +1815,12 @@ func (dt *DomainRoTx) Prune(ctx context.Context, rwTx kv.RwTx, step, txFrom, txT
return stat, nil
}
limit--
stat.Values++
if err := ancientDomainValsCollector.Collect(k, v); err != nil {
return nil, err
}

stat.MinStep = min(stat.MinStep, is)
stat.MaxStep = max(stat.MaxStep, is)
select {
case <-ctx.Done():
// consider ctx exiting as incorrect outcome, error is returned
Expand Down
43 changes: 22 additions & 21 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,13 @@ func ExecV3(ctx context.Context,
if initialCycle {
agg.SetCollateAndBuildWorkers(min(2, estimate.StateV3Collate.Workers()))
agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
//if blockNum < cfg.blockReader.FrozenBlocks() {
//defer agg.DiscardHistory(kv.CommitmentDomain).EnableHistory(kv.CommitmentDomain)
//defer agg.LimitRecentHistoryWithoutFiles(0).LimitRecentHistoryWithoutFiles(agg.StepSize() / 10)
//}
} else {
agg.SetCompressWorkers(1)
agg.SetCollateAndBuildWorkers(1)
}

pruneNonEssentials := cfg.prune.History.Enabled() && cfg.prune.History.PruneTo(execStage.BlockNumber) == execStage.BlockNumber

var err error
inMemExec := txc.Doms != nil
var doms *state2.SharedDomains
Expand Down Expand Up @@ -718,26 +716,28 @@ Loop:
for txIndex := -1; txIndex <= len(txs); txIndex++ {
// Do not oversend, wait for the result heap to go under certain size
txTask := &state.TxTask{
BlockNum: blockNum,
Header: header,
Coinbase: b.Coinbase(),
Uncles: b.Uncles(),
Rules: rules,
Txs: txs,
TxNum: inputTxNum,
TxIndex: txIndex,
BlockHash: b.Hash(),
SkipAnalysis: skipAnalysis,
Final: txIndex == len(txs),
GetHashFn: getHashFn,
EvmBlockContext: blockContext,
Withdrawals: b.Withdrawals(),
Requests: b.Requests(),
BlockNum: blockNum,
Header: header,
Coinbase: b.Coinbase(),
Uncles: b.Uncles(),
Rules: rules,
Txs: txs,
TxNum: inputTxNum,
TxIndex: txIndex,
BlockHash: b.Hash(),
SkipAnalysis: skipAnalysis,
Final: txIndex == len(txs),
GetHashFn: getHashFn,
EvmBlockContext: blockContext,
Withdrawals: b.Withdrawals(),
Requests: b.Requests(),
PruneNonEssentials: pruneNonEssentials,

// use history reader instead of state reader to catch up to the tx where we left off
HistoryExecution: offsetFromBlockBeginning > 0 && txIndex < int(offsetFromBlockBeginning),

BlockReceipts: receipts,
Config: cfg.genesis.Config,
}
if txTask.TxNum <= txNumInDB && txTask.TxNum > 0 {
inputTxNum++
Expand Down Expand Up @@ -884,7 +884,8 @@ Loop:
// }
//}
// If we skip post evaluation, then we should compute root hash ASAP for fail-fast
if !skipPostEvaluation && (rs.SizeEstimate() < commitThreshold || inMemExec) {
aggregatorRo := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx)
if !skipPostEvaluation && (rs.SizeEstimate() < commitThreshold || inMemExec) && !aggregatorRo.CanPrune(applyTx, outputTxNum.Load()) {
break
}
var (
Expand All @@ -903,7 +904,7 @@ Loop:
t1 = time.Since(tt) + ts

tt = time.Now()
if _, err := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx).PruneSmallBatches(ctx, 10*time.Hour, applyTx); err != nil {
if _, err := aggregatorRo.PruneSmallBatches(ctx, 10*time.Hour, applyTx); err != nil {
return err
}
t3 = time.Since(tt)
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_call_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,8 @@ func PruneCallTraces(s *PruneState, tx kv.RwTx, cfg CallTracesCfg, ctx context.C
defer tx.Rollback()
}

if cfg.prune.CallTraces.Enabled() {
if err = pruneCallTraces(tx, logPrefix, cfg.prune.CallTraces.PruneTo(s.ForwardProgress), ctx, cfg.tmpdir, logger); err != nil {
if cfg.prune.History.Enabled() {
if err = pruneCallTraces(tx, logPrefix, cfg.prune.History.PruneTo(s.ForwardProgress), ctx, cfg.tmpdir, logger); err != nil {
return err
}
}
Expand Down
6 changes: 3 additions & 3 deletions eth/stagedsync/stage_log_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
}

startBlock := s.BlockNumber
pruneTo := cfg.prune.Receipts.PruneTo(endBlock) //endBlock - prune.r.older
pruneTo := cfg.prune.History.PruneTo(endBlock) //endBlock - prune.r.older
// if startBlock < pruneTo {
// startBlock = pruneTo
// }
Expand Down Expand Up @@ -435,7 +435,7 @@ func pruneOldLogChunks(tx kv.RwTx, bucket string, inMem *etl.Collector, pruneTo

// Call pruneLogIndex with the current sync progresses and commit the data to db
func PruneLogIndex(s *PruneState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Context, logger log.Logger) (err error) {
if !cfg.prune.Receipts.Enabled() {
if !cfg.prune.History.Enabled() {
return nil
}
logPrefix := s.LogPrefix()
Expand All @@ -449,7 +449,7 @@ func PruneLogIndex(s *PruneState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
defer tx.Rollback()
}

pruneTo := cfg.prune.Receipts.PruneTo(s.ForwardProgress)
pruneTo := cfg.prune.History.PruneTo(s.ForwardProgress)
if err = pruneLogIndex(logPrefix, tx, cfg.tmpdir, s.PruneProgress, pruneTo, ctx, logger, cfg.depositContract); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
}
if cfg.blockReader.FreezingCfg().Enabled {
// noop. in this case senders will be deleted by BlockRetire.PruneAncientBlocks after data-freezing.
} else if cfg.prune.TxIndex.Enabled() {
to := cfg.prune.TxIndex.PruneTo(s.ForwardProgress)
} else if cfg.prune.History.Enabled() {
to := cfg.prune.History.PruneTo(s.ForwardProgress)
if err = rawdb.PruneTable(tx, kv.Senders, to, ctx, 100); err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions eth/stagedsync/stage_txlookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c
}

startBlock := s.BlockNumber
if cfg.prune.TxIndex.Enabled() {
pruneTo := cfg.prune.TxIndex.PruneTo(endBlock)
if cfg.prune.History.Enabled() {
pruneTo := cfg.prune.History.PruneTo(endBlock)
if startBlock < pruneTo {
startBlock = pruneTo
if err = s.UpdatePrune(tx, pruneTo); err != nil { // prune func of this stage will use this value to prevent all ancient blocks traversal
Expand Down Expand Up @@ -248,8 +248,8 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte
var pruneBor bool

// Forward stage doesn't write anything before PruneTo point
if cfg.prune.TxIndex.Enabled() {
blockTo = cfg.prune.TxIndex.PruneTo(s.ForwardProgress)
if cfg.prune.History.Enabled() {
blockTo = cfg.prune.History.PruneTo(s.ForwardProgress)
pruneBor = true
} else if cfg.blockReader.FreezingCfg().Enabled {
blockTo = cfg.blockReader.CanPruneTo(s.ForwardProgress)
Expand Down
Loading
Loading