Skip to content

Commit

Permalink
Merge "[FAB-1500] Recovery of history database"
Browse files Browse the repository at this point in the history
  • Loading branch information
christo4ferris authored and Gerrit Code Review committed Jan 17, 2017
2 parents 1b9bb80 + 4b2947c commit 906bd6f
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 21 deletions.
1 change: 1 addition & 0 deletions core/ledger/history/histmgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ import "github.com/hyperledger/fabric/core/ledger"
type HistMgr interface {
NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error)
Commit(block *common.Block) error
GetBlockNumFromSavepoint() (uint64, error)
}
128 changes: 110 additions & 18 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,48 +85,140 @@ func NewKVLedger(versionedDBProvider statedb.VersionedDBProvider, ledgerID strin

l := &KVLedger{ledgerID, blockStore, txmgmt, historymgmt}

if err := recoverStateDB(l); err != nil {
//Recover both state DB and history DB if they are out of sync with block storage
if err := recoverDB(l); err != nil {
panic(fmt.Errorf(`Error during state DB recovery:%s`, err))
}

return l, nil
}

//Recover the state database by recommitting last valid blocks
func recoverStateDB(l *KVLedger) error {
//Recover the state database and history database (if exist)
//by recommitting last valid blocks
func recoverDB(l *KVLedger) error {
//If there is no block in blockstorage, nothing to recover.
info, _ := l.blockStore.GetBlockchainInfo()
if info.Height == 0 {
logger.Debugf("Block storage is empty.")
return nil
}

//Getting savepointValue stored in the state DB
var err error
var savepointValue uint64
if savepointValue, err = l.txtmgmt.GetBlockNumFromSavepoint(); err != nil {
var stateDBSavepoint, historyDBSavepoint uint64
//Default value for bool is false
var recoverStateDB, recoverHistoryDB bool

//Getting savepointValue stored in the state DB
if stateDBSavepoint, err = l.txtmgmt.GetBlockNumFromSavepoint(); err != nil {
return err
}

//Checking whether the savepointValue is in sync with block storage height
if savepointValue == info.Height {
//Check whether the state DB is in sync with block storage
if recoverStateDB, err = isRecoveryNeeded(stateDBSavepoint, info.Height); err != nil {
return err
}

if ledgerconfig.IsHistoryDBEnabled() == true {
//Getting savepointValue stored in the history DB
if historyDBSavepoint, err = l.historymgmt.GetBlockNumFromSavepoint(); err != nil {
return err
}
//Check whether the history DB is in sync with block storage
if recoverHistoryDB, err = isRecoveryNeeded(historyDBSavepoint, info.Height); err != nil {
return err
}
}

if recoverHistoryDB == false && recoverStateDB == false {
//If nothing needs recovery, return
if ledgerconfig.IsHistoryDBEnabled() == true {
logger.Debugf("Both state database and history database are in sync with the block storage. No need to perform recovery operation.")
} else {
logger.Debugf("State database is in sync with the block storage.")
}
return nil
} else if savepointValue > info.Height {
return errors.New("BlockStorage height is behind savepoint by %d blocks. Recovery the BlockStore first")
} else if recoverHistoryDB == false && recoverStateDB == true {
logger.Debugf("State database is behind block storage by %d blocks. Recovering state database.", info.Height-stateDBSavepoint)
if err = recommitLostBlocks(l, stateDBSavepoint, info.Height, true, false); err != nil {
return err
}
} else if recoverHistoryDB == true && recoverStateDB == false {
logger.Debugf("History database is behind block storage by %d blocks. Recovering history database.", info.Height-historyDBSavepoint)
if err = recommitLostBlocks(l, historyDBSavepoint, info.Height, false, true); err != nil {
return err
}
} else if recoverHistoryDB == true && recoverStateDB == true {
logger.Debugf("State database is behind block storage by %d blocks, and history database is behind block storage by %d blocks. Recovering both state and history database.", info.Height-stateDBSavepoint, info.Height-historyDBSavepoint)
//If both state DB and history DB need to be recovered, first
//we need to ensure that the state DB and history DB are in same state
//before recommitting lost blocks.
if stateDBSavepoint > historyDBSavepoint {
logger.Debugf("History database is behind the state database by %d blocks", stateDBSavepoint-historyDBSavepoint)
logger.Debugf("Making the history DB in sync with state DB")
if err = recommitLostBlocks(l, historyDBSavepoint, stateDBSavepoint, false, true); err != nil {
return err
}
logger.Debugf("Making both history DB and state DB in sync with the block storage")
if err = recommitLostBlocks(l, stateDBSavepoint, info.Height, true, true); err != nil {
return err
}
} else if stateDBSavepoint < historyDBSavepoint {
logger.Debugf("State database is behind the history database by %d blocks", historyDBSavepoint-stateDBSavepoint)
logger.Debugf("Making the state DB in sync with history DB")
if err = recommitLostBlocks(l, stateDBSavepoint, historyDBSavepoint, true, false); err != nil {
return err
}
logger.Debugf("Making both state DB and history DB in sync with the block storage")
if err = recommitLostBlocks(l, historyDBSavepoint, info.Height, true, true); err != nil {
return err
}
} else {
logger.Debugf("State and history database are in same state but behind block storage")
logger.Debugf("Making both state DB and history DB in sync with the block storage")
if err = recommitLostBlocks(l, stateDBSavepoint, info.Height, true, true); err != nil {
return err
}
}
}
return nil
}

//isRecoveryNeeded compares savepoint and current block height to decide whether
//to initiate recovery process
func isRecoveryNeeded(savepoint uint64, blockHeight uint64) (bool, error) {
if savepoint > blockHeight {
return false, errors.New("BlockStorage height is behind savepoint by %d blocks. Recovery the BlockStore first")
} else if savepoint == blockHeight {
return false, nil
} else {
return true, nil
}
}

//recommitLostBlocks retrieves blocks in specified range and commit the write set to either
//state DB or history DB or both
func recommitLostBlocks(l *KVLedger, savepoint uint64, blockHeight uint64, recoverStateDB bool, recoverHistoryDB bool) error {
//Compute updateSet for each missing savepoint and commit to state DB
for blockNumber := savepointValue + 1; blockNumber <= info.Height; blockNumber++ {
var block *common.Block
var err error
var block *common.Block
for blockNumber := savepoint + 1; blockNumber <= blockHeight; blockNumber++ {
if block, err = l.GetBlockByNumber(blockNumber); err != nil {
return err
}
logger.Debugf("Constructing updateSet for the block %d", blockNumber)
if err = l.txtmgmt.ValidateAndPrepare(block, false); err != nil {
return err
if recoverStateDB == true {
logger.Debugf("Constructing updateSet for the block %d", blockNumber)
if err = l.txtmgmt.ValidateAndPrepare(block, false); err != nil {
return err
}
logger.Debugf("Committing block %d to state database", blockNumber)
if err = l.txtmgmt.Commit(); err != nil {
return err
}
}
logger.Debugf("Committing block %d to state database", blockNumber)
if err = l.txtmgmt.Commit(); err != nil {
return err
if ledgerconfig.IsHistoryDBEnabled() == true && recoverHistoryDB == true {
if err = l.historymgmt.Commit(block); err != nil {
return err
}
}
}

Expand Down
Loading

0 comments on commit 906bd6f

Please sign in to comment.