Skip to content

Commit

Permalink
eth,ethdb,node,core/state: backport from bnb-chain/bsc#543
Browse files Browse the repository at this point in the history
Signed-off-by: Delweng <delweng@gmail.com>
  • Loading branch information
jsvisa committed Feb 17, 2023
1 parent 842faa8 commit af405bf
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 3 deletions.
213 changes: 213 additions & 0 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/prometheus/tsdb/fileutil"
)

const (
Expand Down Expand Up @@ -335,6 +338,216 @@ func (p *Pruner) Prune(root common.Hash) error {
return prune(p.snaptree, root, p.db, p.stateBloom, filterName, middleRoots, start)
}

type BlockPruner struct {
db ethdb.Database
oldAncientPath string
newAncientPath string
node *node.Node
BlockAmountReserved uint64
}

func NewBlockPruner(db ethdb.Database, n *node.Node, oldAncientPath, newAncientPath string, BlockAmountReserved uint64) *BlockPruner {
return &BlockPruner{
db: db,
oldAncientPath: oldAncientPath,
newAncientPath: newAncientPath,
node: n,
BlockAmountReserved: BlockAmountReserved,
}
}

func (p *BlockPruner) backUpOldDb(name string, cache, handles int, namespace string, readonly, interrupt bool) error {
// Open old db wrapper.
chainDb, err := p.node.OpenDatabaseWithFreezer(name, cache, handles, p.oldAncientPath, namespace, readonly, true, interrupt)
if err != nil {
log.Error("Failed to open ancient database", "err=", err)
return err
}
defer chainDb.Close()
log.Info("chainDB opened successfully")

// Get the number of items in old ancient db.
itemsOfAncient, err := chainDb.ItemAmountInAncient()
log.Info("the number of items in ancientDB is ", "itemsOfAncient", itemsOfAncient)

// If we can't access the freezer or it's empty, abort.
if err != nil || itemsOfAncient == 0 {
log.Error("can't access the freezer or it's empty, abort")
return errors.New("can't access the freezer or it's empty, abort")
}

// If the items in freezer is less than the block amount that we want to reserve, it is not enough, should stop.
if itemsOfAncient < p.BlockAmountReserved {
log.Error("the number of old blocks is not enough to reserve,", "ancient items", itemsOfAncient, "the amount specified", p.BlockAmountReserved)
return errors.New("the number of old blocks is not enough to reserve")
}

var oldOffSet uint64
if interrupt {
// The interrupt scecario within this function is specific for old and new ancientDB exsisted concurrently,
// should use last version of offset for oldAncientDB, because current offset is
// actually of the new ancientDB_Backup, but what we want is the offset of ancientDB being backup.
oldOffSet = rawdb.ReadOffSetOfLastAncientFreezer(chainDb)
} else {
// Using current version of ancientDB for oldOffSet because the db for backup is current version.
oldOffSet = rawdb.ReadOffSetOfCurrentAncientFreezer(chainDb)
}
log.Info("the oldOffSet is ", "oldOffSet", oldOffSet)

// Get the start BlockNumber for pruning.
startBlockNumber := oldOffSet + itemsOfAncient - p.BlockAmountReserved
log.Info("new offset/new startBlockNumber is ", "new offset", startBlockNumber)

// Create new ancientdb backup and record the new and last version of offset in kvDB as well.
// For every round, newoffset actually equals to the startBlockNumber in ancient backup db.
frdbBack, err := rawdb.NewFreezerDb(chainDb, p.newAncientPath, namespace, readonly, startBlockNumber)
if err != nil {
log.Error("Failed to create ancient freezer backup", "err=", err)
return err
}
defer frdbBack.Close()

offsetBatch := chainDb.NewBatch()
rawdb.WriteOffSetOfCurrentAncientFreezer(offsetBatch, startBlockNumber)
rawdb.WriteOffSetOfLastAncientFreezer(offsetBatch, oldOffSet)
if err := offsetBatch.Write(); err != nil {
log.Crit("Failed to write offset into disk", "err", err)
}

// It's guaranteed that the old/new offsets are updated as well as the new ancientDB are created if this flock exist.
lock, _, err := fileutil.Flock(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK"))
if err != nil {
log.Error("file lock error", "err", err)
return err
}

log.Info("prune info", "old offset", oldOffSet, "number of items in ancientDB", itemsOfAncient, "amount to reserve", p.BlockAmountReserved)
log.Info("new offset/new startBlockNumber recorded successfully ", "new offset", startBlockNumber)

start := time.Now()
// All ancient data after and including startBlockNumber should write into new ancientDB ancient_back.
for blockNumber := startBlockNumber; blockNumber < itemsOfAncient+oldOffSet; blockNumber++ {
blockHash := rawdb.ReadCanonicalHash(chainDb, blockNumber)
block := rawdb.ReadBlock(chainDb, blockHash, blockNumber)
receipts := rawdb.ReadRawReceipts(chainDb, blockHash, blockNumber)
borReceipts := []*types.Receipt{rawdb.ReadBorReceipt(chainDb, blockHash, blockNumber)}

// Calculate the total difficulty of the block
td := rawdb.ReadTd(chainDb, blockHash, blockNumber)
if td == nil {
return consensus.ErrUnknownAncestor
}
// Write into new ancient_back db.
if _, err := rawdb.WriteAncientBlocks(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, []types.Receipts{borReceipts}, td); err != nil {
log.Error("failed to write new ancient", "error", err)
return err
}
// Print the log every 5s for better trace.
if common.PrettyDuration(time.Since(start)) > common.PrettyDuration(5*time.Second) {
log.Info("block backup process running successfully", "current blockNumber for backup", blockNumber)
start = time.Now()
}
}
lock.Release()
log.Info("block back up done", "current start blockNumber in ancientDB", startBlockNumber)
return nil
}

// Backup the ancient data for the old ancient db, i.e. the most recent 128 blocks in ancient db.
func (p *BlockPruner) BlockPruneBackUp(name string, cache, handles int, namespace string, readonly, interrupt bool) error {
start := time.Now()

if err := p.backUpOldDb(name, cache, handles, namespace, readonly, interrupt); err != nil {
return err
}

log.Info("Block pruning BackUp successfully", "time duration since start is", common.PrettyDuration(time.Since(start)))
return nil
}

func (p *BlockPruner) RecoverInterruption(name string, cache, handles int, namespace string, readonly bool) error {
log.Info("RecoverInterruption for block prune")
newExist, err := CheckFileExist(p.newAncientPath)
if err != nil {
log.Error("newAncientDb path error")
return err
}

if newExist {
log.Info("New ancientDB_backup existed in interruption scenario")
flockOfAncientBack, err := CheckFileExist(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK"))
if err != nil {
log.Error("Failed to check flock of ancientDB_Back %v", err)
return err
}

// Indicating both old and new ancientDB existed concurrently.
// Delete directly for the new ancientdb to prune from start, e.g.: path ../chaindb/ancient_backup
if err := os.RemoveAll(p.newAncientPath); err != nil {
log.Error("Failed to remove old ancient directory %v", err)
return err
}
if flockOfAncientBack {
// Indicating the oldOffset/newOffset have already been updated.
if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, true); err != nil {
log.Error("Failed to prune")
return err
}
} else {
// Indicating the flock did not exist and the new offset did not be updated, so just handle this case as usual.
if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, false); err != nil {
log.Error("Failed to prune")
return err
}
}

if err := p.AncientDbReplacer(); err != nil {
log.Error("Failed to replace ancientDB")
return err
}
} else {
log.Info("New ancientDB_backup did not exist in interruption scenario")
// Indicating new ancientDB even did not be created, just prune starting at backup from startBlockNumber as usual,
// in this case, the new offset have not been written into kvDB.
if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, false); err != nil {
log.Error("Failed to prune")
return err
}
if err := p.AncientDbReplacer(); err != nil {
log.Error("Failed to replace ancientDB")
return err
}
}

return nil
}

