Skip to content

Commit

Permalink
ChainDB: let the BlockFetch client add blocks asynchronously
Browse files Browse the repository at this point in the history
Port of IntersectMBO/ouroboros-network#2721

Co-Authored-By: Thomas Winant <thomas@well-typed.com>
  • Loading branch information
facundominguez and mrBliss committed Aug 6, 2024
1 parent f27e7c6 commit 4adf7d9
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
(LedgerSupportsProtocol)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as CSJumping
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise,
ChainDB)
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment
(InvalidBlockPunishment)
Expand All @@ -57,16 +58,16 @@ data ChainDbView m blk = ChainDbView {
getCurrentChain :: STM m (AnchoredFragment (Header blk))
, getIsFetched :: STM m (Point blk -> Bool)
, getMaxSlotNo :: STM m MaxSlotNo
, addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool
, addBlockAsync :: InvalidBlockPunishment m -> blk -> m (AddBlockPromise m blk)
, getChainSelStarvation :: STM m ChainSelStarvation
}

defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk
defaultChainDbView :: ChainDB m blk -> ChainDbView m blk
defaultChainDbView chainDB = ChainDbView {
getCurrentChain = ChainDB.getCurrentChain chainDB
, getIsFetched = ChainDB.getIsFetched chainDB
, getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
, addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
, addBlockAsync = ChainDB.addBlockAsync chainDB
, getChainSelStarvation = ChainDB.getChainSelStarvation chainDB
}

Expand Down Expand Up @@ -217,8 +218,8 @@ mkBlockFetchConsensusInterface
pipeliningPunishment <- InvalidBlockPunishment.mkForDiffusionPipelining
pure $ mkAddFetchedBlock_ pipeliningPunishment enabledPipelining

-- Waits until the block has been written to disk, but not until chain
-- selection has processed the block.
-- Hand over the block to the ChainDB, but don't wait until it has been
-- written to disk or processed.
mkAddFetchedBlock_ ::
( BlockConfig blk
-> Header blk
Expand Down Expand Up @@ -262,7 +263,7 @@ mkBlockFetchConsensusInterface
NotReceivingTentativeBlocks -> disconnect
ReceivingTentativeBlocks ->
pipeliningPunishment bcfg (getHeader blk) disconnect
addBlockWaitWrittenToDisk
addBlockAsync
chainDB
punishment
blk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
ChainSelAddBlock BlockToAdd{blockToAdd} ->
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
blockRealPoint blockToAdd
chainSelSync cdb message)
chainSelSync cdb message
lift $ case message of
ChainSelAddBlock blockToAdd ->
deleteBlockToAdd blockToAdd cdbChainSelQueue
_ -> pure ()
)
where
starvationTracer = Tracer $ traceWith cdbTracer . TraceChainSelStarvationEvent
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,11 @@ chainSelSync cdb@CDB {..} (ChainSelAddBlock BlockToAdd { blockToAdd = b, .. }) =
let immBlockNo = AF.anchorBlockNo curChain

-- We follow the steps from section "## Adding a block" in ChainDB.md

-- Note: we call 'chainSelectionForFutureBlocks' in all branches instead
-- of once, before branching, because we want to do it /after/ writing the
-- block to the VolatileDB and delivering the 'varBlockWrittenToDisk'
-- promise, as this is the promise the BlockFetch client waits for.
-- Otherwise, the BlockFetch client would have to wait for
-- 'chainSelectionForFutureBlocks'.
--
-- Note: we call 'chainSelectionForFutureBlocks' in all branches instead of
-- once, before branching, because we want to do it /after/ writing the
-- block to the VolatileDB so that any threads waiting on the
-- 'varBlockWrittenToDisk' promise don't have to wait for the result of

-- ### Ignore
newTip <- if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,15 @@ getBlockComponent ::
getBlockComponent CDB{..} = getAnyBlockComponent cdbImmutableDB cdbVolatileDB

