Skip to content

Commit

Permalink
Add LoE to the ChainDB QSM test (#1119)
Browse files Browse the repository at this point in the history
Closes #541

This PR adds a new instruction to the ChainDB q-s-m test, which updates
the Limit on Eagerness (LoE) fragment and retriggers chain selection.
This requires a model implementation of the LoE.

As a preparatory change, we add a way to trigger chain selection
synchronously (ie such that we can wait for it to finish), which is only
used in tests.
  • Loading branch information
amesgen committed Aug 30, 2024
2 parents 242f513 + 1357c1b commit 0cae17c
Show file tree
Hide file tree
Showing 13 changed files with 361 additions and 118 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Breaking

- ChainDB: allow to trigger chain selection synchronously
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ module Ouroboros.Consensus.Genesis.Governor (
, sharedCandidatePrefix
) where

import Control.Monad (guard, when)
import Control.Monad (guard, void, when)
import Control.Tracer (Tracer, traceWith)
import Data.Bifunctor (second)
import Data.Containers.ListUtils (nubOrd)
Expand Down Expand Up @@ -142,7 +142,7 @@ gddWatcher cfg tracer chainDb getGsmState getHandles varLoEFrag =
-- The chain selection only depends on the LoE tip, so there
-- is no point in retriggering it if the LoE tip hasn't changed.
when (AF.headHash oldLoEFrag /= AF.headHash loeFrag) $
ChainDB.triggerChainSelectionAsync chainDb
void $ ChainDB.triggerChainSelectionAsync chainDb

-- | Pure snapshot of the dynamic data the GDD operates on.
data GDDStateView m blk peer = GDDStateView {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ module Ouroboros.Consensus.Storage.ChainDB.API (
, addBlock
, addBlockWaitWrittenToDisk
, addBlock_
-- * Trigger chain selection
, ChainSelectionPromise (..)
, triggerChainSelection
, triggerChainSelectionAsync
-- * Serialised block/header with its point
, WithPoint (..)
Expand Down Expand Up @@ -135,7 +138,7 @@ data ChainDB m blk = ChainDB {
addBlockAsync :: InvalidBlockPunishment m -> blk -> m (AddBlockPromise m blk)

-- | Trigger reprocessing of blocks postponed by the LoE.
, chainSelAsync :: m ()
, chainSelAsync :: m (ChainSelectionPromise m)

-- | Get the current chain fragment
--
Expand Down Expand Up @@ -462,9 +465,25 @@ addBlock_ = void ..: addBlock

-- | Alias for naming consistency.
-- The short name was chosen to avoid a larger diff from alignment changes.
triggerChainSelectionAsync :: ChainDB m blk -> m ()
triggerChainSelectionAsync :: ChainDB m blk -> m (ChainSelectionPromise m)
triggerChainSelectionAsync = chainSelAsync

-- | A promise that the chain selection will be performed. It is returned by
-- 'triggerChainSelectionAsync' and contains a monadic action that waits until
-- the corresponding run of Chain Selection is done.
newtype ChainSelectionPromise m = ChainSelectionPromise {
-- NOTE: We might want a mechanism similar to 'AddBlockPromise' and
-- 'AddBlockResult', in case the background ChainDB thread dies; but we
-- currently only use the synchronous variant in tests.
waitChainSelectionPromise :: m ()
}

-- | Trigger selection synchronously: wait until the chain selection has been
-- performed. This is a partial function, only to support tests.
triggerChainSelection :: IOLike m => ChainDB m blk -> m ()
triggerChainSelection chainDB =
waitChainSelectionPromise =<< chainSelAsync chainDB

{-------------------------------------------------------------------------------
Serialised block/header with its point
-------------------------------------------------------------------------------}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,8 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
(lift $ getChainSelMessage cdbChainSelQueue)
(\message -> lift $ atomically $ do
case message of
ChainSelReprocessLoEBlocks -> pure ()
ChainSelReprocessLoEBlocks varProcessed ->
void $ tryPutTMVar varProcessed ()
ChainSelAddBlock BlockToAdd{varBlockWrittenToDisk, varBlockProcessed} -> do
_ <- tryPutTMVar varBlockWrittenToDisk
False
Expand All @@ -535,7 +536,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
closeChainSelQueue cdbChainSelQueue)
(\message -> do
lift $ case message of
ChainSelReprocessLoEBlocks ->
ChainSelReprocessLoEBlocks _ ->
trace PoppedReprocessLoEBlocksFromQueue
ChainSelAddBlock BlockToAdd{blockToAdd} ->
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ triggerChainSelectionAsync ::
forall m blk.
IOLike m =>
ChainDbEnv m blk ->
m ()
m (ChainSelectionPromise m)
triggerChainSelectionAsync CDB {cdbTracer, cdbChainSelQueue} =
addReprocessLoEBlocks (TraceAddBlockEvent >$< cdbTracer) cdbChainSelQueue

Expand Down Expand Up @@ -303,27 +303,29 @@ chainSelSync ::
-- 'ChainSelReprocessLoEBlocks' whenever we receive a new header or lose a
-- peer.
-- If 'cdbLoE' is 'LoEDisabled', this task is skipped.
chainSelSync cdb@CDB{..} ChainSelReprocessLoEBlocks = lift cdbLoE >>= \case
LoEDisabled -> pure ()
LoEEnabled _ -> do
(succsOf, chain) <- lift $ atomically $ do
invalid <- forgetFingerprint <$> readTVar cdbInvalid
(,)
<$> (ignoreInvalidSuc cdbVolatileDB invalid <$>
VolatileDB.filterByPredecessor cdbVolatileDB)
<*> Query.getCurrentChain cdb
let
succsOf' = Set.toList . succsOf . pointHash . castPoint
loeHashes = succsOf' (AF.anchorPoint chain)
firstHeader = either (const Nothing) Just $ AF.last chain
-- We avoid the VolatileDB for the headers we already have in the chain
getHeaderFromHash hash =
case firstHeader of
Just header | headerHash header == hash -> pure header
_ -> VolatileDB.getKnownBlockComponent cdbVolatileDB GetHeader hash
loeHeaders <- lift (mapM getHeaderFromHash loeHashes)
for_ loeHeaders $ \hdr ->
void (chainSelectionForBlock cdb BlockCache.empty hdr noPunishment)
chainSelSync cdb@CDB{..} (ChainSelReprocessLoEBlocks varProcessed) = do
lift cdbLoE >>= \case
LoEDisabled -> pure ()
LoEEnabled _ -> do
(succsOf, chain) <- lift $ atomically $ do
invalid <- forgetFingerprint <$> readTVar cdbInvalid
(,)
<$> (ignoreInvalidSuc cdbVolatileDB invalid <$>
VolatileDB.filterByPredecessor cdbVolatileDB)
<*> Query.getCurrentChain cdb
let
succsOf' = Set.toList . succsOf . pointHash . castPoint
loeHashes = succsOf' (AF.anchorPoint chain)
firstHeader = either (const Nothing) Just $ AF.last chain
-- We avoid the VolatileDB for the headers we already have in the chain
getHeaderFromHash hash =
case firstHeader of
Just header | headerHash header == hash -> pure header
_ -> VolatileDB.getKnownBlockComponent cdbVolatileDB GetHeader hash
loeHeaders <- lift (mapM getHeaderFromHash loeHashes)
for_ loeHeaders $ \hdr ->
void (chainSelectionForBlock cdb BlockCache.empty hdr noPunishment)
lift $ atomically $ putTMVar varProcessed ()

chainSelSync cdb@CDB {..} (ChainSelAddBlock BlockToAdd { blockToAdd = b, .. }) = do
(isMember, invalid, curChain) <- lift $ atomically $ (,,)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
ChainDbEnv (..)
, ChainDbHandle (..)
, ChainDbState (..)
, ChainSelectionPromise (..)
, SerialiseDiskConstraints
, getEnv
, getEnv1
Expand Down Expand Up @@ -83,9 +84,9 @@ import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise (..),
AddBlockResult (..), ChainDbError (..), ChainType,
InvalidBlockReason, LoE, StreamFrom, StreamTo,
UnknownRange)
AddBlockResult (..), ChainDbError (..),
ChainSelectionPromise (..), ChainType, InvalidBlockReason,
LoE, StreamFrom, StreamTo, UnknownRange)
import Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment
(InvalidBlockPunishment)
import Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB (LgrDB,
Expand Down Expand Up @@ -462,8 +463,10 @@ data BlockToAdd m blk = BlockToAdd
data ChainSelMessage m blk
-- | Add a new block
= ChainSelAddBlock !(BlockToAdd m blk)
-- | Reprocess blocks that have been postponed by the LoE
-- | Reprocess blocks that have been postponed by the LoE.
| ChainSelReprocessLoEBlocks
!(StrictTMVar m ())
-- ^ Used for 'ChainSelectionPromise'.

-- | Create a new 'ChainSelQueue' with the given size.
newChainSelQueue :: IOLike m => Word -> m (ChainSelQueue m blk)
Expand Down Expand Up @@ -503,10 +506,13 @@ addReprocessLoEBlocks
:: IOLike m
=> Tracer m (TraceAddBlockEvent blk)
-> ChainSelQueue m blk
-> m ()
-> m (ChainSelectionPromise m)
addReprocessLoEBlocks tracer (ChainSelQueue queue) = do
varProcessed <- newEmptyTMVarIO
let waitUntilRan = atomically $ readTMVar varProcessed
traceWith tracer $ AddedReprocessLoEBlocksToQueue
atomically $ writeTBQueue queue ChainSelReprocessLoEBlocks
atomically $ writeTBQueue queue $ ChainSelReprocessLoEBlocks varProcessed
return $ ChainSelectionPromise waitUntilRan

-- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the
-- queue is empty.
Expand All @@ -524,7 +530,7 @@ closeChainSelQueue (ChainSelQueue queue) = do
where
blockAdd = \case
ChainSelAddBlock ab -> Just ab
ChainSelReprocessLoEBlocks -> Nothing
ChainSelReprocessLoEBlocks _ -> Nothing


{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import Ouroboros.Consensus.Ledger.Query
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Node.ProtocolInfo
import Ouroboros.Consensus.Protocol.Abstract (ChainDepState)
import Ouroboros.Consensus.Storage.ChainDB.API (LoE (..))
import Ouroboros.Consensus.Storage.ImmutableDB.Chunks.Internal
(ChunkNo (..), ChunkSize (..), RelativeSlot (..))
import Ouroboros.Consensus.Storage.ImmutableDB.Chunks.Layout
Expand Down Expand Up @@ -407,3 +408,12 @@ instance Arbitrary Index.CacheConfig where
-- TODO create a Cmd that advances time, so this is being exercised too.
expireUnusedAfter <- (fromIntegral :: Int -> DiffTime) <$> choose (1, 100)
return Index.CacheConfig {Index.pastChunksToCache, Index.expireUnusedAfter}

{-------------------------------------------------------------------------------
LoE
-------------------------------------------------------------------------------}

instance Arbitrary a => Arbitrary (LoE a) where
arbitrary = oneof [pure LoEDisabled, LoEEnabled <$> arbitrary]
shrink LoEDisabled = []
shrink (LoEEnabled x) = LoEDisabled : map LoEEnabled (shrink x)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}

Expand All @@ -18,9 +19,12 @@ import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.Extended
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Storage.ChainDB (InvalidBlockReason)
import Ouroboros.Consensus.Storage.ChainDB.API (LoE (..))
import Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB
import Ouroboros.Consensus.Storage.ImmutableDB
import Ouroboros.Consensus.Util.STM (Fingerprint, WithFingerprint)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as Fragment
import Ouroboros.Network.Block (MaxSlotNo)
import Ouroboros.Network.Mock.Chain
import Ouroboros.Network.Mock.ProducerState
Expand All @@ -37,6 +41,14 @@ instance ToExpr (HeaderHash blk) => ToExpr (Point blk)
instance ToExpr (HeaderHash blk) => ToExpr (RealPoint blk)
instance (ToExpr slot, ToExpr hash) => ToExpr (Block slot hash)

deriving instance ( ToExpr blk
, ToExpr (HeaderHash blk)
)
=> ToExpr (Fragment.Anchor blk)

instance (ToExpr blk, ToExpr (HeaderHash blk)) => ToExpr (AnchoredFragment blk) where
toExpr f = toExpr (Fragment.anchor f, Fragment.toOldestFirst f)

{-------------------------------------------------------------------------------
ouroboros-consensus
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -73,6 +85,8 @@ instance ToExpr ChunkInfo where
instance ToExpr FsError where
toExpr fsError = App (show fsError) []

deriving instance ToExpr a => ToExpr (LoE a)


{-------------------------------------------------------------------------------
si-timers
Expand Down
Loading

0 comments on commit 0cae17c

Please sign in to comment.