Skip to content

Commit

Permalink
Fixed storage for download (erigontech#7175)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored and calmbeing committed Apr 24, 2023
1 parent 07a4608 commit f9e1c0f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 62 deletions.
4 changes: 2 additions & 2 deletions cmd/erigon-cl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func runConsensusLayerNode(cliCtx *cli.Context) error {
return err
}
// Execute from genesis to whatever we have.
return stages.SpawnStageBeaconState(stages.StageBeaconState(db, cfg.BeaconCfg, state, nil, true, executionClient), nil, ctx)
return stages.SpawnStageBeaconState(stages.StageBeaconState(db, cfg.BeaconCfg, state, executionClient), nil, ctx)
}

fmt.Println(cfg.CheckpointUri)
Expand Down Expand Up @@ -106,7 +106,7 @@ func runConsensusLayerNode(cliCtx *cli.Context) error {
gossipManager := network.NewGossipReceiver(ctx, s)
gossipManager.AddReceiver(sentinelrpc.GossipType_BeaconBlockGossipType, downloader)
go gossipManager.Loop()
stageloop, err := stages.NewConsensusStagedSync(ctx, db, downloader, bdownloader, genesisCfg, beaconConfig, cpState, nil, false, tmpdir, executionClient, cfg.BeaconDataCfg)
stageloop, err := stages.NewConsensusStagedSync(ctx, db, downloader, bdownloader, genesisCfg, beaconConfig, cpState, tmpdir, executionClient, cfg.BeaconDataCfg)
if err != nil {
return err
}
Expand Down
10 changes: 2 additions & 8 deletions cmd/erigon-cl/stages/stage_beacon_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,8 @@ func SpawnStageBeaconsBlocks(cfg StageBeaconsBlockCfg, s *stagedsync.StageState,
}
defer tx.Rollback()
}
progress := s.BlockNumber
var lastRoot libcommon.Hash
if progress == 0 {
progress = cfg.state.LatestBlockHeader().Slot
lastRoot, err = cfg.state.BlockRoot()
} else {
lastRoot, err = rawdb.ReadFinalizedBlockRoot(tx, progress)
}
progress := cfg.state.LatestBlockHeader().Slot
lastRoot, err := cfg.state.BlockRoot()
if err != nil {
return err
}
Expand Down
17 changes: 9 additions & 8 deletions cmd/erigon-cl/stages/stage_history_reconstruction.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,23 @@ func SpawnStageHistoryReconstruction(cfg StageHistoryReconstructionCfg, s *stage
// Set up onNewBlock callback
cfg.downloader.SetOnNewBlock(func(blk *cltypes.SignedBeaconBlock) (finished bool, err error) {
slot := blk.Block.Slot
blockRoot, err := blk.Block.HashSSZ()
if err != nil {
return false, err
}
key := append(rawdb.EncodeNumber(slot), blockRoot[:]...)
// Collect attestations
encodedAttestations := cltypes.EncodeAttestationsForStorage(blk.Block.Body.Attestations)
if err := attestationsCollector.Collect(rawdb.EncodeNumber(slot), encodedAttestations); err != nil {
if err := attestationsCollector.Collect(key, encodedAttestations); err != nil {
return false, err
}
// Collect beacon blocks
encodedBeaconBlock, err := blk.EncodeForStorage()
if err != nil {
return false, err
}
blockRoot, err := blk.Block.HashSSZ()
if err != nil {
return false, err
}
slotBytes := rawdb.EncodeNumber(slot)
if err := beaconBlocksCollector.Collect(slotBytes, encodedBeaconBlock); err != nil {
if err := beaconBlocksCollector.Collect(key, encodedBeaconBlock); err != nil {
return false, err
}
// Collect hashes
Expand Down Expand Up @@ -146,7 +147,7 @@ func SpawnStageHistoryReconstruction(cfg StageHistoryReconstructionCfg, s *stage
})
prevProgress := cfg.downloader.Progress()

logInterval := time.NewTicker(30 * time.Second)
logInterval := time.NewTicker(logIntervalTime)
finishCh := make(chan struct{})
// Start logging thread
go func() {
Expand All @@ -155,7 +156,7 @@ func SpawnStageHistoryReconstruction(cfg StageHistoryReconstructionCfg, s *stage
case <-logInterval.C:
logArgs := []interface{}{}
currProgress := cfg.downloader.Progress()
speed := (float64(prevProgress) - float64(currProgress)) / (float64(logIntervalTime) / float64(time.Second))
speed := float64(prevProgress-currProgress) / float64(logIntervalTime/time.Second)
prevProgress = currProgress
peerCount, err := cfg.downloader.Peers()
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions cmd/erigon-cl/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ func NewConsensusStagedSync(ctx context.Context,
genesisCfg *clparams.GenesisConfig,
beaconCfg *clparams.BeaconChainConfig,
state *state.BeaconState,
triggerExecution triggerExecutionFunc,
clearEth1Data bool,
tmpdir string,
executionClient *execution_client.ExecutionClient,
beaconDBCfg *rawdb.BeaconDataConfig,
Expand All @@ -77,7 +75,7 @@ func NewConsensusStagedSync(ctx context.Context,
ctx,
StageHistoryReconstruction(db, backwardDownloader, genesisCfg, beaconCfg, beaconDBCfg, state, tmpdir, executionClient),
StageBeaconsBlock(db, forwardDownloader, genesisCfg, beaconCfg, state, executionClient),
StageBeaconState(db, beaconCfg, state, triggerExecution, clearEth1Data, executionClient),
StageBeaconState(db, beaconCfg, state, executionClient),
),
ConsensusUnwindOrder,
ConsensusPruneOrder,
Expand Down
55 changes: 14 additions & 41 deletions cmd/erigon-cl/stages/stages_beacon_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/rawdb"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state"
Expand All @@ -17,27 +15,20 @@ import (
"github.com/ledgerwatch/log/v3"
)

// This function will trigger block execution, hence: insert + validate + fcu.
type triggerExecutionFunc func(*cltypes.SignedBeaconBlock) error

type StageBeaconStateCfg struct {
db kv.RwDB
beaconCfg *clparams.BeaconChainConfig
state *state.BeaconState
clearEth1Data bool // Whether we want to discard eth1 data.
triggerExecution triggerExecutionFunc
executionClient *execution_client.ExecutionClient
db kv.RwDB
beaconCfg *clparams.BeaconChainConfig
state *state.BeaconState
executionClient *execution_client.ExecutionClient
}

func StageBeaconState(db kv.RwDB,
beaconCfg *clparams.BeaconChainConfig, state *state.BeaconState, triggerExecution triggerExecutionFunc, clearEth1Data bool, executionClient *execution_client.ExecutionClient) StageBeaconStateCfg {
beaconCfg *clparams.BeaconChainConfig, state *state.BeaconState, executionClient *execution_client.ExecutionClient) StageBeaconStateCfg {
return StageBeaconStateCfg{
db: db,
beaconCfg: beaconCfg,
state: state,
clearEth1Data: clearEth1Data,
triggerExecution: triggerExecution,
executionClient: executionClient,
db: db,
beaconCfg: beaconCfg,
state: state,
executionClient: executionClient,
}
}

Expand Down Expand Up @@ -75,8 +66,11 @@ func SpawnStageBeaconState(cfg StageBeaconStateCfg, tx kv.RwTx, ctx context.Cont
}
// TODO: Pass this to state transition with the state
if cfg.executionClient != nil {
if block.Block.Body.ExecutionPayload, err = cfg.executionClient.ReadExecutionPayload(eth1Number, eth1Hash); err != nil {
return err
// Query execution engine only if the payload have an hash.
if eth1Hash != (libcommon.Hash{}) {
if block.Block.Body.ExecutionPayload, err = cfg.executionClient.ReadExecutionPayload(eth1Number, eth1Hash); err != nil {
return err
}
}
// validate fully only in current epoch.
fullValidate := utils.GetCurrentEpoch(cfg.state.GenesisTime(), cfg.beaconCfg.SecondsPerSlot, cfg.beaconCfg.SlotsPerEpoch) == cfg.state.Epoch()
Expand Down Expand Up @@ -104,27 +98,6 @@ func SpawnStageBeaconState(cfg StageBeaconStateCfg, tx kv.RwTx, ctx context.Cont
log.Info("Forkchoice Status", "outcome", receipt.Success)
}

// Clear all ETH1 data from CL db
if cfg.clearEth1Data {
if err := tx.ClearBucket(kv.Headers); err != nil {
return err
}
if err := tx.ClearBucket(kv.BlockBody); err != nil {
return err
}
ethTx := kv.EthTx
transactionsV3, _ := kvcfg.TransactionsV3.Enabled(tx)
if transactionsV3 {
ethTx = kv.EthTxV3
}
if err := tx.ClearBucket(ethTx); err != nil {
return err
}
if err := tx.ClearBucket(kv.Sequence); err != nil {
return err
}
}

log.Info("[BeaconState] Finished transitioning state", "from", fromSlot, "to", endSlot)
if !useExternalTx {
if err = tx.Commit(); err != nil {
Expand Down

0 comments on commit f9e1c0f

Please sign in to comment.