getIsFetched ::
forall m blk. IOLike m
forall m blk. (IOLike m, HasHeader blk)
=> ChainDbEnv m blk -> STM m (Point blk -> Bool)
getIsFetched CDB{..} = basedOnHash <$> VolatileDB.getIsMember cdbVolatileDB
where
-- The volatile DB indexes by hash only, not by points. However, it should
-- not be possible to have two points with the same hash but different
-- slot numbers.
basedOnHash :: (HeaderHash blk -> Bool) -> Point blk -> Bool
basedOnHash f p =
case pointHash p of
BlockHash hash -> f hash
GenesisHash -> False
getIsFetched CDB{..} = do
checkBlocksToAdd <- memberBlocksToAdd cdbChainSelQueue
checkVolDb <- VolatileDB.getIsMember cdbVolatileDB
return $ \pt ->
case pointToWithOriginRealPoint pt of
Origin -> False
NotOrigin pt' -> checkBlocksToAdd pt' || checkVolDb (realPointHash pt')

getIsInvalidBlock ::
forall m blk. (IOLike m, HasHeader blk)
Expand Down Expand Up @@ -185,10 +182,13 @@ getMaxSlotNo CDB{..} = do
-- contains block 9'. The ImmutableDB contains blocks 1-10. The max slot
-- of the current chain will be 10 (being the anchor point of the empty
-- current chain), while the max slot of the VolatileDB will be 9.
curChainMaxSlotNo <- maxSlotNoFromWithOrigin . AF.headSlot
<$> readTVar cdbChain
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo
--
-- Moreover, we have to look in 'ChainSelQueue' too.
curChainMaxSlotNo <-
maxSlotNoFromWithOrigin . AF.headSlot <$> readTVar cdbChain
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
blocksToAddMaxSlotNo <- getBlocksToAddMaxSlotNo cdbChainSelQueue
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo `max` blocksToAddMaxSlotNo

{-------------------------------------------------------------------------------
Unifying interface over the immutable DB and volatile DB, but independent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
-- * Blocks to add
, BlockToAdd (..)
, ChainSelMessage (..)
, ChainSelQueue
, ChainSelQueue -- opaque
, addBlockToAdd
, addReprocessLoEBlocks
, closeChainSelQueue
, deleteBlockToAdd
, getBlocksToAddMaxSlotNo
, getChainSelMessage
, memberBlocksToAdd
, newChainSelQueue
-- * Trace types
, SelectionChangedInfo (..)
Expand All @@ -64,12 +67,11 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
, TraceValidationEvent (..)
) where

import Cardano.Prelude (whenM)
import Control.Monad (when)
import Control.Monad (join, when)
import Control.Tracer
import Data.Foldable (traverse_)
import Data.Foldable (for_)
import Data.Map.Strict (Map)
import Data.Maybe (mapMaybe)
import qualified Data.Map.Strict as Map
import Data.Maybe.Strict (StrictMaybe (..))
import Data.Set (Set)
import Data.Typeable
Expand Down Expand Up @@ -109,7 +111,7 @@ import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (WithFingerprint)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.Block (MaxSlotNo)
import Ouroboros.Network.Block (MaxSlotNo (..))
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))

Expand Down Expand Up @@ -255,6 +257,17 @@ data ChainDbEnv m blk = CDB
, cdbCheckInFuture :: !(CheckInFuture m blk)
, cdbChainSelQueue :: !(ChainSelQueue m blk)
-- ^ Queue of blocks that still have to be added.
--
-- NOTE: the set of blocks in this queue are /not/ disjoint from the set of
-- blocks in the VolatileDB. When processing the next block in the queue, we
-- do not remove the block from the queue /until/ it has been added to the
-- VolatileDB and processed by chain selection. This means the block
-- currently being added will be both in the queue and the VolatileDB for a
-- short while.
--
-- If we would remove the block from the queue before adding it to the
-- VolatileDB, then it would be in /neither/ for a short time, and
-- 'getIsFetched' would incorrectly return 'False'.
, cdbFutureBlocks :: !(StrictTVar m (FutureBlocks m blk))
-- ^ Blocks from the future
--
Expand Down Expand Up @@ -450,8 +463,21 @@ type FutureBlocks m blk = Map (HeaderHash blk) (Header blk, InvalidBlockPunishme
-- | FIFO queue used to add blocks asynchronously to the ChainDB. Blocks are
-- read from this queue by a background thread, which processes the blocks
-- synchronously.
newtype ChainSelQueue m blk = ChainSelQueue (TBQueue m (ChainSelMessage m blk))
deriving NoThunks via OnlyCheckWhnfNamed "ChainSelQueue" (ChainSelQueue m blk)
data ChainSelQueue m blk = ChainSelQueue {
-- TODO use a better data structure, e.g., a heap from the @heaps@
-- package. Wish list:
-- + O(1) pop min value
-- + O(log n) insert
-- + O(n) get all
-- + Bounded in size
--
-- TODO join consecutive blocks into a fragment that can be added at
-- once.
varChainSelQueue :: !(StrictTVar m (Map (RealPoint blk) (BlockToAdd m blk)))
, chainSelQueueCapacity :: !Word
, varChainSelReprocessLoEBlocks :: !(StrictTVar m Bool)
}
deriving (NoThunks) via OnlyCheckWhnfNamed "ChainSelQueue" (ChainSelQueue m blk)

-- | Entry in the 'ChainSelQueue' queue: a block together with the 'TMVar's used
-- to implement 'AddBlockPromise'.
Expand All @@ -465,6 +491,7 @@ data BlockToAdd m blk = BlockToAdd
, varBlockProcessed :: !(StrictTMVar m (AddBlockResult blk))
-- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'.
}
deriving NoThunks via OnlyCheckWhnfNamed "BlockToAdd" (BlockToAdd m blk)

