Skip to content

Commit

Permalink
ChainDB: let the BlockFetch client add blocks asynchronously
Browse files Browse the repository at this point in the history
This is a port of PR IntersectMBO/ouroboros-network#2721 to the new ChainSelQueue.

Co-authored-by: mrBliss <thomas@well-typed.com>
  • Loading branch information
Niols and mrBliss committed Jul 11, 2024
1 parent c896eb4 commit 09e9837
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
(LedgerSupportsProtocol)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as Jumping
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise,
ChainDB)
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment
(InvalidBlockPunishment)
Expand All @@ -56,16 +57,16 @@ data ChainDbView m blk = ChainDbView {
getCurrentChain :: STM m (AnchoredFragment (Header blk))
, getIsFetched :: STM m (Point blk -> Bool)
, getMaxSlotNo :: STM m MaxSlotNo
, addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool
, addBlockAsync :: InvalidBlockPunishment m -> blk -> m (AddBlockPromise m blk)
, getChainSelStarvation :: STM m ChainSelStarvation
}

defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk
defaultChainDbView :: ChainDB m blk -> ChainDbView m blk
defaultChainDbView chainDB = ChainDbView {
getCurrentChain = ChainDB.getCurrentChain chainDB
, getIsFetched = ChainDB.getIsFetched chainDB
, getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
, addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
, addBlockAsync = ChainDB.addBlockAsync chainDB
, getChainSelStarvation = ChainDB.getChainSelStarvation chainDB
}

Expand Down Expand Up @@ -215,8 +216,8 @@ mkBlockFetchConsensusInterface
pipeliningPunishment <- InvalidBlockPunishment.mkForDiffusionPipelining
pure $ mkAddFetchedBlock_ pipeliningPunishment enabledPipelining

-- Waits until the block has been written to disk, but not until chain
-- selection has processed the block.
-- Hand over the block to the ChainDB, but don't wait until it has been
-- written to disk or processed.
mkAddFetchedBlock_ ::
( BlockConfig blk
-> Header blk
Expand Down Expand Up @@ -260,7 +261,7 @@ mkBlockFetchConsensusInterface
NotReceivingTentativeBlocks -> disconnect
ReceivingTentativeBlocks ->
pipeliningPunishment bcfg (getHeader blk) disconnect
addBlockWaitWrittenToDisk
addBlockAsync
chainDB
punishment
blk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
ChainSelAddBlock BlockToAdd{blockToAdd} ->
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
blockRealPoint blockToAdd
chainSelSync cdb message)
chainSelSync cdb message
lift $ case message of
ChainSelAddBlock blockToAdd ->
deleteBlockToAdd blockToAdd cdbChainSelQueue
_ -> pure ()
)
where
starvationTracer = Tracer $ traceWith cdbTracer . TraceChainSelStarvationEvent
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,11 @@ chainSelSync cdb@CDB {..} (ChainSelAddBlock BlockToAdd { blockToAdd = b, .. }) =
let immBlockNo = AF.anchorBlockNo curChain

-- We follow the steps from section "## Adding a block" in ChainDB.md

-- Note: we call 'chainSelectionForFutureBlocks' in all branches instead
-- of once, before branching, because we want to do it /after/ writing the
-- block to the VolatileDB and delivering the 'varBlockWrittenToDisk'
-- promise, as this is the promise the BlockFetch client waits for.
-- Otherwise, the BlockFetch client would have to wait for
-- 'chainSelectionForFutureBlocks'.
--
-- Note: we call 'chainSelectionForFutureBlocks' in all branches instead of
-- once, before branching, because we want to do it /after/ writing the
-- block to the VolatileDB so that any threads waiting on the
-- 'varBlockWrittenToDisk' promise don't have to wait for the result of

-- ### Ignore
newTip <- if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,15 @@ getBlockComponent ::
getBlockComponent CDB{..} = getAnyBlockComponent cdbImmutableDB cdbVolatileDB

