Skip to content

Commit

Permalink
agg: schema (#13232)
Browse files Browse the repository at this point in the history
- `extract domain compresscfg` from domain to schema
- extract schema out of agg constructor
  • Loading branch information
AskAlexSharov authored Dec 26, 2024
1 parent 2b3ae9a commit b11a83b
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 342 deletions.
237 changes: 140 additions & 97 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,171 +100,197 @@ type OnFreezeFunc func(frozenFileNames []string)
const AggregatorSqueezeCommitmentValues = true
const MaxNonFuriousDirtySpacePerTx = 64 * datasize.MB

func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint64, db kv.RoDB, logger log.Logger) (*Aggregator, error) {
tmpdir := dirs.Tmp
salt, err := getStateIndicesSalt(dirs.Snap)
func commitmentFileMustExist(dirs datadir.Dirs, fromStep, toStep uint64) bool {
fPath := filepath.Join(dirs.SnapDomain, fmt.Sprintf("v1-%s.%d-%d.kv", kv.CommitmentDomain, fromStep, toStep))
exists, err := dir.FileExist(fPath)
if err != nil {
return nil, err
panic(err)
}
return exists
}

ctx, ctxCancel := context.WithCancel(ctx)
a := &Aggregator{
ctx: ctx,
ctxCancel: ctxCancel,
onFreeze: func(frozenFileNames []string) {},
dirs: dirs,
tmpdir: tmpdir,
aggregationStep: aggregationStep,
db: db,
leakDetector: dbg.NewLeakDetector("agg", dbg.SlowTx()),
ps: background.NewProgressSet(),
logger: logger,
collateAndBuildWorkers: 1,
mergeWorkers: 1,

commitmentValuesTransform: AggregatorSqueezeCommitmentValues,

produce: true,
}
commitmentFileMustExist := func(fromStep, toStep uint64) bool {
fPath := filepath.Join(dirs.SnapDomain, fmt.Sprintf("v1-%s.%d-%d.kv", kv.CommitmentDomain, fromStep, toStep))
exists, err := dir.FileExist(fPath)
if err != nil {
panic(err)
}
return exists
}

integrityCheck := func(name kv.Domain, fromStep, toStep uint64) bool {
// case1: `kill -9` during building new .kv
// - `accounts` domain may be at step X and `commitment` domain at step X-1
// - not a problem because `commitment` domain still has step X in DB
// case2: `kill -9` during building new .kv and `rm -rf chaindata`
// - `accounts` domain may be at step X and `commitment` domain at step X-1
// - problem! `commitment` domain doesn't have step X in DB
// solution: ignore step X files in both cases
switch name {
case kv.AccountsDomain, kv.StorageDomain, kv.CodeDomain:
if toStep-fromStep > 1 { // only recently built files
return true
}
return commitmentFileMustExist(fromStep, toStep)
default:
func domainIntegrityCheck(name kv.Domain, dirs datadir.Dirs, fromStep, toStep uint64) bool {
// case1: `kill -9` during building new .kv
// - `accounts` domain may be at step X and `commitment` domain at step X-1
// - not a problem because `commitment` domain still has step X in DB
// case2: `kill -9` during building new .kv and `rm -rf chaindata`
// - `accounts` domain may be at step X and `commitment` domain at step X-1
// - problem! `commitment` domain doesn't have step X in DB
// solution: ignore step X files in both cases
switch name {
case kv.AccountsDomain, kv.StorageDomain, kv.CodeDomain:
if toStep-fromStep > 1 { // only recently built files
return true
}
return commitmentFileMustExist(dirs, fromStep, toStep)
default:
return true
}
}

cfg := domainCfg{
var Schema = map[kv.Domain]domainCfg{
kv.AccountsDomain: {
name: kv.AccountsDomain, valuesTable: kv.TblAccountVals,
restrictSubsetFileDeletions: a.commitmentValuesTransform,

integrity: integrityCheck,
compression: seg.CompressNone,
indexList: withBTree | withExistence,
crossDomainIntegrity: domainIntegrityCheck,
compression: seg.CompressNone,
compressCfg: DomainCompressCfg,

hist: histCfg{
valuesTable: kv.TblAccountHistoryVals,
compression: seg.CompressNone,

historyLargeValues: false,
filenameBase: kv.AccountsDomain.String(), //TODO: looks redundant

iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg,
aggregationStep: aggregationStep, keysTable: kv.TblAccountHistoryKeys, valuesTable: kv.TblAccountIdx},
iiCfg: iiCfg{
keysTable: kv.TblAccountHistoryKeys, valuesTable: kv.TblAccountIdx,
withExistence: false, compressorCfg: seg.DefaultCfg,
filenameBase: kv.AccountsDomain.String(), //TODO: looks redundant
},
},
}
if a.d[kv.AccountsDomain], err = NewDomain(cfg, logger); err != nil {
return nil, err
}
cfg = domainCfg{
},
kv.StorageDomain: {
name: kv.StorageDomain, valuesTable: kv.TblStorageVals,
restrictSubsetFileDeletions: a.commitmentValuesTransform,

integrity: integrityCheck,
indexList: withBTree | withExistence,
compression: seg.CompressKeys,
compressCfg: DomainCompressCfg,

hist: histCfg{
valuesTable: kv.TblStorageHistoryVals,
compression: seg.CompressNone,

historyLargeValues: false,
filenameBase: kv.StorageDomain.String(),

iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg,
aggregationStep: aggregationStep, keysTable: kv.TblStorageHistoryKeys, valuesTable: kv.TblStorageIdx},
iiCfg: iiCfg{
keysTable: kv.TblStorageHistoryKeys, valuesTable: kv.TblStorageIdx,
withExistence: false, compressorCfg: seg.DefaultCfg,
filenameBase: kv.StorageDomain.String(),
},
},
}
if a.d[kv.StorageDomain], err = NewDomain(cfg, logger); err != nil {
return nil, err
}
cfg = domainCfg{
},
kv.CodeDomain: {
name: kv.CodeDomain, valuesTable: kv.TblCodeVals,
restrictSubsetFileDeletions: a.commitmentValuesTransform,

integrity: integrityCheck,
indexList: withBTree | withExistence,
compression: seg.CompressVals, // compress Code with keys doesn't show any profit. compress of values show 4x ratio on eth-mainnet and 2.5x ratio on bor-mainnet
compressCfg: DomainCompressCfg,
largeValues: true,

hist: histCfg{
valuesTable: kv.TblCodeHistoryVals,
compression: seg.CompressKeys | seg.CompressVals,

historyLargeValues: true,
filenameBase: kv.CodeDomain.String(),

iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg,
aggregationStep: aggregationStep, keysTable: kv.TblCodeHistoryKeys, valuesTable: kv.TblCodeIdx},
iiCfg: iiCfg{
withExistence: false, compressorCfg: seg.DefaultCfg,
keysTable: kv.TblCodeHistoryKeys, valuesTable: kv.TblCodeIdx,
filenameBase: kv.CodeDomain.String(),
},
},
}
if a.d[kv.CodeDomain], err = NewDomain(cfg, logger); err != nil {
return nil, err
}
cfg = domainCfg{
},
kv.CommitmentDomain: {
name: kv.CommitmentDomain, valuesTable: kv.TblCommitmentVals,
restrictSubsetFileDeletions: a.commitmentValuesTransform,

replaceKeysInValues: a.commitmentValuesTransform,
integrity: integrityCheck,
compression: seg.CompressKeys,
indexList: withBTree | withExistence,
compression: seg.CompressKeys,
compressCfg: DomainCompressCfg,

hist: histCfg{
valuesTable: kv.TblCommitmentHistoryVals,
compression: seg.CompressNone,

snapshotsDisabled: true,
historyLargeValues: false,
filenameBase: kv.CommitmentDomain.String(),

iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg,
aggregationStep: aggregationStep, keysTable: kv.TblCommitmentHistoryKeys, valuesTable: kv.TblCommitmentIdx},
iiCfg: iiCfg{
keysTable: kv.TblCommitmentHistoryKeys, valuesTable: kv.TblCommitmentIdx,
withExistence: false, compressorCfg: seg.DefaultCfg,
filenameBase: kv.CommitmentDomain.String(),
},
},
}
if a.d[kv.CommitmentDomain], err = NewDomain(cfg, logger); err != nil {
return nil, err
}
cfg = domainCfg{
},
kv.ReceiptDomain: {
name: kv.ReceiptDomain, valuesTable: kv.TblReceiptVals,

indexList: withBTree | withExistence,
compression: seg.CompressNone, //seg.CompressKeys | seg.CompressVals,
integrity: integrityCheck,
compressCfg: DomainCompressCfg,

hist: histCfg{
valuesTable: kv.TblReceiptHistoryVals,
compression: seg.CompressNone,

historyLargeValues: false,
filenameBase: kv.ReceiptDomain.String(),

iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg,
aggregationStep: aggregationStep, keysTable: kv.TblReceiptHistoryKeys, valuesTable: kv.TblReceiptIdx},
iiCfg: iiCfg{
keysTable: kv.TblReceiptHistoryKeys, valuesTable: kv.TblReceiptIdx,
withExistence: false, compressorCfg: seg.DefaultCfg,
filenameBase: kv.ReceiptDomain.String(),
},
},
},
}

