Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChainDB: let the BlockFetch client add blocks asynchronously #2721

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ initBlockFetchConsensusInterface cfg chainDB getCandidates blockFetchSize btime
readFetchedBlocks :: STM m (Point blk -> Bool)
readFetchedBlocks = ChainDB.getIsFetched chainDB

-- Waits until the block has been written to disk, but not until chain
-- selection has processed the block.
-- Hand over the block to the ChainDB, but don't wait until it has been
-- written to disk or processed.
addFetchedBlock :: Point blk -> blk -> m ()
addFetchedBlock _pt = void . ChainDB.addBlockWaitWrittenToDisk chainDB
addFetchedBlock _pt = void . ChainDB.addBlockAsync chainDB

readFetchedMaxSlotNo :: STM m MaxSlotNo
readFetchedMaxSlotNo = ChainDB.getMaxSlotNo chainDB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,4 @@ addBlockRunner
addBlockRunner cdb@CDB{..} = forever $ do
blockToAdd <- getBlockToAdd cdbBlocksToAdd
addBlockSync cdb blockToAdd
deleteBlockToAdd blockToAdd cdbBlocksToAdd
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,11 @@ addBlockSync cdb@CDB {..} BlockToAdd { blockToAdd = b, .. } = do
let immBlockNo = AF.anchorBlockNo curChain

-- We follow the steps from section "## Adding a block" in ChainDB.md

-- Note: we call 'chainSelectionForFutureBlocks' in all branches instead
-- of once, before branching, because we want to do it /after/ writing the
-- block to the VolatileDB and delivering the 'varBlockWrittenToDisk'
-- promise, as this is the promise the BlockFetch client waits for.
-- Otherwise, the BlockFetch client would have to wait for
--
-- Note: we call 'chainSelectionForFutureBlocks' in all branches instead of
-- once, before branching, because we want to do it /after/ writing the
-- block to the VolatileDB so that any threads waiting on the
-- 'varBlockWrittenToDisk' promise don't have to wait for the result of
-- 'chainSelectionForFutureBlocks'.

-- ### Ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,15 @@ getBlockComponent ::
getBlockComponent CDB{..} = getAnyBlockComponent cdbImmutableDB cdbVolatileDB

