Skip to content

Commit

Permalink
ChainDB: allow to trigger chain selection synchronously
Browse files Browse the repository at this point in the history
This is useful for tests.

Co-authored-by: Nicolas “Niols” Jeannerod <nicolas.jeannerod@tweag.io>
  • Loading branch information
amesgen and Niols committed Jul 25, 2024
1 parent d16178f commit 6442d81
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Breaking

- ChainDB: allow to trigger chain selection synchronously
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ module Ouroboros.Consensus.Genesis.Governor (
, sharedCandidatePrefix
) where

import Control.Monad (guard, when)
import Control.Monad (guard, void, when)
import Control.Tracer (Tracer, traceWith)
import Data.Bifunctor (second)
import Data.Containers.ListUtils (nubOrd)
Expand Down Expand Up @@ -142,7 +142,7 @@ gddWatcher cfg tracer chainDb getGsmState getHandles varLoEFrag =
-- The chain selection only depends on the LoE tip, so there
-- is no point in retriggering it if the LoE tip hasn't changed.
when (AF.headHash oldLoEFrag /= AF.headHash loeFrag) $
ChainDB.triggerChainSelectionAsync chainDb
void $ ChainDB.triggerChainSelectionAsync chainDb

-- | Pure snapshot of the dynamic data the GDD operates on.
data GDDStateView m blk peer = GDDStateView {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ module Ouroboros.Consensus.Storage.ChainDB.API (
, addBlock
, addBlockWaitWrittenToDisk
, addBlock_
-- * Trigger chain selection
, ChainSelectionPromise (..)
, triggerChainSelection
, triggerChainSelectionAsync
-- * Serialised block/header with its point
, WithPoint (..)
Expand Down Expand Up @@ -135,7 +138,7 @@ data ChainDB m blk = ChainDB {
addBlockAsync :: InvalidBlockPunishment m -> blk -> m (AddBlockPromise m blk)

-- | Trigger reprocessing of blocks postponed by the LoE.
, chainSelAsync :: m ()
, chainSelAsync :: m (ChainSelectionPromise m)

-- | Get the current chain fragment
--
Expand Down Expand Up @@ -462,9 +465,25 @@ addBlock_ = void ..: addBlock

-- | Alias for naming consistency.
-- The short name was chosen to avoid a larger diff from alignment changes.
triggerChainSelectionAsync :: ChainDB m blk -> m ()
triggerChainSelectionAsync :: ChainDB m blk -> m (ChainSelectionPromise m)
triggerChainSelectionAsync = chainSelAsync

-- | A promise that the chain selection will be performed. It is returned by
-- 'triggerChainSelectionAsync' and contains a monadic action that waits until
-- the corresponding run of Chain Selection is done.
newtype ChainSelectionPromise m = ChainSelectionPromise {
-- NOTE: We might want a mechanism similar to 'AddBlockPromise' and
-- 'AddBlockResult', in case the background ChainDB thread dies; but we
-- currently only use the synchronous variant in tests.
waitChainSelectionPromise :: m ()
}

-- | Trigger selection synchronously: wait until the chain selection has been
-- performed. This is a partial function, only to support tests.
triggerChainSelection :: IOLike m => ChainDB m blk -> m ()
triggerChainSelection chainDB =
waitChainSelectionPromise =<< chainSelAsync chainDB

{-------------------------------------------------------------------------------
Serialised block/header with its point
-------------------------------------------------------------------------------}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,8 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
(lift $ getChainSelMessage cdbChainSelQueue)
(\message -> lift $ atomically $ do
case message of
ChainSelReprocessLoEBlocks -> pure ()
ChainSelReprocessLoEBlocks varProcessed ->
void $ tryPutTMVar varProcessed ()
ChainSelAddBlock BlockToAdd{varBlockWrittenToDisk, varBlockProcessed} -> do
_ <- tryPutTMVar varBlockWrittenToDisk
False
Expand All @@ -535,7 +536,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
closeChainSelQueue cdbChainSelQueue)
(\message -> do
lift $ case message of
ChainSelReprocessLoEBlocks ->
ChainSelReprocessLoEBlocks _ ->
trace PoppedReprocessLoEBlocksFromQueue
ChainSelAddBlock BlockToAdd{blockToAdd} ->
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ triggerChainSelectionAsync ::
forall m blk.
IOLike m =>
ChainDbEnv m blk ->
m ()
m (ChainSelectionPromise m)
triggerChainSelectionAsync CDB {cdbTracer, cdbChainSelQueue} =
addReprocessLoEBlocks (TraceAddBlockEvent >$< cdbTracer) cdbChainSelQueue

Expand Down Expand Up @@ -303,27 +303,29 @@ chainSelSync ::
-- 'ChainSelReprocessLoEBlocks' whenever we receive a new header or lose a
-- peer.
-- If 'cdbLoE' is 'LoEDisabled', this task is skipped.
chainSelSync cdb@CDB{..} ChainSelReprocessLoEBlocks = lift cdbLoE >>= \case
LoEDisabled -> pure ()
LoEEnabled _ -> do
(succsOf, chain) <- lift $ atomically $ do
invalid <- forgetFingerprint <$> readTVar cdbInvalid
(,)
<$> (ignoreInvalidSuc cdbVolatileDB invalid <$>
VolatileDB.filterByPredecessor cdbVolatileDB)
<*> Query.getCurrentChain cdb
let
succsOf' = Set.toList . succsOf . pointHash . castPoint
loeHashes = succsOf' (AF.anchorPoint chain)
firstHeader = either (const Nothing) Just $ AF.last chain
-- We avoid the VolatileDB for the headers we already have in the chain
getHeaderFromHash hash =
case firstHeader of
Just header | headerHash header == hash -> pure header
_ -> VolatileDB.getKnownBlockComponent cdbVolatileDB GetHeader hash
loeHeaders <- lift (mapM getHeaderFromHash loeHashes)
for_ loeHeaders $ \hdr ->
void (chainSelectionForBlock cdb BlockCache.empty hdr noPunishment)
chainSelSync cdb@CDB{..} (ChainSelReprocessLoEBlocks varProcessed) = do
lift cdbLoE >>= \case
LoEDisabled -> pure ()
LoEEnabled _ -> do
(succsOf, chain) <- lift $ atomically $ do
invalid <- forgetFingerprint <$> readTVar cdbInvalid
(,)
<$> (ignoreInvalidSuc cdbVolatileDB invalid <$>
VolatileDB.filterByPredecessor cdbVolatileDB)
<*> Query.getCurrentChain cdb
let
succsOf' = Set.toList . succsOf . pointHash . castPoint
loeHashes = succsOf' (AF.anchorPoint chain)
firstHeader = either (const Nothing) Just $ AF.last chain
-- We avoid the VolatileDB for the headers we already have in the chain
getHeaderFromHash hash =
case firstHeader of
Just header | headerHash header == hash -> pure header
_ -> VolatileDB.getKnownBlockComponent cdbVolatileDB GetHeader hash
loeHeaders <- lift (mapM getHeaderFromHash loeHashes)
for_ loeHeaders $ \hdr ->
void (chainSelectionForBlock cdb BlockCache.empty hdr noPunishment)
lift $ atomically $ putTMVar varProcessed ()

chainSelSync cdb@CDB {..} (ChainSelAddBlock BlockToAdd { blockToAdd = b, .. }) = do
(isMember, invalid, curChain) <- lift $ atomically $ (,,)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
ChainDbEnv (..)
, ChainDbHandle (..)
, ChainDbState (..)
, ChainSelectionPromise (..)
, SerialiseDiskConstraints
, getEnv
, getEnv1
Expand Down Expand Up @@ -83,9 +84,9 @@ import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise (..),
AddBlockResult (..), ChainDbError (..), ChainType,
InvalidBlockReason, LoE, StreamFrom, StreamTo,
UnknownRange)
AddBlockResult (..), ChainDbError (..),
ChainSelectionPromise (..), ChainType, InvalidBlockReason,
LoE, StreamFrom, StreamTo, UnknownRange)
import Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment
(InvalidBlockPunishment)
import Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB (LgrDB,
Expand Down Expand Up @@ -462,8 +463,10 @@ data BlockToAdd m blk = BlockToAdd
data ChainSelMessage m blk
-- | Add a new block
= ChainSelAddBlock !(BlockToAdd m blk)
-- | Reprocess blocks that have been postponed by the LoE
-- | Reprocess blocks that have been postponed by the LoE.
| ChainSelReprocessLoEBlocks
!(StrictTMVar m ())
-- ^ Used for 'ChainSelectionPromise'.

-- | Create a new 'ChainSelQueue' with the given size.
newChainSelQueue :: IOLike m => Word -> m (ChainSelQueue m blk)
Expand Down Expand Up @@ -503,10 +506,13 @@ addReprocessLoEBlocks
:: IOLike m
=> Tracer m (TraceAddBlockEvent blk)
-> ChainSelQueue m blk
-> m ()
-> m (ChainSelectionPromise m)
addReprocessLoEBlocks tracer (ChainSelQueue queue) = do
varProcessed <- newEmptyTMVarIO
let waitUntilRan = atomically $ readTMVar varProcessed
traceWith tracer $ AddedReprocessLoEBlocksToQueue
atomically $ writeTBQueue queue ChainSelReprocessLoEBlocks
atomically $ writeTBQueue queue $ ChainSelReprocessLoEBlocks varProcessed
return $ ChainSelectionPromise waitUntilRan

-- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the
-- queue is empty.
Expand All @@ -524,7 +530,7 @@ closeChainSelQueue (ChainSelQueue queue) = do
where
blockAdd = \case
ChainSelAddBlock ab -> Just ab
ChainSelReprocessLoEBlocks -> Nothing
ChainSelReprocessLoEBlocks _ -> Nothing


{-------------------------------------------------------------------------------
Expand Down

0 comments on commit 6442d81

Please sign in to comment.