func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint64, db kv.RoDB, logger log.Logger) (*Aggregator, error) {
tmpdir := dirs.Tmp
salt, err := getStateIndicesSalt(dirs.Snap)
if err != nil {
return nil, err
}
if a.d[kv.ReceiptDomain], err = NewDomain(cfg, logger); err != nil {

ctx, ctxCancel := context.WithCancel(ctx)
a := &Aggregator{
ctx: ctx,
ctxCancel: ctxCancel,
onFreeze: func(frozenFileNames []string) {},
dirs: dirs,
tmpdir: tmpdir,
aggregationStep: aggregationStep,
db: db,
leakDetector: dbg.NewLeakDetector("agg", dbg.SlowTx()),
ps: background.NewProgressSet(),
logger: logger,
collateAndBuildWorkers: 1,
mergeWorkers: 1,

commitmentValuesTransform: AggregatorSqueezeCommitmentValues,

produce: true,
}

if err := a.registerDomain(kv.AccountsDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.LogAddrIdxPos, salt, dirs, db, aggregationStep, kv.FileLogAddressIdx, kv.TblLogAddressKeys, kv.TblLogAddressIdx, logger); err != nil {
if err := a.registerDomain(kv.StorageDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.LogTopicIdxPos, salt, dirs, db, aggregationStep, kv.FileLogTopicsIdx, kv.TblLogTopicsKeys, kv.TblLogTopicsIdx, logger); err != nil {
if err := a.registerDomain(kv.CodeDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.TracesFromIdxPos, salt, dirs, db, aggregationStep, kv.FileTracesFromIdx, kv.TblTracesFromKeys, kv.TblTracesFromIdx, logger); err != nil {
if err := a.registerDomain(kv.CommitmentDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.TracesToIdxPos, salt, dirs, db, aggregationStep, kv.FileTracesToIdx, kv.TblTracesToKeys, kv.TblTracesToIdx, logger); err != nil {
if err := a.registerDomain(kv.ReceiptDomain, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.LogAddrIdxPos, salt, dirs, aggregationStep, kv.FileLogAddressIdx, kv.TblLogAddressKeys, kv.TblLogAddressIdx, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.LogTopicIdxPos, salt, dirs, aggregationStep, kv.FileLogTopicsIdx, kv.TblLogTopicsKeys, kv.TblLogTopicsIdx, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.TracesFromIdxPos, salt, dirs, aggregationStep, kv.FileTracesFromIdx, kv.TblTracesFromKeys, kv.TblTracesFromIdx, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.TracesToIdxPos, salt, dirs, aggregationStep, kv.FileTracesToIdx, kv.TblTracesToKeys, kv.TblTracesToIdx, logger); err != nil {
return nil, err
}
a.KeepRecentTxnsOfHistoriesWithDisabledSnapshots(100_000) // ~1k blocks of history
Expand Down Expand Up @@ -317,9 +343,26 @@ func getStateIndicesSalt(baseDir string) (salt *uint32, err error) {
return salt, nil
}

func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadir.Dirs, db kv.RoDB, aggregationStep uint64, filenameBase, indexKeysTable, indexTable string, logger log.Logger) error {
func (a *Aggregator) registerDomain(name kv.Domain, salt *uint32, dirs datadir.Dirs, aggregationStep uint64, logger log.Logger) (err error) {
cfg := Schema[name]
//TODO: move dynamic part of config to InvertedIndex
cfg.restrictSubsetFileDeletions = a.commitmentValuesTransform
if name == kv.CommitmentDomain {
cfg.replaceKeysInValues = a.commitmentValuesTransform
}
cfg.hist.iiCfg.salt = salt
cfg.hist.iiCfg.dirs = dirs
cfg.hist.iiCfg.aggregationStep = aggregationStep
a.d[name], err = NewDomain(cfg, logger)
if err != nil {
return err
}
return nil
}

func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadir.Dirs, aggregationStep uint64, filenameBase, indexKeysTable, indexTable string, logger log.Logger) error {
idxCfg := iiCfg{
salt: salt, dirs: dirs, db: db,
salt: salt, dirs: dirs,
aggregationStep: aggregationStep,
filenameBase: filenameBase,
keysTable: indexKeysTable,
Expand Down
4 changes: 1 addition & 3 deletions erigon-lib/state/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,9 +1088,7 @@ func testDbAndAggregatorv3(tb testing.TB, aggStep uint64) (kv.RwDB, *Aggregator)
tb.Helper()
require, logger := require.New(tb), log.New()
dirs := datadir.New(tb.TempDir())
db := mdbx.New(kv.ChainDB, logger).InMem(dirs.Chaindata).GrowthStep(32 * datasize.MB).MapSize(2 * datasize.GB).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
return kv.ChaindataTablesCfg
}).MustOpen()
db := mdbx.New(kv.ChainDB, logger).InMem(dirs.Chaindata).GrowthStep(32 * datasize.MB).MapSize(2 * datasize.GB).MustOpen()
tb.Cleanup(db.Close)

agg, err := NewAggregator(context.Background(), dirs, aggStep, db, logger)
Expand Down
25 changes: 14 additions & 11 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type domainCfg struct {
valuesTable string // bucket to store domain values; key -> inverted_step + values (Dupsort)
largeValues bool

integrity rangeDomainIntegrityChecker
crossDomainIntegrity rangeDomainIntegrityChecker

// replaceKeysInValues allows to replace commitment branch values with shorter keys.
// for commitment domain only
Expand Down Expand Up @@ -129,12 +129,11 @@ var DomainCompressCfg = seg.Cfg{

func NewDomain(cfg domainCfg, logger log.Logger) (*Domain, error) {
if cfg.hist.iiCfg.dirs.SnapDomain == "" {
panic("empty `dirs` variable")
panic("assert: empty `dirs`")
}
if cfg.indexList == 0 {
cfg.indexList = withBTree | withExistence
if cfg.hist.filenameBase == "" {
panic("assert: emtpy `filenameBase`" + cfg.name.String())
}
cfg.compressCfg = DomainCompressCfg

d := &Domain{
domainCfg: cfg,
Expand All @@ -143,10 +142,6 @@ func NewDomain(cfg domainCfg, logger log.Logger) (*Domain, error) {
}

var err error
if cfg.hist.filenameBase == "" {
cfg.hist.filenameBase = cfg.name.String()
cfg.hist.iiCfg.filenameBase = cfg.hist.filenameBase
}
if d.History, err = NewHistory(cfg.hist, logger); err != nil {
return nil, err
}
Expand Down Expand Up @@ -298,10 +293,18 @@ func (d *Domain) closeFilesAfterStep(lowerBound uint64) {
}

func (d *Domain) scanDirtyFiles(fileNames []string) (garbageFiles []*filesItem) {
for _, dirtyFile := range scanDirtyFiles(fileNames, d.aggregationStep, d.filenameBase, "kv", d.logger) {
if d.filenameBase == "" {
panic("assert: empty `filenameBase`")
}
if d.aggregationStep == 0 {
panic("assert: empty `aggregationStep`")
}

l := scanDirtyFiles(fileNames, d.aggregationStep, d.filenameBase, "kv", d.logger)
for _, dirtyFile := range l {
startStep, endStep := dirtyFile.startTxNum/d.aggregationStep, dirtyFile.endTxNum/d.aggregationStep
domainName, _ := kv.String2Domain(d.filenameBase)
if d.integrity != nil && !d.integrity(domainName, startStep, endStep) {
if d.crossDomainIntegrity != nil && !d.crossDomainIntegrity(domainName, d.dirs, startStep, endStep) {
d.logger.Debug("[agg] skip garbage file", "name", d.filenameBase, "startStep", startStep, "endStep", endStep)
continue
}
Expand Down
Loading

0 comments on commit b11a83b

Please sign in to comment.