Skip to content

Commit

Permalink
Stabilize Snapshots (ethereum#1529)
Browse files Browse the repository at this point in the history
Cherry pick bug fixes from upstream for snapshots, which will enable higher transaction throughput. It also enables snapshots by default (which is one of the commits pulled from upstream).

Upstream commits included:

68754f3 cmd/utils: grant snapshot cache to trie if disabled (ethereum#21416)
3ee91b9 core/state/snapshot: reduce disk layer depth during generation
a15d71a core/state/snapshot: stop generator if it hits missing trie nodes (ethereum#21649)
43c278c core/state: disable snapshot iteration if it's not fully constructed (ethereum#21682)
b63e3c3 core: improve snapshot journal recovery (ethereum#21594)
e640267 core/state/snapshot: fix journal recovery from generating old journal (ethereum#21775)
7b7b327 core/state/snapshot: update generator marker in sync with flushes
167ff56 core/state/snapshot: gethring -> gathering typo (ethereum#22104)
d2e1b17 snapshot, trie: fixed typos, mostly in snapshot pkg (ethereum#22133)
c4deebb core/state/snapshot: add generation logs to storage too
5e9f5ca core/state/snapshot: write snapshot generator in batch (ethereum#22163)
18145ad core/state: maintain one more diff layer (ethereum#21730)
04a7226 snapshot: merge loops for better performance (ethereum#22160)
994cdc6 cmd/utils: enable snapshots by default
9ec3329 core/state/snapshot: ensure Cap retains a min number of layers
52e5c38 core/state: copy the snap when copying the state (ethereum#22340)
a31f6d5 core/state/snapshot: fix panic on missing parent
61ff3e8 core/state/snapshot, ethdb: track deletions more accurately (ethereum#22582)
c79fc20 core/state/snapshot: fix data race in diff layer (ethereum#22540)

Other changes
Commit f9b5530 (not from upstream) fixes an incorrect default DatabaseCache value due to an earlier bad merge.

Tested
Automated tests
Testing on a private testnet
Backwards compatibility
Enabling snapshots by default is a breaking change in terms of the CLI flags, but will not cause backwards incompatibility between the node and other nodes.

Co-authored-by: Péter Szilágyi <peterke@gmail.com>
Co-authored-by: gary rong <garyrong0905@gmail.com>
Co-authored-by: Melvin Junhee Woo <melvin.woo@groundx.xyz>
Co-authored-by: Martin Holst Swende <martin@swende.se>
Co-authored-by: Edgar Aroutiounian <edgar.factorial@gmail.com>
  • Loading branch information
6 people committed May 5, 2021
1 parent 4acd3d0 commit 77a4294
Show file tree
Hide file tree
Showing 26 changed files with 2,402 additions and 282 deletions.
9 changes: 5 additions & 4 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ var (
Usage: `Blockchain garbage collection mode ("full", "archive")`,
Value: "full",
}
SnapshotFlag = cli.BoolFlag{
SnapshotFlag = cli.BoolTFlag{
Name: "snapshot",
Usage: `Enables snapshot-database mode -- experimental work in progress feature`,
Usage: `Enables snapshot-database mode (default = enable)`,
}
TxLookupLimitFlag = cli.Int64Flag{
Name: "txlookuplimit",
Expand Down Expand Up @@ -1695,7 +1695,8 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheSnapshotFlag.Name) {
cfg.SnapshotCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheSnapshotFlag.Name) / 100
}
if !ctx.GlobalIsSet(SnapshotFlag.Name) {
if !ctx.GlobalBool(SnapshotFlag.Name) {
cfg.TrieCleanCache += cfg.SnapshotCache
cfg.SnapshotCache = 0 // Disabled
}
if ctx.GlobalIsSet(DocRootFlag.Name) {
Expand Down Expand Up @@ -1946,7 +1947,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readOnly bool) (chain *core.B
TrieTimeLimit: eth.DefaultConfig.TrieTimeout,
SnapshotLimit: eth.DefaultConfig.SnapshotCache,
}
if !ctx.GlobalIsSet(SnapshotFlag.Name) {
if !ctx.GlobalBool(SnapshotFlag.Name) {
cache.SnapshotLimit = 0 // Disabled
}
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) {
Expand Down
97 changes: 84 additions & 13 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
vmConfig vm.Config

badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
writeLegacyJournal bool // Testing flag used to flush the snapshot journal in legacy format.
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -277,9 +278,29 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// Make sure the state associated with the block is available
head := bc.CurrentBlock()
if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
if err := bc.SetHead(head.NumberU64()); err != nil {
return nil, err
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash(), "snaproot", diskRoot)

snapDisk, err := bc.SetHeadBeyondRoot(head.NumberU64(), diskRoot)
if err != nil {
return nil, err
}
// Chain rewound, persist old snapshot number to indicate recovery procedure
if snapDisk != 0 {
rawdb.WriteSnapshotRecoveryNumber(bc.db, snapDisk)
}
} else {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
if err := bc.SetHead(head.NumberU64()); err != nil {
return nil, err
}
}
}
// Ensure that a previous crash in SetHead doesn't leave extra ancients
Expand Down Expand Up @@ -335,7 +356,18 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root(), !bc.cacheConfig.SnapshotWait)
// If the chain was rewound past the snapshot persistent layer (causing
// a recovery block number to be persisted to disk), check if we're still
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
var recover bool

head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer > head.NumberU64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.NumberU64(), "diskbase", *layer)
recover = true
}
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, recover)
}
// Take ownership of this particular state
go bc.update()
Expand Down Expand Up @@ -435,9 +467,25 @@ func (bc *BlockChain) loadLastState() error {
// was fast synced or full synced and in which state, the method will try to
// delete minimal data from disk whilst retaining chain consistency.
func (bc *BlockChain) SetHead(head uint64) error {
_, err := bc.SetHeadBeyondRoot(head, common.Hash{})
return err
}

// SetHeadBeyondRoot rewinds the local chain to a new head with the extra condition
// that the rewind must pass the specified state root. This method is meant to be
// used when rewiding with snapshots enabled to ensure that we go back further than
// persistent disk layer. Depending on whether the node was fast synced or full, and
// in which state, the method will try to delete minimal data from disk whilst
// retaining chain consistency.
//
// The method returns the block number where the requested root cap was found.
func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

// Track the block number of the requested root hash
var rootNumber uint64 // (no root == always 0)

// Retrieve the last pivot block to short circuit rollbacks beyond it and the
// current freezer limit to start nuking id underflown
pivot := rawdb.ReadLastPivotNumber(bc.db)
Expand All @@ -453,8 +501,16 @@ func (bc *BlockChain) SetHead(head uint64) error {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
} else {
// Block exists, keep rewinding until we find one with state
// Block exists, keep rewinding until we find one with state,
// keeping rewinding until we exceed the optional threshold
// root hash
beyondRoot := (root == common.Hash{}) // Flag whether we're beyond the requested root (no root, always true)

for {
// If a root threshold was requested but not yet crossed, check
if root != (common.Hash{}) && !beyondRoot && newHeadBlock.Root() == root {
beyondRoot, rootNumber = true, newHeadBlock.NumberU64()
}
if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
log.Info("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
if pivot == nil || newHeadBlock.NumberU64() > *pivot {
Expand All @@ -465,8 +521,12 @@ func (bc *BlockChain) SetHead(head uint64) error {
newHeadBlock = bc.genesisBlock
}
}
log.Info("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
if beyondRoot || newHeadBlock.NumberU64() == 0 {
log.Info("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
}
log.Info("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root())
newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1) // Keep rewinding
}
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
Expand Down Expand Up @@ -547,7 +607,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()

return bc.loadLastState()
return rootNumber, bc.loadLastState()
}

// FastSyncCommitHead sets the current head block to the one defined by the hash
Expand Down Expand Up @@ -590,6 +650,11 @@ func (bc *BlockChain) Snapshot() *snapshot.Tree {
return bc.snaps
}

// Add this to solve conflicts due to cherry-picking
func (bc *BlockChain) Snapshots() *snapshot.Tree {
return bc.Snapshot()
}

// CurrentFastBlock retrieves the current fast-sync head block of the canonical
// chain. The block is retrieved from the blockchain's internal cache.
func (bc *BlockChain) CurrentFastBlock() *types.Block {
Expand Down Expand Up @@ -899,8 +964,14 @@ func (bc *BlockChain) Stop() {
var snapBase common.Hash
if bc.snaps != nil {
var err error
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
if bc.writeLegacyJournal {
if snapBase, err = bc.snaps.LegacyJournal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
}
} else {
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
}
}
}
// Ensure the state of a recent block is also stored to disk before exiting.
Expand Down
Loading

0 comments on commit 77a4294

Please sign in to comment.