func CheckFileExist(path string) (bool, error) {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
// Indicating the file didn't exist.
return false, nil
}
return true, err
}
return true, nil
}

func (p *BlockPruner) AncientDbReplacer() error {
// Delete directly for the old ancientdb, e.g.: path ../chaindb/ancient
if err := os.RemoveAll(p.oldAncientPath); err != nil {
log.Error("Failed to remove old ancient directory %v", err)
return err
}

// Rename the new ancientdb path same to the old
if err := os.Rename(p.newAncientPath, p.oldAncientPath); err != nil {
log.Error("Failed to rename new ancient directory")
return err
}
return nil
}

// RecoverPruning will resume the pruning procedure during the system restart.
// This function is used in this case: user tries to prune state data, but the
// system was interrupted midway because of crash or manual-kill. In this case
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
} else {
d.ancientLimit = 0
}
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
frozen, _ := d.stateDB.ItemAmountInAncient() // Ignore the error here since light client can also hit here.

// If a part of blockchain data has already been written into active store,
// disable the ancient style insertion explicitly.
Expand Down
7 changes: 7 additions & 0 deletions ethdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ type AncientReader interface {

// AncientSize returns the ancient size of the specified category.
AncientSize(kind string) (uint64, error)

// ItemAmountInAncient returns the actual length of current ancientDB.
ItemAmountInAncient() (uint64, error)

// AncientOffSet returns the offset of current ancientDB.
AncientOffSet() uint64
}

// AncientBatchReader is the interface for 'batched' or 'atomic' reading.
Expand Down Expand Up @@ -164,6 +170,7 @@ type AncientStore interface {

// Database contains all the methods required by the high level database to not
// only access the key-value data store but also the chain freezer.
//
//go:generate mockgen -destination=../eth/filters/IDatabase.go -package=filters . Database
type Database interface {
Reader
Expand Down
4 changes: 2 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r
// also attaching a chain freezer to it that moves ancient chain data from the
// database to immutable append-only files. If the node is an ephemeral one, a
// memory database is returned.
func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string, readonly bool) (ethdb.Database, error) {
func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.state == closedState {
Expand All @@ -738,7 +738,7 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer,
case !filepath.IsAbs(freezer):
freezer = n.ResolvePath(freezer)
}
db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace, readonly)
db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace, readonly, disableFreeze, isLastOffset)
}

if err == nil {
Expand Down

0 comments on commit af405bf

Please sign in to comment.