From 476d5200f7b41ebfa6e47c6c40f5c721e33318a3 Mon Sep 17 00:00:00 2001 From: John <25290445+Mercybudda@users.noreply.github.com> Date: Wed, 19 Jan 2022 18:07:49 +0800 Subject: [PATCH] [R4R]offline block prune (#543) * offline block prune * update * update * update and add unit test * addressed comments from walt * Addressed comments from walt and Igor * ensure MPT and snapshot matched * add one more parameter to indicate blockprune * update the logic of creating freezerDb * update flag command description * expose the function for db inspect the offset/startBlockNumber * add flags to inspect prune info * rename flag of reserved-recent-blocks to block-amount-reserved * addressed comments from walt * handle the case of command interruption * refined goimports * addressed comments from walt * change the logic as restarting prune after interruption * addressed comments * reclaimed freezer logic * introduce flag to enable/disable check between MPT and snapshot * update the logic of frozen field in freezerDB * update the code in all places related to freezer change * addressed comments from dylan * update the logic for backup block difficulty * addressed comments from dylan --- .gitignore | 2 + cmd/geth/chaincmd.go | 6 +- cmd/geth/dbcmd.go | 34 +++- cmd/geth/main.go | 2 + cmd/geth/pruneblock_test.go | 242 ++++++++++++++++++++++++ cmd/geth/snapshot.go | 194 ++++++++++++++++++- cmd/geth/usage.go | 2 + cmd/utils/flags.go | 25 ++- core/blockchain.go | 10 +- core/blockchain_repair_test.go | 4 +- core/blockchain_sethead_test.go | 2 +- core/blockchain_snapshot_test.go | 4 +- core/blockchain_test.go | 18 +- core/rawdb/accessors_chain_test.go | 2 +- core/rawdb/chain_iterator.go | 5 +- core/rawdb/database.go | 113 ++++++++++- core/rawdb/freezer.go | 28 ++- core/rawdb/schema.go | 6 + core/rawdb/table.go | 10 + core/state/pruner/pruner.go | 211 +++++++++++++++++++++ eth/downloader/downloader.go | 4 +- ethdb/database.go | 6 + node/node.go | 9 +- rlp/safe.go | 2 +- tests/fuzzers/bls12381/bls12381_fuzz.go | 3 +- 25 files changed, 879 insertions(+), 65 deletions(-) create mode 100644 cmd/geth/pruneblock_test.go diff --git a/.gitignore b/.gitignore index 3d05da8816..0271db9ecd 100644 --- a/.gitignore +++ b/.gitignore @@ -48,3 +48,5 @@ profile.cov /dashboard/assets/package-lock.json **/yarn-error.log +cmd/geth/node/ +cmd/geth/__debug_bin diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 1152bfdfdd..a30efbbdea 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -458,7 +458,7 @@ func importPreimages(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, false) + db := utils.MakeChainDatabase(ctx, stack, false, false) start := time.Now() if err := utils.ImportPreimages(db, ctx.Args().First()); err != nil { @@ -477,7 +477,7 @@ func exportPreimages(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) start := time.Now() if err := utils.ExportPreimages(db, ctx.Args().First()); err != nil { @@ -491,7 +491,7 @@ func dump(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) for _, arg := range ctx.Args() { var header *types.Header if hashish(arg) { diff --git a/cmd/geth/dbcmd.go b/cmd/geth/dbcmd.go index 4c70373e9a..2ac8b803cb 100644 --- a/cmd/geth/dbcmd.go +++ b/cmd/geth/dbcmd.go @@ -62,6 +62,7 @@ Remove blockchain and state databases`, dbPutCmd, dbGetSlotsCmd, dbDumpFreezerIndex, + ancientInspectCmd, }, } dbInspectCmd = cli.Command{ @@ -195,6 +196,16 @@ WARNING: This is a low-level operation which may cause database corruption!`, }, Description: "This command displays information about the freezer index.", } + ancientInspectCmd = cli.Command{ + Action: utils.MigrateFlags(ancientInspect), + Name: "inspect-reserved-oldest-blocks", + Flags: []cli.Flag{ + utils.DataDirFlag, + }, + Usage: "Inspect the ancientStore information", + Description: `This commands will read current offset from kvdb, which is the current offset and starting BlockNumber +of ancientStore, will also displays the reserved number of blocks in ancientStore `, + } ) func removeDB(ctx *cli.Context) error { @@ -282,12 +293,21 @@ func inspect(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() return rawdb.InspectDatabase(db, prefix, start) } +func ancientInspect(ctx *cli.Context) error { + stack, _ := makeConfigNode(ctx) + defer stack.Close() + + db := utils.MakeChainDatabase(ctx, stack, true, true) + defer db.Close() + return rawdb.AncientInspect(db) +} + func showLeveldbStats(db ethdb.Stater) { if stats, err := db.Stat("leveldb.stats"); err != nil { log.Warn("Failed to read database stats", "error", err) @@ -305,7 +325,7 @@ func dbStats(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() showLeveldbStats(db) @@ -316,7 +336,7 @@ func dbCompact(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, false) + db := utils.MakeChainDatabase(ctx, stack, false, false) defer db.Close() log.Info("Stats before compaction") @@ -340,7 +360,7 @@ func dbGet(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() key, err := hexutil.Decode(ctx.Args().Get(0)) @@ -365,7 +385,7 @@ func dbDelete(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, false) + db := utils.MakeChainDatabase(ctx, stack, false, false) defer db.Close() key, err := hexutil.Decode(ctx.Args().Get(0)) @@ -392,7 +412,7 @@ func dbPut(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, false) + db := utils.MakeChainDatabase(ctx, stack, false, false) defer db.Close() var ( @@ -426,7 +446,7 @@ func dbDumpTrie(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() var ( root []byte diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 3e2585fed6..3576fe2e46 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -165,6 +165,8 @@ var ( utils.MinerNotifyFullFlag, configFileFlag, utils.CatalystFlag, + utils.BlockAmountReserved, + utils.CheckSnapshotWithMPT, } rpcFlags = []cli.Flag{ diff --git a/cmd/geth/pruneblock_test.go b/cmd/geth/pruneblock_test.go new file mode 100644 index 0000000000..7d78f444fa --- /dev/null +++ b/cmd/geth/pruneblock_test.go @@ -0,0 +1,242 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package main + +import ( + "bytes" + "encoding/hex" + "fmt" + "io/ioutil" + "math/big" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ethereum/go-ethereum/cmd/utils" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state/pruner" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" +) + +var ( + canonicalSeed = 1 + blockPruneBackUpBlockNumber = 128 + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + balance = big.NewInt(10000000) + gspec = &core.Genesis{Config: params.TestChainConfig, Alloc: core.GenesisAlloc{address: {Balance: balance}}} + signer = types.LatestSigner(gspec.Config) + config = &core.CacheConfig{ + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieTimeLimit: 5 * time.Minute, + SnapshotLimit: 0, // Disable snapshot + TriesInMemory: 128, + } + engine = ethash.NewFullFaker() +) + +func TestOfflineBlockPrune(t *testing.T) { + //Corner case for 0 remain in ancinetStore. + testOfflineBlockPruneWithAmountReserved(t, 0) + //General case. + testOfflineBlockPruneWithAmountReserved(t, 100) +} + +func testOfflineBlockPruneWithAmountReserved(t *testing.T, amountReserved uint64) { + datadir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("Failed to create temporary datadir: %v", err) + } + os.RemoveAll(datadir) + + chaindbPath := filepath.Join(datadir, "chaindata") + oldAncientPath := filepath.Join(chaindbPath, "ancient") + newAncientPath := filepath.Join(chaindbPath, "ancient_back") + + db, blocks, blockList, receiptsList, externTdList, startBlockNumber, _ := BlockchainCreator(t, chaindbPath, oldAncientPath, amountReserved) + node, _ := startEthService(t, gspec, blocks, chaindbPath) + defer node.Close() + + //Initialize a block pruner for pruning, only remain amountReserved blocks backward. + testBlockPruner := pruner.NewBlockPruner(db, node, oldAncientPath, newAncientPath, amountReserved) + if err != nil { + t.Fatalf("failed to make new blockpruner: %v", err) + } + if err := testBlockPruner.BlockPruneBackUp(chaindbPath, 512, utils.MakeDatabaseHandles(), "", false, false); err != nil { + t.Fatalf("Failed to back up block: %v", err) + } + + dbBack, err := rawdb.NewLevelDBDatabaseWithFreezer(chaindbPath, 0, 0, newAncientPath, "", false, true, false) + if err != nil { + t.Fatalf("failed to create database with ancient backend") + } + defer dbBack.Close() + + //check against if the backup data matched original one + for blockNumber := startBlockNumber; blockNumber < startBlockNumber+amountReserved; blockNumber++ { + blockHash := rawdb.ReadCanonicalHash(dbBack, blockNumber) + block := rawdb.ReadBlock(dbBack, blockHash, blockNumber) + + if block.Hash() != blockHash { + t.Fatalf("block data did not match between oldDb and backupDb") + } + if blockList[blockNumber-startBlockNumber].Hash() != blockHash { + t.Fatalf("block data did not match between oldDb and backupDb") + } + + receipts := rawdb.ReadRawReceipts(dbBack, blockHash, blockNumber) + if err := checkReceiptsRLP(receipts, receiptsList[blockNumber-startBlockNumber]); err != nil { + t.Fatalf("receipts did not match between oldDb and backupDb") + } + // // Calculate the total difficulty of the block + td := rawdb.ReadTd(dbBack, blockHash, blockNumber) + if td == nil { + t.Fatalf("Failed to ReadTd: %v", consensus.ErrUnknownAncestor) + } + if td.Cmp(externTdList[blockNumber-startBlockNumber]) != 0 { + t.Fatalf("externTd did not match between oldDb and backupDb") + } + } + + //check if ancientDb freezer replaced successfully + testBlockPruner.AncientDbReplacer() + if _, err := os.Stat(newAncientPath); err != nil { + if !os.IsNotExist(err) { + t.Fatalf("ancientDb replaced unsuccessfully") + } + } + if _, err := os.Stat(oldAncientPath); err != nil { + t.Fatalf("ancientDb replaced unsuccessfully") + } +} + +func BlockchainCreator(t *testing.T, chaindbPath, AncientPath string, blockRemain uint64) (ethdb.Database, []*types.Block, []*types.Block, []types.Receipts, []*big.Int, uint64, *core.BlockChain) { + //create a database with ancient freezer + db, err := rawdb.NewLevelDBDatabaseWithFreezer(chaindbPath, 0, 0, AncientPath, "", false, false, false) + if err != nil { + t.Fatalf("failed to create database with ancient backend") + } + defer db.Close() + genesis := gspec.MustCommit(db) + // Initialize a fresh chain with only a genesis block + blockchain, err := core.NewBlockChain(db, config, gspec.Config, engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("Failed to create chain: %v", err) + } + + // Make chain starting from genesis + blocks, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 500, func(i int, block *core.BlockGen) { + block.SetCoinbase(common.Address{0: byte(canonicalSeed), 19: byte(i)}) + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, nil, nil), signer, key) + if err != nil { + panic(err) + } + block.AddTx(tx) + block.SetDifficulty(big.NewInt(1000000)) + }) + if _, err := blockchain.InsertChain(blocks); err != nil { + t.Fatalf("Failed to import canonical chain start: %v", err) + } + + // Force run a freeze cycle + type freezer interface { + Freeze(threshold uint64) error + Ancients() (uint64, error) + } + db.(freezer).Freeze(10) + + frozen, err := db.Ancients() + //make sure there're frozen items + if err != nil || frozen == 0 { + t.Fatalf("Failed to import canonical chain start: %v", err) + } + if frozen < blockRemain { + t.Fatalf("block amount is not enough for pruning: %v", err) + } + + oldOffSet := rawdb.ReadOffSetOfCurrentAncientFreezer(db) + // Get the actual start block number. + startBlockNumber := frozen - blockRemain + oldOffSet + // Initialize the slice to buffer the block data left. + blockList := make([]*types.Block, 0, blockPruneBackUpBlockNumber) + receiptsList := make([]types.Receipts, 0, blockPruneBackUpBlockNumber) + externTdList := make([]*big.Int, 0, blockPruneBackUpBlockNumber) + // All ancient data within the most recent 128 blocks write into memory buffer for future new ancient_back directory usage. + for blockNumber := startBlockNumber; blockNumber < frozen+oldOffSet; blockNumber++ { + blockHash := rawdb.ReadCanonicalHash(db, blockNumber) + block := rawdb.ReadBlock(db, blockHash, blockNumber) + blockList = append(blockList, block) + receipts := rawdb.ReadRawReceipts(db, blockHash, blockNumber) + receiptsList = append(receiptsList, receipts) + // Calculate the total difficulty of the block + td := rawdb.ReadTd(db, blockHash, blockNumber) + if td == nil { + t.Fatalf("Failed to ReadTd: %v", consensus.ErrUnknownAncestor) + } + externTdList = append(externTdList, td) + } + + return db, blocks, blockList, receiptsList, externTdList, startBlockNumber, blockchain +} + +func checkReceiptsRLP(have, want types.Receipts) error { + if len(have) != len(want) { + return fmt.Errorf("receipts sizes mismatch: have %d, want %d", len(have), len(want)) + } + for i := 0; i < len(want); i++ { + rlpHave, err := rlp.EncodeToBytes(have[i]) + if err != nil { + return err + } + rlpWant, err := rlp.EncodeToBytes(want[i]) + if err != nil { + return err + } + if !bytes.Equal(rlpHave, rlpWant) { + return fmt.Errorf("receipt #%d: receipt mismatch: have %s, want %s", i, hex.EncodeToString(rlpHave), hex.EncodeToString(rlpWant)) + } + } + return nil +} + +// startEthService creates a full node instance for testing. +func startEthService(t *testing.T, genesis *core.Genesis, blocks []*types.Block, chaindbPath string) (*node.Node, *eth.Ethereum) { + t.Helper() + n, err := node.New(&node.Config{DataDir: chaindbPath}) + if err != nil { + t.Fatal("can't create node:", err) + } + + if err := n.Start(); err != nil { + t.Fatal("can't start node:", err) + } + + return n, nil +} diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index 2f615a2c18..20920a0f94 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -19,8 +19,14 @@ package main import ( "bytes" "errors" + "fmt" + "os" + "path/filepath" "time" + "github.com/prometheus/tsdb/fileutil" + cli "gopkg.in/urfave/cli.v1" + "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" @@ -28,10 +34,11 @@ import ( "github.com/ethereum/go-ethereum/core/state/pruner" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" - cli "gopkg.in/urfave/cli.v1" ) var ( @@ -78,6 +85,30 @@ WARNING: It's necessary to delete the trie clean cache after the pruning. If you specify another directory for the trie clean cache via "--cache.trie.journal" during the use of Geth, please also specify it here for correct deletion. Otherwise the trie clean cache with default directory will be deleted. +`, + }, + { + Name: "prune-block", + Usage: "Prune block data offline", + Action: utils.MigrateFlags(pruneBlock), + Category: "MISCELLANEOUS COMMANDS", + Flags: []cli.Flag{ + utils.DataDirFlag, + utils.AncientFlag, + utils.BlockAmountReserved, + utils.TriesInMemoryFlag, + utils.CheckSnapshotWithMPT, + }, + Description: ` +geth offline prune-block for block data in ancientdb. +The amount of blocks expected for remaining after prune can be specified via block-amount-reserved in this command, +will prune and only remain the specified amount of old block data in ancientdb. +the brief workflow is to backup the the number of this specified amount blocks backward in original ancientdb +into new ancient_backup, then delete the original ancientdb dir and rename the ancient_backup to original one for replacement, +finally assemble the statedb and new ancientDb together. +The purpose of doing it is because the block data will be moved into the ancient store when it +becomes old enough(exceed the Threshold 90000), the disk usage will be very large over time, and is occupied mainly by ancientDb, +so it's very necessary to do block data prune, this feature will handle it. `, }, { @@ -149,11 +180,164 @@ It's also usable without snapshot enabled. } ) +func accessDb(ctx *cli.Context, stack *node.Node) (ethdb.Database, error) { + //The layer of tries trees that keep in memory. + TriesInMemory := int(ctx.GlobalUint64(utils.TriesInMemoryFlag.Name)) + chaindb := utils.MakeChainDatabase(ctx, stack, false, true) + defer chaindb.Close() + + if !ctx.GlobalBool(utils.CheckSnapshotWithMPT.Name) { + return chaindb, nil + } + headBlock := rawdb.ReadHeadBlock(chaindb) + if headBlock == nil { + return nil, errors.New("failed to load head block") + } + headHeader := headBlock.Header() + //Make sure the MPT and snapshot matches before pruning, otherwise the node can not start. + snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, TriesInMemory, headBlock.Root(), false, false, false) + if err != nil { + log.Error("snaptree error", "err", err) + return nil, err // The relevant snapshot(s) might not exist + } + + // Use the HEAD-(n-1) as the target root. The reason for picking it is: + // - in most of the normal cases, the related state is available + // - the probability of this layer being reorg is very low + + // Retrieve all snapshot layers from the current HEAD. + // In theory there are n difflayers + 1 disk layer present, + // so n diff layers are expected to be returned. + layers := snaptree.Snapshots(headHeader.Root, TriesInMemory, true) + if len(layers) != TriesInMemory { + // Reject if the accumulated diff layers are less than n. It + // means in most of normal cases, there is no associated state + // with bottom-most diff layer. + log.Error("snapshot layers != TriesInMemory", "err", err) + return nil, fmt.Errorf("snapshot not old enough yet: need %d more blocks", TriesInMemory-len(layers)) + } + // Use the bottom-most diff layer as the target + targetRoot := layers[len(layers)-1].Root() + + // Ensure the root is really present. The weak assumption + // is the presence of root can indicate the presence of the + // entire trie. + if blob := rawdb.ReadTrieNode(chaindb, targetRoot); len(blob) == 0 { + // The special case is for clique based networks(rinkeby, goerli + // and some other private networks), it's possible that two + // consecutive blocks will have same root. In this case snapshot + // difflayer won't be created. So HEAD-(n-1) may not paired with + // head-(n-1) layer. Instead the paired layer is higher than the + // bottom-most diff layer. Try to find the bottom-most snapshot + // layer with state available. + // + // Note HEAD is ignored. Usually there is the associated + // state available, but we don't want to use the topmost state + // as the pruning target. + var found bool + for i := len(layers) - 2; i >= 1; i-- { + if blob := rawdb.ReadTrieNode(chaindb, layers[i].Root()); len(blob) != 0 { + targetRoot = layers[i].Root() + found = true + log.Info("Selecting middle-layer as the pruning target", "root", targetRoot, "depth", i) + break + } + } + if !found { + if blob := rawdb.ReadTrieNode(chaindb, snaptree.DiskRoot()); len(blob) != 0 { + targetRoot = snaptree.DiskRoot() + found = true + log.Info("Selecting disk-layer as the pruning target", "root", targetRoot) + } + } + if !found { + if len(layers) > 0 { + log.Error("no snapshot paired state") + return nil, errors.New("no snapshot paired state") + } + return nil, fmt.Errorf("associated state[%x] is not present", targetRoot) + } + } else { + if len(layers) > 0 { + log.Info("Selecting bottom-most difflayer as the pruning target", "root", targetRoot, "height", headHeader.Number.Uint64()-uint64(len(layers)-1)) + } else { + log.Info("Selecting user-specified state as the pruning target", "root", targetRoot) + } + } + return chaindb, nil +} + +func pruneBlock(ctx *cli.Context) error { + stack, config := makeConfigNode(ctx) + defer stack.Close() + blockAmountReserved := ctx.GlobalUint64(utils.BlockAmountReserved.Name) + chaindb, err := accessDb(ctx, stack) + if err != nil { + return err + } + var newAncientPath string + oldAncientPath := ctx.GlobalString(utils.AncientFlag.Name) + if !filepath.IsAbs(oldAncientPath) { + oldAncientPath = stack.ResolvePath(oldAncientPath) + } + + path, _ := filepath.Split(oldAncientPath) + if path == "" { + return errors.New("prune failed, did not specify the AncientPath") + } + newAncientPath = filepath.Join(path, "ancient_back") + + blockpruner := pruner.NewBlockPruner(chaindb, stack, oldAncientPath, newAncientPath, blockAmountReserved) + + lock, exist, err := fileutil.Flock(filepath.Join(oldAncientPath, "PRUNEFLOCK")) + if err != nil { + log.Error("file lock error", "err", err) + return err + } + if exist { + defer lock.Release() + log.Info("file lock existed, waiting for prune recovery and continue", "err", err) + if err := blockpruner.RecoverInterruption("chaindata", config.Eth.DatabaseCache, utils.MakeDatabaseHandles(), "", false); err != nil { + log.Error("Pruning failed", "err", err) + return err + } + log.Info("Block prune successfully") + return nil + } + + if _, err := os.Stat(newAncientPath); err == nil { + // No file lock found for old ancientDB but new ancientDB exsisted, indicating the geth was interrupted + // after old ancientDB removal, this happened after backup successfully, so just rename the new ancientDB + if err := blockpruner.AncientDbReplacer(); err != nil { + log.Error("Failed to rename new ancient directory") + return err + } + log.Info("Block prune successfully") + return nil + } + name := "chaindata" + if err := blockpruner.BlockPruneBackUp(name, config.Eth.DatabaseCache, utils.MakeDatabaseHandles(), "", false, false); err != nil { + log.Error("Failed to back up block", "err", err) + return err + } + + log.Info("backup block successfully") + + //After backing up successfully, rename the new ancientdb name to the original one, and delete the old ancientdb + if err := blockpruner.AncientDbReplacer(); err != nil { + return err + } + + lock.Release() + log.Info("Block prune successfully") + return nil +} + func pruneState(ctx *cli.Context) error { stack, config := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, false) + chaindb := utils.MakeChainDatabase(ctx, stack, false, false) pruner, err := pruner.NewPruner(chaindb, stack.ResolvePath(""), stack.ResolvePath(config.Eth.TrieCleanCacheJournal), ctx.GlobalUint64(utils.BloomFilterSizeFlag.Name), ctx.GlobalUint64(utils.TriesInMemoryFlag.Name)) if err != nil { log.Error("Failed to open snapshot tree", "err", err) @@ -182,7 +366,7 @@ func verifyState(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) + chaindb := utils.MakeChainDatabase(ctx, stack, true, false) headBlock := rawdb.ReadHeadBlock(chaindb) if headBlock == nil { log.Error("Failed to load head block") @@ -220,7 +404,7 @@ func traverseState(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) + chaindb := utils.MakeChainDatabase(ctx, stack, true, false) headBlock := rawdb.ReadHeadBlock(chaindb) if headBlock == nil { log.Error("Failed to load head block") @@ -310,7 +494,7 @@ func traverseRawState(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) + chaindb := utils.MakeChainDatabase(ctx, stack, true, false) headBlock := rawdb.ReadHeadBlock(chaindb) if headBlock == nil { log.Error("Failed to load head block") diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index fba14530b5..a1a0a7d0b4 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -58,6 +58,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.LightKDFFlag, utils.WhitelistFlag, utils.TriesInMemoryFlag, + utils.BlockAmountReserved, + utils.CheckSnapshotWithMPT, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 67632c031b..58f4798aee 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -33,6 +33,10 @@ import ( "text/template" "time" + pcsclite "github.com/gballet/go-libpcsclite" + gopsutil "github.com/shirou/gopsutil/mem" + "gopkg.in/urfave/cli.v1" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" @@ -66,9 +70,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/nat" "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/ethereum/go-ethereum/params" - pcsclite "github.com/gballet/go-libpcsclite" - gopsutil "github.com/shirou/gopsutil/mem" - "gopkg.in/urfave/cli.v1" ) func init() { @@ -827,6 +828,16 @@ var ( Name: "catalyst", Usage: "Catalyst mode (eth2 integration testing)", } + + BlockAmountReserved = cli.Uint64Flag{ + Name: "block-amount-reserved", + Usage: "Sets the expected remained amount of blocks for offline block prune", + } + + CheckSnapshotWithMPT = cli.BoolFlag{ + Name: "check-snapshot-with-mpt", + Usage: "Enable checking between snapshot and MPT ", + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1764,7 +1775,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(DataDirFlag.Name) { // Check if we have an already initialized chain and fall back to // that if so. Otherwise we need to generate a new genesis spec. - chaindb := MakeChainDatabase(ctx, stack, false) // TODO (MariusVanDerWijden) make this read only + chaindb := MakeChainDatabase(ctx, stack, false, false) // TODO (MariusVanDerWijden) make this read only if rawdb.ReadCanonicalHash(chaindb, 0) != (common.Hash{}) { cfg.Genesis = nil // fallback to db content } @@ -1883,7 +1894,7 @@ func SplitTagsFlag(tagsFlag string) map[string]string { } // MakeChainDatabase open an LevelDB using the flags passed to the client and will hard crash if it fails. -func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly bool) ethdb.Database { +func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly, disableFreeze bool) ethdb.Database { var ( cache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheDatabaseFlag.Name) / 100 handles = MakeDatabaseHandles() @@ -1896,7 +1907,7 @@ func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly bool) ethdb. chainDb, err = stack.OpenDatabase(name, cache, handles, "", readonly) } else { name := "chaindata" - chainDb, err = stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "", readonly) + chainDb, err = stack.OpenDatabaseWithFreezer(name, cache, handles, ctx.GlobalString(AncientFlag.Name), "", readonly, disableFreeze, false) } if err != nil { Fatalf("Could not open database: %v", err) @@ -1926,7 +1937,7 @@ func MakeGenesis(ctx *cli.Context) *core.Genesis { // MakeChain creates a chain manager from set command line flags. func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chainDb ethdb.Database) { var err error - chainDb = MakeChainDatabase(ctx, stack, false) // TODO(rjl493456442) support read-only database + chainDb = MakeChainDatabase(ctx, stack, false, false) // TODO(rjl493456442) support read-only database config, _, err := core.SetupGenesisBlock(chainDb, MakeGenesis(ctx)) if err != nil { Fatalf("%v", err) diff --git a/core/blockchain.go b/core/blockchain.go index 6aa5916218..f4602bc847 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -325,9 +325,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par rawdb.InitDatabaseFromFreezer(bc.db) // If ancient database is not empty, reconstruct all missing // indices in the background. - frozen, _ := bc.db.Ancients() + frozen, _ := bc.db.ItemAmountInAncient() if frozen > 0 { - txIndexBlock = frozen + txIndexBlock, _ = bc.db.Ancients() } } if err := bc.loadLastState(); err != nil { @@ -362,7 +362,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } // Ensure that a previous crash in SetHead doesn't leave extra ancients - if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { + if frozen, err := bc.db.ItemAmountInAncient(); err == nil && frozen > 0 { + frozen, err = bc.db.Ancients() + if err != nil { + return nil, err + } var ( needRewind bool low uint64 diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index 5ddb6d2e07..b85f9c9e00 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -1762,7 +1762,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) { } os.RemoveAll(datadir) - db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false) + db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false) if err != nil { t.Fatalf("Failed to create persistent database: %v", err) } @@ -1832,7 +1832,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) { db.Close() // Start a new blockchain back up and see where the repait leads us - db, err = rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false) + db, err = rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false) if err != nil { t.Fatalf("Failed to reopen persistent database: %v", err) } diff --git a/core/blockchain_sethead_test.go b/core/blockchain_sethead_test.go index 6caec36eab..bdcca988a3 100644 --- a/core/blockchain_sethead_test.go +++ b/core/blockchain_sethead_test.go @@ -1961,7 +1961,7 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) { } os.RemoveAll(datadir) - db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false) + db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false) if err != nil { t.Fatalf("Failed to create persistent database: %v", err) } diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index b61eb741f0..530ad035dd 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -64,7 +64,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo } os.RemoveAll(datadir) - db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false) + db, err := rawdb.NewLevelDBDatabaseWithFreezer(datadir, 0, 0, datadir, "", false, false, false) if err != nil { t.Fatalf("Failed to create persistent database: %v", err) } @@ -248,7 +248,7 @@ func (snaptest *crashSnapshotTest) test(t *testing.T) { db.Close() // Start a new blockchain back up and see where the repair leads us - newdb, err := rawdb.NewLevelDBDatabaseWithFreezer(snaptest.datadir, 0, 0, snaptest.datadir, "", false) + newdb, err := rawdb.NewLevelDBDatabaseWithFreezer(snaptest.datadir, 0, 0, snaptest.datadir, "", false, false, false) if err != nil { t.Fatalf("Failed to reopen persistent database: %v", err) } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 6874534817..8078db774f 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -653,7 +653,7 @@ func TestFastVsFullChains(t *testing.T) { t.Fatalf("failed to create temp freezer dir: %v", err) } defer os.Remove(frdir) - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -727,7 +727,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { t.Fatalf("failed to create temp freezer dir: %v", err) } defer os.Remove(dir) - db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false) + db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -1594,7 +1594,7 @@ func TestBlockchainRecovery(t *testing.T) { } defer os.Remove(frdir) - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -1651,7 +1651,7 @@ func TestIncompleteAncientReceiptChainInsertion(t *testing.T) { t.Fatalf("failed to create temp freezer dir: %v", err) } defer os.Remove(frdir) - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -1850,7 +1850,7 @@ func testInsertKnownChainData(t *testing.T, typ string) { t.Fatalf("failed to create temp freezer dir: %v", err) } defer os.Remove(dir) - chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false) + chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), dir, "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2130,7 +2130,7 @@ func TestTransactionIndices(t *testing.T) { t.Fatalf("failed to create temp freezer dir: %v", err) } defer os.Remove(frdir) - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2158,7 +2158,7 @@ func TestTransactionIndices(t *testing.T) { // Init block chain with external ancients, check all needed indices has been indexed. limit := []uint64{0, 32, 64, 128} for _, l := range limit { - ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2178,7 +2178,7 @@ func TestTransactionIndices(t *testing.T) { } // Reconstruct a block chain which only reserves HEAD-64 tx indices - ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + ancientDb, err = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2257,7 +2257,7 @@ func TestSkipStaleTxIndicesInFastSync(t *testing.T) { t.Fatalf("failed to create temp freezer dir: %v", err) } defer os.Remove(frdir) - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index ea9dc436cf..f99511e456 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -440,7 +440,7 @@ func TestAncientStorage(t *testing.T) { } defer os.Remove(frdir) - db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { t.Fatalf("failed to create database with ancient backend") } diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 4dca06765e..22d5188e91 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -35,7 +35,7 @@ import ( // injects into the database the block hash->number mappings. func InitDatabaseFromFreezer(db ethdb.Database) { // If we can't access the freezer or it's empty, abort - frozen, err := db.Ancients() + frozen, err := db.ItemAmountInAncient() if err != nil || frozen == 0 { return } @@ -44,8 +44,9 @@ func InitDatabaseFromFreezer(db ethdb.Database) { start = time.Now() logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log hash common.Hash + offset = db.AncientOffSet() ) - for i := uint64(0); i < frozen; i++ { + for i := uint64(0) + offset; i < frozen+offset; i++ { // Since the freezer has all data in sequential order on a file, // it would be 'neat' to read more data in one go, and let the // freezerdb return N items (e.g up to 1000 items per go) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 82d5df06ce..3342b36f36 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -20,16 +20,18 @@ import ( "bytes" "errors" "fmt" + "math/big" "os" "sync/atomic" "time" + "github.com/olekukonko/tablewriter" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/leveldb" "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/log" - "github.com/olekukonko/tablewriter" ) // freezerdb is a database wrapper that enabled freezer data retrievals. @@ -112,6 +114,11 @@ func (db *nofreezedb) Ancients() (uint64, error) { return 0, errNotSupported } +// Ancients returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) ItemAmountInAncient() (uint64, error) { + return 0, errNotSupported +} + // AncientSize returns an error as we don't have a backing chain freezer. func (db *nofreezedb) AncientSize(kind string) (uint64, error) { return 0, errNotSupported @@ -140,6 +147,10 @@ func (db *nofreezedb) SetDiffStore(diff ethdb.KeyValueStore) { db.diffStore = diff } +func (db *nofreezedb) AncientOffSet() uint64 { + return 0 +} + // NewDatabase creates a high level database on top of a given key-value data // store without a freezer moving immutable chain segments into cold storage. func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { @@ -148,15 +159,69 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { } } +func ReadOffSetOfCurrentAncientFreezer(db ethdb.KeyValueReader) uint64 { + offset, _ := db.Get(offSetOfCurrentAncientFreezer) + if offset == nil { + return 0 + } + return new(big.Int).SetBytes(offset).Uint64() +} + +func ReadOffSetOfLastAncientFreezer(db ethdb.KeyValueReader) uint64 { + offset, _ := db.Get(offSetOfLastAncientFreezer) + if offset == nil { + return 0 + } + return new(big.Int).SetBytes(offset).Uint64() +} + +func WriteOffSetOfCurrentAncientFreezer(db ethdb.KeyValueWriter, offset uint64) { + if err := db.Put(offSetOfCurrentAncientFreezer, new(big.Int).SetUint64(offset).Bytes()); err != nil { + log.Crit("Failed to store offSetOfAncientFreezer", "err", err) + } +} +func WriteOffSetOfLastAncientFreezer(db ethdb.KeyValueWriter, offset uint64) { + if err := db.Put(offSetOfLastAncientFreezer, new(big.Int).SetUint64(offset).Bytes()); err != nil { + log.Crit("Failed to store offSetOfAncientFreezer", "err", err) + } +} + +// NewFreezerDb only create a freezer without statedb. +func NewFreezerDb(db ethdb.KeyValueStore, frz, namespace string, readonly bool, newOffSet uint64) (*freezer, error) { + // Create the idle freezer instance, this operation should be atomic to avoid mismatch between offset and acientDB. + frdb, err := newFreezer(frz, namespace, readonly) + if err != nil { + return nil, err + } + frdb.offset = newOffSet + frdb.frozen += newOffSet + return frdb, nil +} + // NewDatabaseWithFreezer creates a high level database on top of a given key- // value data store with a freezer moving immutable chain segments into cold // storage. -func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly bool) (ethdb.Database, error) { +func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) { // Create the idle freezer instance frdb, err := newFreezer(freezer, namespace, readonly) if err != nil { return nil, err } + + var offset uint64 + // The offset of ancientDB should be handled differently in different scenarios. + if isLastOffset { + offset = ReadOffSetOfLastAncientFreezer(db) + } else { + offset = ReadOffSetOfCurrentAncientFreezer(db) + } + + frdb.offset = offset + + // Some blocks in ancientDB may have already been frozen and been pruned, so adding the offset to + // reprensent the absolute number of blocks already frozen. + frdb.frozen += offset + // Since the freezer can be stored separately from the user's key-value database, // there's a fairly high probability that the user requests invalid combinations // of the freezer and database. Ensure that we don't shoot ourselves in the foot @@ -179,7 +244,10 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st // If the genesis hash is empty, we have a new key-value store, so nothing to // validate in this method. If, however, the genesis hash is not nil, compare // it to the freezer content. - if kvgenesis, _ := db.Get(headerHashKey(0)); len(kvgenesis) > 0 { + // Only to check the followings when offset equal to 0, otherwise the block number + // in ancientdb did not start with 0, no genesis block in ancientdb as well. + + if kvgenesis, _ := db.Get(headerHashKey(0)); offset == 0 && len(kvgenesis) > 0 { if frozen, _ := frdb.Ancients(); frozen > 0 { // If the freezer already contains something, ensure that the genesis blocks // match, otherwise we might mix up freezers across chains and destroy both @@ -221,8 +289,9 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st // feezer. } } + // Freezer is consistent with the key-value database, permit combining the two - if !frdb.readonly { + if !disableFreeze && !frdb.readonly { go frdb.freeze(db) } return &freezerdb{ @@ -256,12 +325,12 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string, r // NewLevelDBDatabaseWithFreezer creates a persistent key-value database with a // freezer moving immutable chain segments into cold storage. -func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer string, namespace string, readonly bool) (ethdb.Database, error) { +func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer string, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) { kvdb, err := leveldb.New(file, cache, handles, namespace, readonly) if err != nil { return nil, err } - frdb, err := NewDatabaseWithFreezer(kvdb, freezer, namespace, readonly) + frdb, err := NewDatabaseWithFreezer(kvdb, freezer, namespace, readonly, disableFreeze, isLastOffset) if err != nil { kvdb.Close() return nil, err @@ -298,6 +367,35 @@ func (s *stat) Size() string { func (s *stat) Count() string { return s.count.String() } +func AncientInspect(db ethdb.Database) error { + offset := counter(ReadOffSetOfCurrentAncientFreezer(db)) + // Get number of ancient rows inside the freezer. + ancients := counter(0) + if count, err := db.ItemAmountInAncient(); err != nil { + log.Error("failed to get the items amount in ancientDB", "err", err) + return err + } else { + ancients = counter(count) + } + var endNumber counter + if offset+ancients <= 0 { + endNumber = 0 + } else { + endNumber = offset + ancients - 1 + } + stats := [][]string{ + {"Offset/StartBlockNumber", "Offset/StartBlockNumber of ancientDB", offset.String()}, + {"Amount of remained items in AncientStore", "Remaining items of ancientDB", ancients.String()}, + {"The last BlockNumber within ancientDB", "The last BlockNumber", endNumber.String()}, + } + table := tablewriter.NewWriter(os.Stdout) + table.SetHeader([]string{"Database", "Category", "Items"}) + table.SetFooter([]string{"", "AncientStore information", ""}) + table.AppendBulk(stats) + table.Render() + + return nil +} // InspectDatabase traverses the entire database and checks the size // of all different categories of data. @@ -431,9 +529,10 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { } // Get number of ancient rows inside the freezer ancients := counter(0) - if count, err := db.Ancients(); err == nil { + if count, err := db.ItemAmountInAncient(); err == nil { ancients = counter(count) } + // Display the database statistic. stats := [][]string{ {"Key-Value store", "Headers", headers.Size(), headers.Count()}, diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 94b99a64eb..92dfc9604a 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -85,6 +85,8 @@ type freezer struct { quit chan struct{} closeOnce sync.Once + + offset uint64 // Starting BlockNumber in current freezer } // newFreezer creates a chain freezer that moves ancient chain data into @@ -164,7 +166,7 @@ func (f *freezer) Close() error { // in the freezer. func (f *freezer) HasAncient(kind string, number uint64) (bool, error) { if table := f.tables[kind]; table != nil { - return table.has(number), nil + return table.has(number - f.offset), nil } return false, nil } @@ -172,7 +174,7 @@ func (f *freezer) HasAncient(kind string, number uint64) (bool, error) { // Ancient retrieves an ancient binary blob from the append-only immutable files. func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { if table := f.tables[kind]; table != nil { - return table.Retrieve(number) + return table.Retrieve(number - f.offset) } return nil, errUnknownTable } @@ -182,6 +184,16 @@ func (f *freezer) Ancients() (uint64, error) { return atomic.LoadUint64(&f.frozen), nil } +// ItemAmountInAncient returns the actual length of current ancientDB. +func (f *freezer) ItemAmountInAncient() (uint64, error) { + return atomic.LoadUint64(&f.frozen) - atomic.LoadUint64(&f.offset), nil +} + +// AncientOffSet returns the offset of current ancientDB. +func (f *freezer) AncientOffSet() uint64 { + return atomic.LoadUint64(&f.offset) +} + // AncientSize returns the ancient size of the specified category. func (f *freezer) AncientSize(kind string) (uint64, error) { if table := f.tables[kind]; table != nil { @@ -216,23 +228,23 @@ func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td } }() // Inject all the components into the relevant data tables - if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil { + if err := f.tables[freezerHashTable].Append(f.frozen-f.offset, hash[:]); err != nil { log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err) return err } - if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil { + if err := f.tables[freezerHeaderTable].Append(f.frozen-f.offset, header); err != nil { log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err) return err } - if err := f.tables[freezerBodiesTable].Append(f.frozen, body); err != nil { + if err := f.tables[freezerBodiesTable].Append(f.frozen-f.offset, body); err != nil { log.Error("Failed to append ancient body", "number", f.frozen, "hash", hash, "err", err) return err } - if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil { + if err := f.tables[freezerReceiptTable].Append(f.frozen-f.offset, receipts); err != nil { log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err) return err } - if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil { + if err := f.tables[freezerDifficultyTable].Append(f.frozen-f.offset, td); err != nil { log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err) return err } @@ -249,7 +261,7 @@ func (f *freezer) TruncateAncients(items uint64) error { return nil } for _, table := range f.tables { - if err := table.truncate(items); err != nil { + if err := table.truncate(items - f.offset); err != nil { return err } } diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index b4fb99e451..ece5006c39 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -69,6 +69,12 @@ var ( // fastTxLookupLimitKey tracks the transaction lookup limit during fast sync. fastTxLookupLimitKey = []byte("FastTransactionLookupLimit") + //offSet of new updated ancientDB. + offSetOfCurrentAncientFreezer = []byte("offSetOfCurrentAncientFreezer") + + //offSet of the ancientDB before updated version. + offSetOfLastAncientFreezer = []byte("offSetOfLastAncientFreezer") + // badBlockKey tracks the list of bad blocks seen by local badBlockKey = []byte("InvalidBlock") diff --git a/core/rawdb/table.go b/core/rawdb/table.go index a27f5f8ed7..0dcaf8cfb0 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -68,6 +68,16 @@ func (t *table) Ancients() (uint64, error) { return t.db.Ancients() } +// ItemAmountInAncient returns the actual length of current ancientDB. +func (t *table) ItemAmountInAncient() (uint64, error) { + return t.db.ItemAmountInAncient() +} + +// AncientOffSet returns the offset of current ancientDB. +func (t *table) AncientOffSet() uint64 { + return t.db.AncientOffSet() +} + // AncientSize is a noop passthrough that just forwards the request to the underlying // database. func (t *table) AncientSize(kind string) (uint64, error) { diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index bb599fcd87..5b070f3afa 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -27,7 +27,10 @@ import ( "strings" "time" + "github.com/prometheus/tsdb/fileutil" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state/snapshot" @@ -35,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) @@ -85,6 +89,14 @@ type Pruner struct { triesInMemory uint64 } +type BlockPruner struct { + db ethdb.Database + oldAncientPath string + newAncientPath string + node *node.Node + BlockAmountReserved uint64 +} + // NewPruner creates the pruner instance. func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize, triesInMemory uint64) (*Pruner, error) { headBlock := rawdb.ReadHeadBlock(db) @@ -101,6 +113,7 @@ func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize, trie bloomSize = 256 } stateBloom, err := newStateBloomWithSize(bloomSize) + if err != nil { return nil, err } @@ -115,6 +128,16 @@ func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize, trie }, nil } +func NewBlockPruner(db ethdb.Database, n *node.Node, oldAncientPath, newAncientPath string, BlockAmountReserved uint64) *BlockPruner { + return &BlockPruner{ + db: db, + oldAncientPath: oldAncientPath, + newAncientPath: newAncientPath, + node: n, + BlockAmountReserved: BlockAmountReserved, + } +} + func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, middleStateRoots map[common.Hash]struct{}, start time.Time) error { // Delete all stale trie nodes in the disk. With the help of state bloom // the trie nodes(and codes) belong to the active state will be filtered @@ -233,6 +256,194 @@ func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, sta return nil } +func (p *BlockPruner) backUpOldDb(name string, cache, handles int, namespace string, readonly, interrupt bool) error { + // Open old db wrapper. + chainDb, err := p.node.OpenDatabaseWithFreezer(name, cache, handles, p.oldAncientPath, namespace, readonly, true, interrupt) + if err != nil { + log.Error("Failed to open ancient database", "err=", err) + return err + } + defer chainDb.Close() + log.Info("chainDB opened successfully") + + // Get the number of items in old ancient db. + itemsOfAncient, err := chainDb.ItemAmountInAncient() + log.Info("the number of items in ancientDB is ", "itemsOfAncient", itemsOfAncient) + + // If we can't access the freezer or it's empty, abort. + if err != nil || itemsOfAncient == 0 { + log.Error("can't access the freezer or it's empty, abort") + return errors.New("can't access the freezer or it's empty, abort") + } + + // If the items in freezer is less than the block amount that we want to reserve, it is not enough, should stop. + if itemsOfAncient < p.BlockAmountReserved { + log.Error("the number of old blocks is not enough to reserve,", "ancient items", itemsOfAncient, "the amount specified", p.BlockAmountReserved) + return errors.New("the number of old blocks is not enough to reserve") + } + + var oldOffSet uint64 + if interrupt { + // The interrupt scecario within this function is specific for old and new ancientDB exsisted concurrently, + // should use last version of offset for oldAncientDB, because current offset is + // actually of the new ancientDB_Backup, but what we want is the offset of ancientDB being backup. + oldOffSet = rawdb.ReadOffSetOfLastAncientFreezer(chainDb) + } else { + // Using current version of ancientDB for oldOffSet because the db for backup is current version. + oldOffSet = rawdb.ReadOffSetOfCurrentAncientFreezer(chainDb) + } + log.Info("the oldOffSet is ", "oldOffSet", oldOffSet) + + // Get the start BlockNumber for pruning. + startBlockNumber := oldOffSet + itemsOfAncient - p.BlockAmountReserved + log.Info("new offset/new startBlockNumber is ", "new offset", startBlockNumber) + + // Create new ancientdb backup and record the new and last version of offset in kvDB as well. + // For every round, newoffset actually equals to the startBlockNumber in ancient backup db. + frdbBack, err := rawdb.NewFreezerDb(chainDb, p.newAncientPath, namespace, readonly, startBlockNumber) + if err != nil { + log.Error("Failed to create ancient freezer backup", "err=", err) + return err + } + defer frdbBack.Close() + + offsetBatch := chainDb.NewBatch() + rawdb.WriteOffSetOfCurrentAncientFreezer(offsetBatch, startBlockNumber) + rawdb.WriteOffSetOfLastAncientFreezer(offsetBatch, oldOffSet) + if err := offsetBatch.Write(); err != nil { + log.Crit("Failed to write offset into disk", "err", err) + } + + // It's guaranteed that the old/new offsets are updated as well as the new ancientDB are created if this flock exist. + lock, _, err := fileutil.Flock(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK")) + if err != nil { + log.Error("file lock error", "err", err) + return err + } + + log.Info("prune info", "old offset", oldOffSet, "number of items in ancientDB", itemsOfAncient, "amount to reserve", p.BlockAmountReserved) + log.Info("new offset/new startBlockNumber recorded successfully ", "new offset", startBlockNumber) + + start := time.Now() + // All ancient data after and including startBlockNumber should write into new ancientDB ancient_back. + for blockNumber := startBlockNumber; blockNumber < itemsOfAncient+oldOffSet; blockNumber++ { + blockHash := rawdb.ReadCanonicalHash(chainDb, blockNumber) + block := rawdb.ReadBlock(chainDb, blockHash, blockNumber) + receipts := rawdb.ReadRawReceipts(chainDb, blockHash, blockNumber) + // Calculate the total difficulty of the block + td := rawdb.ReadTd(chainDb, blockHash, blockNumber) + if td == nil { + return consensus.ErrUnknownAncestor + } + // Write into new ancient_back db. + rawdb.WriteAncientBlock(frdbBack, block, receipts, td) + // Print the log every 5s for better trace. + if common.PrettyDuration(time.Since(start)) > common.PrettyDuration(5*time.Second) { + log.Info("block backup process running successfully", "current blockNumber for backup", blockNumber) + start = time.Now() + } + } + lock.Release() + log.Info("block back up done", "current start blockNumber in ancientDB", startBlockNumber) + return nil +} + +// Backup the ancient data for the old ancient db, i.e. the most recent 128 blocks in ancient db. +func (p *BlockPruner) BlockPruneBackUp(name string, cache, handles int, namespace string, readonly, interrupt bool) error { + + start := time.Now() + + if err := p.backUpOldDb(name, cache, handles, namespace, readonly, interrupt); err != nil { + return err + } + + log.Info("Block pruning BackUp successfully", "time duration since start is", common.PrettyDuration(time.Since(start))) + return nil +} + +func (p *BlockPruner) RecoverInterruption(name string, cache, handles int, namespace string, readonly bool) error { + log.Info("RecoverInterruption for block prune") + newExist, err := CheckFileExist(p.newAncientPath) + if err != nil { + log.Error("newAncientDb path error") + return err + } + + if newExist { + log.Info("New ancientDB_backup existed in interruption scenario") + flockOfAncientBack, err := CheckFileExist(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK")) + if err != nil { + log.Error("Failed to check flock of ancientDB_Back %v", err) + return err + } + + // Indicating both old and new ancientDB existed concurrently. + // Delete directly for the new ancientdb to prune from start, e.g.: path ../chaindb/ancient_backup + if err := os.RemoveAll(p.newAncientPath); err != nil { + log.Error("Failed to remove old ancient directory %v", err) + return err + } + if flockOfAncientBack { + // Indicating the oldOffset/newOffset have already been updated. + if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, true); err != nil { + log.Error("Failed to prune") + return err + } + } else { + // Indicating the flock did not exist and the new offset did not be updated, so just handle this case as usual. + if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, false); err != nil { + log.Error("Failed to prune") + return err + } + } + + if err := p.AncientDbReplacer(); err != nil { + log.Error("Failed to replace ancientDB") + return err + } + } else { + log.Info("New ancientDB_backup did not exist in interruption scenario") + // Indicating new ancientDB even did not be created, just prune starting at backup from startBlockNumber as usual, + // in this case, the new offset have not been written into kvDB. + if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, false); err != nil { + log.Error("Failed to prune") + return err + } + if err := p.AncientDbReplacer(); err != nil { + log.Error("Failed to replace ancientDB") + return err + } + } + + return nil +} + +func CheckFileExist(path string) (bool, error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + // Indicating the file didn't exist. + return false, nil + } + return true, err + } + return true, nil +} + +func (p *BlockPruner) AncientDbReplacer() error { + // Delete directly for the old ancientdb, e.g.: path ../chaindb/ancient + if err := os.RemoveAll(p.oldAncientPath); err != nil { + log.Error("Failed to remove old ancient directory %v", err) + return err + } + + // Rename the new ancientdb path same to the old + if err := os.Rename(p.newAncientPath, p.oldAncientPath); err != nil { + log.Error("Failed to rename new ancient directory") + return err + } + return nil +} + // Prune deletes all historical state nodes except the nodes belong to the // specified state version. If user doesn't specify the state version, use // the bottom-most snapshot diff layer as the target. diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 0009f41786..e1225e7a1c 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -599,10 +599,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I d.ancientLimit = 0 } frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. - + itemAmountInAncient, _ := d.stateDB.ItemAmountInAncient() // If a part of blockchain data has already been written into active store, // disable the ancient style insertion explicitly. - if origin >= frozen && frozen != 0 { + if origin >= frozen && itemAmountInAncient != 0 { d.ancientLimit = 0 log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1) } else if d.ancientLimit > 0 { diff --git a/ethdb/database.go b/ethdb/database.go index 40d0e01d57..e437817a20 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -81,6 +81,12 @@ type AncientReader interface { // AncientSize returns the ancient size of the specified category. AncientSize(kind string) (uint64, error) + + // ItemAmountInAncient returns the actual length of current ancientDB. + ItemAmountInAncient() (uint64, error) + + // AncientOffSet returns the offset of current ancientDB. + AncientOffSet() uint64 } // AncientWriter contains the methods required to write to immutable ancient data. diff --git a/node/node.go b/node/node.go index c122dad32c..b6e1246b3d 100644 --- a/node/node.go +++ b/node/node.go @@ -27,6 +27,8 @@ import ( "strings" "sync" + "github.com/prometheus/tsdb/fileutil" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" @@ -35,7 +37,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" - "github.com/prometheus/tsdb/fileutil" ) // Node is a container on which services can be registered. @@ -586,7 +587,7 @@ func (n *Node) OpenAndMergeDatabase(name string, cache, handles int, freezer, di if persistDiff { chainDataHandles = handles * chainDataHandlesPercentage / 100 } - chainDB, err := n.OpenDatabaseWithFreezer(name, cache, chainDataHandles, freezer, namespace, readonly) + chainDB, err := n.OpenDatabaseWithFreezer(name, cache, chainDataHandles, freezer, namespace, readonly, false, false) if err != nil { return nil, err } @@ -606,7 +607,7 @@ func (n *Node) OpenAndMergeDatabase(name string, cache, handles int, freezer, di // also attaching a chain freezer to it that moves ancient chain data from the // database to immutable append-only files. If the node is an ephemeral one, a // memory database is returned. -func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string, readonly bool) (ethdb.Database, error) { +func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) { n.lock.Lock() defer n.lock.Unlock() if n.state == closedState { @@ -625,7 +626,7 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, case !filepath.IsAbs(freezer): freezer = n.ResolvePath(freezer) } - db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace, readonly) + db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace, readonly, disableFreeze, isLastOffset) } if err == nil { diff --git a/rlp/safe.go b/rlp/safe.go index c881650a0d..a80380aef4 100644 --- a/rlp/safe.go +++ b/rlp/safe.go @@ -22,5 +22,5 @@ import "reflect" // byteArrayBytes returns a slice of the byte array v. func byteArrayBytes(v reflect.Value) []byte { - return v.Slice(0, v.Len()).Bytes() + return v.Slice(0, v.Len()).Bytes() } diff --git a/tests/fuzzers/bls12381/bls12381_fuzz.go b/tests/fuzzers/bls12381/bls12381_fuzz.go index 298050ad36..5c28b952d3 100644 --- a/tests/fuzzers/bls12381/bls12381_fuzz.go +++ b/tests/fuzzers/bls12381/bls12381_fuzz.go @@ -28,6 +28,7 @@ import ( gnark "github.com/consensys/gnark-crypto/ecc/bls12-381" "github.com/consensys/gnark-crypto/ecc/bls12-381/fp" "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" + "github.com/ethereum/go-ethereum/crypto/bls12381" ) @@ -159,7 +160,7 @@ func FuzzCrossG1MultiExp(data []byte) int { gethPoints = append(gethPoints, new(bls12381.PointG1).Set(kp1)) gnarkPoints = append(gnarkPoints, *cp1) } - if len(gethScalars) == 0{ + if len(gethScalars) == 0 { return 0 } // compute multi exponentiation