Skip to content

Commit

Permalink
[wip] Spliting snap.stop logic to e2 and e3 parts: snap.stop/snap.sta…
Browse files Browse the repository at this point in the history
…te.stop (corresponds to ProduceE2 and ProduceE3) (#10622)

split Produce flag to ProduceE2 and ProduceE3 flag. Also replaced a
logic in freezing files in snapshot. Added produce flag and it's setter
to an aggregator. Closes #10318

---------

Co-authored-by: Ilya Miheev <ilya.miheev@lamoda.ru>
  • Loading branch information
JkLondon and Ilya Miheev committed Jun 13, 2024
1 parent 3c0ddf0 commit cff37c6
Show file tree
Hide file tree
Showing 14 changed files with 73 additions and 49 deletions.
2 changes: 1 addition & 1 deletion cmd/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func extractBodies(datadir string) error {
snaps := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{
Enabled: true,
KeepBlocks: true,
Produce: false,
ProduceE2: false,
}, filepath.Join(datadir, "snapshots"), 0, log.New())
snaps.ReopenFolder()

Expand Down
4 changes: 3 additions & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1772,7 +1772,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
dirs := datadir.New(datadirCli)

//useSnapshots = true
snapCfg := ethconfig.NewSnapCfg(useSnapshots, true, true)
snapCfg := ethconfig.NewSnapCfg(useSnapshots, true, true, true)

_allSnapshotsSingleton = freezeblocks.NewRoSnapshots(snapCfg, dirs.Snap, 0, logger)
_allBorSnapshotsSingleton = freezeblocks.NewBorRoSnapshots(snapCfg, dirs.Snap, 0, logger)
Expand All @@ -1782,6 +1782,8 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
panic(err)
}

_aggSingleton.SetProduceMod(snapCfg.ProduceE3)