getIsFetched ::
forall m blk. IOLike m
forall m blk. (IOLike m, HasHeader blk)
=> ChainDbEnv m blk -> STM m (Point blk -> Bool)
getIsFetched CDB{..} = basedOnHash <$> VolatileDB.getIsMember cdbVolatileDB
where
-- The volatile DB indexes by hash only, not by points. However, it should
-- not be possible to have two points with the same hash but different
-- slot numbers.
basedOnHash :: (HeaderHash blk -> Bool) -> Point blk -> Bool
basedOnHash f p =
case pointHash p of
BlockHash hash -> f hash
GenesisHash -> False
getIsFetched CDB{..} = do
checkBlocksToAdd <- memberBlocksToAdd cdbChainSelQueue
checkVolDb <- VolatileDB.getIsMember cdbVolatileDB
return $ \pt ->
case pointToWithOriginRealPoint pt of
Origin -> False
NotOrigin pt' -> checkBlocksToAdd pt' || checkVolDb (realPointHash pt')

getIsInvalidBlock ::
forall m blk. (IOLike m, HasHeader blk)
Expand Down Expand Up @@ -185,10 +182,13 @@ getMaxSlotNo CDB{..} = do
-- contains block 9'. The ImmutableDB contains blocks 1-10. The max slot
-- of the current chain will be 10 (being the anchor point of the empty
-- current chain), while the max slot of the VolatileDB will be 9.
curChainMaxSlotNo <- maxSlotNoFromWithOrigin . AF.headSlot
<$> readTVar cdbChain
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo
--
-- Moreover, we have to look in 'ChainSelQueue' too.
curChainMaxSlotNo <-
maxSlotNoFromWithOrigin . AF.headSlot <$> readTVar cdbChain
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
blocksToAddMaxSlotNo <- getBlocksToAddMaxSlotNo cdbChainSelQueue
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo `max` blocksToAddMaxSlotNo

{-------------------------------------------------------------------------------
Unifying interface over the immutable DB and volatile DB, but independent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
-- * Blocks to add
, BlockToAdd (..)
, ChainSelMessage (..)
, ChainSelQueue
, ChainSelQueue -- opaque
, addBlockToAdd
, addReprocessLoEBlocks
, closeChainSelQueue
, deleteBlockToAdd
, getBlocksToAddMaxSlotNo
, getChainSelMessage
, memberBlocksToAdd
, newChainSelQueue
-- * Trace types
, SelectionChangedInfo (..)
Expand All @@ -64,11 +67,12 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
, TraceValidationEvent (..)
) where

import Cardano.Prelude (whenM)
import Cardano.Prelude (Bifunctor (second))
import Control.Monad (void)
import Control.Tracer
import Data.Foldable (traverse_)
import Data.Foldable (for_)
import Data.Map.Strict (Map)
import Data.Maybe (mapMaybe)
import qualified Data.Map.Strict as Map
import Data.Maybe.Strict (StrictMaybe (..))
import Data.Set (Set)
import Data.Typeable
Expand Down Expand Up @@ -106,9 +110,10 @@ import Ouroboros.Consensus.Util.CallStack
import Ouroboros.Consensus.Util.Enclose (Enclosing, Enclosing' (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (WithFingerprint)
import Ouroboros.Consensus.Util.STM (WithFingerprint,
blockUntilChanged)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.Block (MaxSlotNo)
import Ouroboros.Network.Block (MaxSlotNo (..))
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))

Expand Down Expand Up @@ -254,6 +259,17 @@ data ChainDbEnv m blk = CDB
, cdbCheckInFuture :: !(CheckInFuture m blk)
, cdbChainSelQueue :: !(ChainSelQueue m blk)
-- ^ Queue of blocks that still have to be added.
--
-- NOTE: the set of blocks in this queue are /not/ disjoint from the set of
-- blocks in the VolatileDB. When processing the next block in the queue, we
-- do not remove the block from the queue /until/ it has been added to the
-- VolatileDB and processed by chain selection. This means the block
-- currently being added will be both in the queue and the VolatileDB for a
-- short while.
--
-- If we would remove the block from the queue before adding it to the
-- VolatileDB, then it would be in /neither/ for a short time, and
-- 'getIsFetched' would incorrectly return 'False'.
, cdbFutureBlocks :: !(StrictTVar m (FutureBlocks m blk))
-- ^ Blocks from the future
--
Expand Down Expand Up @@ -449,8 +465,21 @@ type FutureBlocks m blk = Map (HeaderHash blk) (Header blk, InvalidBlockPunishme
-- | FIFO queue used to add blocks asynchronously to the ChainDB. Blocks are
-- read from this queue by a background thread, which processes the blocks
-- synchronously.
newtype ChainSelQueue m blk = ChainSelQueue (TBQueue m (ChainSelMessage m blk))
deriving NoThunks via OnlyCheckWhnfNamed "ChainSelQueue" (ChainSelQueue m blk)
data ChainSelQueue m blk = ChainSelQueue {
-- TODO use a better data structure, e.g., a heap from the @heaps@
-- package. Wish list:
-- + O(1) pop min value
-- + O(log n) insert
-- + O(n) get all
-- + Bounded in size
--
-- TODO join consecutive blocks into a fragment that can be added at
-- once.
varChainSelQueue :: !(StrictTVar m (Map (RealPoint blk) (BlockToAdd m blk)))
, chainSelQueueCapacity :: !Word
, varChainSelReprocessLoEBlocks :: !(StrictTVar m Bool)
}
deriving (NoThunks) via OnlyCheckWhnfNamed "ChainSelQueue" (ChainSelQueue m blk)

-- | Entry in the 'ChainSelQueue' queue: a block together with the 'TMVar's used
-- to implement 'AddBlockPromise'.
Expand All @@ -464,6 +493,7 @@ data BlockToAdd m blk = BlockToAdd
, varBlockProcessed :: !(StrictTMVar m (AddBlockResult blk))
-- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'.
}
deriving NoThunks via OnlyCheckWhnfNamed "BlockToAdd" (BlockToAdd m blk)

