diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs index 93750475a9..6bebea9d1f 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs @@ -30,7 +30,8 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol (LedgerSupportsProtocol) import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as Jumping -import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB) +import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise, + ChainDB) import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB import Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment (InvalidBlockPunishment) @@ -56,16 +57,16 @@ data ChainDbView m blk = ChainDbView { getCurrentChain :: STM m (AnchoredFragment (Header blk)) , getIsFetched :: STM m (Point blk -> Bool) , getMaxSlotNo :: STM m MaxSlotNo - , addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool + , addBlockAsync :: InvalidBlockPunishment m -> blk -> m (AddBlockPromise m blk) , getChainSelStarvation :: STM m ChainSelStarvation } -defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk +defaultChainDbView :: ChainDB m blk -> ChainDbView m blk defaultChainDbView chainDB = ChainDbView { getCurrentChain = ChainDB.getCurrentChain chainDB , getIsFetched = ChainDB.getIsFetched chainDB , getMaxSlotNo = ChainDB.getMaxSlotNo chainDB - , addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB + , addBlockAsync = ChainDB.addBlockAsync chainDB , getChainSelStarvation = ChainDB.getChainSelStarvation chainDB } @@ -215,8 +216,8 @@ mkBlockFetchConsensusInterface pipeliningPunishment <- InvalidBlockPunishment.mkForDiffusionPipelining pure $ mkAddFetchedBlock_ pipeliningPunishment enabledPipelining - -- Waits until the block has been written to disk, but not until chain - -- selection has processed the block. + -- Hand over the block to the ChainDB, but don't wait until it has been + -- written to disk or processed. mkAddFetchedBlock_ :: ( BlockConfig blk -> Header blk @@ -260,7 +261,7 @@ mkBlockFetchConsensusInterface NotReceivingTentativeBlocks -> disconnect ReceivingTentativeBlocks -> pipeliningPunishment bcfg (getHeader blk) disconnect - addBlockWaitWrittenToDisk + addBlockAsync chainDB punishment blk diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs index a854904cae..82a0a228be 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs @@ -540,6 +540,11 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do ChainSelAddBlock BlockToAdd{blockToAdd} -> trace $ PoppedBlockFromQueue $ FallingEdgeWith $ blockRealPoint blockToAdd - chainSelSync cdb message) + chainSelSync cdb message + lift $ case message of + ChainSelAddBlock blockToAdd -> + deleteBlockToAdd blockToAdd cdbChainSelQueue + _ -> pure () + ) where starvationTracer = Tracer $ traceWith cdbTracer . TraceChainSelStarvationEvent diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs index 9965743b4f..c688119201 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs @@ -335,13 +335,11 @@ chainSelSync cdb@CDB {..} (ChainSelAddBlock BlockToAdd { blockToAdd = b, .. }) = let immBlockNo = AF.anchorBlockNo curChain -- We follow the steps from section "## Adding a block" in ChainDB.md - - -- Note: we call 'chainSelectionForFutureBlocks' in all branches instead - -- of once, before branching, because we want to do it /after/ writing the - -- block to the VolatileDB and delivering the 'varBlockWrittenToDisk' - -- promise, as this is the promise the BlockFetch client waits for. - -- Otherwise, the BlockFetch client would have to wait for - -- 'chainSelectionForFutureBlocks'. + -- + -- Note: we call 'chainSelectionForFutureBlocks' in all branches instead of + -- once, before branching, because we want to do it /after/ writing the + -- block to the VolatileDB so that any threads waiting on the + -- 'varBlockWrittenToDisk' promise don't have to wait for the result of -- ### Ignore newTip <- if diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs index 96bb57eac3..c697f11804 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs @@ -131,18 +131,15 @@ getBlockComponent :: getBlockComponent CDB{..} = getAnyBlockComponent cdbImmutableDB cdbVolatileDB getIsFetched :: - forall m blk. IOLike m + forall m blk. (IOLike m, HasHeader blk) => ChainDbEnv m blk -> STM m (Point blk -> Bool) -getIsFetched CDB{..} = basedOnHash <$> VolatileDB.getIsMember cdbVolatileDB - where - -- The volatile DB indexes by hash only, not by points. However, it should - -- not be possible to have two points with the same hash but different - -- slot numbers. - basedOnHash :: (HeaderHash blk -> Bool) -> Point blk -> Bool - basedOnHash f p = - case pointHash p of - BlockHash hash -> f hash - GenesisHash -> False +getIsFetched CDB{..} = do + checkBlocksToAdd <- memberBlocksToAdd cdbChainSelQueue + checkVolDb <- VolatileDB.getIsMember cdbVolatileDB + return $ \pt -> + case pointToWithOriginRealPoint pt of + Origin -> False + NotOrigin pt' -> checkBlocksToAdd pt' || checkVolDb (realPointHash pt') getIsInvalidBlock :: forall m blk. (IOLike m, HasHeader blk) @@ -185,10 +182,13 @@ getMaxSlotNo CDB{..} = do -- contains block 9'. The ImmutableDB contains blocks 1-10. The max slot -- of the current chain will be 10 (being the anchor point of the empty -- current chain), while the max slot of the VolatileDB will be 9. - curChainMaxSlotNo <- maxSlotNoFromWithOrigin . AF.headSlot - <$> readTVar cdbChain - volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB - return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo + -- + -- Moreover, we have to look in 'ChainSelQueue' too. + curChainMaxSlotNo <- + maxSlotNoFromWithOrigin . AF.headSlot <$> readTVar cdbChain + volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB + blocksToAddMaxSlotNo <- getBlocksToAddMaxSlotNo cdbChainSelQueue + return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo `max` blocksToAddMaxSlotNo {------------------------------------------------------------------------------- Unifying interface over the immutable DB and volatile DB, but independent diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs index c1e1141b76..83c6476a9d 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs @@ -43,11 +43,14 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types ( -- * Blocks to add , BlockToAdd (..) , ChainSelMessage (..) - , ChainSelQueue + , ChainSelQueue -- opaque , addBlockToAdd , addReprocessLoEBlocks , closeChainSelQueue + , deleteBlockToAdd + , getBlocksToAddMaxSlotNo , getChainSelMessage + , memberBlocksToAdd , newChainSelQueue -- * Trace types , SelectionChangedInfo (..) @@ -64,11 +67,12 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types ( , TraceValidationEvent (..) ) where -import Cardano.Prelude (whenM) +import Cardano.Prelude (Bifunctor (second)) +import Control.Monad (void) import Control.Tracer -import Data.Foldable (traverse_) +import Data.Foldable (for_) import Data.Map.Strict (Map) -import Data.Maybe (mapMaybe) +import qualified Data.Map.Strict as Map import Data.Maybe.Strict (StrictMaybe (..)) import Data.Set (Set) import Data.Typeable @@ -106,9 +110,10 @@ import Ouroboros.Consensus.Util.CallStack import Ouroboros.Consensus.Util.Enclose (Enclosing, Enclosing' (..)) import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.ResourceRegistry -import Ouroboros.Consensus.Util.STM (WithFingerprint) +import Ouroboros.Consensus.Util.STM (WithFingerprint, + blockUntilChanged) import Ouroboros.Network.AnchoredFragment (AnchoredFragment) -import Ouroboros.Network.Block (MaxSlotNo) +import Ouroboros.Network.Block (MaxSlotNo (..)) import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation (..)) @@ -254,6 +259,17 @@ data ChainDbEnv m blk = CDB , cdbCheckInFuture :: !(CheckInFuture m blk) , cdbChainSelQueue :: !(ChainSelQueue m blk) -- ^ Queue of blocks that still have to be added. + -- + -- NOTE: the set of blocks in this queue are /not/ disjoint from the set of + -- blocks in the VolatileDB. When processing the next block in the queue, we + -- do not remove the block from the queue /until/ it has been added to the + -- VolatileDB and processed by chain selection. This means the block + -- currently being added will be both in the queue and the VolatileDB for a + -- short while. + -- + -- If we would remove the block from the queue before adding it to the + -- VolatileDB, then it would be in /neither/ for a short time, and + -- 'getIsFetched' would incorrectly return 'False'. , cdbFutureBlocks :: !(StrictTVar m (FutureBlocks m blk)) -- ^ Blocks from the future -- @@ -449,8 +465,21 @@ type FutureBlocks m blk = Map (HeaderHash blk) (Header blk, InvalidBlockPunishme -- | FIFO queue used to add blocks asynchronously to the ChainDB. Blocks are -- read from this queue by a background thread, which processes the blocks -- synchronously. -newtype ChainSelQueue m blk = ChainSelQueue (TBQueue m (ChainSelMessage m blk)) - deriving NoThunks via OnlyCheckWhnfNamed "ChainSelQueue" (ChainSelQueue m blk) +data ChainSelQueue m blk = ChainSelQueue { + -- TODO use a better data structure, e.g., a heap from the @heaps@ + -- package. Wish list: + -- + O(1) pop min value + -- + O(log n) insert + -- + O(n) get all + -- + Bounded in size + -- + -- TODO join consecutive blocks into a fragment that can be added at + -- once. + varChainSelQueue :: !(StrictTVar m (Map (RealPoint blk) (BlockToAdd m blk))) + , chainSelQueueCapacity :: !Word + , varChainSelReprocessLoEBlocks :: !(StrictTVar m Bool) + } + deriving (NoThunks) via OnlyCheckWhnfNamed "ChainSelQueue" (ChainSelQueue m blk) -- | Entry in the 'ChainSelQueue' queue: a block together with the 'TMVar's used -- to implement 'AddBlockPromise'. @@ -464,6 +493,7 @@ data BlockToAdd m blk = BlockToAdd , varBlockProcessed :: !(StrictTMVar m (AddBlockResult blk)) -- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'. } + deriving NoThunks via OnlyCheckWhnfNamed "BlockToAdd" (BlockToAdd m blk) -- | Different async tasks for triggering ChainSel data ChainSelMessage m blk @@ -473,9 +503,11 @@ data ChainSelMessage m blk | ChainSelReprocessLoEBlocks -- | Create a new 'ChainSelQueue' with the given size. -newChainSelQueue :: IOLike m => Word -> m (ChainSelQueue m blk) -newChainSelQueue queueSize = ChainSelQueue <$> - atomically (newTBQueue (fromIntegral queueSize)) +newChainSelQueue :: (IOLike m, StandardHash blk, Typeable blk) => Word -> m (ChainSelQueue m blk) +newChainSelQueue chainSelQueueCapacity = do + varChainSelQueue <- newTVarIO mempty + varChainSelReprocessLoEBlocks <- newTVarIO False + return $ ChainSelQueue {varChainSelQueue, chainSelQueueCapacity, varChainSelReprocessLoEBlocks} -- | Add a block to the 'ChainSelQueue' queue. Can block when the queue is full. addBlockToAdd :: @@ -485,7 +517,7 @@ addBlockToAdd :: -> InvalidBlockPunishment m -> blk -> m (AddBlockPromise m blk) -addBlockToAdd tracer (ChainSelQueue queue) punish blk = do +addBlockToAdd tracer (ChainSelQueue {varChainSelQueue, chainSelQueueCapacity}) punish blk = do varBlockWrittenToDisk <- newEmptyTMVarIO varBlockProcessed <- newEmptyTMVarIO let !toAdd = BlockToAdd @@ -496,8 +528,12 @@ addBlockToAdd tracer (ChainSelQueue queue) punish blk = do } traceWith tracer $ AddedBlockToQueue (blockRealPoint blk) RisingEdge queueSize <- atomically $ do - writeTBQueue queue (ChainSelAddBlock toAdd) - lengthTBQueue queue + chainSelQueue <- readTVar varChainSelQueue + let chainSelQueue' = Map.insert (blockRealPoint blk) toAdd chainSelQueue + chainSelQueueSize = Map.size chainSelQueue' + check (fromIntegral chainSelQueueSize <= chainSelQueueCapacity) + writeTVar varChainSelQueue chainSelQueue' + return chainSelQueueSize traceWith tracer $ AddedBlockToQueue (blockRealPoint blk) (FallingEdgeWith (fromIntegral queueSize)) return AddBlockPromise @@ -511,51 +547,82 @@ addReprocessLoEBlocks => Tracer m (TraceAddBlockEvent blk) -> ChainSelQueue m blk -> m () -addReprocessLoEBlocks tracer (ChainSelQueue queue) = do +addReprocessLoEBlocks tracer (ChainSelQueue {varChainSelReprocessLoEBlocks}) = do traceWith tracer $ AddedReprocessLoEBlocksToQueue - atomically $ writeTBQueue queue ChainSelReprocessLoEBlocks + atomically $ writeTVar varChainSelReprocessLoEBlocks True -- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the -- queue is empty; in that case, reports the starvation (and its end) to the -- callback. getChainSelMessage - :: (IOLike m, HasHeader blk) + :: IOLike m => Tracer m (TraceChainSelStarvationEvent blk) -> StrictTVar m ChainSelStarvation -> ChainSelQueue m blk -> m (ChainSelMessage m blk) -getChainSelMessage starvationTracer starvationVar (ChainSelQueue queue) = do - -- NOTE: The test of emptiness and the blocking read are in different STM - -- transactions on purpose. - whenM (atomically $ isEmptyTBQueue queue) $ do - writeTVarIO starvationVar ChainSelStarvationOngoing - traceWith starvationTracer . ChainSelStarvationStarted =<< getMonotonicTime - message <- atomically $ readTBQueue queue - -- If there was a starvation ongoing, we need to report that it is done. - whenM ((== ChainSelStarvationOngoing) <$> readTVarIO starvationVar) $ - case message of - ChainSelAddBlock BlockToAdd {blockToAdd} -> do - time <- getMonotonicTime - traceWith starvationTracer $ ChainSelStarvationEnded time $ blockRealPoint blockToAdd - writeTVarIO starvationVar $ ChainSelStarvationEndedAt time - ChainSelReprocessLoEBlocks -> pure () - return message +getChainSelMessage starvationTracer starvationVar queue = go where + go = do + (reprocessLoEBlocks, chainSelQueue) <- atomically readBoth + case reprocessLoEBlocks of + True -> do + writeTVarIO varChainSelReprocessLoEBlocks False + return ChainSelReprocessLoEBlocks + False -> + case Map.minView chainSelQueue of + Just (blockToAdd, chainSelQueue') -> do + writeTVarIO varChainSelQueue chainSelQueue' + return $ ChainSelAddBlock blockToAdd + Nothing -> do + writeTVarIO starvationVar ChainSelStarvationOngoing + traceWith starvationTracer . ChainSelStarvationStarted =<< getMonotonicTime -- FIXME: only trace if first time + void $ atomically $ blockUntilChanged (second Map.null) (False, True) readBoth + go + ChainSelQueue {varChainSelQueue, varChainSelReprocessLoEBlocks} = queue writeTVarIO v x = atomically $ writeTVar v x + readBoth = (,) <$> readTVar varChainSelReprocessLoEBlocks <*> readTVar varChainSelQueue -- | Flush the 'ChainSelQueue' queue and notify the waiting threads. -- +-- REVIEW: What about all the threads that are waiting to write in the queue and +-- will write after the flush?! closeChainSelQueue :: IOLike m => ChainSelQueue m blk -> STM m () -closeChainSelQueue (ChainSelQueue queue) = do - as <- mapMaybe blockAdd <$> flushTBQueue queue - traverse_ (\a -> tryPutTMVar (varBlockProcessed a) - (FailedToAddBlock "Queue flushed")) - as - where - blockAdd = \case - ChainSelAddBlock ab -> Just ab - ChainSelReprocessLoEBlocks -> Nothing +closeChainSelQueue ChainSelQueue {varChainSelQueue} = do + chainSelQueue <- readTVar varChainSelQueue + for_ chainSelQueue $ \BlockToAdd {varBlockProcessed} -> + putTMVar varBlockProcessed $ FailedToAddBlock "Queue flushed" +-- | Delete the given 'BlockToAdd' from the 'ChainSelQueue'. +-- +-- PRECONDITION: the given 'BlockToAdd' is in 'ChainSelQueue'. +deleteBlockToAdd :: + (IOLike m, HasHeader blk) + => BlockToAdd m blk + -> ChainSelQueue m blk + -> m () +deleteBlockToAdd (BlockToAdd _ blk _ _) (ChainSelQueue {varChainSelQueue}) = + atomically $ modifyTVar varChainSelQueue $ Map.delete (blockRealPoint blk) + +-- | Return a function to test the membership for the given 'BlocksToAdd'. +memberBlocksToAdd :: + (IOLike m, HasHeader blk) + => ChainSelQueue m blk + -> STM m (RealPoint blk -> Bool) +memberBlocksToAdd (ChainSelQueue {varChainSelQueue}) = + flip Map.member <$> readTVar varChainSelQueue + +getBlocksToAddMaxSlotNo :: + IOLike m + => ChainSelQueue m blk + -> STM m MaxSlotNo +getBlocksToAddMaxSlotNo (ChainSelQueue {varChainSelQueue}) = aux <$> readTVar varChainSelQueue + where + -- | The 'Ord' instance of 'RealPoint' orders by 'SlotNo' first, so the + -- maximal key of the map has the greatest 'SlotNo'. + aux :: Map (RealPoint blk) (BlockToAdd m blk) -> MaxSlotNo + aux queue = case Map.lookupMax queue of + Nothing -> NoMaxSlotNo + Just (RealPoint s _, _) -> MaxSlotNo s {------------------------------------------------------------------------------- Trace types