getIsFetched ::
forall m blk. IOLike m
forall m blk. (IOLike m, HasHeader blk)
=> ChainDbEnv m blk -> STM m (Point blk -> Bool)
getIsFetched CDB{..} = basedOnHash <$> VolatileDB.getIsMember cdbVolatileDB
where
-- The volatile DB indexes by hash only, not by points. However, it should
-- not be possible to have two points with the same hash but different
-- slot numbers.
basedOnHash :: (HeaderHash blk -> Bool) -> Point blk -> Bool
basedOnHash f p =
case pointHash p of
BlockHash hash -> f hash
GenesisHash -> False
getIsFetched CDB{..} = do
checkBlocksToAdd <- memberBlocksToAdd cdbBlocksToAdd
checkVolDb <- VolatileDB.getIsMember cdbVolatileDB
return $ \pt ->
case pointToWithOriginRealPoint pt of
Origin -> False
NotOrigin pt' -> checkBlocksToAdd pt' || checkVolDb (realPointHash pt')
Copy link
Contributor Author

@mrBliss mrBliss Nov 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: when a block has been removed but not yet written to the VolatileDB, this will return False. Possible solution: in the background thread, peek the block, process it, and then remove it from the queue. Add a comment that these two are not disjoint.

cc: @edsko.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thing someone is paying attention 🙄


getIsInvalidBlock ::
forall m blk. (IOLike m, HasHeader blk)
Expand Down Expand Up @@ -194,10 +191,13 @@ getMaxSlotNo CDB{..} = do
-- contains block 9'. The ImmutableDB contains blocks 1-10. The max slot
-- of the current chain will be 10 (being the anchor point of the empty
-- current chain), while the max slot of the VolatileDB will be 9.
curChainMaxSlotNo <- maxSlotNoFromWithOrigin . AF.headSlot
<$> readTVar cdbChain
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo
--
-- Moreover, we have to look in 'BlocksToAdd' too.
curChainMaxSlotNo <-
maxSlotNoFromWithOrigin . AF.headSlot <$> readTVar cdbChain
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
blocksToAddMaxSlotNo <- getBlocksToAddMaxSlotNo cdbBlocksToAdd
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo `max` blocksToAddMaxSlotNo

{-------------------------------------------------------------------------------
Unifying interface over the immutable DB and volatile DB, but independent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
-- * Future blocks
, FutureBlocks
-- * Blocks to add
, BlocksToAdd
, BlocksToAdd -- opaque
, BlockToAdd (..)
, newBlocksToAdd
, addBlockToAdd
, getBlockToAdd
, deleteBlockToAdd
, memberBlocksToAdd
, getBlocksToAddMaxSlotNo
-- * Trace types
, TraceEvent (..)
, NewTipInfo (..)
Expand All @@ -61,6 +64,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (

import Control.Tracer
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Typeable
import Data.Void (Void)
import Data.Word (Word64)
Expand All @@ -70,6 +74,7 @@ import NoThunks.Class (OnlyCheckWhnfNamed (..))
import Control.Monad.Class.MonadSTM.Strict (newEmptyTMVarIO)

import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.Block (MaxSlotNo (..))

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
Expand Down Expand Up @@ -241,6 +246,17 @@ data ChainDbEnv m blk = CDB
, cdbCheckInFuture :: !(CheckInFuture m blk)
, cdbBlocksToAdd :: !(BlocksToAdd m blk)
-- ^ Queue of blocks that still have to be added.
--
-- NOTE: the set of blocks in this queue are /not/ disjoint from the set of
-- blocks in the VolatileDB. When processing the next block in the queue, we
-- do not remove the block from the queue /until/ it has been added to the
-- VolatileDB and processed by chain selection. This means the block
-- currently being added will be both in the queue and the VolatileDB for a
-- short while.
--
-- If we would remove the block from the queue before adding it to the
-- VolatileDB, then it would be in /neither/ for a short time, and
-- 'getIsFetched' would incorrectly return 'False'.
, cdbFutureBlocks :: !(StrictTVar m (FutureBlocks blk))
-- ^ Blocks from the future
--
Expand Down Expand Up @@ -424,23 +440,40 @@ type FutureBlocks blk = Map (HeaderHash blk) (Header blk)
-- | FIFO queue used to add blocks asynchronously to the ChainDB. Blocks are
-- read from this queue by a background thread, which processes the blocks
-- synchronously.
newtype BlocksToAdd m blk = BlocksToAdd (TBQueue m (BlockToAdd m blk))
data BlocksToAdd m blk = BlocksToAdd {
-- TODO use a better data structure, e.g., a heap from the @heaps@
-- package. Wish list:
-- + O(1) pop min value
-- + O(log n) insert
-- + O(n) get all
-- + Bounded in size
--
-- TODO join consecutive blocks into a fragment that can be added at
-- once.
blocksToAddQueue :: !(StrictTVar m (Map (RealPoint blk) (BlockToAdd m blk)))
, blocksToAddCapacity :: !Word
}
deriving NoThunks via OnlyCheckWhnfNamed "BlocksToAdd" (BlocksToAdd m blk)

-- | Entry in the 'BlocksToAdd' queue: a block together with the 'TMVar's used
-- to implement 'AddBlockPromise'.
data BlockToAdd m blk = BlockToAdd
{ blockToAdd :: !blk
, varBlockWrittenToDisk :: !(StrictTMVar m Bool)
-- ^ Used for the 'blockWrittenToDisk' field of 'AddBlockPromise'.
, varBlockProcessed :: !(StrictTMVar m (Point blk))
-- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'.
}
data BlockToAdd m blk = BlockToAdd {
blockToAdd :: !blk
, varBlockWrittenToDisk :: !(StrictTMVar m Bool)
-- ^ Used for the 'blockWrittenToDisk' field of 'AddBlockPromise'.
, varBlockProcessed :: !(StrictTMVar m (Point blk))
-- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'.
}
deriving NoThunks via OnlyCheckWhnfNamed "BlockToAdd" (BlockToAdd m blk)

-- | Create a new 'BlocksToAdd' with the given size.
newBlocksToAdd :: IOLike m => Word -> m (BlocksToAdd m blk)
newBlocksToAdd queueSize = BlocksToAdd <$>
atomically (newTBQueue (fromIntegral queueSize))
newBlocksToAdd :: (IOLike m, HasHeader blk) => Word -> m (BlocksToAdd m blk)
newBlocksToAdd queueCapacity = do
varQueue <- newTVarIO mempty
return BlocksToAdd {
blocksToAddQueue = varQueue
, blocksToAddCapacity = queueCapacity
}

-- | Add a block to the 'BlocksToAdd' queue. Can block when the queue is full.
addBlockToAdd
Expand All @@ -449,7 +482,7 @@ addBlockToAdd
-> BlocksToAdd m blk
-> blk
-> m (AddBlockPromise m blk)
addBlockToAdd tracer (BlocksToAdd queue) blk = do
addBlockToAdd tracer (BlocksToAdd varQueue queueCapacity) blk = do
varBlockWrittenToDisk <- newEmptyTMVarIO
varBlockProcessed <- newEmptyTMVarIO
let !toAdd = BlockToAdd
Expand All @@ -458,19 +491,64 @@ addBlockToAdd tracer (BlocksToAdd queue) blk = do
, varBlockProcessed
}
queueSize <- atomically $ do
writeTBQueue queue toAdd
lengthTBQueue queue
queue <- readTVar varQueue
let queue' = Map.insert (blockRealPoint blk) toAdd queue
queueSize = Map.size queue'
check (fromIntegral queueSize <= queueCapacity)
writeTVar varQueue queue'
return queueSize
traceWith tracer $
AddedBlockToQueue (blockRealPoint blk) (fromIntegral queueSize)
return AddBlockPromise
{ blockWrittenToDisk = readTMVar varBlockWrittenToDisk
, blockProcessed = readTMVar varBlockProcessed
}

-- | Get the oldest block from the 'BlocksToAdd' queue. Can block when the
-- queue is empty.
-- | Get the next block from the 'BlocksToAdd' queue. Blocks when the queue is
-- empty.
--
-- NOTE: does not remove the block from the queue, 'deleteBlockToAdd' can be
-- used for that.
getBlockToAdd :: IOLike m => BlocksToAdd m blk -> m (BlockToAdd m blk)
getBlockToAdd (BlocksToAdd queue) = atomically $ readTBQueue queue
getBlockToAdd (BlocksToAdd varQueue _) = atomically $ do
queue <- readTVar varQueue
case Map.minView queue of
Nothing -> retry
Just (toProcess, queue') -> do
writeTVar varQueue queue'
return toProcess

-- | Delete the given 'BlockToAdd' from the 'BlocksToAdd'.
--
-- PRECONDITION: the given 'BlockToAdd' is in 'BlocksToAdd'.
deleteBlockToAdd ::
(IOLike m, HasHeader blk)
=> BlockToAdd m blk
-> BlocksToAdd m blk
-> m ()
deleteBlockToAdd (BlockToAdd blk _ _) (BlocksToAdd varQueue _) =
atomically $ modifyTVar varQueue $ Map.delete (blockRealPoint blk)

-- | Return a function to test the membership for the given 'BlocksToAdd'.
memberBlocksToAdd ::
(IOLike m, HasHeader blk)
=> BlocksToAdd m blk
-> STM m (RealPoint blk -> Bool)
memberBlocksToAdd (BlocksToAdd varQueue _) =
flip Map.member <$> readTVar varQueue

getBlocksToAddMaxSlotNo ::
IOLike m
=> BlocksToAdd m blk
-> STM m MaxSlotNo
getBlocksToAddMaxSlotNo (BlocksToAdd varQueue _) = aux <$> readTVar varQueue
where
-- | The 'Ord' instance of 'RealPoint' orders by 'SlotNo' first, so the
-- maximal key of the map has the greatest 'SlotNo'.
aux :: Map (RealPoint blk) (BlockToAdd m blk) -> MaxSlotNo
aux queue = case Map.lookupMax queue of
Nothing -> NoMaxSlotNo
Just (RealPoint s _, _) -> MaxSlotNo s

{-------------------------------------------------------------------------------
Trace types
Expand Down