From 62c8e8193e4bac74ad4a875d4efb131825297f28 Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Wed, 8 Jan 2025 22:03:13 +0100 Subject: [PATCH] Erigon 3: Purified states repressentation (#13227) Purifification of states works by pruning historical states from dangling nodes. The process is to collect all keys into a temporary MDBX with mapping `node -> layer` , then we iterate over each node again and remove all of them whose `node->layer` is not matching in the historical states DB. --------- Co-authored-by: alex.sharov --- RELEASE_INSTRUCTIONS.md | 8 + cmd/integration/commands/state_domains.go | 358 +++++++++++++++++++++- erigon-lib/state/aggregator.go | 6 +- erigon-lib/state/aggregator2.go | 20 +- erigon-lib/state/aggregator_test.go | 2 +- erigon-lib/state/domain.go | 22 +- erigon-lib/state/domain_committed.go | 12 +- erigon-lib/state/domain_shared.go | 4 +- erigon-lib/state/domain_test.go | 18 +- erigon-lib/state/merge.go | 8 +- erigon-lib/state/squeeze.go | 12 +- turbo/app/snapshots_cmd.go | 5 +- 12 files changed, 419 insertions(+), 56 deletions(-) diff --git a/RELEASE_INSTRUCTIONS.md b/RELEASE_INSTRUCTIONS.md index f4f064b12ad..07dbf96f359 100644 --- a/RELEASE_INSTRUCTIONS.md +++ b/RELEASE_INSTRUCTIONS.md @@ -31,3 +31,11 @@ In most cases, it is enough to bump minor version. In the file `ethdb/remote/remotedbserver/server.go` there is variable `KvServiceAPIVersion` that needs to be updated if there are any changes in the remote KV interface, or database schema, leading to data migrations. In most cases, it is enough to bump minor version. It is best to change both DB schema version and remove KV version together. + +## Purify the state domains if a regenration is done + +If a regenration is done, the state domains need to be purified. This can be done by running the following command: +```` +make integration +./build/bin/integration purify_domains --datadir= --replace-in-datadir +```` diff --git a/cmd/integration/commands/state_domains.go b/cmd/integration/commands/state_domains.go index e754165af75..0c673f48a3b 100644 --- a/cmd/integration/commands/state_domains.go +++ b/cmd/integration/commands/state_domains.go @@ -18,14 +18,20 @@ package commands import ( "context" + "encoding/binary" "encoding/hex" "errors" "fmt" + "os" + "path" "path/filepath" + "runtime" + "sort" "strings" + "github.com/erigontech/erigon-lib/etl" + "github.com/erigontech/erigon-lib/seg" state3 "github.com/erigontech/erigon-lib/state" - "github.com/spf13/cobra" "github.com/erigontech/erigon-lib/log/v3" @@ -33,8 +39,11 @@ import ( libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/datadir" "github.com/erigontech/erigon-lib/common/length" + downloadertype "github.com/erigontech/erigon-lib/downloader/snaptype" "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/mdbx" kv2 "github.com/erigontech/erigon-lib/kv/mdbx" + statelib "github.com/erigontech/erigon-lib/state" "github.com/erigontech/erigon/cmd/utils" "github.com/erigontech/erigon/core" "github.com/erigontech/erigon/core/state" @@ -52,12 +61,23 @@ func init() { withStartTx(readDomains) rootCmd.AddCommand(readDomains) + + withDataDir(purifyDomains) + purifyDomains.Flags().StringVar(&outDatadir, "out", "out-purified", "") + purifyDomains.Flags().BoolVar(&purifyOnlyCommitment, "only-commitment", true, "purify only commitment domain") + purifyDomains.Flags().BoolVar(&replaceInDatadir, "replace-in-datadir", false, "replace the purified domains directly in datadir (will remove .kvei and .bt too)") + purifyDomains.Flags().Float64Var(&minSkipRatioL0, "min-skip-ratio-l0", 0.1, "minimum ratio of keys to skip in L0") + rootCmd.AddCommand(purifyDomains) } // if trie variant is not hex, we could not have another rootHash with to verify it var ( - stepSize uint64 - lastStep uint64 + stepSize uint64 + lastStep uint64 + minSkipRatioL0 float64 + outDatadir string + purifyOnlyCommitment bool + replaceInDatadir bool ) // write command to just seek and query state by addr and domain from state db and files (if any) @@ -120,6 +140,322 @@ var readDomains = &cobra.Command{ }, } +var purifyDomains = &cobra.Command{ + Use: "purify_domains", + Short: `Regenerate kv files without repeating keys.`, + Example: "go run ./cmd/integration purify_domains --datadir=... --verbosity=3", + Args: cobra.ArbitraryArgs, + Run: func(cmd *cobra.Command, args []string) { + dirs := datadir.New(datadirCli) + // Iterate over all the files in dirs.SnapDomain and print them + domainDir := dirs.SnapDomain + + // make a temporary dir + tmpDir, err := os.MkdirTemp(dirs.Tmp, "purifyTemp") // make a temporary dir to store the keys + if err != nil { + fmt.Println("Error creating temporary directory: ", err) + return + } + // make a temporary DB to store the keys + + purifyDB := mdbx.MustOpen(tmpDir) + defer purifyDB.Close() + var purificationDomains []string + if purifyOnlyCommitment { + purificationDomains = []string{"commitment"} + } else { + purificationDomains = []string{"account", "storage" /*"code",*/, "commitment", "receipt"} + } + //purificationDomains := []string{"commitment"} + for _, domain := range purificationDomains { + if err := makePurifiableIndexDB(purifyDB, dirs, log.New(), domain); err != nil { + fmt.Println("Error making purifiable index DB: ", err) + return + } + } + for _, domain := range purificationDomains { + if err := makePurifiedDomains(purifyDB, dirs, log.New(), domain); err != nil { + fmt.Println("Error making purifiable index DB: ", err) + return + } + } + if err != nil { + fmt.Printf("error walking the path %q: %v\n", domainDir, err) + } + }, +} + +func makePurifiableIndexDB(db kv.RwDB, dirs datadir.Dirs, logger log.Logger, domain string) error { + var tbl string + switch domain { + case "account": + tbl = kv.MaxTxNum + case "storage": + tbl = kv.HeaderNumber + case "code": + tbl = kv.HeaderCanonical + case "commitment": + tbl = kv.HeaderTD + case "receipt": + tbl = kv.BadHeaderNumber + default: + return fmt.Errorf("invalid domain %s", domain) + } + // Iterate over all the files in dirs.SnapDomain and print them + filesNamesToIndex := []string{} + if err := filepath.Walk(dirs.SnapDomain, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + // Skip directories + if info.IsDir() { + return nil + } + if !strings.Contains(info.Name(), domain) { + return nil + } + // Here you can decide if you only want to process certain file extensions + // e.g., .kv files + if filepath.Ext(path) != ".kv" { + // Skip non-kv files if that's your domain’s format + return nil + } + + fmt.Printf("Add file to indexing of %s: %s\n", domain, path) + + filesNamesToIndex = append(filesNamesToIndex, info.Name()) + return nil + }); err != nil { + return fmt.Errorf("failed to walk through the domainDir %s: %w", domain, err) + } + + collector := etl.NewCollector("Purification", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), logger) + defer collector.Close() + // sort the files by name + sort.Slice(filesNamesToIndex, func(i, j int) bool { + res, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToIndex[i]) + if !ok { + panic("invalid file name") + } + res2, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToIndex[j]) + if !ok { + panic("invalid file name") + } + return res.From < res2.From + }) + tx, err := db.BeginRw(context.Background()) + if err != nil { + return fmt.Errorf("failed to start transaction: %w", err) + } + defer tx.Rollback() + + // now start the file indexing + for i, fileName := range filesNamesToIndex { + if i == 0 { + continue // we can skip first layer as all the keys are already mapped to 0. + } + layerBytes := make([]byte, 4) + binary.BigEndian.PutUint32(layerBytes, uint32(i)) + count := 0 + + dec, err := seg.NewDecompressor(path.Join(dirs.SnapDomain, fileName)) + if err != nil { + return fmt.Errorf("failed to create decompressor: %w", err) + } + defer dec.Close() + getter := dec.MakeGetter() + fmt.Printf("Indexing file %s\n", fileName) + var buf []byte + for getter.HasNext() { + buf = buf[:0] + buf, _ = getter.Next(buf) + + collector.Collect(buf, layerBytes) + count++ + //fmt.Println("count: ", count, "keyLength: ", len(buf)) + if count%100000 == 0 { + fmt.Printf("Indexed %d keys in file %s\n", count, fileName) + } + // skip values + getter.Skip() + } + fmt.Printf("Indexed %d keys in file %s\n", count, fileName) + } + fmt.Println("Loading the keys to DB") + if err := collector.Load(tx, tbl, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil { + return fmt.Errorf("failed to load: %w", err) + } + + return tx.Commit() +} + +func makePurifiedDomains(db kv.RwDB, dirs datadir.Dirs, logger log.Logger, domainName string) error { + domain, err := kv.String2Domain(domainName) + if err != nil { + return err + } + + compressionType := statelib.Schema[domain].Compression + compressCfg := statelib.Schema[domain].CompressCfg + compressCfg.Workers = runtime.NumCPU() + + var tbl string + switch domainName { + case "account": + tbl = kv.MaxTxNum + case "storage": + tbl = kv.HeaderNumber + case "code": + tbl = kv.HeaderCanonical + case "commitment": + tbl = kv.HeaderTD + case "receipt": + tbl = kv.BadHeaderNumber + default: + return fmt.Errorf("invalid domainName %s", domainName) + } + // Iterate over all the files in dirs.SnapDomain and print them + filesNamesToPurify := []string{} + if err := filepath.Walk(dirs.SnapDomain, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + // Skip directories + if info.IsDir() { + return nil + } + if !strings.Contains(info.Name(), domainName) { + return nil + } + // Here you can decide if you only want to process certain file extensions + // e.g., .kv files + if filepath.Ext(path) != ".kv" { + // Skip non-kv files if that's your domainName’s format + return nil + } + + fmt.Printf("Add file to purification of %s: %s\n", domainName, path) + + filesNamesToPurify = append(filesNamesToPurify, info.Name()) + return nil + }); err != nil { + return fmt.Errorf("failed to walk through the domainDir %s: %w", domainName, err) + } + // sort the files by name + sort.Slice(filesNamesToPurify, func(i, j int) bool { + res, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToPurify[i]) + if !ok { + panic("invalid file name") + } + res2, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToPurify[j]) + if !ok { + panic("invalid file name") + } + return res.From < res2.From + }) + + tx, err := db.BeginRo(context.Background()) + if err != nil { + return fmt.Errorf("failed to start transaction: %w", err) + } + defer tx.Rollback() + outD := datadir.New(outDatadir) + + // now start the file indexing + for currentLayer, fileName := range filesNamesToPurify { + count := 0 + skipped := 0 + + dec, err := seg.NewDecompressor(filepath.Join(dirs.SnapDomain, fileName)) + if err != nil { + return fmt.Errorf("failed to create decompressor: %w", err) + } + defer dec.Close() + getter := dec.MakeGetter() + + valuesComp, err := seg.NewCompressor(context.Background(), "Purification", filepath.Join(outD.SnapDomain, fileName), dirs.Tmp, compressCfg, log.LvlTrace, log.New()) + if err != nil { + return fmt.Errorf("create %s values compressor: %w", filepath.Join(outD.SnapDomain, fileName), err) + } + defer valuesComp.Close() + + comp := seg.NewWriter(valuesComp, compressionType) + defer comp.Close() + + fmt.Printf("Indexing file %s\n", fileName) + var ( + bufKey []byte + bufVal []byte + ) + + var layer uint32 + for getter.HasNext() { + // get the key and value for the current entry + bufKey = bufKey[:0] + bufKey, _ = getter.Next(bufKey) + bufVal = bufVal[:0] + bufVal, _ = getter.Next(bufVal) + + layerBytes, err := tx.GetOne(tbl, bufKey) + if err != nil { + return fmt.Errorf("failed to get key %x: %w", bufKey, err) + } + // if the key is not found, then the layer is 0 + layer = 0 + if len(layerBytes) == 4 { + layer = binary.BigEndian.Uint32(layerBytes) + } + if layer != uint32(currentLayer) { + skipped++ + continue + } + if err := comp.AddWord(bufKey); err != nil { + return fmt.Errorf("failed to add key %x: %w", bufKey, err) + } + if err := comp.AddWord(bufVal); err != nil { + return fmt.Errorf("failed to add val %x: %w", bufVal, err) + } + count++ + if count%100000 == 0 { + skipRatio := float64(skipped) / float64(count) + fmt.Printf("Indexed %d keys, skipped %d, in file %s. skip ratio: %.2f\n", count, skipped, fileName, skipRatio) + } + } + + skipRatio := float64(skipped) / float64(count) + if skipRatio < minSkipRatioL0 && currentLayer == 0 { + fmt.Printf("Skip ratio %.2f is less than min-skip-ratio-l0 %.2f, skipping the domainName and file %s\n", skipRatio, minSkipRatioL0, fileName) + return nil + } + fmt.Printf("Loaded %d keys in file %s. now compressing...\n", count, fileName) + if err := comp.Compress(); err != nil { + return fmt.Errorf("failed to compress: %w", err) + } + fmt.Printf("Compressed %d keys in file %s\n", count, fileName) + comp.Close() + if replaceInDatadir { + fmt.Printf("Replacing the file %s in datadir\n", fileName) + if err := os.Rename(filepath.Join(outD.SnapDomain, fileName), filepath.Join(dirs.SnapDomain, fileName)); err != nil { + return fmt.Errorf("failed to replace the file %s: %w", fileName, err) + } + kveiFile := strings.ReplaceAll(fileName, ".kv", ".kvei") + btFile := strings.ReplaceAll(fileName, ".kv", ".bt") + kviFile := strings.ReplaceAll(fileName, ".kv", ".kvi") + removeManyIgnoreError( + filepath.Join(dirs.SnapDomain, fileName+".torrent"), + filepath.Join(dirs.SnapDomain, btFile), + filepath.Join(dirs.SnapDomain, btFile+".torrent"), + filepath.Join(dirs.SnapDomain, kveiFile), + filepath.Join(dirs.SnapDomain, kveiFile+".torrent"), + filepath.Join(dirs.SnapDomain, kviFile), + filepath.Join(dirs.SnapDomain, kviFile+".torrent"), + ) + fmt.Printf("Removed the files %s and %s\n", kveiFile, btFile) + } + } + return nil +} + func requestDomains(chainDb, stateDb kv.RwDB, ctx context.Context, readDomain string, addrs [][]byte, logger log.Logger) error { sn, bsn, agg, _, _, _ := allSnapshots(ctx, chainDb, logger) defer sn.Close() @@ -181,3 +517,19 @@ func requestDomains(chainDb, stateDb kv.RwDB, ctx context.Context, readDomain st } return nil } + +func removeMany(filePaths ...string) error { + for _, filePath := range filePaths { + if err := os.Remove(filePath); err != nil { + _, fileName := filepath.Split(filePath) + return fmt.Errorf("failed to remove the file: %s, %w", fileName, err) + } + } + return nil +} + +func removeManyIgnoreError(filePaths ...string) { + for _, filePath := range filePaths { + os.Remove(filePath) + } +} diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index 870cb049934..8f17d44acc9 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -314,7 +314,7 @@ func (a *Aggregator) SetCollateAndBuildWorkers(i int) { a.collateAndBuildWorkers func (a *Aggregator) SetMergeWorkers(i int) { a.mergeWorkers = i } func (a *Aggregator) SetCompressWorkers(i int) { for _, d := range a.d { - d.compressCfg.Workers = i + d.CompressCfg.Workers = i d.History.compressorCfg.Workers = i d.History.InvertedIndex.compressorCfg.Workers = i } @@ -504,7 +504,7 @@ func (sf AggV3StaticFiles) CleanupOnError() { } func (a *Aggregator) buildFiles(ctx context.Context, step uint64) error { - a.logger.Debug("[agg] collate and build", "step", step, "collate_workers", a.collateAndBuildWorkers, "merge_workers", a.mergeWorkers, "compress_workers", a.d[kv.AccountsDomain].compressCfg.Workers) + a.logger.Debug("[agg] collate and build", "step", step, "collate_workers", a.collateAndBuildWorkers, "merge_workers", a.mergeWorkers, "compress_workers", a.d[kv.AccountsDomain].CompressCfg.Workers) var ( logEvery = time.NewTicker(time.Second * 30) @@ -690,7 +690,7 @@ func (a *Aggregator) BuildFiles2(ctx context.Context, fromStep, toStep uint64) e } func (a *Aggregator) mergeLoopStep(ctx context.Context, toTxNum uint64) (somethingDone bool, err error) { - a.logger.Debug("[agg] merge", "collate_workers", a.collateAndBuildWorkers, "merge_workers", a.mergeWorkers, "compress_workers", a.d[kv.AccountsDomain].compressCfg.Workers) + a.logger.Debug("[agg] merge", "collate_workers", a.collateAndBuildWorkers, "merge_workers", a.mergeWorkers, "compress_workers", a.d[kv.AccountsDomain].CompressCfg.Workers) aggTx := a.BeginFilesRo() defer aggTx.Close() diff --git a/erigon-lib/state/aggregator2.go b/erigon-lib/state/aggregator2.go index f1f51d09121..aa8983a7cd6 100644 --- a/erigon-lib/state/aggregator2.go +++ b/erigon-lib/state/aggregator2.go @@ -71,8 +71,8 @@ var Schema = map[kv.Domain]domainCfg{ IndexList: AccessorBTree | AccessorExistence, crossDomainIntegrity: domainIntegrityCheck, - compression: seg.CompressNone, - compressCfg: DomainCompressCfg, + Compression: seg.CompressNone, + CompressCfg: DomainCompressCfg, hist: histCfg{ valuesTable: kv.TblAccountHistoryVals, @@ -92,8 +92,8 @@ var Schema = map[kv.Domain]domainCfg{ name: kv.StorageDomain, valuesTable: kv.TblStorageVals, IndexList: AccessorBTree | AccessorExistence, - compression: seg.CompressKeys, - compressCfg: DomainCompressCfg, + Compression: seg.CompressKeys, + CompressCfg: DomainCompressCfg, hist: histCfg{ valuesTable: kv.TblStorageHistoryVals, @@ -113,8 +113,8 @@ var Schema = map[kv.Domain]domainCfg{ name: kv.CodeDomain, valuesTable: kv.TblCodeVals, IndexList: AccessorBTree | AccessorExistence, - compression: seg.CompressVals, // compress Code with keys doesn't show any profit. compress of values show 4x ratio on eth-mainnet and 2.5x ratio on bor-mainnet - compressCfg: DomainCompressCfg, + Compression: seg.CompressVals, // compress Code with keys doesn't show any profit. compress of values show 4x ratio on eth-mainnet and 2.5x ratio on bor-mainnet + CompressCfg: DomainCompressCfg, largeValues: true, hist: histCfg{ @@ -135,8 +135,8 @@ var Schema = map[kv.Domain]domainCfg{ name: kv.CommitmentDomain, valuesTable: kv.TblCommitmentVals, IndexList: AccessorHashMap, - compression: seg.CompressKeys, - compressCfg: DomainCompressCfg, + Compression: seg.CompressKeys, + CompressCfg: DomainCompressCfg, hist: histCfg{ valuesTable: kv.TblCommitmentHistoryVals, @@ -157,8 +157,8 @@ var Schema = map[kv.Domain]domainCfg{ name: kv.ReceiptDomain, valuesTable: kv.TblReceiptVals, IndexList: AccessorBTree | AccessorExistence, - compression: seg.CompressNone, //seg.CompressKeys | seg.CompressVals, - compressCfg: DomainCompressCfg, + Compression: seg.CompressNone, //seg.CompressKeys | seg.CompressVals, + CompressCfg: DomainCompressCfg, hist: histCfg{ valuesTable: kv.TblReceiptHistoryVals, diff --git a/erigon-lib/state/aggregator_test.go b/erigon-lib/state/aggregator_test.go index 759df463408..8a89bcb56eb 100644 --- a/erigon-lib/state/aggregator_test.go +++ b/erigon-lib/state/aggregator_test.go @@ -1270,7 +1270,7 @@ func TestAggregator_RebuildCommitmentBasedOnFiles(t *testing.T) { roots := make([]common.Hash, 0) // collect latest root from each available file - compression := ac.d[kv.CommitmentDomain].d.compression + compression := ac.d[kv.CommitmentDomain].d.Compression fnames := []string{} for _, f := range ac.d[kv.CommitmentDomain].files { var k, stateVal []byte diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index 7d9c2770f0a..58d0e1ffc9e 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -95,8 +95,8 @@ type domainCfg struct { hist histCfg name kv.Domain - compression seg.FileCompression - compressCfg seg.Cfg + Compression seg.FileCompression + CompressCfg seg.Cfg IndexList Accessors // list of indexes for given domain valuesTable string // bucket to store domain values; key -> inverted_step + values (Dupsort) largeValues bool @@ -385,7 +385,7 @@ func (d *Domain) openDirtyFiles() (err error) { if toStep == 0 && d.filenameBase == "commitment" { btM = 128 } - if item.bindex, err = OpenBtreeIndexWithDecompressor(fPath, btM, item.decompressor, d.compression); err != nil { + if item.bindex, err = OpenBtreeIndexWithDecompressor(fPath, btM, item.decompressor, d.Compression); err != nil { _, fName := filepath.Split(fPath) d.logger.Warn("[agg] Domain.openDirtyFiles", "err", err, "f", fName) // don't interrupt on error. other files may be good @@ -774,7 +774,7 @@ func (d *Domain) collateETL(ctx context.Context, stepFrom, stepTo uint64, wal *e }() coll.valuesPath = d.kvFilePath(stepFrom, stepTo) - if coll.valuesComp, err = seg.NewCompressor(ctx, d.filenameBase+".domain.collate", coll.valuesPath, d.dirs.Tmp, d.compressCfg, log.LvlTrace, d.logger); err != nil { + if coll.valuesComp, err = seg.NewCompressor(ctx, d.filenameBase+".domain.collate", coll.valuesPath, d.dirs.Tmp, d.CompressCfg, log.LvlTrace, d.logger); err != nil { return Collation{}, fmt.Errorf("create %s values compressor: %w", d.filenameBase, err) } @@ -783,7 +783,7 @@ func (d *Domain) collateETL(ctx context.Context, stepFrom, stepTo uint64, wal *e //comp := seg.NewWriter(coll.valuesComp, seg.CompressNone) // compress := seg.CompressNone if stepTo-stepFrom > DomainMinStepsToCompress { - compress = d.compression + compress = d.Compression } comp := seg.NewWriter(coll.valuesComp, compress) @@ -886,7 +886,7 @@ func (d *Domain) collate(ctx context.Context, step, txFrom, txTo uint64, roTx kv }() coll.valuesPath = d.kvFilePath(step, step+1) - if coll.valuesComp, err = seg.NewCompressor(ctx, d.filenameBase+".domain.collate", coll.valuesPath, d.dirs.Tmp, d.compressCfg, log.LvlTrace, d.logger); err != nil { + if coll.valuesComp, err = seg.NewCompressor(ctx, d.filenameBase+".domain.collate", coll.valuesPath, d.dirs.Tmp, d.CompressCfg, log.LvlTrace, d.logger); err != nil { return Collation{}, fmt.Errorf("create %s values compressor: %w", d.filenameBase, err) } @@ -1068,7 +1068,7 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co btM = 128 } - bt, err = CreateBtreeIndexWithDecompressor(btPath, btM, valuesDecomp, d.compression, *d.salt, ps, d.dirs.Tmp, d.logger, d.noFsync) + bt, err = CreateBtreeIndexWithDecompressor(btPath, btM, valuesDecomp, d.Compression, *d.salt, ps, d.dirs.Tmp, d.logger, d.noFsync) if err != nil { return StaticFiles{}, fmt.Errorf("build %s .bt idx: %w", d.filenameBase, err) } @@ -1170,7 +1170,7 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio if step == 0 && d.filenameBase == "commitment" { btM = 128 } - bt, err = CreateBtreeIndexWithDecompressor(btPath, btM, valuesDecomp, d.compression, *d.salt, ps, d.dirs.Tmp, d.logger, d.noFsync) + bt, err = CreateBtreeIndexWithDecompressor(btPath, btM, valuesDecomp, d.Compression, *d.salt, ps, d.dirs.Tmp, d.logger, d.noFsync) if err != nil { return StaticFiles{}, fmt.Errorf("build %s .bt idx: %w", d.filenameBase, err) } @@ -1211,7 +1211,7 @@ func (d *Domain) buildAccessor(ctx context.Context, fromStep, toStep uint64, dat Salt: d.salt, NoFsync: d.noFsync, } - return buildAccessor(ctx, data, d.compression, idxPath, false, cfg, ps, d.logger) + return buildAccessor(ctx, data, d.Compression, idxPath, false, cfg, ps, d.logger) } func (d *Domain) missedBtreeAccessors() (l []*filesItem) { @@ -1274,7 +1274,7 @@ func (d *Domain) BuildMissedAccessors(ctx context.Context, g *errgroup.Group, ps g.Go(func() error { fromStep, toStep := item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep idxPath := d.kvBtFilePath(fromStep, toStep) - if err := BuildBtreeIndexWithDecompressor(idxPath, item.decompressor, d.compression, ps, d.dirs.Tmp, *d.salt, d.logger, d.noFsync); err != nil { + if err := BuildBtreeIndexWithDecompressor(idxPath, item.decompressor, d.Compression, ps, d.dirs.Tmp, *d.salt, d.logger, d.noFsync); err != nil { return fmt.Errorf("failed to build btree index for %s: %w", item.decompressor.FileName(), err) } return nil @@ -1613,7 +1613,7 @@ func (dt *DomainRoTx) statelessGetter(i int) *seg.Reader { } r := dt.getters[i] if r == nil { - r = seg.NewReader(dt.files[i].src.decompressor.MakeGetter(), dt.d.compression) + r = seg.NewReader(dt.files[i].src.decompressor.MakeGetter(), dt.d.Compression) dt.getters[i] = r } return r diff --git a/erigon-lib/state/domain_committed.go b/erigon-lib/state/domain_committed.go index 8daef3f6b26..b9456dc231d 100644 --- a/erigon-lib/state/domain_committed.go +++ b/erigon-lib/state/domain_committed.go @@ -255,7 +255,7 @@ func (dt *DomainRoTx) commitmentValTransformDomain(rng MergeRange, accounts, sto if _, ok := accountFileMap[f.startTxNum]; !ok { accountFileMap[f.startTxNum] = make(map[uint64]*seg.Reader) } - accountFileMap[f.startTxNum][f.endTxNum] = seg.NewReader(f.decompressor.MakeGetter(), accounts.d.compression) + accountFileMap[f.startTxNum][f.endTxNum] = seg.NewReader(f.decompressor.MakeGetter(), accounts.d.Compression) } } storageFileMap := make(map[uint64]map[uint64]*seg.Reader) @@ -264,12 +264,12 @@ func (dt *DomainRoTx) commitmentValTransformDomain(rng MergeRange, accounts, sto if _, ok := storageFileMap[f.startTxNum]; !ok { storageFileMap[f.startTxNum] = make(map[uint64]*seg.Reader) } - storageFileMap[f.startTxNum][f.endTxNum] = seg.NewReader(f.decompressor.MakeGetter(), storage.d.compression) + storageFileMap[f.startTxNum][f.endTxNum] = seg.NewReader(f.decompressor.MakeGetter(), storage.d.Compression) } } - ms := seg.NewReader(mergedStorage.decompressor.MakeGetter(), storage.d.compression) - ma := seg.NewReader(mergedAccount.decompressor.MakeGetter(), accounts.d.compression) + ms := seg.NewReader(mergedStorage.decompressor.MakeGetter(), storage.d.Compression) + ma := seg.NewReader(mergedAccount.decompressor.MakeGetter(), accounts.d.Compression) dt.d.logger.Debug("prepare commitmentValTransformDomain", "merge", rng.String("range", dt.d.aggregationStep), "Mstorage", hadToLookupStorage, "Maccount", hadToLookupAccount) vt := func(valBuf []byte, keyFromTxNum, keyEndTxNum uint64) (transValBuf []byte, err error) { @@ -285,7 +285,7 @@ func (dt *DomainRoTx) commitmentValTransformDomain(rng MergeRange, accounts, sto if dirty == nil { return nil, fmt.Errorf("dirty storage file not found %d-%d", keyFromTxNum/dt.d.aggregationStep, keyEndTxNum/dt.d.aggregationStep) } - sig = seg.NewReader(dirty.decompressor.MakeGetter(), storage.d.compression) + sig = seg.NewReader(dirty.decompressor.MakeGetter(), storage.d.Compression) storageFileMap[keyFromTxNum][keyEndTxNum] = sig } @@ -298,7 +298,7 @@ func (dt *DomainRoTx) commitmentValTransformDomain(rng MergeRange, accounts, sto if dirty == nil { return nil, fmt.Errorf("dirty account file not found %d-%d", keyFromTxNum/dt.d.aggregationStep, keyEndTxNum/dt.d.aggregationStep) } - aig = seg.NewReader(dirty.decompressor.MakeGetter(), accounts.d.compression) + aig = seg.NewReader(dirty.decompressor.MakeGetter(), accounts.d.Compression) accountFileMap[keyFromTxNum][keyEndTxNum] = aig } diff --git a/erigon-lib/state/domain_shared.go b/erigon-lib/state/domain_shared.go index e570c8c7263..495caa8462e 100644 --- a/erigon-lib/state/domain_shared.go +++ b/erigon-lib/state/domain_shared.go @@ -491,8 +491,8 @@ func (sd *SharedDomains) replaceShortenedKeysInBranch(prefix []byte, branch comm sd.logger.Crit("dereference key during commitment read", "failed", err.Error()) return nil, err } - storageGetter := seg.NewReader(storageItem.decompressor.MakeGetter(), sto.d.compression) - accountGetter := seg.NewReader(accountItem.decompressor.MakeGetter(), acc.d.compression) + storageGetter := seg.NewReader(storageItem.decompressor.MakeGetter(), sto.d.Compression) + accountGetter := seg.NewReader(accountItem.decompressor.MakeGetter(), acc.d.Compression) metricI := 0 for i, f := range sd.aggTx.d[kv.CommitmentDomain].files { if i > 5 { diff --git a/erigon-lib/state/domain_test.go b/erigon-lib/state/domain_test.go index 18451c4bc6b..cf125bd5728 100644 --- a/erigon-lib/state/domain_test.go +++ b/erigon-lib/state/domain_test.go @@ -135,7 +135,7 @@ func testCollationBuild(t *testing.T, compressDomainVals bool) { ctx := context.Background() if compressDomainVals { - d.compression = seg.CompressKeys | seg.CompressVals + d.Compression = seg.CompressKeys | seg.CompressVals } tx, err := db.BeginRw(ctx) @@ -206,7 +206,7 @@ func testCollationBuild(t *testing.T, compressDomainVals bool) { defer sf.CleanupOnError() c.Close() - g := seg.NewReader(sf.valuesDecomp.MakeGetter(), d.compression) + g := seg.NewReader(sf.valuesDecomp.MakeGetter(), d.Compression) g.Reset(0) var words []string for g.HasNext() { @@ -1122,7 +1122,7 @@ func TestDomain_CollationBuildInMem(t *testing.T) { defer sf.CleanupOnError() c.Close() - g := seg.NewReader(sf.valuesDecomp.MakeGetter(), d.compression) + g := seg.NewReader(sf.valuesDecomp.MakeGetter(), d.Compression) g.Reset(0) var words []string for g.HasNext() { @@ -1459,7 +1459,7 @@ func TestDomain_GetAfterAggregation(t *testing.T) { d.historyLargeValues = false d.History.compression = seg.CompressNone //seg.CompressKeys | seg.CompressVals - d.compression = seg.CompressNone //seg.CompressKeys | seg.CompressVals + d.Compression = seg.CompressNone //seg.CompressKeys | seg.CompressVals d.filenameBase = kv.FileCommitmentDomain dc := d.BeginFilesRo() @@ -1529,7 +1529,7 @@ func TestDomainRange(t *testing.T) { d.historyLargeValues = false d.History.compression = seg.CompressNone // seg.CompressKeys | seg.CompressVals - d.compression = seg.CompressNone // seg.CompressKeys | seg.CompressVals + d.Compression = seg.CompressNone // seg.CompressKeys | seg.CompressVals d.filenameBase = kv.FileAccountDomain dc := d.BeginFilesRo() @@ -1624,7 +1624,7 @@ func TestDomain_CanPruneAfterAggregation(t *testing.T) { d.historyLargeValues = false d.History.compression = seg.CompressKeys | seg.CompressVals - d.compression = seg.CompressKeys | seg.CompressVals + d.Compression = seg.CompressKeys | seg.CompressVals d.filenameBase = kv.FileCommitmentDomain dc := d.BeginFilesRo() @@ -1720,7 +1720,7 @@ func TestDomain_PruneAfterAggregation(t *testing.T) { d.historyLargeValues = false d.History.compression = seg.CompressNone //seg.CompressKeys | seg.CompressVals - d.compression = seg.CompressNone //seg.CompressKeys | seg.CompressVals + d.Compression = seg.CompressNone //seg.CompressKeys | seg.CompressVals dc := d.BeginFilesRo() defer dc.Close() @@ -1863,7 +1863,7 @@ func TestDomain_PruneProgress(t *testing.T) { d.historyLargeValues = false d.History.compression = seg.CompressKeys | seg.CompressVals - d.compression = seg.CompressKeys | seg.CompressVals + d.Compression = seg.CompressKeys | seg.CompressVals dc := d.BeginFilesRo() defer dc.Close() @@ -2479,7 +2479,7 @@ func TestDomainContext_findShortenedKey(t *testing.T) { lastFile := findFile(st, en) require.NotNilf(t, lastFile, "%d-%d", st/dc.d.aggregationStep, en/dc.d.aggregationStep) - lf := seg.NewReader(lastFile.decompressor.MakeGetter(), d.compression) + lf := seg.NewReader(lastFile.decompressor.MakeGetter(), d.Compression) shortenedKey, found := dc.findShortenedKey([]byte(key), lf, lastFile) require.Truef(t, found, "key %d/%d %x file %d %d %s", ki, len(data), []byte(key), lastFile.startTxNum, lastFile.endTxNum, lastFile.decompressor.FileName()) diff --git a/erigon-lib/state/merge.go b/erigon-lib/state/merge.go index 2c22e540a30..bfa69b354a3 100644 --- a/erigon-lib/state/merge.go +++ b/erigon-lib/state/merge.go @@ -406,12 +406,12 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h fromStep, toStep := r.values.from/r.aggStep, r.values.to/r.aggStep kvFilePath := dt.d.kvFilePath(fromStep, toStep) - kvFile, err := seg.NewCompressor(ctx, "merge domain "+dt.d.filenameBase, kvFilePath, dt.d.dirs.Tmp, dt.d.compressCfg, log.LvlTrace, dt.d.logger) + kvFile, err := seg.NewCompressor(ctx, "merge domain "+dt.d.filenameBase, kvFilePath, dt.d.dirs.Tmp, dt.d.CompressCfg, log.LvlTrace, dt.d.logger) if err != nil { return nil, nil, nil, fmt.Errorf("merge %s compressor: %w", dt.d.filenameBase, err) } - compression := dt.d.compression + compression := dt.d.Compression if toStep-fromStep < DomainMinStepsToCompress { compression = seg.CompressNone } @@ -425,7 +425,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h var cp CursorHeap heap.Init(&cp) for _, item := range domainFiles { - g := seg.NewReader(item.decompressor.MakeGetter(), dt.d.compression) + g := seg.NewReader(item.decompressor.MakeGetter(), dt.d.Compression) g.Reset(0) if g.HasNext() { key, _ := g.Next(nil) @@ -521,7 +521,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h if toStep == 0 && dt.d.filenameBase == "commitment" { btM = 128 } - valuesIn.bindex, err = CreateBtreeIndexWithDecompressor(btPath, btM, valuesIn.decompressor, dt.d.compression, *dt.d.salt, ps, dt.d.dirs.Tmp, dt.d.logger, dt.d.noFsync) + valuesIn.bindex, err = CreateBtreeIndexWithDecompressor(btPath, btM, valuesIn.decompressor, dt.d.Compression, *dt.d.salt, ps, dt.d.dirs.Tmp, dt.d.logger, dt.d.noFsync) if err != nil { return nil, nil, nil, fmt.Errorf("merge %s btindex [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err) } diff --git a/erigon-lib/state/squeeze.go b/erigon-lib/state/squeeze.go index ad929203d53..4a611627ed5 100644 --- a/erigon-lib/state/squeeze.go +++ b/erigon-lib/state/squeeze.go @@ -75,8 +75,8 @@ func (a *Aggregator) sqeezeDomainFile(ctx context.Context, domain kv.Domain, fro panic("please use SqueezeCommitmentFiles func") } - compression := a.d[domain].compression - compressCfg := a.d[domain].compressCfg + compression := a.d[domain].Compression + compressCfg := a.d[domain].CompressCfg a.logger.Info("[sqeeze] file", "f", to, "cfg", compressCfg, "c", compression) decompressor, err := seg.NewDecompressor(from) @@ -200,24 +200,24 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error { err = func() error { steps := cf.endTxNum/ac.a.aggregationStep - cf.startTxNum/ac.a.aggregationStep - compression := commitment.d.compression + compression := commitment.d.Compression if steps < DomainMinStepsToCompress { compression = seg.CompressNone } ac.a.logger.Info("[squeeze_migration] file start", "original", cf.decompressor.FileName(), - "progress", fmt.Sprintf("%d/%d", ri+1, len(ranges)), "compress_cfg", commitment.d.compressCfg, "compress", compression) + "progress", fmt.Sprintf("%d/%d", ri+1, len(ranges)), "compress_cfg", commitment.d.CompressCfg, "compress", compression) originalPath := cf.decompressor.FilePath() squeezedTmpPath := originalPath + sqExt + ".tmp" squeezedCompr, err := seg.NewCompressor(context.Background(), "squeeze", squeezedTmpPath, ac.a.dirs.Tmp, - commitment.d.compressCfg, log.LvlInfo, commitment.d.logger) + commitment.d.CompressCfg, log.LvlInfo, commitment.d.logger) if err != nil { return err } defer squeezedCompr.Close() - writer := seg.NewWriter(squeezedCompr, commitment.d.compression) + writer := seg.NewWriter(squeezedCompr, commitment.d.Compression) reader := seg.NewReader(cf.decompressor.MakeGetter(), compression) reader.Reset(0) diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index 6fa77e759cf..3ea6d4366b1 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -951,7 +951,7 @@ func doMeta(cliCtx *cli.Context) error { panic(err) } defer src.Close() - log.Info("meta", "count", src.Count(), "size", datasize.ByteSize(src.Size()).HumanReadable(), "serialized_dict", datasize.ByteSize(src.SerializedDictSize()).HumanReadable(), "dict_words", src.DictWords(), "name", src.FileName()) + log.Info("meta", "count", src.Count(), "size", datasize.ByteSize(src.Size()).HumanReadable(), "serialized_dict", datasize.ByteSize(src.SerializedDictSize()).HumanReadable(), "dict_words", src.DictWords(), "name", src.FileName(), "detected_compression_type", seg.DetectCompressType(src.MakeGetter())) } else if strings.HasSuffix(fname, ".bt") { kvFPath := strings.TrimSuffix(fname, ".bt") + ".kv" src, err := seg.NewDecompressor(kvFPath) @@ -1262,6 +1262,9 @@ func doCompress(cliCtx *cli.Context) error { if dbg.EnvBool("OnlyVals", false) { compression = seg.CompressVals } + if dbg.EnvBool("NoCompress", false) { + compression = seg.CompressNone + } logger.Info("[compress] file", "datadir", dirs.DataDir, "f", f, "cfg", compressCfg) c, err := seg.NewCompressor(ctx, "compress", f, dirs.Tmp, compressCfg, log.LvlInfo, logger)