-- | Different async tasks for triggering ChainSel
data ChainSelMessage m blk
Expand All @@ -473,9 +503,11 @@ data ChainSelMessage m blk
| ChainSelReprocessLoEBlocks

-- | Create a new 'ChainSelQueue' with the given size.
newChainSelQueue :: IOLike m => Word -> m (ChainSelQueue m blk)
newChainSelQueue queueSize = ChainSelQueue <$>
atomically (newTBQueue (fromIntegral queueSize))
newChainSelQueue :: (IOLike m, StandardHash blk, Typeable blk) => Word -> m (ChainSelQueue m blk)
newChainSelQueue chainSelQueueCapacity = do
varChainSelQueue <- newTVarIO mempty
varChainSelReprocessLoEBlocks <- newTVarIO False
return $ ChainSelQueue {varChainSelQueue, chainSelQueueCapacity, varChainSelReprocessLoEBlocks}

-- | Add a block to the 'ChainSelQueue' queue. Can block when the queue is full.
addBlockToAdd ::
Expand All @@ -485,7 +517,7 @@ addBlockToAdd ::
-> InvalidBlockPunishment m
-> blk
-> m (AddBlockPromise m blk)
addBlockToAdd tracer (ChainSelQueue queue) punish blk = do
addBlockToAdd tracer (ChainSelQueue {varChainSelQueue, chainSelQueueCapacity}) punish blk = do
varBlockWrittenToDisk <- newEmptyTMVarIO
varBlockProcessed <- newEmptyTMVarIO
let !toAdd = BlockToAdd
Expand All @@ -496,8 +528,12 @@ addBlockToAdd tracer (ChainSelQueue queue) punish blk = do
}
traceWith tracer $ AddedBlockToQueue (blockRealPoint blk) RisingEdge
queueSize <- atomically $ do
writeTBQueue queue (ChainSelAddBlock toAdd)
lengthTBQueue queue
chainSelQueue <- readTVar varChainSelQueue
let chainSelQueue' = Map.insert (blockRealPoint blk) toAdd chainSelQueue
chainSelQueueSize = Map.size chainSelQueue'
check (fromIntegral chainSelQueueSize <= chainSelQueueCapacity)
writeTVar varChainSelQueue chainSelQueue'
return chainSelQueueSize
traceWith tracer $
AddedBlockToQueue (blockRealPoint blk) (FallingEdgeWith (fromIntegral queueSize))
return AddBlockPromise
Expand All @@ -511,51 +547,82 @@ addReprocessLoEBlocks
=> Tracer m (TraceAddBlockEvent blk)
-> ChainSelQueue m blk
-> m ()
addReprocessLoEBlocks tracer (ChainSelQueue queue) = do
addReprocessLoEBlocks tracer (ChainSelQueue {varChainSelReprocessLoEBlocks}) = do
traceWith tracer $ AddedReprocessLoEBlocksToQueue
atomically $ writeTBQueue queue ChainSelReprocessLoEBlocks
atomically $ writeTVar varChainSelReprocessLoEBlocks True

-- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the
-- queue is empty; in that case, reports the starvation (and its end) to the
-- callback.
getChainSelMessage
:: (IOLike m, HasHeader blk)
:: IOLike m
=> Tracer m (TraceChainSelStarvationEvent blk)
-> StrictTVar m ChainSelStarvation
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
getChainSelMessage starvationTracer starvationVar (ChainSelQueue queue) = do
-- NOTE: The test of emptiness and the blocking read are in different STM
-- transactions on purpose.
whenM (atomically $ isEmptyTBQueue queue) $ do
writeTVarIO starvationVar ChainSelStarvationOngoing
traceWith starvationTracer . ChainSelStarvationStarted =<< getMonotonicTime
message <- atomically $ readTBQueue queue
-- If there was a starvation ongoing, we need to report that it is done.
whenM ((== ChainSelStarvationOngoing) <$> readTVarIO starvationVar) $
case message of
ChainSelAddBlock BlockToAdd {blockToAdd} -> do
time <- getMonotonicTime
traceWith starvationTracer $ ChainSelStarvationEnded time $ blockRealPoint blockToAdd
writeTVarIO starvationVar $ ChainSelStarvationEndedAt time
ChainSelReprocessLoEBlocks -> pure ()
return message
getChainSelMessage starvationTracer starvationVar queue = go
where
go = do
(reprocessLoEBlocks, chainSelQueue) <- atomically readBoth
case reprocessLoEBlocks of
True -> do
writeTVarIO varChainSelReprocessLoEBlocks False
return ChainSelReprocessLoEBlocks
False ->
case Map.minView chainSelQueue of
Just (blockToAdd, chainSelQueue') -> do
writeTVarIO varChainSelQueue chainSelQueue'
return $ ChainSelAddBlock blockToAdd
Nothing -> do
writeTVarIO starvationVar ChainSelStarvationOngoing
traceWith starvationTracer . ChainSelStarvationStarted =<< getMonotonicTime -- FIXME: only trace if first time
void $ atomically $ blockUntilChanged (second Map.null) (False, True) readBoth
go
ChainSelQueue {varChainSelQueue, varChainSelReprocessLoEBlocks} = queue
writeTVarIO v x = atomically $ writeTVar v x
readBoth = (,) <$> readTVar varChainSelReprocessLoEBlocks <*> readTVar varChainSelQueue

-- | Flush the 'ChainSelQueue' queue and notify the waiting threads.
--
-- REVIEW: What about all the threads that are waiting to write in the queue and
-- will write after the flush?!
closeChainSelQueue :: IOLike m => ChainSelQueue m blk -> STM m ()
closeChainSelQueue (ChainSelQueue queue) = do
as <- mapMaybe blockAdd <$> flushTBQueue queue
traverse_ (\a -> tryPutTMVar (varBlockProcessed a)
(FailedToAddBlock "Queue flushed"))
as
where
blockAdd = \case
ChainSelAddBlock ab -> Just ab
ChainSelReprocessLoEBlocks -> Nothing
closeChainSelQueue ChainSelQueue {varChainSelQueue} = do
chainSelQueue <- readTVar varChainSelQueue
for_ chainSelQueue $ \BlockToAdd {varBlockProcessed} ->
putTMVar varBlockProcessed $ FailedToAddBlock "Queue flushed"

-- | Delete the given 'BlockToAdd' from the 'ChainSelQueue'.
--
-- PRECONDITION: the given 'BlockToAdd' is in 'ChainSelQueue'.
deleteBlockToAdd ::
(IOLike m, HasHeader blk)
=> BlockToAdd m blk
-> ChainSelQueue m blk
-> m ()
deleteBlockToAdd (BlockToAdd _ blk _ _) (ChainSelQueue {varChainSelQueue}) =
atomically $ modifyTVar varChainSelQueue $ Map.delete (blockRealPoint blk)

-- | Return a function to test the membership for the given 'BlocksToAdd'.
memberBlocksToAdd ::
(IOLike m, HasHeader blk)
=> ChainSelQueue m blk
-> STM m (RealPoint blk -> Bool)
memberBlocksToAdd (ChainSelQueue {varChainSelQueue}) =
flip Map.member <$> readTVar varChainSelQueue

getBlocksToAddMaxSlotNo ::
IOLike m
=> ChainSelQueue m blk
-> STM m MaxSlotNo
getBlocksToAddMaxSlotNo (ChainSelQueue {varChainSelQueue}) = aux <$> readTVar varChainSelQueue
where
-- | The 'Ord' instance of 'RealPoint' orders by 'SlotNo' first, so the
-- maximal key of the map has the greatest 'SlotNo'.
aux :: Map (RealPoint blk) (BlockToAdd m blk) -> MaxSlotNo
aux queue = case Map.lookupMax queue of
Nothing -> NoMaxSlotNo
Just (RealPoint s _, _) -> MaxSlotNo s

{-------------------------------------------------------------------------------
Trace types
Expand Down

0 comments on commit 09e9837

Please sign in to comment.