if useSnapshots {
g := &errgroup.Group{}
g.Go(func() error {
Expand Down
8 changes: 4 additions & 4 deletions cmd/snapshots/cmp/cmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (c comparitor) compareHeaders(ctx context.Context, f1ents []fs.DirEntry, f2

f1snaps := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{
Enabled: true,
Produce: false,
ProduceE2: false,
NoDownloader: true,
}, info1.Dir(), info1.From, logger)

Expand All @@ -471,7 +471,7 @@ func (c comparitor) compareHeaders(ctx context.Context, f1ents []fs.DirEntry, f2

f2snaps := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{
Enabled: true,
Produce: false,
ProduceE2: false,
NoDownloader: true,
}, info2.Dir(), info2.From, logger)

Expand Down Expand Up @@ -742,7 +742,7 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en

f1snaps := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{
Enabled: true,
Produce: false,
ProduceE2: false,
NoDownloader: true,
}, info1.Dir(), info1.From, logger)

Expand All @@ -752,7 +752,7 @@ func (c comparitor) compareBodies(ctx context.Context, f1ents []*BodyEntry, f2en

f2snaps := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{
Enabled: true,
Produce: false,
ProduceE2: false,
NoDownloader: true,
}, info2.Dir(), info2.From, logger)

Expand Down
7 changes: 6 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,10 @@ var (
Name: ethconfig.FlagSnapStop,
Usage: "Workaround to stop producing new snapshots, if you meet some snapshots-related critical bug. It will stop move historical data from DB to new immutable snapshots. DB will grow and may slightly slow-down - and removing this flag in future will not fix this effect (db size will not greatly reduce).",
}
SnapStateStopFlag = cli.BoolFlag{
Name: ethconfig.FlagSnapStateStop,
Usage: "Workaround to stop producing new state files, if you meet some state-related critical bug. It will stop aggregate DB history in a state files. DB will grow and may slightly slow-down - and removing this flag in future will not fix this effect (db size will not greatly reduce).",
}
TorrentVerbosityFlag = cli.IntFlag{
Name: "torrent.verbosity",
Value: 2,
Expand Down Expand Up @@ -1748,7 +1752,8 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C

cfg.Dirs = nodeConfig.Dirs
cfg.Snapshot.KeepBlocks = ctx.Bool(SnapKeepBlocksFlag.Name)
cfg.Snapshot.Produce = !ctx.Bool(SnapStopFlag.Name)
cfg.Snapshot.ProduceE2 = !ctx.Bool(SnapStopFlag.Name)
cfg.Snapshot.ProduceE3 = !ctx.Bool(SnapStateStopFlag.Name)
cfg.Snapshot.NoDownloader = ctx.Bool(NoDownloaderFlag.Name)
cfg.Snapshot.Verify = ctx.Bool(DownloaderVerifyFlag.Name)
cfg.Snapshot.DownloaderAddr = strings.TrimSpace(ctx.String(DownloaderAddrFlag.Name))
Expand Down
14 changes: 14 additions & 0 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type Aggregator struct {
logger log.Logger

ctxAutoIncrement atomic.Uint64

produce bool
}

type OnFreezeFunc func(frozenFileNames []string)
Expand Down Expand Up @@ -125,6 +127,8 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
mergeWorkers: 1,

commitmentValuesTransform: AggregatorSqueezeCommitmentValues,

produce: true,
}
cfg := domainCfg{
hist: histCfg{
Expand Down Expand Up @@ -1582,10 +1586,20 @@ func (a *Aggregator) SetSnapshotBuildSema(semaphore *semaphore.Weighted) {
a.snapshotBuildSema = semaphore
}

// SetProduceMod allows setting produce to false in order to stop making state files (default value is true)
func (a *Aggregator) SetProduceMod(produce bool) {
a.produce = produce
}

// Returns channel which is closed when aggregation is done
func (a *Aggregator) BuildFilesInBackground(txNum uint64) chan struct{} {
fin := make(chan struct{})

if !a.produce {
close(fin)
return fin
}

if (txNum + 1) <= a.visibleFilesMinimaxTxNum.Load()+a.aggregationStep {
close(fin)
return fin
Expand Down
3 changes: 3 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
if err != nil {
return nil, err
}

backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter

backend.chainDB, err = temporal.New(backend.chainDB, agg)
Expand Down Expand Up @@ -1448,6 +1449,8 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf
return nil, nil, nil, nil, nil, err
}

agg.SetProduceMod(snConfig.Snapshot.ProduceE3)

g := &errgroup.Group{}
g.Go(func() error {
allSnapshots.OptimisticalyReopenFolder()
Expand Down
13 changes: 8 additions & 5 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ var Defaults = Config{
Snapshot: BlocksFreezing{
Enabled: true,
KeepBlocks: false,
Produce: true,
ProduceE2: true,
ProduceE3: true,
},
}

Expand Down Expand Up @@ -136,7 +137,8 @@ func init() {
type BlocksFreezing struct {
Enabled bool
KeepBlocks bool // produce new snapshots of blocks but don't remove blocks from DB
Produce bool // produce new snapshots
ProduceE2 bool // produce new block files
ProduceE3 bool // produce new state files
NoDownloader bool // possible to use snapshots without calling Downloader
Verify bool // verify snapshots on startup
DownloaderAddr string
Expand All @@ -150,7 +152,7 @@ func (s BlocksFreezing) String() string {
if s.KeepBlocks {
out = append(out, "--"+FlagSnapKeepBlocks+"=true")
}
if !s.Produce {
if !s.ProduceE2 {
out = append(out, "--"+FlagSnapStop+"=true")
}
return strings.Join(out, " ")
Expand All @@ -159,10 +161,11 @@ func (s BlocksFreezing) String() string {
var (
FlagSnapKeepBlocks = "snap.keepblocks"
FlagSnapStop = "snap.stop"
FlagSnapStateStop = "snap.state.stop"
)

func NewSnapCfg(enabled, keepBlocks, produce bool) BlocksFreezing {
return BlocksFreezing{Enabled: enabled, KeepBlocks: keepBlocks, Produce: produce}
func NewSnapCfg(enabled, keepBlocks, produceE2, produceE3 bool) BlocksFreezing {
return BlocksFreezing{Enabled: enabled, KeepBlocks: keepBlocks, ProduceE2: produceE2, ProduceE3: produceE3}
}

// Config contains configuration options for ETH protocol.
Expand Down
27 changes: 10 additions & 17 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (

"github.com/c2h5oh/datasize"
"github.com/erigontech/mdbx-go/mdbx"
metrics2 "github.com/ledgerwatch/erigon-lib/common/metrics"
"github.com/ledgerwatch/erigon-lib/config3"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/errgroup"

metrics2 "github.com/ledgerwatch/erigon-lib/common/metrics"
"github.com/ledgerwatch/erigon-lib/config3"

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp"
Expand Down Expand Up @@ -106,15 +107,15 @@ state changes (updates) and can "atomically commit all changes to underlying lay
Layers from top to bottom:
- IntraBlockState - used to exec txs. It does store inside all updates of given txn.
Can understan if txn failed or OutOfGas - then revert all changes.
Can understand if txn failed or OutOfGas - then revert all changes.
Each parallel-worker hav own IntraBlockState.
IntraBlockState does commit changes to lower-abstraction-level by method `ibs.MakeWriteSet()`
- StateWriterBufferedV3 - txs which executed by parallel workers can conflict with each-other.
This writer does accumulate updates and then send them to conflict-resolution.
Until conflict-resolution succeed - none of execution updates must pass to lower-abstraction-level.
Object TxTask it's just set of small buffers (readset + writeset) for each transaction.
Write to TxTask happends by code like `txTask.ReadLists = rw.stateReader.ReadSet()`.
Write to TxTask happens by code like `txTask.ReadLists = rw.stateReader.ReadSet()`.
- TxTask - objects coming from parallel-workers to conflict-resolution goroutine (ApplyLoop and method ReadsValid).
Flush of data to lower-level-of-abstraction is done by method `agg.ApplyState` (method agg.ApplyHistory exists
Expand All @@ -136,7 +137,7 @@ rwloop does:
- commit
- open new RoTx
- set new RoTx to all Workers
- start Workersстартует воркеры
- start Worker start workers
When rwLoop has nothing to do - it does Prune, or flush of WAL to RwTx (agg.rotate+agg.Flush)
*/
Expand All @@ -155,7 +156,6 @@ func ExecV3(ctx context.Context,
blockReader := cfg.blockReader
agg, engine := cfg.agg, cfg.engine
chainConfig, genesis := cfg.chainConfig, cfg.genesis
blocksFreezeCfg := cfg.blockReader.FreezingCfg()

applyTx := txc.Tx
useExternalTx := applyTx != nil
Expand Down Expand Up @@ -307,10 +307,7 @@ func ExecV3(ctx context.Context,
"from", blockNum, "to", maxBlockNum, "fromTxNum", doms.TxNum(), "offsetFromBlockBeginning", offsetFromBlockBeginning, "initialCycle", initialCycle, "useExternalTx", useExternalTx)
}

if blocksFreezeCfg.Produce {
//log.Info(fmt.Sprintf("[snapshots] db has steps amount: %s", agg.StepsRangeInDBAsStr(applyTx)))
agg.BuildFilesInBackground(outputTxNum.Load())
}
agg.BuildFilesInBackground(outputTxNum.Load())

var outputBlockNum = stages.SyncMetrics[stages.Execution]
inputBlockNum := &atomic.Uint64{}
Expand Down Expand Up @@ -924,9 +921,7 @@ Loop:
}

t2 = time.Since(tt)
if blocksFreezeCfg.Produce {
agg.BuildFilesInBackground(outputTxNum.Load())
}
agg.BuildFilesInBackground(outputTxNum.Load())

applyTx, err = cfg.db.BeginRw(context.Background()) //nolint
if err != nil {
Expand Down Expand Up @@ -955,7 +950,7 @@ Loop:
}
}

if parallel && blocksFreezeCfg.Produce { // sequential exec - does aggregate right after commit
if parallel { // sequential exec - does aggregate right after commit
agg.BuildFilesInBackground(outputTxNum.Load())
}
select {
Expand Down Expand Up @@ -994,9 +989,7 @@ Loop:
}
}

if blocksFreezeCfg.Produce {
agg.BuildFilesInBackground(outputTxNum.Load())
}
agg.BuildFilesInBackground(outputTxNum.Load())

return nil
}
Expand Down
28 changes: 15 additions & 13 deletions eth/stagedsync/stage_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func StageSnapshotsCfg(db kv.RwDB,

freezingCfg := cfg.blockReader.FreezingCfg()

if freezingCfg.Enabled && freezingCfg.Produce {
if freezingCfg.Enabled && freezingCfg.ProduceE2 {
u := cfg.snapshotUploader

if maxSeedable := u.maxSeedableHeader(); u.cfg.syncConfig.FrozenBlockLimit > 0 && maxSeedable > u.cfg.syncConfig.FrozenBlockLimit {
Expand Down Expand Up @@ -488,18 +488,19 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv.

freezingCfg := cfg.blockReader.FreezingCfg()
if freezingCfg.Enabled {
if freezingCfg.Produce {
//TODO: initialSync maybe save files progress here
if cfg.blockRetire.HasNewFrozenFiles() || cfg.agg.HasNewFrozenFiles() {
ac := cfg.agg.BeginFilesRo()
defer ac.Close()
aggFiles := ac.Files()
ac.Close()
if cfg.blockRetire.HasNewFrozenFiles() || cfg.agg.HasNewFrozenFiles() {
ac := cfg.agg.BeginFilesRo()
defer ac.Close()
aggFiles := ac.Files()
ac.Close()

if err := rawdb.WriteSnapshots(tx, cfg.blockReader.FrozenFiles(), aggFiles); err != nil {
return err
}
if err := rawdb.WriteSnapshots(tx, cfg.blockReader.FrozenFiles(), aggFiles); err != nil {
return err
}
}

if freezingCfg.ProduceE2 {
//TODO: initialSync maybe save files progress here

var minBlockNumber uint64

Expand Down Expand Up @@ -540,7 +541,8 @@ func SnapshotsPrune(s *PruneState, cfg SnapshotsCfg, ctx context.Context, tx kv.
return err
})

//cfg.agg.BuildFilesInBackground()
// cfg.agg.BuildFilesInBackground()

}

pruneLimit := 100
Expand Down Expand Up @@ -673,7 +675,7 @@ func (u *snapshotUploader) init(ctx context.Context, logger log.Logger) {
if u.files == nil {
freezingCfg := u.cfg.blockReader.FreezingCfg()

if freezingCfg.Enabled && freezingCfg.Produce {
if freezingCfg.Enabled && freezingCfg.ProduceE2 {
u.files = map[string]*uploadState{}
u.start(ctx, logger)
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/sentry/simulator/sentry_simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC

knownSnapshots := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{
Enabled: true,
Produce: false,
ProduceE2: false,
NoDownloader: true,
}, "", 0, logger)

Expand All @@ -84,7 +84,7 @@ func NewSentry(ctx context.Context, chain string, snapshotLocation string, peerC
//s.knownSnapshots.ReopenList([]string{ent2.Name()}, false)
activeSnapshots := freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{
Enabled: true,
Produce: false,
ProduceE2: false,
NoDownloader: true,
}, torrentDir, 0, logger)

Expand Down
6 changes: 3 additions & 3 deletions turbo/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func doIntegrity(cliCtx *cli.Context) error {
chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
defer chainDB.Close()

cfg := ethconfig.NewSnapCfg(true, false, true)
cfg := ethconfig.NewSnapCfg(true, false, true, true)

blockSnaps, borSnaps, caplinSnaps, blockRetire, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger)
if err != nil {
Expand Down Expand Up @@ -549,7 +549,7 @@ func doIndicesCommand(cliCtx *cli.Context) error {
return err
}

cfg := ethconfig.NewSnapCfg(true, false, true)
cfg := ethconfig.NewSnapCfg(true, false, true, true)
chainConfig := fromdb.ChainConfig(chainDB)
blockSnaps, borSnaps, caplinSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, chainDB, logger)
if err != nil {
Expand Down Expand Up @@ -743,7 +743,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
defer db.Close()

cfg := ethconfig.NewSnapCfg(true, false, true)
cfg := ethconfig.NewSnapCfg(true, false, true, true)
blockSnaps, borSnaps, caplinSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, db, logger)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var DefaultFlags = []cli.Flag{

&utils.SnapKeepBlocksFlag,
&utils.SnapStopFlag,
&utils.SnapStateStopFlag,
&utils.DbPageSizeFlag,
&utils.DbSizeLimitFlag,
&utils.TorrentPortFlag,
Expand Down
4 changes: 2 additions & 2 deletions turbo/snapshotsync/freezeblocks/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,10 +696,10 @@ func (s *RoSnapshots) buildMissedIndicesIfNeed(ctx context.Context, logPrefix st
if s.IndicesMax() >= s.SegmentsMax() {
return nil
}
if !s.Cfg().Produce && s.IndicesMax() == 0 {
if !s.Cfg().ProduceE2 && s.IndicesMax() == 0 {
return fmt.Errorf("please remove --snap.stop, erigon can't work without creating basic indices")
}
if !s.Cfg().Produce {
if !s.Cfg().ProduceE2 {
return nil
}
if !s.SegmentsReady() {
Expand Down
Loading

0 comments on commit cff37c6

Please sign in to comment.