Skip to content

Commit

Permalink
ChainDB: write sync to VolDB, async chain selection
Browse files Browse the repository at this point in the history
Fixes #2487.
  • Loading branch information
mrBliss committed Aug 10, 2020
1 parent 62e57b9 commit 009613a
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
-- TODO vary these
, cdbGcDelay = 0
, cdbGcInterval = 1
, cdbBlocksToAddSize = 2
, cdbChainSelQueueSize = 2
}
where
prj af = case AF.headBlockNo af of
Expand Down
2 changes: 1 addition & 1 deletion ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ initBlockFetchConsensusInterface cfg chainDB getCandidates blockFetchSize btime
-- Waits until the block has been written to disk, but not until chain
-- selection has processed the block.
addFetchedBlock :: Point blk -> blk -> m ()
addFetchedBlock _pt = void . ChainDB.addBlockWaitWrittenToDisk chainDB
addFetchedBlock _pt = ChainDB.addBlockWaitWrittenToDisk chainDB

readFetchedMaxSlotNo :: STM m MaxSlotNo
readFetchedMaxSlotNo = ChainDB.getMaxSlotNo chainDB
Expand Down
39 changes: 12 additions & 27 deletions ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ data ChainDB m blk = ChainDB {
-- part of the chain if there are other chains available that are
-- preferred by the consensus algorithm (typically, longer chains).
--
-- This function typically returns immediately, yielding a
-- 'AddBlockPromise' which can be used to wait for the result. You can
-- use 'addBlock' to add the block synchronously.
-- This function typically returns immediately after writing the block
-- to disk, yielding a 'AddBlockPromise' which can be used to wait until
-- the block has been processed by chain selection. You can use
-- 'addBlock' to add the block synchronously.
--
-- NOTE: back pressure can be applied when overloaded.
addBlockAsync :: blk -> m (AddBlockPromise m blk)
Expand Down Expand Up @@ -361,24 +362,10 @@ instance DB (ChainDB m blk) where
Adding a block
-------------------------------------------------------------------------------}

