Skip to content

Commit

Permalink
core: define and test chain rewind corner cases (#21409)
Browse files Browse the repository at this point in the history
* core: define and test chain reparation cornercases

* core: write up a variety of set-head tests

* core, eth: unify chain rollbacks, handle all the cases

* core: make linter smile

* core: remove commented out legacy code

* core, eth/downloader: fix review comments

* core: revert a removed recovery mechanism
  • Loading branch information
karalabe authored Aug 20, 2020
1 parent 0bdd295 commit 8cbdc86
Show file tree
Hide file tree
Showing 13 changed files with 3,952 additions and 214 deletions.
203 changes: 86 additions & 117 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ type CacheConfig struct {
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}

// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
}

// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
Expand Down Expand Up @@ -204,13 +214,7 @@ type BlockChain struct {
// Processor.
func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64) (*BlockChain, error) {
if cacheConfig == nil {
cacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
}
cacheConfig = defaultCacheConfig
}
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
Expand Down Expand Up @@ -268,15 +272,18 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
txIndexBlock = frozen
}
}

if err := bc.loadLastState(); err != nil {
return nil, err
}
// The first thing the node will do is reconstruct the verification data for
// the head block (ethash cache or clique voting snapshot). Might as well do
// it in advance.
bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)

// Make sure the state associated with the block is available
head := bc.CurrentBlock()
if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
if err := bc.SetHead(head.NumberU64()); err != nil {
return nil, err
}
}
// Ensure that a previous crash in SetHead doesn't leave extra ancients
if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 {
var (
needRewind bool
Expand All @@ -286,7 +293,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// blockchain repair. If the head full block is even lower than the ancient
// chain, truncate the ancient store.
fullBlock := bc.CurrentBlock()
if fullBlock != nil && fullBlock != bc.genesisBlock && fullBlock.NumberU64() < frozen-1 {
if fullBlock != nil && fullBlock.Hash() != bc.genesisBlock.Hash() && fullBlock.NumberU64() < frozen-1 {
needRewind = true
low = fullBlock.NumberU64()
}
Expand All @@ -301,15 +308,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
}
if needRewind {
var hashes []common.Hash
previous := bc.CurrentHeader().Number.Uint64()
for i := low + 1; i <= bc.CurrentHeader().Number.Uint64(); i++ {
hashes = append(hashes, rawdb.ReadCanonicalHash(bc.db, i))
log.Error("Truncating ancient chain", "from", bc.CurrentHeader().Number.Uint64(), "to", low)
if err := bc.SetHead(low); err != nil {
return nil, err
}
bc.Rollback(hashes)
log.Warn("Truncate ancient chain", "from", previous, "to", low)
}
}
// The first thing the node will do is reconstruct the verification data for
// the head block (ethash cache or clique voting snapshot). Might as well do
// it in advance.
bc.engine.VerifyHeader(bc, bc.CurrentHeader(), true)

// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
for hash := range BadHashes {
if header := bc.GetHeaderByHash(hash); header != nil {
Expand All @@ -318,7 +327,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// make sure the headerByNumber (if present) is in our current canonical chain
if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
bc.SetHead(header.Number.Uint64() - 1)
if err := bc.SetHead(header.Number.Uint64() - 1); err != nil {
return nil, err
}
log.Error("Chain rewind was successful, resuming normal operation")
}
}
Expand Down Expand Up @@ -385,15 +396,6 @@ func (bc *BlockChain) loadLastState() error {
log.Warn("Head block missing, resetting chain", "hash", head)
return bc.Reset()
}
// Make sure the state associated with the block is available
if _, err := state.New(currentBlock.Root(), bc.stateCache, bc.snaps); err != nil {
// Dangling block without a state associated, init from scratch
log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
if err := bc.repair(&currentBlock); err != nil {
return err
}
rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
}
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64()))
Expand Down Expand Up @@ -427,30 +429,48 @@ func (bc *BlockChain) loadLastState() error {
log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(currentHeader.Time), 0)))
log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(currentBlock.Time()), 0)))
log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(int64(currentFastBlock.Time()), 0)))

if pivot := rawdb.ReadLastPivotNumber(bc.db); pivot != nil {
log.Info("Loaded last fast-sync pivot marker", "number", *pivot)
}
return nil
}

