Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed storage for download #7175

Merged
merged 2 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/erigon-cl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func runConsensusLayerNode(cliCtx *cli.Context) error {
return err
}
// Execute from genesis to whatever we have.
return stages.SpawnStageBeaconState(stages.StageBeaconState(db, cfg.BeaconCfg, state, nil, true, executionClient), nil, ctx)
return stages.SpawnStageBeaconState(stages.StageBeaconState(db, cfg.BeaconCfg, state, executionClient), nil, ctx)
}

fmt.Println(cfg.CheckpointUri)
Expand Down Expand Up @@ -106,7 +106,7 @@ func runConsensusLayerNode(cliCtx *cli.Context) error {
gossipManager := network.NewGossipReceiver(ctx, s)
gossipManager.AddReceiver(sentinelrpc.GossipType_BeaconBlockGossipType, downloader)
go gossipManager.Loop()
stageloop, err := stages.NewConsensusStagedSync(ctx, db, downloader, bdownloader, genesisCfg, beaconConfig, cpState, nil, false, tmpdir, executionClient, cfg.BeaconDataCfg)
stageloop, err := stages.NewConsensusStagedSync(ctx, db, downloader, bdownloader, genesisCfg, beaconConfig, cpState, tmpdir, executionClient, cfg.BeaconDataCfg)
if err != nil {
return err
}
Expand Down
10 changes: 2 additions & 8 deletions cmd/erigon-cl/stages/stage_beacon_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,8 @@ func SpawnStageBeaconsBlocks(cfg StageBeaconsBlockCfg, s *stagedsync.StageState,
}
defer tx.Rollback()
}
progress := s.BlockNumber
var lastRoot libcommon.Hash
if progress == 0 {
progress = cfg.state.LatestBlockHeader().Slot
lastRoot, err = cfg.state.BlockRoot()
} else {
lastRoot, err = rawdb.ReadFinalizedBlockRoot(tx, progress)
}
progress := cfg.state.LatestBlockHeader().Slot
lastRoot, err := cfg.state.BlockRoot()
if err != nil {
return err
}
Expand Down
17 changes: 9 additions & 8 deletions cmd/erigon-cl/stages/stage_history_reconstruction.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,23 @@ func SpawnStageHistoryReconstruction(cfg StageHistoryReconstructionCfg, s *stage
// Set up onNewBlock callback
cfg.downloader.SetOnNewBlock(func(blk *cltypes.SignedBeaconBlock) (finished bool, err error) {
slot := blk.Block.Slot
blockRoot, err := blk.Block.HashSSZ()
if err != nil {
return false, err
}
key := append(rawdb.EncodeNumber(slot), blockRoot[:]...)
// Collect attestations
encodedAttestations := cltypes.EncodeAttestationsForStorage(blk.Block.Body.Attestations)
if err := attestationsCollector.Collect(rawdb.EncodeNumber(slot), encodedAttestations); err != nil {
if err := attestationsCollector.Collect(key, encodedAttestations); err != nil {
return false, err
}
// Collect beacon blocks
encodedBeaconBlock, err := blk.EncodeForStorage()
if err != nil {
return false, err
}
blockRoot, err := blk.Block.HashSSZ()
if err != nil {
return false, err
}
slotBytes := rawdb.EncodeNumber(slot)
if err := beaconBlocksCollector.Collect(slotBytes, encodedBeaconBlock); err != nil {
if err := beaconBlocksCollector.Collect(key, encodedBeaconBlock); err != nil {
return false, err
}
// Collect hashes
Expand Down Expand Up @@ -146,7 +147,7 @@ func SpawnStageHistoryReconstruction(cfg StageHistoryReconstructionCfg, s *stage
})
prevProgress := cfg.downloader.Progress()

logInterval := time.NewTicker(30 * time.Second)
logInterval := time.NewTicker(logIntervalTime)
finishCh := make(chan struct{})
// Start logging thread
go func() {
Expand All @@ -155,7 +156,7 @@ func SpawnStageHistoryReconstruction(cfg StageHistoryReconstructionCfg, s *stage
case <-logInterval.C:
logArgs := []interface{}{}
currProgress := cfg.downloader.Progress()
speed := (float64(prevProgress) - float64(currProgress)) / (float64(logIntervalTime) / float64(time.Second))
speed := float64(prevProgress-currProgress) / float64(logIntervalTime/time.Second)
prevProgress = currProgress
peerCount, err := cfg.downloader.Peers()
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions cmd/erigon-cl/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ func NewConsensusStagedSync(ctx context.Context,
genesisCfg *clparams.GenesisConfig,
beaconCfg *clparams.BeaconChainConfig,
state *state.BeaconState,
triggerExecution triggerExecutionFunc,
clearEth1Data bool,
tmpdir string,
executionClient *execution_client.ExecutionClient,
beaconDBCfg *rawdb.BeaconDataConfig,
Expand All @@ -77,7 +75,7 @@ func NewConsensusStagedSync(ctx context.Context,
ctx,
StageHistoryReconstruction(db, backwardDownloader, genesisCfg, beaconCfg, beaconDBCfg, state, tmpdir, executionClient),
StageBeaconsBlock(db, forwardDownloader, genesisCfg, beaconCfg, state, executionClient),
StageBeaconState(db, beaconCfg, state, triggerExecution, clearEth1Data, executionClient),
StageBeaconState(db, beaconCfg, state, executionClient),
),
ConsensusUnwindOrder,
ConsensusPruneOrder,
Expand Down
55 changes: 14 additions & 41 deletions cmd/erigon-cl/stages/stages_beacon_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/rawdb"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state"
Expand All @@ -17,27 +15,20 @@ import (
"github.com/ledgerwatch/log/v3"
)

// This function will trigger block execution, hence: insert + validate + fcu.
type triggerExecutionFunc func(*cltypes.SignedBeaconBlock) error

type StageBeaconStateCfg struct {
db kv.RwDB
beaconCfg *clparams.BeaconChainConfig
state *state.BeaconState
clearEth1Data bool // Whether we want to discard eth1 data.
triggerExecution triggerExecutionFunc
executionClient *execution_client.ExecutionClient
db kv.RwDB
beaconCfg *clparams.BeaconChainConfig
state *state.BeaconState
executionClient *execution_client.ExecutionClient
}

func StageBeaconState(db kv.RwDB,
beaconCfg *clparams.BeaconChainConfig, state *state.BeaconState, triggerExecution triggerExecutionFunc, clearEth1Data bool, executionClient *execution_client.ExecutionClient) StageBeaconStateCfg {
beaconCfg *clparams.BeaconChainConfig, state *state.BeaconState, executionClient *execution_client.ExecutionClient) StageBeaconStateCfg {
return StageBeaconStateCfg{
db: db,
beaconCfg: beaconCfg,
state: state,
clearEth1Data: clearEth1Data,
triggerExecution: triggerExecution,
executionClient: executionClient,
db: db,
beaconCfg: beaconCfg,
state: state,
executionClient: executionClient,
}
}

Expand Down Expand Up @@ -75,8 +66,11 @@ func SpawnStageBeaconState(cfg StageBeaconStateCfg, tx kv.RwTx, ctx context.Cont
}
// TODO: Pass this to state transition with the state
if cfg.executionClient != nil {
if block.Block.Body.ExecutionPayload, err = cfg.executionClient.ReadExecutionPayload(eth1Number, eth1Hash); err != nil {
return err
// Query execution engine only if the payload have an hash.
if eth1Hash != (libcommon.Hash{}) {
if block.Block.Body.ExecutionPayload, err = cfg.executionClient.ReadExecutionPayload(eth1Number, eth1Hash); err != nil {
return err
}
}
// validate fully only in current epoch.
fullValidate := utils.GetCurrentEpoch(cfg.state.GenesisTime(), cfg.beaconCfg.SecondsPerSlot, cfg.beaconCfg.SlotsPerEpoch) == cfg.state.Epoch()
Expand Down Expand Up @@ -104,27 +98,6 @@ func SpawnStageBeaconState(cfg StageBeaconStateCfg, tx kv.RwTx, ctx context.Cont
log.Info("Forkchoice Status", "outcome", receipt.Success)
}

// Clear all ETH1 data from CL db
if cfg.clearEth1Data {
if err := tx.ClearBucket(kv.Headers); err != nil {
return err
}
if err := tx.ClearBucket(kv.BlockBody); err != nil {
return err
}
ethTx := kv.EthTx
transactionsV3, _ := kvcfg.TransactionsV3.Enabled(tx)
if transactionsV3 {
ethTx = kv.EthTxV3
}
if err := tx.ClearBucket(ethTx); err != nil {
return err
}
if err := tx.ClearBucket(kv.Sequence); err != nil {
return err
}
}

log.Info("[BeaconState] Finished transitioning state", "from", fromSlot, "to", endSlot)
if !useExternalTx {
if err = tx.Commit(); err != nil {
Expand Down