data AddBlockPromise m blk = AddBlockPromise
{ blockWrittenToDisk :: STM m Bool
-- ^ Use this 'STM' transaction to wait until the block has been written
-- to disk.
--
-- Returns 'True' when the block was written to disk or 'False' when it
-- was ignored, e.g., because it was older than @k@.
--
-- If the 'STM' transaction has returned 'True' then 'getIsFetched' will
-- return 'True' for the added block.
--
-- NOTE: Even when the result is 'False', 'getIsFetched' might still
-- return 'True', e.g., the block was older than @k@, but it has been
-- downloaded and stored on disk before.
, blockProcessed :: STM m (Point blk)
-- ^ Use this 'STM' transaction to wait until the block has been
-- processed: the block has been written to disk and chain selection has
-- been performed for the block, /unless/ the block is from the future.
newtype AddBlockPromise m blk = AddBlockPromise
{ blockProcessed :: STM m (Point blk)
-- ^ Use this 'STM' transaction to wait until chain selection has been
-- performed for the block, /unless/ the block is from the future.
--
-- The ChainDB's tip after chain selection is returned. When this tip
-- doesn't match the added block, it doesn't necessarily mean the block
Expand All @@ -393,12 +380,10 @@ data AddBlockPromise m blk = AddBlockPromise
-- disk.
}

-- | Add a block synchronously: wait until the block has been written to disk
-- (see 'blockWrittenToDisk').
addBlockWaitWrittenToDisk :: IOLike m => ChainDB m blk -> blk -> m Bool
addBlockWaitWrittenToDisk chainDB blk = do
promise <- addBlockAsync chainDB blk
atomically $ blockWrittenToDisk promise
-- | Add a block, return after the block has been written to disk, but do not
-- wait until the block has been processed.
addBlockWaitWrittenToDisk :: IOLike m => ChainDB m blk -> blk -> m ()
addBlockWaitWrittenToDisk chainDB blk = void $ addBlockAsync chainDB blk

-- | Add a block synchronously: wait until the block has been processed (see
-- 'blockProcessed'). The new tip of the ChainDB is returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ openDBInternal args launchBgTasks = do
varNextReaderKey <- newTVarM (ReaderKey 0)
varCopyLock <- newMVar ()
varKillBgThreads <- newTVarM $ return ()
blocksToAdd <- newBlocksToAdd (Args.cdbBlocksToAddSize args)
chainSelQueue <- newChainSelectionQueue (Args.cdbChainSelQueueSize args)

let env = CDB { cdbImmDB = immDB
, cdbVolDB = volDB
Expand All @@ -177,7 +177,7 @@ openDBInternal args launchBgTasks = do
, cdbChunkInfo = Args.cdbChunkInfo args
, cdbCheckIntegrity = Args.cdbCheckIntegrity args
, cdbCheckInFuture = Args.cdbCheckInFuture args
, cdbBlocksToAdd = blocksToAdd
, cdbChainSelQueue = chainSelQueue
, cdbFutureBlocks = varFutureBlocks
}
h <- fmap CDBHandle $ newTVarM $ ChainDbOpen env
Expand Down Expand Up @@ -206,7 +206,7 @@ openDBInternal args launchBgTasks = do
{ intCopyToImmDB = getEnv h Background.copyToImmDB
, intGarbageCollect = getEnv1 h Background.garbageCollect
, intUpdateLedgerSnapshots = getEnv h Background.updateLedgerSnapshots
, intAddBlockRunner = getEnv h Background.addBlockRunner
, intAddBlockRunner = getEnv h Background.chainSelectionRunner
, intKillBgThreads = varKillBgThreads
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,57 +35,57 @@ import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.VolDB as VolDB
data ChainDbArgs m blk = forall h1 h2 h3. (Eq h1, Eq h2, Eq h3) => ChainDbArgs {

-- HasFS instances
cdbHasFSImmDb :: HasFS m h1
, cdbHasFSVolDb :: HasFS m h2
, cdbHasFSLgrDB :: HasFS m h3
cdbHasFSImmDb :: HasFS m h1
, cdbHasFSVolDb :: HasFS m h2
, cdbHasFSLgrDB :: HasFS m h3

-- Policy
, cdbImmValidation :: ImmDB.ValidationPolicy
, cdbVolValidation :: VolDB.BlockValidationPolicy
, cdbBlocksPerFile :: VolDB.BlocksPerFile
, cdbParamsLgrDB :: LgrDB.LedgerDbParams
, cdbDiskPolicy :: LgrDB.DiskPolicy
, cdbImmValidation :: ImmDB.ValidationPolicy
, cdbVolValidation :: VolDB.BlockValidationPolicy
, cdbBlocksPerFile :: VolDB.BlocksPerFile
, cdbParamsLgrDB :: LgrDB.LedgerDbParams
, cdbDiskPolicy :: LgrDB.DiskPolicy

-- Integration
, cdbTopLevelConfig :: TopLevelConfig blk
, cdbChunkInfo :: ChunkInfo
, cdbCheckIntegrity :: blk -> Bool
, cdbGenesis :: m (ExtLedgerState blk)
, cdbCheckInFuture :: CheckInFuture m blk
, cdbImmDbCacheConfig :: ImmDB.CacheConfig
, cdbTopLevelConfig :: TopLevelConfig blk
, cdbChunkInfo :: ChunkInfo
, cdbCheckIntegrity :: blk -> Bool
, cdbGenesis :: m (ExtLedgerState blk)
, cdbCheckInFuture :: CheckInFuture m blk
, cdbImmDbCacheConfig :: ImmDB.CacheConfig

-- Misc
, cdbTracer :: Tracer m (TraceEvent blk)
, cdbTraceLedger :: Tracer m (LgrDB.LedgerDB blk)
, cdbRegistry :: ResourceRegistry m
, cdbGcDelay :: DiffTime
, cdbGcInterval :: DiffTime
, cdbBlocksToAddSize :: Word
-- ^ Size of the queue used to store asynchronously added blocks. This
-- is the maximum number of blocks that could be kept in memory at the
-- same time when the background thread processing the blocks can't keep
-- up.
, cdbTracer :: Tracer m (TraceEvent blk)
, cdbTraceLedger :: Tracer m (LgrDB.LedgerDB blk)
, cdbRegistry :: ResourceRegistry m
, cdbGcDelay :: DiffTime
, cdbGcInterval :: DiffTime
, cdbChainSelQueueSize :: Word
-- ^ Size of the queue used to asynchronously perform chain selection on
-- newly qadded blocks. This is the maximum number of blocks that could
-- be kept in memory at the same time when the background thread
-- processing the blocks can't keep up.
}

-- | Arguments specific to the ChainDB, not to the ImmutableDB, VolatileDB, or
-- LedgerDB.
data ChainDbSpecificArgs m blk = ChainDbSpecificArgs {
cdbsTracer :: Tracer m (TraceEvent blk)
, cdbsRegistry :: ResourceRegistry m
cdbsTracer :: Tracer m (TraceEvent blk)
, cdbsRegistry :: ResourceRegistry m
-- ^ TODO: the ImmutableDB takes a 'ResourceRegistry' too, but we're
-- using it for ChainDB-specific things. Revisit these arguments.
, cdbsGcDelay :: DiffTime
, cdbsGcDelay :: DiffTime
-- ^ Delay between copying a block to the ImmutableDB and triggering a
-- garbage collection for the corresponding slot on the VolatileDB.
--
-- The goal of the delay is to ensure that the write to the ImmutableDB
-- has been flushed to disk before deleting the block from the
-- VolatileDB, so that a crash won't result in the loss of the block.
, cdbsGcInterval :: DiffTime
, cdbsGcInterval :: DiffTime
-- ^ Batch all scheduled GCs so that at most one GC happens every
-- 'cdbsGcInterval'.
, cdbsCheckInFuture :: CheckInFuture m blk
, cdbsBlocksToAddSize :: Word
, cdbsCheckInFuture :: CheckInFuture m blk
, cdbsChainSelQueueSize :: Word
}

-- | Default arguments
Expand All @@ -112,13 +112,13 @@ data ChainDbSpecificArgs m blk = ChainDbSpecificArgs {
-- normal operation, we receive 1 block/20s, meaning at most 1 block.
defaultSpecificArgs :: ChainDbSpecificArgs m blk
defaultSpecificArgs = ChainDbSpecificArgs{
cdbsGcDelay = secondsToDiffTime 60
, cdbsGcInterval = secondsToDiffTime 10
, cdbsBlocksToAddSize = 10
cdbsGcDelay = secondsToDiffTime 60
, cdbsGcInterval = secondsToDiffTime 10
, cdbsChainSelQueueSize = 10
-- Fields without a default
, cdbsTracer = error "no default for cdbsTracer"
, cdbsRegistry = error "no default for cdbsRegistry"
, cdbsCheckInFuture = error "no default for cdbsCheckInFuture"
, cdbsTracer = error "no default for cdbsTracer"
, cdbsRegistry = error "no default for cdbsRegistry"
, cdbsCheckInFuture = error "no default for cdbsCheckInFuture"
}

-- | Default arguments for use within IO
Expand Down Expand Up @@ -170,12 +170,12 @@ fromChainDbArgs ChainDbArgs{..} = (
, lgrTraceLedger = cdbTraceLedger
}
, ChainDbSpecificArgs {
cdbsTracer = cdbTracer
, cdbsRegistry = cdbRegistry
, cdbsGcDelay = cdbGcDelay
, cdbsGcInterval = cdbGcInterval
, cdbsCheckInFuture = cdbCheckInFuture
, cdbsBlocksToAddSize = cdbBlocksToAddSize
cdbsTracer = cdbTracer
, cdbsRegistry = cdbRegistry
, cdbsGcDelay = cdbGcDelay
, cdbsGcInterval = cdbGcInterval
, cdbsCheckInFuture = cdbCheckInFuture
, cdbsChainSelQueueSize = cdbChainSelQueueSize
}
)

Expand All @@ -193,27 +193,27 @@ toChainDbArgs ImmDB.ImmDbArgs{..}
LgrDB.LgrDbArgs{..}
ChainDbSpecificArgs{..} = ChainDbArgs{
-- HasFS instances
cdbHasFSImmDb = immHasFS
, cdbHasFSVolDb = volHasFS
, cdbHasFSLgrDB = lgrHasFS
cdbHasFSImmDb = immHasFS
, cdbHasFSVolDb = volHasFS
, cdbHasFSLgrDB = lgrHasFS
-- Policy
, cdbImmValidation = immValidation
, cdbVolValidation = volValidation
, cdbBlocksPerFile = volBlocksPerFile
, cdbParamsLgrDB = lgrParams
, cdbDiskPolicy = lgrDiskPolicy
, cdbImmValidation = immValidation
, cdbVolValidation = volValidation
, cdbBlocksPerFile = volBlocksPerFile
, cdbParamsLgrDB = lgrParams
, cdbDiskPolicy = lgrDiskPolicy
-- Integration
, cdbTopLevelConfig = lgrTopLevelConfig
, cdbChunkInfo = immChunkInfo
, cdbCheckIntegrity = immCheckIntegrity
, cdbGenesis = lgrGenesis
, cdbCheckInFuture = cdbsCheckInFuture
, cdbImmDbCacheConfig = immCacheConfig
, cdbTopLevelConfig = lgrTopLevelConfig
, cdbChunkInfo = immChunkInfo
, cdbCheckIntegrity = immCheckIntegrity
, cdbGenesis = lgrGenesis
, cdbCheckInFuture = cdbsCheckInFuture
, cdbImmDbCacheConfig = immCacheConfig
-- Misc
, cdbTracer = cdbsTracer
, cdbTraceLedger = lgrTraceLedger
, cdbRegistry = immRegistry
, cdbGcDelay = cdbsGcDelay
, cdbGcInterval = cdbsGcInterval
, cdbBlocksToAddSize = cdbsBlocksToAddSize
, cdbTracer = cdbsTracer
, cdbTraceLedger = lgrTraceLedger
, cdbRegistry = immRegistry
, cdbGcDelay = cdbsGcDelay
, cdbGcInterval = cdbsGcInterval
, cdbChainSelQueueSize = cdbsChainSelQueueSize
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Background
-- ** Testing
, ScheduledGc (..)
, dumpGcSchedule
-- * Adding blocks to the ChainDB
, addBlockRunner
-- * Performing chain selection
, chainSelectionRunner
) where

import Control.Exception (assert)
Expand Down Expand Up @@ -68,7 +68,7 @@ import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Storage.ChainDB.API (BlockRef (..),
ChainDbFailure (..))
import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
(addBlockSync)
(chainSelectionForBlock)
import Ouroboros.Consensus.Storage.ChainDB.Impl.ImmDB
(ImmDbSerialiseConstraints)
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.ImmDB as ImmDB
Expand Down Expand Up @@ -98,8 +98,8 @@ launchBgTasks
-> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup
-> m ()
launchBgTasks cdb@CDB{..} replayed = do
!addBlockThread <- launch "ChainDB.addBlockRunner" $
addBlockRunner cdb
!addBlockThread <- launch "ChainDB.chainSelectionRunner" $
chainSelectionRunner cdb
gcSchedule <- newGcSchedule
!gcThread <- launch "ChainDB.gcScheduleRunner" $
gcScheduleRunner gcSchedule $ garbageCollect cdb
Expand Down Expand Up @@ -519,12 +519,12 @@ dumpGcSchedule :: IOLike m => GcSchedule m -> STM m [ScheduledGc]
dumpGcSchedule (GcSchedule varQueue) = toList <$> readTVar varQueue

{-------------------------------------------------------------------------------
Adding blocks to the ChainDB
Performing chain selection
-------------------------------------------------------------------------------}

-- | Read blocks from 'cdbBlocksToAdd' and add them synchronously to the
-- ChainDB.
addBlockRunner
-- | Read blocks from 'ChainSelectionQueue' and add perform chain selection
-- for them.
chainSelectionRunner
:: ( IOLike m
, LedgerSupportsProtocol blk
, InspectLedger blk
Expand All @@ -534,6 +534,12 @@ addBlockRunner
)
=> ChainDbEnv m blk
-> m Void
addBlockRunner cdb@CDB{..} = forever $ do
blockToAdd <- getBlockToAdd cdbBlocksToAdd
addBlockSync cdb blockToAdd
chainSelectionRunner cdb@CDB{..} = forever $ do
(BlockToProcess { blockToProcess, varBlockProcessed }, cache) <-
getBlockToProcess cdbChainSelQueue
newTip <-
chainSelectionForBlock
cdb
cache
(getHeader blockToProcess)
atomically $ putTMVar varBlockProcessed newTip
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache
, empty
, singleton
, cacheBlock
, fromList
, lookup
) where

import Prelude hiding (lookup)

import Data.Map (Map)
import qualified Data.Map as Map
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map

import Ouroboros.Consensus.Block

Expand All @@ -33,3 +34,6 @@ cacheBlock blk (BlockCache cache) = BlockCache (Map.insert (blockHash blk) blk c

lookup :: HasHeader blk => HeaderHash blk -> BlockCache blk -> Maybe blk
lookup hash (BlockCache cache) = Map.lookup hash cache

fromList :: HasHeader blk => [(HeaderHash blk, blk)] -> BlockCache blk
fromList = BlockCache . Map.fromList
Loading

0 comments on commit 009613a

Please sign in to comment.