diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index b74d07fbfe7..2565bb40cf1 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -530,7 +530,7 @@ func extractBodies(datadir string) error { snaps := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{ Enabled: true, KeepBlocks: true, - Produce: false, + ProduceE2: false, }, filepath.Join(datadir, "snapshots"), 0, log.New()) snaps.ReopenFolder() diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 293ee1151ff..6ac020ad458 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1772,7 +1772,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl dirs := datadir.New(datadirCli) //useSnapshots = true - snapCfg := ethconfig.NewSnapCfg(useSnapshots, true, true) + snapCfg := ethconfig.NewSnapCfg(useSnapshots, true, true, true) _allSnapshotsSingleton = freezeblocks.NewRoSnapshots(snapCfg, dirs.Snap, 0, logger) _allBorSnapshotsSingleton = freezeblocks.NewBorRoSnapshots(snapCfg, dirs.Snap, 0, logger) @@ -1782,6 +1782,8 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl panic(err) } + _aggSingleton.SetProduceMod(snapCfg.ProduceE3) + if useSnapshots { g := &errgroup.Group{} g.Go(func() error { diff --git a/cmd/snapshots/cmp/cmp.go b/cmd/snapshots/cmp/cmp.go index d1cb0fc8414..563f84f48c0 100644 --- a/cmd/snapshots/cmp/cmp.go +++ b/cmd/snapshots/cmp/cmp.go @@ -461,7 +461,7 @@ func (c comparitor) compareHeaders(ctx context.Context, f1ents []fs.DirEntry, f2 f1snaps := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{ Enabled: true, - Produce: false, + ProduceE2: false, NoDownloader: true, }, info1.Dir(), info1.From, logger) @@ -471,7 +471,7 @@ func (c comparitor) compareHeaders(ctx context.Context, f1ents []fs.DirEntry, f2 f2snaps := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{ Enabled: true, - Produce: false, + ProduceE2: false, NoDownloader: true, }, info2.Dir(), info2.From, logger) @@ -742,7 +742,7 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en f1snaps := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{ Enabled: true, - Produce: false, + ProduceE2: false, NoDownloader: true, }, info1.Dir(), info1.From, logger) @@ -752,7 +752,7 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en f2snaps := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{ Enabled: true, - Produce: false, + ProduceE2: false, NoDownloader: true, }, info2.Dir(), info2.From, logger) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index eef9bef0010..29e774296eb 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -686,6 +686,10 @@ var ( Name: ethconfig.FlagSnapStop, Usage: "Workaround to stop producing new snapshots, if you meet some snapshots-related critical bug. It will stop move historical data from DB to new immutable snapshots. DB will grow and may slightly slow-down - and removing this flag in future will not fix this effect (db size will not greatly reduce).", } + SnapStateStopFlag = cli.BoolFlag{ + Name: ethconfig.FlagSnapStateStop, + Usage: "Workaround to stop producing new state files, if you meet some state-related critical bug. It will stop aggregate DB history in a state files. DB will grow and may slightly slow-down - and removing this flag in future will not fix this effect (db size will not greatly reduce).", + } TorrentVerbosityFlag = cli.IntFlag{ Name: "torrent.verbosity", Value: 2, @@ -1748,7 +1752,8 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C cfg.Dirs = nodeConfig.Dirs cfg.Snapshot.KeepBlocks = ctx.Bool(SnapKeepBlocksFlag.Name) - cfg.Snapshot.Produce = !ctx.Bool(SnapStopFlag.Name) + cfg.Snapshot.ProduceE2 = !ctx.Bool(SnapStopFlag.Name) + cfg.Snapshot.ProduceE3 = !ctx.Bool(SnapStateStopFlag.Name) cfg.Snapshot.NoDownloader = ctx.Bool(NoDownloaderFlag.Name) cfg.Snapshot.Verify = ctx.Bool(DownloaderVerifyFlag.Name) cfg.Snapshot.DownloaderAddr = strings.TrimSpace(ctx.String(DownloaderAddrFlag.Name)) diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index 5020798a74d..ec5d1c8c40a 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -95,6 +95,8 @@ type Aggregator struct { logger log.Logger ctxAutoIncrement atomic.Uint64 + + produce bool } type OnFreezeFunc func(frozenFileNames []string) @@ -125,6 +127,8 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 mergeWorkers: 1, commitmentValuesTransform: AggregatorSqueezeCommitmentValues, + + produce: true, } cfg := domainCfg{ hist: histCfg{ @@ -1582,10 +1586,20 @@ func (a *Aggregator) SetSnapshotBuildSema(semaphore *semaphore.Weighted) { a.snapshotBuildSema = semaphore } +// SetProduceMod allows setting produce to false in order to stop making state files (default value is true) +func (a *Aggregator) SetProduceMod(produce bool) { + a.produce = produce +} + // Returns channel which is closed when aggregation is done func (a *Aggregator) BuildFilesInBackground(txNum uint64) chan struct{} { fin := make(chan struct{}) + if !a.produce { + close(fin) + return fin + } + if (txNum + 1) <= a.visibleFilesMinimaxTxNum.Load()+a.aggregationStep { close(fin) return fin diff --git a/eth/backend.go b/eth/backend.go index 0209c43b2c5..86c3a2f62b1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -351,6 +351,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger if err != nil { return nil, err } + backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter backend.chainDB, err = temporal.New(backend.chainDB, agg) @@ -1448,6 +1449,8 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf return nil, nil, nil, nil, nil, err } + agg.SetProduceMod(snConfig.Snapshot.ProduceE3) + g := &errgroup.Group{} g.Go(func() error { allSnapshots.OptimisticalyReopenFolder() diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 4550d65aaa6..bbcf7e777b5 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -103,7 +103,8 @@ var Defaults = Config{ Snapshot: BlocksFreezing{ Enabled: true, KeepBlocks: false, - Produce: true, + ProduceE2: true, + ProduceE3: true, }, } @@ -136,7 +137,8 @@ func init() { type BlocksFreezing struct { Enabled bool KeepBlocks bool // produce new snapshots of blocks but don't remove blocks from DB - Produce bool // produce new snapshots + ProduceE2 bool // produce new block files + ProduceE3 bool // produce new state files NoDownloader bool // possible to use snapshots without calling Downloader Verify bool // verify snapshots on startup DownloaderAddr string @@ -150,7 +152,7 @@ func (s BlocksFreezing) String() string { if s.KeepBlocks { out = append(out, "--"+FlagSnapKeepBlocks+"=true") } - if !s.Produce { + if !s.ProduceE2 { out = append(out, "--"+FlagSnapStop+"=true") } return strings.Join(out, " ") @@ -159,10 +161,11 @@ func (s BlocksFreezing) String() string { var ( FlagSnapKeepBlocks = "snap.keepblocks" FlagSnapStop = "snap.stop" + FlagSnapStateStop = "snap.state.stop" ) -func NewSnapCfg(enabled, keepBlocks, produce bool) BlocksFreezing { - return BlocksFreezing{Enabled: enabled, KeepBlocks: keepBlocks, Produce: produce} +func NewSnapCfg(enabled, keepBlocks, produceE2, produceE3 bool) BlocksFreezing { + return BlocksFreezing{Enabled: enabled, KeepBlocks: keepBlocks, ProduceE2: produceE2, ProduceE3: produceE3} } // Config contains configuration options for ETH protocol. diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 8ebeb88b11a..99c62e54305 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -16,11 +16,12 @@ import ( "github.com/c2h5oh/datasize" "github.com/erigontech/mdbx-go/mdbx" - metrics2 "github.com/ledgerwatch/erigon-lib/common/metrics" - "github.com/ledgerwatch/erigon-lib/config3" "github.com/ledgerwatch/log/v3" "golang.org/x/sync/errgroup" + metrics2 "github.com/ledgerwatch/erigon-lib/common/metrics" + "github.com/ledgerwatch/erigon-lib/config3" + "github.com/ledgerwatch/erigon-lib/chain" "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/cmp" @@ -106,7 +107,7 @@ state changes (updates) and can "atomically commit all changes to underlying lay Layers from top to bottom: - IntraBlockState - used to exec txs. It does store inside all updates of given txn. -Can understan if txn failed or OutOfGas - then revert all changes. +Can understand if txn failed or OutOfGas - then revert all changes. Each parallel-worker hav own IntraBlockState. IntraBlockState does commit changes to lower-abstraction-level by method `ibs.MakeWriteSet()` @@ -114,7 +115,7 @@ IntraBlockState does commit changes to lower-abstraction-level by method `ibs.Ma This writer does accumulate updates and then send them to conflict-resolution. Until conflict-resolution succeed - none of execution updates must pass to lower-abstraction-level. Object TxTask it's just set of small buffers (readset + writeset) for each transaction. -Write to TxTask happends by code like `txTask.ReadLists = rw.stateReader.ReadSet()`. +Write to TxTask happens by code like `txTask.ReadLists = rw.stateReader.ReadSet()`. - TxTask - objects coming from parallel-workers to conflict-resolution goroutine (ApplyLoop and method ReadsValid). Flush of data to lower-level-of-abstraction is done by method `agg.ApplyState` (method agg.ApplyHistory exists @@ -136,7 +137,7 @@ rwloop does: - commit - open new RoTx - set new RoTx to all Workers - - start Workersстартует воркеры + - start Worker start workers When rwLoop has nothing to do - it does Prune, or flush of WAL to RwTx (agg.rotate+agg.Flush) */ @@ -155,7 +156,6 @@ func ExecV3(ctx context.Context, blockReader := cfg.blockReader agg, engine := cfg.agg, cfg.engine chainConfig, genesis := cfg.chainConfig, cfg.genesis - blocksFreezeCfg := cfg.blockReader.FreezingCfg() applyTx := txc.Tx useExternalTx := applyTx != nil @@ -307,10 +307,7 @@ func ExecV3(ctx context.Context, "from", blockNum, "to", maxBlockNum, "fromTxNum", doms.TxNum(), "offsetFromBlockBeginning", offsetFromBlockBeginning, "initialCycle", initialCycle, "useExternalTx", useExternalTx) } - if blocksFreezeCfg.Produce { - //log.Info(fmt.Sprintf("[snapshots] db has steps amount: %s", agg.StepsRangeInDBAsStr(applyTx))) - agg.BuildFilesInBackground(outputTxNum.Load()) - } + agg.BuildFilesInBackground(outputTxNum.Load()) var outputBlockNum = stages.SyncMetrics[stages.Execution] inputBlockNum := &atomic.Uint64{} @@ -924,9 +921,7 @@ Loop: } t2 = time.Since(tt) - if blocksFreezeCfg.Produce { - agg.BuildFilesInBackground(outputTxNum.Load()) - } + agg.BuildFilesInBackground(outputTxNum.Load()) applyTx, err = cfg.db.BeginRw(context.Background()) //nolint if err != nil { @@ -955,7 +950,7 @@ Loop: } } - if parallel && blocksFreezeCfg.Produce { // sequential exec - does aggregate right after commit + if parallel { // sequential exec - does aggregate right after commit agg.BuildFilesInBackground(outputTxNum.Load()) } select { @@ -994,9 +989,7 @@ Loop: } } - if blocksFreezeCfg.Produce { - agg.BuildFilesInBackground(outputTxNum.Load()) - } + agg.BuildFilesInBackground(outputTxNum.Load()) return nil } diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index 86b4950631e..4f2a2996658 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -116,7 +116,7 @@ func StageSnapshotsCfg(db kv.RwDB, freezingCfg := cfg.blockReader.FreezingCfg() - if freezingCfg.Enabled && freezingCfg.Produce { + if freezingCfg.Enabled && freezingCfg.ProduceE2 { u := cfg.snapshotUploader if maxSeedable := u.maxSeedableHeader(); u.cfg.syncConfig.FrozenBlockLimit > 0 && maxSeedable > u.cfg.syncConfig.FrozenBlockLimit { @@ -488,18 +488,19 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv. freezingCfg := cfg.blockReader.FreezingCfg() if freezingCfg.Enabled { - if freezingCfg.Produce { - //TODO: initialSync maybe save files progress here - if cfg.blockRetire.HasNewFrozenFiles() || cfg.agg.HasNewFrozenFiles() { - ac := cfg.agg.BeginFilesRo() - defer ac.Close() - aggFiles := ac.Files() - ac.Close() + if cfg.blockRetire.HasNewFrozenFiles() || cfg.agg.HasNewFrozenFiles() { + ac := cfg.agg.BeginFilesRo() + defer ac.Close() + aggFiles := ac.Files() + ac.Close() - if err := rawdb.WriteSnapshots(tx, cfg.blockReader.FrozenFiles(), aggFiles); err != nil { - return err - } + if err := rawdb.WriteSnapshots(tx, cfg.blockReader.FrozenFiles(), aggFiles); err != nil { + return err } + } + + if freezingCfg.ProduceE2 { + //TODO: initialSync maybe save files progress here var minBlockNumber uint64 @@ -540,7 +541,8 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv. return err }) - //cfg.agg.BuildFilesInBackground() + // cfg.agg.BuildFilesInBackground() + } pruneLimit := 100 @@ -673,7 +675,7 @@ func (u *snapshotUploader) init(ctx context.Context, logger log.Logger) { if u.files == nil { freezingCfg := u.cfg.blockReader.FreezingCfg() - if freezingCfg.Enabled && freezingCfg.Produce { + if freezingCfg.Enabled && freezingCfg.ProduceE2 { u.files = map[string]*uploadState{} u.start(ctx, logger) } diff --git a/p2p/sentry/simulator/sentry_simulator.go b/p2p/sentry/simulator/sentry_simulator.go index b003b0e469b..200be8980db 100644 --- a/p2p/sentry/simulator/sentry_simulator.go +++ b/p2p/sentry/simulator/sentry_simulator.go @@ -69,7 +69,7 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC knownSnapshots := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{ Enabled: true, - Produce: false, + ProduceE2: false, NoDownloader: true, }, "", 0, logger) @@ -84,7 +84,7 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC //s.knownSnapshots.ReopenList([]string{ent2.Name()}, false) activeSnapshots := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{ Enabled: true, - Produce: false, + ProduceE2: false, NoDownloader: true, }, torrentDir, 0, logger) diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index eb1047344e2..b0f9006cb48 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -383,7 +383,7 @@ func doIntegrity(cliCtx *cli.Context) error { chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() defer chainDB.Close() - cfg := ethconfig.NewSnapCfg(true, false, true) + cfg := ethconfig.NewSnapCfg(true, false, true, true) blockSnaps, borSnaps, caplinSnaps, blockRetire, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger) if err != nil { @@ -549,7 +549,7 @@ func doIndicesCommand(cliCtx *cli.Context) error { return err } - cfg := ethconfig.NewSnapCfg(true, false, true) + cfg := ethconfig.NewSnapCfg(true, false, true, true) chainConfig := fromdb.ChainConfig(chainDB) blockSnaps, borSnaps, caplinSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger) if err != nil { @@ -743,7 +743,7 @@ func doRetireCommand(cliCtx *cli.Context) error { db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() defer db.Close() - cfg := ethconfig.NewSnapCfg(true, false, true) + cfg := ethconfig.NewSnapCfg(true, false, true, true) blockSnaps, borSnaps, caplinSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, db, logger) if err != nil { return err diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index e634edf9fea..f808053c026 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -96,6 +96,7 @@ var DefaultFlags = []cli.Flag{ &utils.SnapKeepBlocksFlag, &utils.SnapStopFlag, + &utils.SnapStateStopFlag, &utils.DbPageSizeFlag, &utils.DbSizeLimitFlag, &utils.TorrentPortFlag, diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index f610ab038b0..b7f11ac62a1 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -696,10 +696,10 @@ func (s *RoSnapshots) buildMissedIndicesIfNeed(ctx context.Context, logPrefix st if s.IndicesMax() >= s.SegmentsMax() { return nil } - if !s.Cfg().Produce && s.IndicesMax() == 0 { + if !s.Cfg().ProduceE2 && s.IndicesMax() == 0 { return fmt.Errorf("please remove --snap.stop, erigon can't work without creating basic indices") } - if !s.Cfg().Produce { + if !s.Cfg().ProduceE2 { return nil } if !s.SegmentsReady() { diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 32af673e4f6..ad6abd1dcc4 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -489,6 +489,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK blockRetire := freezeblocks.NewBlockRetire(1, dirs, mock.BlockReader, blockWriter, mock.DB, mock.ChainConfig, mock.Notifications.Events, blockSnapBuildSema, logger) historyV3 := true + mock.agg.SetProduceMod(mock.BlockReader.FreezingCfg().ProduceE3) mock.Sync = stagedsync.New( cfg.Sync, stagedsync.DefaultStages(mock.Ctx,