-- | Different async tasks for triggering ChainSel
data ChainSelMessage m blk
Expand All @@ -474,9 +501,11 @@ data ChainSelMessage m blk
| ChainSelReprocessLoEBlocks

-- | Create a new 'ChainSelQueue' with the given size.
newChainSelQueue :: IOLike m => Word -> m (ChainSelQueue m blk)
newChainSelQueue queueSize = ChainSelQueue <$>
atomically (newTBQueue (fromIntegral queueSize))
newChainSelQueue :: (IOLike m, StandardHash blk, Typeable blk) => Word -> m (ChainSelQueue m blk)
newChainSelQueue chainSelQueueCapacity = do
varChainSelQueue <- newTVarIO mempty
varChainSelReprocessLoEBlocks <- newTVarIO False
return $ ChainSelQueue {varChainSelQueue, chainSelQueueCapacity, varChainSelReprocessLoEBlocks}

-- | Add a block to the 'ChainSelQueue' queue. Can block when the queue is full.
addBlockToAdd ::
Expand All @@ -486,7 +515,7 @@ addBlockToAdd ::
-> InvalidBlockPunishment m
-> blk
-> m (AddBlockPromise m blk)
addBlockToAdd tracer (ChainSelQueue queue) punish blk = do
addBlockToAdd tracer (ChainSelQueue {varChainSelQueue, chainSelQueueCapacity}) punish blk = do
varBlockWrittenToDisk <- newEmptyTMVarIO
varBlockProcessed <- newEmptyTMVarIO
let !toAdd = BlockToAdd
Expand All @@ -497,8 +526,12 @@ addBlockToAdd tracer (ChainSelQueue queue) punish blk = do
}
traceWith tracer $ AddedBlockToQueue (blockRealPoint blk) RisingEdge
queueSize <- atomically $ do
writeTBQueue queue (ChainSelAddBlock toAdd)
lengthTBQueue queue
chainSelQueue <- readTVar varChainSelQueue
let chainSelQueue' = Map.insert (blockRealPoint blk) toAdd chainSelQueue
chainSelQueueSize = Map.size chainSelQueue'
check (fromIntegral chainSelQueueSize <= chainSelQueueCapacity)
writeTVar varChainSelQueue chainSelQueue'
return chainSelQueueSize
traceWith tracer $
AddedBlockToQueue (blockRealPoint blk) (FallingEdgeWith (fromIntegral queueSize))
return AddBlockPromise
Expand All @@ -512,51 +545,102 @@ addReprocessLoEBlocks
=> Tracer m (TraceAddBlockEvent blk)
-> ChainSelQueue m blk
-> m ()
addReprocessLoEBlocks tracer (ChainSelQueue queue) = do
addReprocessLoEBlocks tracer (ChainSelQueue {varChainSelReprocessLoEBlocks}) = do
traceWith tracer $ AddedReprocessLoEBlocksToQueue
atomically $ writeTBQueue queue ChainSelReprocessLoEBlocks
atomically $ writeTVar varChainSelReprocessLoEBlocks True

-- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the
-- queue is empty; in that case, reports the starvation (and its end) to the
-- callback.
getChainSelMessage
:: (IOLike m, HasHeader blk)
:: forall m blk. (HasHeader blk, IOLike m)
=> Tracer m (TraceChainSelStarvationEvent blk)
-> StrictTVar m ChainSelStarvation
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
getChainSelMessage starvationTracer starvationVar (ChainSelQueue queue) = do
-- NOTE: The test of emptiness and the blocking read are in different STM
-- transactions on purpose.
whenM (atomically $ isEmptyTBQueue queue) $ do
writeTVarIO starvationVar ChainSelStarvationOngoing
traceWith starvationTracer . ChainSelStarvationStarted =<< getMonotonicTime
message <- atomically $ readTBQueue queue
-- If there was a starvation ongoing, we need to report that it is done.
whenM ((== ChainSelStarvationOngoing) <$> readTVarIO starvationVar) $
case message of
ChainSelAddBlock BlockToAdd {blockToAdd} -> do
time <- getMonotonicTime
traceWith starvationTracer $ ChainSelStarvationEnded time $ blockRealPoint blockToAdd
writeTVarIO starvationVar $ ChainSelStarvationEndedAt time
ChainSelReprocessLoEBlocks -> pure ()
return message
getChainSelMessage starvationTracer starvationVar queue = go
where
writeTVarIO v x = atomically $ writeTVar v x
go = join $ atomically $
readTVar varChainSelReprocessLoEBlocks >>= \case
True -> do
writeTVar varChainSelReprocessLoEBlocks False
pure $ pure ChainSelReprocessLoEBlocks
False -> do
chainSelQueue <- readTVar varChainSelQueue
case Map.minView chainSelQueue of
Just (blockToAdd, chainSelQueue') -> do
writeTVar varChainSelQueue chainSelQueue'
pure $ do
terminateStarvationMeasure blockToAdd
pure $ ChainSelAddBlock blockToAdd
Nothing -> pure $ do
startStarvationMeasure
blockUntilMoreWork
go

ChainSelQueue {varChainSelQueue, varChainSelReprocessLoEBlocks} = queue

-- Wait until we either need to reprocess blocks due to the LoE, or until a
-- new block arrives.
blockUntilMoreWork :: m ()
blockUntilMoreWork = atomically $ do
reprocessLoEBlocks <- readTVar varChainSelReprocessLoEBlocks
chainSelQueue <- readTVar varChainSelQueue
check $ reprocessLoEBlocks || not (Map.null chainSelQueue)

startStarvationMeasure = do
prevStarvation <- atomically $ swapTVar starvationVar ChainSelStarvationOngoing
when (prevStarvation /= ChainSelStarvationOngoing) $
traceWith starvationTracer . ChainSelStarvationStarted =<< getMonotonicTime

terminateStarvationMeasure BlockToAdd{blockToAdd=block} = do
prevStarvation <- readTVarIO starvationVar
when (prevStarvation == ChainSelStarvationOngoing) $ do
tf <- getMonotonicTime
traceWith starvationTracer (ChainSelStarvationEnded tf $ blockRealPoint block)
atomically $ writeTVar starvationVar (ChainSelStarvationEndedAt tf)

-- | Flush the 'ChainSelQueue' queue and notify the waiting threads.
--
-- REVIEW: What about all the threads that are waiting to write in the queue and
-- will write after the flush?!
closeChainSelQueue :: IOLike m => ChainSelQueue m blk -> STM m ()
closeChainSelQueue (ChainSelQueue queue) = do
as <- mapMaybe blockAdd <$> flushTBQueue queue
traverse_ (\a -> tryPutTMVar (varBlockProcessed a)
(FailedToAddBlock "Queue flushed"))
as
where
blockAdd = \case
ChainSelAddBlock ab -> Just ab
ChainSelReprocessLoEBlocks -> Nothing
closeChainSelQueue ChainSelQueue {varChainSelQueue} = do
chainSelQueue <- swapTVar varChainSelQueue Map.empty
for_ chainSelQueue $ \BlockToAdd {varBlockProcessed} ->
putTMVar varBlockProcessed $ FailedToAddBlock "Queue flushed"

-- | Delete the given 'BlockToAdd' from the 'ChainSelQueue'.
--
-- PRECONDITION: the given 'BlockToAdd' is in 'ChainSelQueue'.
deleteBlockToAdd ::
(IOLike m, HasHeader blk)
=> BlockToAdd m blk
-> ChainSelQueue m blk
-> m ()
deleteBlockToAdd (BlockToAdd _ blk _ _) (ChainSelQueue {varChainSelQueue}) =
atomically $ modifyTVar varChainSelQueue $ Map.delete (blockRealPoint blk)

-- | Return a function to test the membership for the given 'BlocksToAdd'.
memberBlocksToAdd ::
(IOLike m, HasHeader blk)
=> ChainSelQueue m blk
-> STM m (RealPoint blk -> Bool)
memberBlocksToAdd (ChainSelQueue {varChainSelQueue}) =
flip Map.member <$> readTVar varChainSelQueue

getBlocksToAddMaxSlotNo ::
IOLike m
=> ChainSelQueue m blk
-> STM m MaxSlotNo
getBlocksToAddMaxSlotNo (ChainSelQueue {varChainSelQueue}) = aux <$> readTVar varChainSelQueue
where
-- | The 'Ord' instance of 'RealPoint' orders by 'SlotNo' first, so the
-- maximal key of the map has the greatest 'SlotNo'.
aux :: Map (RealPoint blk) (BlockToAdd m blk) -> MaxSlotNo
aux queue = case Map.lookupMax queue of
Nothing -> NoMaxSlotNo
Just (RealPoint s _, _) -> MaxSlotNo s

{-------------------------------------------------------------------------------
Trace types
Expand Down

0 comments on commit 4adf7d9

Please sign in to comment.