From b11a83b15619487850cd2589263575d6ed8ba173 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 26 Dec 2024 16:39:55 +0700 Subject: [PATCH] agg: schema (#13232) - `extract domain compresscfg` from domain to schema - extract schema out of agg constructor --- erigon-lib/state/aggregator.go | 237 ++++++++++++++---------- erigon-lib/state/aggregator_test.go | 4 +- erigon-lib/state/domain.go | 25 +-- erigon-lib/state/domain_test.go | 91 ++++----- erigon-lib/state/history.go | 14 +- erigon-lib/state/history_test.go | 80 ++++---- erigon-lib/state/inverted_index.go | 15 +- erigon-lib/state/inverted_index_test.go | 34 ++-- erigon-lib/state/merge_test.go | 208 ++++++++++----------- migrations/reset_stage_txn_lookup.go | 1 + 10 files changed, 367 insertions(+), 342 deletions(-) diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index 495a49d0a01..ea739577034 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -100,106 +100,84 @@ type OnFreezeFunc func(frozenFileNames []string) const AggregatorSqueezeCommitmentValues = true const MaxNonFuriousDirtySpacePerTx = 64 * datasize.MB -func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint64, db kv.RoDB, logger log.Logger) (*Aggregator, error) { - tmpdir := dirs.Tmp - salt, err := getStateIndicesSalt(dirs.Snap) +func commitmentFileMustExist(dirs datadir.Dirs, fromStep, toStep uint64) bool { + fPath := filepath.Join(dirs.SnapDomain, fmt.Sprintf("v1-%s.%d-%d.kv", kv.CommitmentDomain, fromStep, toStep)) + exists, err := dir.FileExist(fPath) if err != nil { - return nil, err + panic(err) } + return exists +} - ctx, ctxCancel := context.WithCancel(ctx) - a := &Aggregator{ - ctx: ctx, - ctxCancel: ctxCancel, - onFreeze: func(frozenFileNames []string) {}, - dirs: dirs, - tmpdir: tmpdir, - aggregationStep: aggregationStep, - db: db, - leakDetector: dbg.NewLeakDetector("agg", dbg.SlowTx()), - ps: background.NewProgressSet(), - logger: logger, - collateAndBuildWorkers: 1, - mergeWorkers: 1, - - commitmentValuesTransform: AggregatorSqueezeCommitmentValues, - - produce: true, - } - commitmentFileMustExist := func(fromStep, toStep uint64) bool { - fPath := filepath.Join(dirs.SnapDomain, fmt.Sprintf("v1-%s.%d-%d.kv", kv.CommitmentDomain, fromStep, toStep)) - exists, err := dir.FileExist(fPath) - if err != nil { - panic(err) - } - return exists - } - - integrityCheck := func(name kv.Domain, fromStep, toStep uint64) bool { - // case1: `kill -9` during building new .kv - // - `accounts` domain may be at step X and `commitment` domain at step X-1 - // - not a problem because `commitment` domain still has step X in DB - // case2: `kill -9` during building new .kv and `rm -rf chaindata` - // - `accounts` domain may be at step X and `commitment` domain at step X-1 - // - problem! `commitment` domain doesn't have step X in DB - // solution: ignore step X files in both cases - switch name { - case kv.AccountsDomain, kv.StorageDomain, kv.CodeDomain: - if toStep-fromStep > 1 { // only recently built files - return true - } - return commitmentFileMustExist(fromStep, toStep) - default: +func domainIntegrityCheck(name kv.Domain, dirs datadir.Dirs, fromStep, toStep uint64) bool { + // case1: `kill -9` during building new .kv + // - `accounts` domain may be at step X and `commitment` domain at step X-1 + // - not a problem because `commitment` domain still has step X in DB + // case2: `kill -9` during building new .kv and `rm -rf chaindata` + // - `accounts` domain may be at step X and `commitment` domain at step X-1 + // - problem! `commitment` domain doesn't have step X in DB + // solution: ignore step X files in both cases + switch name { + case kv.AccountsDomain, kv.StorageDomain, kv.CodeDomain: + if toStep-fromStep > 1 { // only recently built files return true } + return commitmentFileMustExist(dirs, fromStep, toStep) + default: + return true } +} - cfg := domainCfg{ +var Schema = map[kv.Domain]domainCfg{ + kv.AccountsDomain: { name: kv.AccountsDomain, valuesTable: kv.TblAccountVals, - restrictSubsetFileDeletions: a.commitmentValuesTransform, - integrity: integrityCheck, - compression: seg.CompressNone, + indexList: withBTree | withExistence, + crossDomainIntegrity: domainIntegrityCheck, + compression: seg.CompressNone, + compressCfg: DomainCompressCfg, hist: histCfg{ valuesTable: kv.TblAccountHistoryVals, compression: seg.CompressNone, historyLargeValues: false, + filenameBase: kv.AccountsDomain.String(), //TODO: looks redundant - iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg, - aggregationStep: aggregationStep, keysTable: kv.TblAccountHistoryKeys, valuesTable: kv.TblAccountIdx}, + iiCfg: iiCfg{ + keysTable: kv.TblAccountHistoryKeys, valuesTable: kv.TblAccountIdx, + withExistence: false, compressorCfg: seg.DefaultCfg, + filenameBase: kv.AccountsDomain.String(), //TODO: looks redundant + }, }, - } - if a.d[kv.AccountsDomain], err = NewDomain(cfg, logger); err != nil { - return nil, err - } - cfg = domainCfg{ + }, + kv.StorageDomain: { name: kv.StorageDomain, valuesTable: kv.TblStorageVals, - restrictSubsetFileDeletions: a.commitmentValuesTransform, - integrity: integrityCheck, + indexList: withBTree | withExistence, compression: seg.CompressKeys, + compressCfg: DomainCompressCfg, hist: histCfg{ valuesTable: kv.TblStorageHistoryVals, compression: seg.CompressNone, historyLargeValues: false, + filenameBase: kv.StorageDomain.String(), - iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg, - aggregationStep: aggregationStep, keysTable: kv.TblStorageHistoryKeys, valuesTable: kv.TblStorageIdx}, + iiCfg: iiCfg{ + keysTable: kv.TblStorageHistoryKeys, valuesTable: kv.TblStorageIdx, + withExistence: false, compressorCfg: seg.DefaultCfg, + filenameBase: kv.StorageDomain.String(), + }, }, - } - if a.d[kv.StorageDomain], err = NewDomain(cfg, logger); err != nil { - return nil, err - } - cfg = domainCfg{ + }, + kv.CodeDomain: { name: kv.CodeDomain, valuesTable: kv.TblCodeVals, - restrictSubsetFileDeletions: a.commitmentValuesTransform, - integrity: integrityCheck, + indexList: withBTree | withExistence, compression: seg.CompressVals, // compress Code with keys doesn't show any profit. compress of values show 4x ratio on eth-mainnet and 2.5x ratio on bor-mainnet + compressCfg: DomainCompressCfg, largeValues: true, hist: histCfg{ @@ -207,21 +185,21 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 compression: seg.CompressKeys | seg.CompressVals, historyLargeValues: true, + filenameBase: kv.CodeDomain.String(), - iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg, - aggregationStep: aggregationStep, keysTable: kv.TblCodeHistoryKeys, valuesTable: kv.TblCodeIdx}, + iiCfg: iiCfg{ + withExistence: false, compressorCfg: seg.DefaultCfg, + keysTable: kv.TblCodeHistoryKeys, valuesTable: kv.TblCodeIdx, + filenameBase: kv.CodeDomain.String(), + }, }, - } - if a.d[kv.CodeDomain], err = NewDomain(cfg, logger); err != nil { - return nil, err - } - cfg = domainCfg{ + }, + kv.CommitmentDomain: { name: kv.CommitmentDomain, valuesTable: kv.TblCommitmentVals, - restrictSubsetFileDeletions: a.commitmentValuesTransform, - replaceKeysInValues: a.commitmentValuesTransform, - integrity: integrityCheck, - compression: seg.CompressKeys, + indexList: withBTree | withExistence, + compression: seg.CompressKeys, + compressCfg: DomainCompressCfg, hist: histCfg{ valuesTable: kv.TblCommitmentHistoryVals, @@ -229,42 +207,90 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 snapshotsDisabled: true, historyLargeValues: false, + filenameBase: kv.CommitmentDomain.String(), - iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg, - aggregationStep: aggregationStep, keysTable: kv.TblCommitmentHistoryKeys, valuesTable: kv.TblCommitmentIdx}, + iiCfg: iiCfg{ + keysTable: kv.TblCommitmentHistoryKeys, valuesTable: kv.TblCommitmentIdx, + withExistence: false, compressorCfg: seg.DefaultCfg, + filenameBase: kv.CommitmentDomain.String(), + }, }, - } - if a.d[kv.CommitmentDomain], err = NewDomain(cfg, logger); err != nil { - return nil, err - } - cfg = domainCfg{ + }, + kv.ReceiptDomain: { name: kv.ReceiptDomain, valuesTable: kv.TblReceiptVals, + + indexList: withBTree | withExistence, compression: seg.CompressNone, //seg.CompressKeys | seg.CompressVals, - integrity: integrityCheck, + compressCfg: DomainCompressCfg, hist: histCfg{ valuesTable: kv.TblReceiptHistoryVals, compression: seg.CompressNone, historyLargeValues: false, + filenameBase: kv.ReceiptDomain.String(), - iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg, - aggregationStep: aggregationStep, keysTable: kv.TblReceiptHistoryKeys, valuesTable: kv.TblReceiptIdx}, + iiCfg: iiCfg{ + keysTable: kv.TblReceiptHistoryKeys, valuesTable: kv.TblReceiptIdx, + withExistence: false, compressorCfg: seg.DefaultCfg, + filenameBase: kv.ReceiptDomain.String(), + }, }, + }, +} + +func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint64, db kv.RoDB, logger log.Logger) (*Aggregator, error) { + tmpdir := dirs.Tmp + salt, err := getStateIndicesSalt(dirs.Snap) + if err != nil { + return nil, err } - if a.d[kv.ReceiptDomain], err = NewDomain(cfg, logger); err != nil { + + ctx, ctxCancel := context.WithCancel(ctx) + a := &Aggregator{ + ctx: ctx, + ctxCancel: ctxCancel, + onFreeze: func(frozenFileNames []string) {}, + dirs: dirs, + tmpdir: tmpdir, + aggregationStep: aggregationStep, + db: db, + leakDetector: dbg.NewLeakDetector("agg", dbg.SlowTx()), + ps: background.NewProgressSet(), + logger: logger, + collateAndBuildWorkers: 1, + mergeWorkers: 1, + + commitmentValuesTransform: AggregatorSqueezeCommitmentValues, + + produce: true, + } + + if err := a.registerDomain(kv.AccountsDomain, salt, dirs, aggregationStep, logger); err != nil { return nil, err } - if err := a.registerII(kv.LogAddrIdxPos, salt, dirs, db, aggregationStep, kv.FileLogAddressIdx, kv.TblLogAddressKeys, kv.TblLogAddressIdx, logger); err != nil { + if err := a.registerDomain(kv.StorageDomain, salt, dirs, aggregationStep, logger); err != nil { return nil, err } - if err := a.registerII(kv.LogTopicIdxPos, salt, dirs, db, aggregationStep, kv.FileLogTopicsIdx, kv.TblLogTopicsKeys, kv.TblLogTopicsIdx, logger); err != nil { + if err := a.registerDomain(kv.CodeDomain, salt, dirs, aggregationStep, logger); err != nil { return nil, err } - if err := a.registerII(kv.TracesFromIdxPos, salt, dirs, db, aggregationStep, kv.FileTracesFromIdx, kv.TblTracesFromKeys, kv.TblTracesFromIdx, logger); err != nil { + if err := a.registerDomain(kv.CommitmentDomain, salt, dirs, aggregationStep, logger); err != nil { return nil, err } - if err := a.registerII(kv.TracesToIdxPos, salt, dirs, db, aggregationStep, kv.FileTracesToIdx, kv.TblTracesToKeys, kv.TblTracesToIdx, logger); err != nil { + if err := a.registerDomain(kv.ReceiptDomain, salt, dirs, aggregationStep, logger); err != nil { + return nil, err + } + if err := a.registerII(kv.LogAddrIdxPos, salt, dirs, aggregationStep, kv.FileLogAddressIdx, kv.TblLogAddressKeys, kv.TblLogAddressIdx, logger); err != nil { + return nil, err + } + if err := a.registerII(kv.LogTopicIdxPos, salt, dirs, aggregationStep, kv.FileLogTopicsIdx, kv.TblLogTopicsKeys, kv.TblLogTopicsIdx, logger); err != nil { + return nil, err + } + if err := a.registerII(kv.TracesFromIdxPos, salt, dirs, aggregationStep, kv.FileTracesFromIdx, kv.TblTracesFromKeys, kv.TblTracesFromIdx, logger); err != nil { + return nil, err + } + if err := a.registerII(kv.TracesToIdxPos, salt, dirs, aggregationStep, kv.FileTracesToIdx, kv.TblTracesToKeys, kv.TblTracesToIdx, logger); err != nil { return nil, err } a.KeepRecentTxnsOfHistoriesWithDisabledSnapshots(100_000) // ~1k blocks of history @@ -317,9 +343,26 @@ func getStateIndicesSalt(baseDir string) (salt *uint32, err error) { return salt, nil } -func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadir.Dirs, db kv.RoDB, aggregationStep uint64, filenameBase, indexKeysTable, indexTable string, logger log.Logger) error { +func (a *Aggregator) registerDomain(name kv.Domain, salt *uint32, dirs datadir.Dirs, aggregationStep uint64, logger log.Logger) (err error) { + cfg := Schema[name] + //TODO: move dynamic part of config to InvertedIndex + cfg.restrictSubsetFileDeletions = a.commitmentValuesTransform + if name == kv.CommitmentDomain { + cfg.replaceKeysInValues = a.commitmentValuesTransform + } + cfg.hist.iiCfg.salt = salt + cfg.hist.iiCfg.dirs = dirs + cfg.hist.iiCfg.aggregationStep = aggregationStep + a.d[name], err = NewDomain(cfg, logger) + if err != nil { + return err + } + return nil +} + +func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadir.Dirs, aggregationStep uint64, filenameBase, indexKeysTable, indexTable string, logger log.Logger) error { idxCfg := iiCfg{ - salt: salt, dirs: dirs, db: db, + salt: salt, dirs: dirs, aggregationStep: aggregationStep, filenameBase: filenameBase, keysTable: indexKeysTable, diff --git a/erigon-lib/state/aggregator_test.go b/erigon-lib/state/aggregator_test.go index a5d89ad3d57..f70f655d650 100644 --- a/erigon-lib/state/aggregator_test.go +++ b/erigon-lib/state/aggregator_test.go @@ -1088,9 +1088,7 @@ func testDbAndAggregatorv3(tb testing.TB, aggStep uint64) (kv.RwDB, *Aggregator) tb.Helper() require, logger := require.New(tb), log.New() dirs := datadir.New(tb.TempDir()) - db := mdbx.New(kv.ChainDB, logger).InMem(dirs.Chaindata).GrowthStep(32 * datasize.MB).MapSize(2 * datasize.GB).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.ChaindataTablesCfg - }).MustOpen() + db := mdbx.New(kv.ChainDB, logger).InMem(dirs.Chaindata).GrowthStep(32 * datasize.MB).MapSize(2 * datasize.GB).MustOpen() tb.Cleanup(db.Close) agg, err := NewAggregator(context.Background(), dirs, aggStep, db, logger) diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index d34e39be8c9..25e28680a8f 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -101,7 +101,7 @@ type domainCfg struct { valuesTable string // bucket to store domain values; key -> inverted_step + values (Dupsort) largeValues bool - integrity rangeDomainIntegrityChecker + crossDomainIntegrity rangeDomainIntegrityChecker // replaceKeysInValues allows to replace commitment branch values with shorter keys. // for commitment domain only @@ -129,12 +129,11 @@ var DomainCompressCfg = seg.Cfg{ func NewDomain(cfg domainCfg, logger log.Logger) (*Domain, error) { if cfg.hist.iiCfg.dirs.SnapDomain == "" { - panic("empty `dirs` variable") + panic("assert: empty `dirs`") } - if cfg.indexList == 0 { - cfg.indexList = withBTree | withExistence + if cfg.hist.filenameBase == "" { + panic("assert: emtpy `filenameBase`" + cfg.name.String()) } - cfg.compressCfg = DomainCompressCfg d := &Domain{ domainCfg: cfg, @@ -143,10 +142,6 @@ func NewDomain(cfg domainCfg, logger log.Logger) (*Domain, error) { } var err error - if cfg.hist.filenameBase == "" { - cfg.hist.filenameBase = cfg.name.String() - cfg.hist.iiCfg.filenameBase = cfg.hist.filenameBase - } if d.History, err = NewHistory(cfg.hist, logger); err != nil { return nil, err } @@ -298,10 +293,18 @@ func (d *Domain) closeFilesAfterStep(lowerBound uint64) { } func (d *Domain) scanDirtyFiles(fileNames []string) (garbageFiles []*filesItem) { - for _, dirtyFile := range scanDirtyFiles(fileNames, d.aggregationStep, d.filenameBase, "kv", d.logger) { + if d.filenameBase == "" { + panic("assert: empty `filenameBase`") + } + if d.aggregationStep == 0 { + panic("assert: empty `aggregationStep`") + } + + l := scanDirtyFiles(fileNames, d.aggregationStep, d.filenameBase, "kv", d.logger) + for _, dirtyFile := range l { startStep, endStep := dirtyFile.startTxNum/d.aggregationStep, dirtyFile.endTxNum/d.aggregationStep domainName, _ := kv.String2Domain(d.filenameBase) - if d.integrity != nil && !d.integrity(domainName, startStep, endStep) { + if d.crossDomainIntegrity != nil && !d.crossDomainIntegrity(domainName, d.dirs, startStep, endStep) { d.logger.Debug("[agg] skip garbage file", "name", d.filenameBase, "startStep", startStep, "endStep", endStep) continue } diff --git a/erigon-lib/state/domain_test.go b/erigon-lib/state/domain_test.go index ae5c32c7bc3..18451c4bc6b 100644 --- a/erigon-lib/state/domain_test.go +++ b/erigon-lib/state/domain_test.go @@ -34,10 +34,6 @@ import ( "testing" "time" - "github.com/holiman/uint256" - "github.com/stretchr/testify/require" - btree2 "github.com/tidwall/btree" - "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/background" datadir2 "github.com/erigontech/erigon-lib/common/datadir" @@ -51,6 +47,8 @@ import ( "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/seg" "github.com/erigontech/erigon-lib/types" + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" ) type rndGen struct { @@ -75,35 +73,16 @@ func testDbAndDomain(t *testing.T, logger log.Logger) (kv.RwDB, *Domain) { func testDbAndDomainOfStep(t *testing.T, aggStep uint64, logger log.Logger) (kv.RwDB, *Domain) { t.Helper() dirs := datadir2.New(t.TempDir()) - keysTable := "Keys" - valsTable := "Vals" - historyKeysTable := "HistoryKeys" - historyValsTable := "HistoryVals" - settingsTable := "Settings" //nolint - indexTable := "Index" - db := mdbx.New(kv.ChainDB, logger).InMem(dirs.Chaindata).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - tcfg := kv.TableCfg{ - keysTable: kv.TableCfgItem{Flags: kv.DupSort}, - valsTable: kv.TableCfgItem{Flags: kv.DupSort}, - historyKeysTable: kv.TableCfgItem{Flags: kv.DupSort}, - historyValsTable: kv.TableCfgItem{Flags: kv.DupSort}, - settingsTable: kv.TableCfgItem{}, - indexTable: kv.TableCfgItem{Flags: kv.DupSort}, - kv.TblPruningProgress: kv.TableCfgItem{}, - } - return tcfg - }).MustOpen() + cfg := Schema[kv.AccountsDomain] + cfg.crossDomainIntegrity = nil //no other domains + + db := mdbx.New(kv.ChainDB, logger).InMem(dirs.Chaindata).MustOpen() t.Cleanup(db.Close) salt := uint32(1) - cfg := domainCfg{ - name: kv.AccountsDomain, valuesTable: valsTable, - hist: histCfg{ - valuesTable: historyValsTable, - compression: seg.CompressNone, historyLargeValues: true, - - iiCfg: iiCfg{salt: &salt, dirs: dirs, db: db, withExistence: false, - aggregationStep: aggStep, keysTable: historyKeysTable, valuesTable: indexTable}, - }} + + cfg.hist.iiCfg.aggregationStep = aggStep + cfg.hist.iiCfg.dirs = dirs + cfg.hist.iiCfg.salt = &salt d, err := NewDomain(cfg, logger) require.NoError(t, err) d.DisableFsync() @@ -532,9 +511,9 @@ func collateAndMerge(t *testing.T, db kv.RwDB, tx kv.RwTx, d *Domain, txs uint64 valuesOuts, indexOuts, historyOuts := dc.staticFilesInRange(r) valuesIn, indexIn, historyIn, err := dc.mergeFiles(ctx, valuesOuts, indexOuts, historyOuts, r, nil, background.NewProgressSet()) require.NoError(t, err) - if valuesIn != nil && valuesIn.decompressor != nil { - fmt.Printf("merge: %s\n", valuesIn.decompressor.FileName()) - } + //if valuesIn != nil && valuesIn.decompressor != nil { + //fmt.Printf("merge: %s\n", valuesIn.decompressor.FileName()) + //} d.integrateMergedDirtyFiles(valuesOuts, indexOuts, historyOuts, valuesIn, indexIn, historyIn) d.reCalcVisibleFiles(d.dirtyFilesEndTxNumMinimax()) return false @@ -1044,28 +1023,38 @@ func TestDomain_OpenFilesWithDeletions(t *testing.T) { dom.Close() } +func emptyTestDomain(aggStep uint64) *Domain { + cfg := Schema[kv.AccountsDomain] + cfg.crossDomainIntegrity = nil + + salt := uint32(1) + cfg.hist.iiCfg.salt = &salt + cfg.hist.iiCfg.dirs = datadir2.New(os.TempDir()) + cfg.hist.iiCfg.aggregationStep = aggStep + + d, err := NewDomain(cfg, log.New()) + if err != nil { + panic(err) + } + return d +} + func TestScanStaticFilesD(t *testing.T) { t.Parallel() - ii := &Domain{ - History: &History{ - histCfg: histCfg{ - filenameBase: "test", - }, - InvertedIndex: emptyTestInvertedIndex(1)}, - dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess), - } + d := emptyTestDomain(1) + files := []string{ - "v1-test.0-1.kv", - "v1-test.1-2.kv", - "v1-test.0-4.kv", - "v1-test.2-3.kv", - "v1-test.3-4.kv", - "v1-test.4-5.kv", - } - ii.scanDirtyFiles(files) + "v1-accounts.0-1.kv", + "v1-accounts.1-2.kv", + "v1-accounts.0-4.kv", + "v1-accounts.2-3.kv", + "v1-accounts.3-4.kv", + "v1-accounts.4-5.kv", + } + d.scanDirtyFiles(files) var found []string - ii.dirtyFiles.Walk(func(items []*filesItem) bool { + d.dirtyFiles.Walk(func(items []*filesItem) bool { for _, item := range items { found = append(found, fmt.Sprintf("%d-%d", item.startTxNum, item.endTxNum)) } diff --git a/erigon-lib/state/history.go b/erigon-lib/state/history.go index c0c4be4d4bb..c23d11f85fb 100644 --- a/erigon-lib/state/history.go +++ b/erigon-lib/state/history.go @@ -28,6 +28,7 @@ import ( "sync" "time" + "github.com/erigontech/erigon-lib/common/datadir" btree2 "github.com/tidwall/btree" "golang.org/x/sync/errgroup" @@ -69,7 +70,7 @@ type History struct { _visibleFiles []visibleFile } -type rangeDomainIntegrityChecker func(d kv.Domain, fromStep, toStep uint64) bool +type rangeDomainIntegrityChecker func(d kv.Domain, dirs datadir.Dirs, fromStep, toStep uint64) bool type rangeIntegrityChecker func(fromStep, toStep uint64) bool type histCfg struct { @@ -136,8 +137,11 @@ func NewHistory(cfg histCfg, logger log.Logger) (*History, error) { return &h, nil } +func (h *History) vFileName(fromStep, toStep uint64) string { + return fmt.Sprintf("v1-%s.%d-%d.v", h.filenameBase, fromStep, toStep) +} func (h *History) vFilePath(fromStep, toStep uint64) string { - return filepath.Join(h.dirs.SnapHistory, fmt.Sprintf("v1-%s.%d-%d.v", h.filenameBase, fromStep, toStep)) + return filepath.Join(h.dirs.SnapHistory, h.vFileName(fromStep, toStep)) } func (h *History) vAccessorFilePath(fromStep, toStep uint64) string { return filepath.Join(h.dirs.SnapAccessors, fmt.Sprintf("v1-%s.%d-%d.vi", h.filenameBase, fromStep, toStep)) @@ -169,6 +173,12 @@ func (h *History) openFolder() error { } func (h *History) scanDirtyFiles(fileNames []string) { + if h.filenameBase == "" { + panic("assert: empty `filenameBase`") + } + if h.aggregationStep == 0 { + panic("assert: empty `aggregationStep`") + } for _, dirtyFile := range scanDirtyFiles(fileNames, h.aggregationStep, h.filenameBase, "v", h.logger) { startStep, endStep := dirtyFile.startTxNum/h.aggregationStep, dirtyFile.endTxNum/h.aggregationStep if h.integrity != nil && !h.integrity(startStep, endStep) { diff --git a/erigon-lib/state/history_test.go b/erigon-lib/state/history_test.go index d8462ca7e14..24272c8e01e 100644 --- a/erigon-lib/state/history_test.go +++ b/erigon-lib/state/history_test.go @@ -28,9 +28,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - btree2 "github.com/tidwall/btree" - "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/background" "github.com/erigontech/erigon-lib/common/datadir" @@ -45,45 +42,28 @@ import ( "github.com/erigontech/erigon-lib/recsplit" "github.com/erigontech/erigon-lib/recsplit/eliasfano32" "github.com/erigontech/erigon-lib/seg" + "github.com/stretchr/testify/require" ) func testDbAndHistory(tb testing.TB, largeValues bool, logger log.Logger) (kv.RwDB, *History) { tb.Helper() dirs := datadir.New(tb.TempDir()) - keysTable := "AccountKeys" - indexTable := "AccountIndex" - valsTable := "AccountVals" - settingsTable := "Settings" - db := mdbx.New(kv.ChainDB, logger).InMem(dirs.SnapDomain).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - if largeValues { - return kv.TableCfg{ - keysTable: kv.TableCfgItem{Flags: kv.DupSort}, - indexTable: kv.TableCfgItem{Flags: kv.DupSort}, - valsTable: kv.TableCfgItem{Flags: kv.DupSort}, - settingsTable: kv.TableCfgItem{}, - kv.TblPruningProgress: kv.TableCfgItem{}, - } - } - return kv.TableCfg{ - keysTable: kv.TableCfgItem{Flags: kv.DupSort}, - indexTable: kv.TableCfgItem{Flags: kv.DupSort}, - valsTable: kv.TableCfgItem{Flags: kv.DupSort}, - settingsTable: kv.TableCfgItem{}, - kv.TblPruningProgress: kv.TableCfgItem{}, - } - }).MustOpen() + db := mdbx.New(kv.ChainDB, logger).InMem(dirs.Chaindata).MustOpen() //TODO: tests will fail if set histCfg.compression = CompressKeys | CompressValues salt := uint32(1) - cfg := histCfg{ - filenameBase: "hist", - valuesTable: valsTable, - - iiCfg: iiCfg{salt: &salt, dirs: dirs, db: db, withExistence: false, - aggregationStep: 16, filenameBase: "hist", keysTable: keysTable, valuesTable: indexTable, - }, - compression: seg.CompressNone, historyLargeValues: largeValues, - } - h, err := NewHistory(cfg, logger) + cfg := Schema[kv.AccountsDomain] + + cfg.hist.iiCfg.aggregationStep = 16 + cfg.hist.iiCfg.dirs = dirs + cfg.hist.iiCfg.salt = &salt + + cfg.hist.historyLargeValues = largeValues + + //perf of tests + cfg.hist.iiCfg.withExistence = false + cfg.hist.iiCfg.compression = seg.CompressNone + cfg.hist.compression = seg.CompressNone + h, err := NewHistory(cfg.hist, logger) require.NoError(tb, err) h.DisableFsync() tb.Cleanup(db.Close) @@ -235,7 +215,8 @@ func TestHistoryCollationBuild(t *testing.T) { c, err := h.collate(ctx, 0, 0, 8, tx) require.NoError(err) - require.True(strings.HasSuffix(c.historyPath, "v1-hist.0-1.v")) + + require.True(strings.HasSuffix(c.historyPath, h.vFileName(0, 1))) require.Equal(6, c.historyCount) require.Equal(3, c.efHistoryComp.Count()/2) @@ -1387,20 +1368,23 @@ func TestIterateChanged2(t *testing.T) { func TestScanStaticFilesH(t *testing.T) { t.Parallel() - h := &History{ - histCfg: histCfg{ - filenameBase: "test", - }, - InvertedIndex: emptyTestInvertedIndex(1), - dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess), + newTestDomain := func() (*InvertedIndex, *History) { + d := emptyTestDomain(1) + d.History.InvertedIndex.integrity = nil + d.History.InvertedIndex.indexList = 0 + d.History.indexList = 0 + return d.History.InvertedIndex, d.History } + + _, h := newTestDomain() + files := []string{ - "v1-test.0-1.v", - "v1-test.1-2.v", - "v1-test.0-4.v", - "v1-test.2-3.v", - "v1-test.3-4.v", - "v1-test.4-5.v", + "v1-accounts.0-1.v", + "v1-accounts.1-2.v", + "v1-accounts.0-4.v", + "v1-accounts.2-3.v", + "v1-accounts.3-4.v", + "v1-accounts.4-5.v", } h.scanDirtyFiles(files) require.Equal(t, 6, h.dirtyFiles.Len()) diff --git a/erigon-lib/state/inverted_index.go b/erigon-lib/state/inverted_index.go index 0c805f06e9b..e5ce42f88b9 100644 --- a/erigon-lib/state/inverted_index.go +++ b/erigon-lib/state/inverted_index.go @@ -77,7 +77,6 @@ type InvertedIndex struct { type iiCfg struct { salt *uint32 dirs datadir.Dirs - db kv.RoDB // global db pointer. mostly for background warmup. filenameBase string // filename base for all files of this inverted index aggregationStep uint64 // amount of transactions inside single aggregation step @@ -101,7 +100,13 @@ type iiVisible struct { func NewInvertedIndex(cfg iiCfg, logger log.Logger) (*InvertedIndex, error) { if cfg.dirs.SnapDomain == "" { - panic("empty `dirs` varialbe") + panic("assert: empty `dirs`") + } + if cfg.filenameBase == "" { + panic("assert: empty `filenameBase`") + } + if cfg.aggregationStep == 0 { + panic("assert: empty `aggregationStep`") } //if cfg.compressorCfg.MaxDictPatterns == 0 && cfg.compressorCfg.MaxPatternLen == 0 { cfg.compressorCfg = seg.DefaultCfg @@ -177,6 +182,12 @@ func (ii *InvertedIndex) openFolder() error { } func (ii *InvertedIndex) scanDirtyFiles(fileNames []string) { + if ii.filenameBase == "" { + panic("assert: empty `filenameBase`") + } + if ii.aggregationStep == 0 { + panic("assert: empty `aggregationStep`") + } for _, dirtyFile := range scanDirtyFiles(fileNames, ii.aggregationStep, ii.filenameBase, "ef", ii.logger) { startStep, endStep := dirtyFile.startTxNum/ii.aggregationStep, dirtyFile.endTxNum/ii.aggregationStep if ii.integrity != nil && !ii.integrity(startStep, endStep) { diff --git a/erigon-lib/state/inverted_index_test.go b/erigon-lib/state/inverted_index_test.go index e9e19640887..e173e4f7df4 100644 --- a/erigon-lib/state/inverted_index_test.go +++ b/erigon-lib/state/inverted_index_test.go @@ -56,7 +56,7 @@ func testDbAndInvertedIndex(tb testing.TB, aggStep uint64, logger log.Logger) (k }).MustOpen() tb.Cleanup(db.Close) salt := uint32(1) - cfg := iiCfg{salt: &salt, dirs: dirs, db: db, aggregationStep: aggStep, filenameBase: "inv", keysTable: keysTable, valuesTable: indexTable} + cfg := iiCfg{salt: &salt, dirs: dirs, aggregationStep: aggStep, filenameBase: "inv", keysTable: keysTable, valuesTable: indexTable} ii, err := NewInvertedIndex(cfg, logger) require.NoError(tb, err) ii.DisableFsync() @@ -636,12 +636,12 @@ func TestScanStaticFiles(t *testing.T) { ii := emptyTestInvertedIndex(1) files := []string{ - "v1-test.0-1.ef", - "v1-test.1-2.ef", - "v1-test.0-4.ef", - "v1-test.2-3.ef", - "v1-test.3-4.ef", - "v1-test.4-5.ef", + "v1-accounts.0-1.ef", + "v1-accounts.1-2.ef", + "v1-accounts.0-4.ef", + "v1-accounts.2-3.ef", + "v1-accounts.3-4.ef", + "v1-accounts.4-5.ef", } ii.scanDirtyFiles(files) require.Equal(t, 6, ii.dirtyFiles.Len()) @@ -656,16 +656,16 @@ func TestScanStaticFiles(t *testing.T) { func TestCtxFiles(t *testing.T) { ii := emptyTestInvertedIndex(1) files := []string{ - "v1-test.0-1.ef", // overlap with same `endTxNum=4` - "v1-test.1-2.ef", - "v1-test.0-4.ef", - "v1-test.2-3.ef", - "v1-test.3-4.ef", - "v1-test.4-5.ef", // no overlap - "v1-test.480-484.ef", // overlap with same `startTxNum=480` - "v1-test.480-488.ef", - "v1-test.480-496.ef", - "v1-test.480-512.ef", + "v1-accounts.0-1.ef", // overlap with same `endTxNum=4` + "v1-accounts.1-2.ef", + "v1-accounts.0-4.ef", + "v1-accounts.2-3.ef", + "v1-accounts.3-4.ef", + "v1-accounts.4-5.ef", // no overlap + "v1-accounts.480-484.ef", // overlap with same `startTxNum=480` + "v1-accounts.480-488.ef", + "v1-accounts.480-496.ef", + "v1-accounts.480-512.ef", } ii.scanDirtyFiles(files) require.Equal(t, 10, ii.dirtyFiles.Len()) diff --git a/erigon-lib/state/merge_test.go b/erigon-lib/state/merge_test.go index 3838440dec1..794514f5f2e 100644 --- a/erigon-lib/state/merge_test.go +++ b/erigon-lib/state/merge_test.go @@ -23,20 +23,23 @@ import ( "testing" "github.com/erigontech/erigon-lib/common/datadir" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - btree2 "github.com/tidwall/btree" + "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/seg" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/erigontech/erigon-lib/recsplit/eliasfano32" ) func emptyTestInvertedIndex(aggStep uint64) *InvertedIndex { salt := uint32(1) - cfg := iiCfg{salt: &salt, dirs: datadir.New(os.TempDir()), db: nil, aggregationStep: aggStep, filenameBase: "test"} + cfg := Schema[kv.AccountsDomain].hist.iiCfg + + cfg.salt = &salt + cfg.dirs = datadir.New(os.TempDir()) + cfg.aggregationStep = aggStep ii, err := NewInvertedIndex(cfg, log.New()) ii.indexList = 0 @@ -45,15 +48,23 @@ func emptyTestInvertedIndex(aggStep uint64) *InvertedIndex { } return ii } + func TestFindMergeRangeCornerCases(t *testing.T) { t.Parallel() + newTestDomain := func() (*InvertedIndex, *History) { + d := emptyTestDomain(1) + d.History.InvertedIndex.integrity = nil + d.History.InvertedIndex.indexList = 0 + d.History.indexList = 0 + return d.History.InvertedIndex, d.History + } t.Run("ii: > 2 unmerged files", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, _ := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-2.ef", - "v1-test.2-3.ef", - "v1-test.3-4.ef", + "v1-accounts.0-2.ef", + "v1-accounts.2-3.ef", + "v1-accounts.3-4.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -74,12 +85,12 @@ func TestFindMergeRangeCornerCases(t *testing.T) { assert.Equal(t, 3, len(idxF)) }) t.Run("hist: > 2 unmerged files", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, h := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-1.ef", - "v1-test.1-2.ef", - "v1-test.2-3.ef", - "v1-test.3-4.ef", + "v1-accounts.0-1.ef", + "v1-accounts.1-2.ef", + "v1-accounts.2-3.ef", + "v1-accounts.3-4.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -88,14 +99,11 @@ func TestFindMergeRangeCornerCases(t *testing.T) { }) ii.reCalcVisibleFiles(ii.dirtyFilesEndTxNumMinimax()) - h := &History{ - histCfg: histCfg{filenameBase: "test"}, - InvertedIndex: ii, dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess)} h.scanDirtyFiles([]string{ - "v1-test.0-1.v", - "v1-test.1-2.v", - "v1-test.2-3.v", - "v1-test.3-4.v", + "v1-accounts.0-1.v", + "v1-accounts.1-2.v", + "v1-accounts.2-3.v", + "v1-accounts.3-4.v", }) h.dirtyFiles.Scan(func(item *filesItem) bool { fName := h.vFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -112,12 +120,12 @@ func TestFindMergeRangeCornerCases(t *testing.T) { assert.Equal(t, 2, int(r.index.to)) }) t.Run("not equal amount of files", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, h := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-1.ef", - "v1-test.1-2.ef", - "v1-test.2-3.ef", - "v1-test.3-4.ef", + "v1-accounts.0-1.ef", + "v1-accounts.1-2.ef", + "v1-accounts.2-3.ef", + "v1-accounts.3-4.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -126,12 +134,9 @@ func TestFindMergeRangeCornerCases(t *testing.T) { }) ii.reCalcVisibleFiles(ii.dirtyFilesEndTxNumMinimax()) - h := &History{ - histCfg: histCfg{filenameBase: "test"}, - InvertedIndex: ii, dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess)} h.scanDirtyFiles([]string{ - "v1-test.0-1.v", - "v1-test.1-2.v", + "v1-accounts.0-1.v", + "v1-accounts.1-2.v", }) h.dirtyFiles.Scan(func(item *filesItem) bool { fName := h.vFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -151,11 +156,11 @@ func TestFindMergeRangeCornerCases(t *testing.T) { assert.Equal(t, 2, int(r.index.to)) }) t.Run("idx merged, history not yet", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, h := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-2.ef", - "v1-test.2-3.ef", - "v1-test.3-4.ef", + "v1-accounts.0-2.ef", + "v1-accounts.2-3.ef", + "v1-accounts.3-4.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -164,12 +169,9 @@ func TestFindMergeRangeCornerCases(t *testing.T) { }) ii.reCalcVisibleFiles(ii.dirtyFilesEndTxNumMinimax()) - h := &History{ - histCfg: histCfg{filenameBase: "test"}, - InvertedIndex: ii, dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess)} h.scanDirtyFiles([]string{ - "v1-test.0-1.v", - "v1-test.1-2.v", + "v1-accounts.0-1.v", + "v1-accounts.1-2.v", }) h.dirtyFiles.Scan(func(item *filesItem) bool { fName := h.vFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -188,13 +190,13 @@ func TestFindMergeRangeCornerCases(t *testing.T) { assert.Equal(t, 2, int(r.history.to)) }) t.Run("idx merged, history not yet, 2", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, h := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-1.ef", - "v1-test.1-2.ef", - "v1-test.2-3.ef", - "v1-test.3-4.ef", - "v1-test.0-4.ef", + "v1-accounts.0-1.ef", + "v1-accounts.1-2.ef", + "v1-accounts.2-3.ef", + "v1-accounts.3-4.ef", + "v1-accounts.0-4.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -203,14 +205,11 @@ func TestFindMergeRangeCornerCases(t *testing.T) { }) ii.reCalcVisibleFiles(ii.dirtyFilesEndTxNumMinimax()) - h := &History{ - histCfg: histCfg{filenameBase: "test"}, - InvertedIndex: ii, dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess)} h.scanDirtyFiles([]string{ - "v1-test.0-1.v", - "v1-test.1-2.v", - "v1-test.2-3.v", - "v1-test.3-4.v", + "v1-accounts.0-1.v", + "v1-accounts.1-2.v", + "v1-accounts.2-3.v", + "v1-accounts.3-4.v", }) h.dirtyFiles.Scan(func(item *filesItem) bool { fName := h.vFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -232,9 +231,9 @@ func TestFindMergeRangeCornerCases(t *testing.T) { require.Equal(t, 2, len(histFiles)) }) t.Run("idx merged and small files lost", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, h := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-4.ef", + "v1-accounts.0-4.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -243,14 +242,11 @@ func TestFindMergeRangeCornerCases(t *testing.T) { }) ii.reCalcVisibleFiles(ii.dirtyFilesEndTxNumMinimax()) - h := &History{ - histCfg: histCfg{filenameBase: "test"}, - InvertedIndex: ii, dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess)} h.scanDirtyFiles([]string{ - "v1-test.0-1.v", - "v1-test.1-2.v", - "v1-test.2-3.v", - "v1-test.3-4.v", + "v1-accounts.0-1.v", + "v1-accounts.1-2.v", + "v1-accounts.2-3.v", + "v1-accounts.3-4.v", }) h.dirtyFiles.Scan(func(item *filesItem) bool { fName := h.vFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -271,10 +267,10 @@ func TestFindMergeRangeCornerCases(t *testing.T) { }) t.Run("history merged, but index not and history garbage left", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, h := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-1.ef", - "v1-test.1-2.ef", + "v1-accounts.0-1.ef", + "v1-accounts.1-2.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -284,13 +280,10 @@ func TestFindMergeRangeCornerCases(t *testing.T) { ii.reCalcVisibleFiles(ii.dirtyFilesEndTxNumMinimax()) // `kill -9` may leave small garbage files, but if big one already exists we assume it's good(fsynced) and no reason to merge again - h := &History{ - histCfg: histCfg{filenameBase: "test"}, - InvertedIndex: ii, dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess)} h.scanDirtyFiles([]string{ - "v1-test.0-1.v", - "v1-test.1-2.v", - "v1-test.0-2.v", + "v1-accounts.0-1.v", + "v1-accounts.1-2.v", + "v1-accounts.0-2.v", }) h.dirtyFiles.Scan(func(item *filesItem) bool { fName := h.vFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -312,13 +305,13 @@ func TestFindMergeRangeCornerCases(t *testing.T) { require.Equal(t, 0, len(histFiles)) }) t.Run("history merge progress ahead of idx", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, h := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-1.ef", - "v1-test.1-2.ef", - "v1-test.0-2.ef", - "v1-test.2-3.ef", - "v1-test.3-4.ef", + "v1-accounts.0-1.ef", + "v1-accounts.1-2.ef", + "v1-accounts.0-2.ef", + "v1-accounts.2-3.ef", + "v1-accounts.3-4.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -327,15 +320,12 @@ func TestFindMergeRangeCornerCases(t *testing.T) { }) ii.reCalcVisibleFiles(ii.dirtyFilesEndTxNumMinimax()) - h := &History{ - histCfg: histCfg{filenameBase: "test"}, - InvertedIndex: ii, dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess)} h.scanDirtyFiles([]string{ - "v1-test.0-1.v", - "v1-test.1-2.v", - "v1-test.0-2.v", - "v1-test.2-3.v", - "v1-test.3-4.v", + "v1-accounts.0-1.v", + "v1-accounts.1-2.v", + "v1-accounts.0-2.v", + "v1-accounts.2-3.v", + "v1-accounts.3-4.v", }) h.dirtyFiles.Scan(func(item *filesItem) bool { fName := h.vFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -357,12 +347,12 @@ func TestFindMergeRangeCornerCases(t *testing.T) { require.Equal(t, 3, len(histFiles)) }) t.Run("idx merge progress ahead of history", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, h := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-1.ef", - "v1-test.1-2.ef", - "v1-test.0-2.ef", - "v1-test.2-3.ef", + "v1-accounts.0-1.ef", + "v1-accounts.1-2.ef", + "v1-accounts.0-2.ef", + "v1-accounts.2-3.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -371,13 +361,10 @@ func TestFindMergeRangeCornerCases(t *testing.T) { }) ii.reCalcVisibleFiles(ii.dirtyFilesEndTxNumMinimax()) - h := &History{ - histCfg: histCfg{filenameBase: "test"}, - InvertedIndex: ii, dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess)} h.scanDirtyFiles([]string{ - "v1-test.0-1.v", - "v1-test.1-2.v", - "v1-test.2-3.v", + "v1-accounts.0-1.v", + "v1-accounts.1-2.v", + "v1-accounts.2-3.v", }) h.dirtyFiles.Scan(func(item *filesItem) bool { fName := h.vFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -399,11 +386,11 @@ func TestFindMergeRangeCornerCases(t *testing.T) { require.Equal(t, 2, len(histFiles)) }) t.Run("idx merged, but garbage left", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, h := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-1.ef", - "v1-test.1-2.ef", - "v1-test.0-2.ef", + "v1-accounts.0-1.ef", + "v1-accounts.1-2.ef", + "v1-accounts.0-2.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -412,12 +399,11 @@ func TestFindMergeRangeCornerCases(t *testing.T) { }) ii.reCalcVisibleFiles(ii.dirtyFilesEndTxNumMinimax()) - h := &History{InvertedIndex: ii, dirtyFiles: btree2.NewBTreeG[*filesItem](filesItemLess)} h.scanDirtyFiles([]string{ - "v1-test.0-1.v", - "v1-test.1-2.v", - "v1-test.0-2.v", - "v1-test.2-3.v", + "v1-accounts.0-1.v", + "v1-accounts.1-2.v", + "v1-accounts.0-2.v", + "v1-accounts.2-3.v", }) h.dirtyFiles.Scan(func(item *filesItem) bool { fName := h.vFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) @@ -433,13 +419,13 @@ func TestFindMergeRangeCornerCases(t *testing.T) { assert.False(t, r.history.needMerge) }) t.Run("idx merged, but garbage left2", func(t *testing.T) { - ii := emptyTestInvertedIndex(1) + ii, _ := newTestDomain() ii.scanDirtyFiles([]string{ - "v1-test.0-1.ef", - "v1-test.1-2.ef", - "v1-test.0-2.ef", - "v1-test.2-3.ef", - "v1-test.3-4.ef", + "v1-accounts.0-1.ef", + "v1-accounts.1-2.ef", + "v1-accounts.0-2.ef", + "v1-accounts.2-3.ef", + "v1-accounts.3-4.ef", }) ii.dirtyFiles.Scan(func(item *filesItem) bool { fName := ii.efFilePath(item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep) diff --git a/migrations/reset_stage_txn_lookup.go b/migrations/reset_stage_txn_lookup.go index d4c9e7cb8b2..bccd9e2db50 100644 --- a/migrations/reset_stage_txn_lookup.go +++ b/migrations/reset_stage_txn_lookup.go @@ -18,6 +18,7 @@ package migrations import ( "context" + "github.com/erigontech/erigon-lib/common/datadir" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/log/v3"