// SetHead rewinds the local chain to a new head. In the case of headers, everything
// above the new head will be deleted and the new one set. In the case of blocks
// though, the head may be further rewound if block bodies are missing (non-archive
// nodes after a fast sync).
// SetHead rewinds the local chain to a new head. Depending on whether the node
// was fast synced or full synced and in which state, the method will try to
// delete minimal data from disk whilst retaining chain consistency.
func (bc *BlockChain) SetHead(head uint64) error {
log.Warn("Rewinding blockchain", "target", head)

bc.chainmu.Lock()
defer bc.chainmu.Unlock()

updateFn := func(db ethdb.KeyValueWriter, header *types.Header) {
// Rewind the block chain, ensuring we don't end up with a stateless head block
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() {
// Retrieve the last pivot block to short circuit rollbacks beyond it and the
// current freezer limit to start nuking id underflown
pivot := rawdb.ReadLastPivotNumber(bc.db)
frozen, _ := bc.db.Ancients()

updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (uint64, bool) {
// Rewind the block chain, ensuring we don't end up with a stateless head
// block. Note, depth equality is permitted to allow using SetHead as a
// chain reparation mechanism without deleting any data!
if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.NumberU64() {
newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
} else {
if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
// Rewound state missing, rolled back to before pivot, reset to genesis
newHeadBlock = bc.genesisBlock
// Block exists, keep rewinding until we find one with state
for {
if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
if pivot == nil || newHeadBlock.NumberU64() > *pivot {
newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1)
continue
} else {
log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot)
newHeadBlock = bc.genesisBlock
}
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
}
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
Expand All @@ -462,7 +482,6 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}

// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64())
Expand All @@ -479,19 +498,27 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
}
head := bc.CurrentBlock().NumberU64()

// If setHead underflown the freezer threshold and the block processing
// intent afterwards is full block importing, delete the chain segment
// between the stateful-block and the sethead target.
var wipe bool
if head+1 < frozen {
wipe = pivot == nil || head >= *pivot
}
return head, wipe // Only force wipe if full synced
}
// Rewind the header chain, deleting all block bodies until then
delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) {
// Ignore the error here since light client won't hit this path
frozen, _ := bc.db.Ancients()
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if err := bc.db.TruncateAncients(num + 1); err != nil {
if err := bc.db.TruncateAncients(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}

// Remove the hash <-> number mapping from the active store.
rawdb.DeleteHeaderNumber(db, hash)
} else {
Expand All @@ -503,8 +530,18 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
// Todo(rjl493456442) txlookup, bloombits, etc
}
bc.hc.SetHead(head, updateFn, delFn)

// If SetHead was only called as a chain reparation method, try to skip
// touching the header chain altogether, unless the freezer is broken
if block := bc.CurrentBlock(); block.NumberU64() == head {
if target, force := updateFn(bc.db, block.Header()); force {
bc.hc.SetHead(target, updateFn, delFn)
}
} else {
// Rewind the chain to the requested head and keep going backwards until a
// block with a state is found or fast sync pivot is passed
log.Warn("Rewinding blockchain", "target", head)
bc.hc.SetHead(head, updateFn, delFn)
}
// Clear out any stale content from the caches
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
Expand Down Expand Up @@ -627,28 +664,6 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
return nil
}

// repair tries to repair the current blockchain by rolling back the current block
// until one with associated state is found. This is needed to fix incomplete db
// writes caused either by crashes/power outages, or simply non-committed tries.
//
// This method only rolls back the current block. The current header and current
// fast block are left intact.
func (bc *BlockChain) repair(head **types.Block) error {
for {
// Abort if we've rewound to a head block that does have associated state
if _, err := state.New((*head).Root(), bc.stateCache, bc.snaps); err == nil {
log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash())
return nil
}
// Otherwise rewind one block and recheck state availability there
block := bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1)
if block == nil {
return fmt.Errorf("missing block %d [%x]", (*head).NumberU64()-1, (*head).ParentHash())
}
*head = block
}
}

// Export writes the active chain to the given writer.
func (bc *BlockChain) Export(w io.Writer) error {
return bc.ExportN(w, uint64(0), bc.CurrentBlock().NumberU64())
Expand Down Expand Up @@ -985,52 +1000,6 @@ const (
SideStatTy
)

// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (bc *BlockChain) Rollback(chain []common.Hash) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

batch := bc.db.NewBatch()
for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]

// Degrade the chain markers if they are explicitly reverted.
// In theory we should update all in-memory markers in the
// last step, however the direction of rollback is from high
// to low, so it's safe the update in-memory markers directly.
currentHeader := bc.hc.CurrentHeader()
if currentHeader.Hash() == hash {
newHeadHeader := bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)
rawdb.WriteHeadHeaderHash(batch, currentHeader.ParentHash)
bc.hc.SetCurrentHeader(newHeadHeader)
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
rawdb.WriteHeadFastBlockHash(batch, currentFastBlock.ParentHash())
bc.currentFastBlock.Store(newFastBlock)
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
rawdb.WriteHeadBlockHash(batch, currentBlock.ParentHash())
bc.currentBlock.Store(newBlock)
headBlockGauge.Update(int64(newBlock.NumberU64()))
}
}
if err := batch.Write(); err != nil {
log.Crit("Failed to rollback chain markers", "err", err)
}
// Truncate ancient data which exceeds the current header.
//
// Notably, it can happen that system crashes without truncating the ancient data
// but the head indicator has been updated in the active store. Regarding this issue,
// system will self recovery by truncating the extra data during the setup phase.
if err := bc.truncateAncient(bc.hc.CurrentHeader().Number.Uint64()); err != nil {
log.Crit("Truncate ancient store failed", "err", err)
}
}

// truncateAncient rewinds the blockchain to the specified header and deletes all
// data in the ancient store that exceeds the specified header.
func (bc *BlockChain) truncateAncient(head uint64) error {
Expand Down
Loading

0 comments on commit 8cbdc86

Please sign in to comment.