Skip to content

Commit

Permalink
Reimplement ChainDB init to unmask exceptions
Browse files Browse the repository at this point in the history
Prior to this change, ChainDB was being initialized in an `allocate`
call which mask exceptions, and because of this the initialization
process was uninterruptible.

This change reimplements the logic so that initialization is no longer
masked. The ChainDB initialization now runs with a temporary resource
registry that will ensure deallocation of resources in presence of an
exception. Refer to the additions to the report for an explanation of
the new structure of registries and how they are related.

Co-authored-by: nfrisby <nick.frisby@iohk.io>
  • Loading branch information
jasagredo and nfrisby committed Nov 17, 2021
1 parent 6568bfe commit 4f4b05a
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 151 deletions.
2 changes: 1 addition & 1 deletion ouroboros-consensus-cardano/tools/db-analyser/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ analyse CmdLine {..} args =
Nothing -> pure genesisLedger
Just snapshot -> readSnapshot ledgerDbFS (decodeExtLedgerState' cfg) decode snapshot
initLedger <- either (error . show) pure initLedgerErr
ImmutableDB.withDB (ImmutableDB.openDB immutableDbArgs) $ \immutableDB -> do
ImmutableDB.withDB (ImmutableDB.openDB immutableDbArgs runWithTempRegistry) $ \immutableDB -> do
runAnalysis analysis $ AnalysisEnv {
cfg
, initLedger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ closeOpenIterators varIters = do

open :: ImmutableDbArgs Identity IO TestBlock -> IO ImmutableDBState
open args = do
(db, internal) <- openDBInternal args
(db, internal) <- openDBInternal args runWithTempRegistry
return ImmutableDBState { db, internal }

-- | Opens a new ImmutableDB and stores it in 'varDB'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import Ouroboros.Network.Block (MaxSlotNo)

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry

import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.FS.API (SomeHasFS (..), hPutAll,
Expand Down Expand Up @@ -483,7 +484,7 @@ data VolatileDBEnv = VolatileDBEnv
-- Does not close the current VolatileDB stored in 'varDB'.
reopenDB :: VolatileDBEnv -> IO ()
reopenDB VolatileDBEnv { varDB, args } = do
db <- openDB args
db <- openDB args runWithTempRegistry
void $ swapMVar varDB db

semanticsImpl :: VolatileDBEnv -> At CmdErr Concrete -> IO (At Resp Concrete)
Expand Down Expand Up @@ -587,7 +588,7 @@ test cmds = do
}

(hist, res, trace) <- bracket
(openDB args >>= newMVar)
(openDB args runWithTempRegistry >>= newMVar)
-- Note: we might be closing a different VolatileDB than the one we
-- opened, as we can reopen it the VolatileDB, swapping the VolatileDB
-- in the MVar.
Expand Down
7 changes: 2 additions & 5 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,8 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
, ChainDB.cdbVolatileDbValidation = ValidateAll
}

(_, chainDB) <- allocate registry
(\_ -> openChainDB
registry inFuture cfg initLedger
llrnChainDbArgsDefaults customiseChainDbArgs')
ChainDB.closeDB
chainDB <- openChainDB registry inFuture cfg initLedger
llrnChainDbArgsDefaults customiseChainDbArgs'

btime <-
hardForkBlockchainTime $
Expand Down
218 changes: 121 additions & 97 deletions ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl (
) where

import Control.Monad (when)
import Control.Monad.Trans.Class (lift)
import Control.Tracer
import Data.Functor ((<&>))
import Data.Functor.Identity (Identity)
Expand All @@ -55,6 +56,8 @@ import Ouroboros.Consensus.Util.STM (Fingerprint (..),

import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
import qualified Ouroboros.Consensus.Storage.ChainDB.API as API
import Ouroboros.Consensus.Util.ResourceRegistry (WithTempRegistry,
allocate, runInnerWithTempRegistry, runWithTempRegistry)

import Ouroboros.Consensus.Storage.ChainDB.Impl.Args (ChainDbArgs,
defaultArgs)
Expand Down Expand Up @@ -112,115 +115,136 @@ openDBInternal
=> ChainDbArgs Identity m blk
-> Bool -- ^ 'True' = Launch background tasks
-> m (ChainDB m blk, Internal m blk)
openDBInternal args launchBgTasks = do
immutableDB <- ImmutableDB.openDB argsImmutableDb
immutableDbTipPoint <- atomically $ ImmutableDB.getTipPoint immutableDB
openDBInternal args launchBgTasks = runWithTempRegistry $ do
immutableDB <- ImmutableDB.openDB argsImmutableDb $ innerOpenCont ImmutableDB.closeDB
immutableDbTipPoint <- lift $ atomically $ ImmutableDB.getTipPoint immutableDB
let immutableDbTipChunk =
chunkIndexOfPoint (Args.cdbChunkInfo args) immutableDbTipPoint
traceWith tracer $
lift $ traceWith tracer $
TraceOpenEvent $
OpenedImmutableDB immutableDbTipPoint immutableDbTipChunk

volatileDB <- VolatileDB.openDB argsVolatileDb
traceWith tracer $ TraceOpenEvent OpenedVolatileDB
let lgrReplayTracer =
LgrDB.decorateReplayTracer
immutableDbTipPoint
(contramap TraceLedgerReplayEvent tracer)
(lgrDB, replayed) <- LgrDB.openDB argsLgrDb
volatileDB <- VolatileDB.openDB argsVolatileDb $ innerOpenCont VolatileDB.closeDB
(chainDB, testing, env) <- lift $ do
traceWith tracer $ TraceOpenEvent OpenedVolatileDB
let lgrReplayTracer =
LgrDB.decorateReplayTracer
immutableDbTipPoint
(contramap TraceLedgerReplayEvent tracer)
(lgrDB, replayed) <- LgrDB.openDB argsLgrDb
lgrReplayTracer
immutableDB
(Query.getAnyKnownBlock immutableDB volatileDB)
traceWith tracer $ TraceOpenEvent OpenedLgrDB

varInvalid <- newTVarIO (WithFingerprint Map.empty (Fingerprint 0))
varFutureBlocks <- newTVarIO Map.empty


chainAndLedger <- ChainSel.initialChainSelection
immutableDB
volatileDB
lgrDB
tracer
(Args.cdbTopLevelConfig args)
varInvalid
varFutureBlocks
(Args.cdbCheckInFuture args)

let chain = VF.validatedFragment chainAndLedger
ledger = VF.validatedLedger chainAndLedger
cfg = Args.cdbTopLevelConfig args

atomically $ LgrDB.setCurrent lgrDB ledger
varChain <- newTVarIO chain
varIterators <- newTVarIO Map.empty
varFollowers <- newTVarIO Map.empty
varNextIteratorKey <- newTVarIO (IteratorKey 0)
varNextFollowerKey <- newTVarIO (FollowerKey 0)
varCopyLock <- newMVar ()
varKillBgThreads <- newTVarIO $ return ()
blocksToAdd <- newBlocksToAdd (Args.cdbBlocksToAddSize args)

let env = CDB { cdbImmutableDB = immutableDB
, cdbVolatileDB = volatileDB
, cdbLgrDB = lgrDB
, cdbChain = varChain
, cdbIterators = varIterators
, cdbFollowers = varFollowers
, cdbTopLevelConfig = cfg
, cdbInvalid = varInvalid
, cdbNextIteratorKey = varNextIteratorKey
, cdbNextFollowerKey = varNextFollowerKey
, cdbCopyLock = varCopyLock
, cdbTracer = tracer
, cdbTraceLedger = Args.cdbTraceLedger args
, cdbRegistry = Args.cdbRegistry args
, cdbGcDelay = Args.cdbGcDelay args
, cdbGcInterval = Args.cdbGcInterval args
, cdbKillBgThreads = varKillBgThreads
, cdbChunkInfo = Args.cdbChunkInfo args
, cdbCheckIntegrity = Args.cdbCheckIntegrity args
, cdbCheckInFuture = Args.cdbCheckInFuture args
, cdbBlocksToAdd = blocksToAdd
, cdbFutureBlocks = varFutureBlocks
}
h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env
let chainDB = API.ChainDB
{ addBlockAsync = getEnv1 h ChainSel.addBlockAsync
, getCurrentChain = getEnvSTM h Query.getCurrentChain
, getLedgerDB = getEnvSTM h Query.getLedgerDB
, getTipBlock = getEnv h Query.getTipBlock
, getTipHeader = getEnv h Query.getTipHeader
, getTipPoint = getEnvSTM h Query.getTipPoint
, getBlockComponent = getEnv2 h Query.getBlockComponent
, getIsFetched = getEnvSTM h Query.getIsFetched
, getIsValid = getEnvSTM h Query.getIsValid
, getMaxSlotNo = getEnvSTM h Query.getMaxSlotNo
, stream = Iterator.stream h
, newFollower = Follower.newFollower h
, getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock
, closeDB = closeDB h
, isOpen = isOpen h
}
testing = Internal
{ intCopyToImmutableDB = getEnv h Background.copyToImmutableDB
, intGarbageCollect = getEnv1 h Background.garbageCollect
, intUpdateLedgerSnapshots = getEnv h Background.updateLedgerSnapshots
, intAddBlockRunner = getEnv h Background.addBlockRunner
, intKillBgThreads = varKillBgThreads
}

traceWith tracer $ TraceOpenEvent $ OpenedDB
(castPoint $ AF.anchorPoint chain)
(castPoint $ AF.headPoint chain)

when launchBgTasks $ Background.launchBgTasks env replayed

return (chainDB, testing)
traceWith tracer $ TraceOpenEvent OpenedLgrDB

varInvalid <- newTVarIO (WithFingerprint Map.empty (Fingerprint 0))
varFutureBlocks <- newTVarIO Map.empty


chainAndLedger <- ChainSel.initialChainSelection
immutableDB
volatileDB
lgrDB
tracer
(Args.cdbTopLevelConfig args)
varInvalid
varFutureBlocks
(Args.cdbCheckInFuture args)

let chain = VF.validatedFragment chainAndLedger
ledger = VF.validatedLedger chainAndLedger
cfg = Args.cdbTopLevelConfig args

atomically $ LgrDB.setCurrent lgrDB ledger
varChain <- newTVarIO chain
varIterators <- newTVarIO Map.empty
varFollowers <- newTVarIO Map.empty
varNextIteratorKey <- newTVarIO (IteratorKey 0)
varNextFollowerKey <- newTVarIO (FollowerKey 0)
varCopyLock <- newMVar ()
varKillBgThreads <- newTVarIO $ return ()
blocksToAdd <- newBlocksToAdd (Args.cdbBlocksToAddSize args)

let env = CDB { cdbImmutableDB = immutableDB
, cdbVolatileDB = volatileDB
, cdbLgrDB = lgrDB
, cdbChain = varChain
, cdbIterators = varIterators
, cdbFollowers = varFollowers
, cdbTopLevelConfig = cfg
, cdbInvalid = varInvalid
, cdbNextIteratorKey = varNextIteratorKey
, cdbNextFollowerKey = varNextFollowerKey
, cdbCopyLock = varCopyLock
, cdbTracer = tracer
, cdbTraceLedger = Args.cdbTraceLedger args
, cdbRegistry = Args.cdbRegistry args
, cdbGcDelay = Args.cdbGcDelay args
, cdbGcInterval = Args.cdbGcInterval args
, cdbKillBgThreads = varKillBgThreads
, cdbChunkInfo = Args.cdbChunkInfo args
, cdbCheckIntegrity = Args.cdbCheckIntegrity args
, cdbCheckInFuture = Args.cdbCheckInFuture args
, cdbBlocksToAdd = blocksToAdd
, cdbFutureBlocks = varFutureBlocks
}
h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env
let chainDB = API.ChainDB
{ addBlockAsync = getEnv1 h ChainSel.addBlockAsync
, getCurrentChain = getEnvSTM h Query.getCurrentChain
, getLedgerDB = getEnvSTM h Query.getLedgerDB
, getTipBlock = getEnv h Query.getTipBlock
, getTipHeader = getEnv h Query.getTipHeader
, getTipPoint = getEnvSTM h Query.getTipPoint
, getBlockComponent = getEnv2 h Query.getBlockComponent
, getIsFetched = getEnvSTM h Query.getIsFetched
, getIsValid = getEnvSTM h Query.getIsValid
, getMaxSlotNo = getEnvSTM h Query.getMaxSlotNo
, stream = Iterator.stream h
, newFollower = Follower.newFollower h
, getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock
, closeDB = closeDB h
, isOpen = isOpen h
}
testing = Internal
{ intCopyToImmutableDB = getEnv h Background.copyToImmutableDB
, intGarbageCollect = getEnv1 h Background.garbageCollect
, intUpdateLedgerSnapshots = getEnv h Background.updateLedgerSnapshots
, intAddBlockRunner = getEnv h Background.addBlockRunner
, intKillBgThreads = varKillBgThreads
}

traceWith tracer $ TraceOpenEvent $ OpenedDB
(castPoint $ AF.anchorPoint chain)
(castPoint $ AF.headPoint chain)

when launchBgTasks $ Background.launchBgTasks env replayed

return (chainDB, testing, env)

_ <- lift $ allocate (Args.cdbRegistry args) (\_ -> return $ chainDB) API.closeDB

return ((chainDB, testing), env)
where
tracer = Args.cdbTracer args
(argsImmutableDb, argsVolatileDb, argsLgrDb, _) = Args.fromChainDbArgs args

-- | We use 'runInnerWithTempRegistry' for the component databases.
innerOpenCont ::
IOLike m
=> (innerDB -> m ())
-> WithTempRegistry st m (innerDB, st)
-> WithTempRegistry (ChainDbEnv m blk) m innerDB
innerOpenCont closer m =
runInnerWithTempRegistry
(fmap (\(innerDB, st) -> (innerDB, st, innerDB)) m)
((True <$) . closer)
(\_env _innerDB -> True)
-- This check is degenerate because handles in @_env@ and the
-- @_innerDB@ handle do not support an equality check; all of the
-- identifying data is only in the handle's closure, not
-- accessible because of our intentional encapsulation choices.

isOpen :: IOLike m => ChainDbHandle m blk -> STM m Bool
isOpen (CDBHandle varState) = readTVar varState <&> \case
ChainDbClosed -> False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ data ChainDbSpecificArgs f m blk = ChainDbSpecificArgs {
-- ^ Batch all scheduled GCs so that at most one GC happens every
-- 'cdbsGcInterval'.
, cdbsRegistry :: HKD f (ResourceRegistry m)
-- ^ TODO: the ImmutableDB takes a 'ResourceRegistry' too, but we're
-- using it for ChainDB-specific things. Revisit these arguments.
, cdbsTracer :: Tracer m (TraceEvent blk)
}

Expand Down Expand Up @@ -230,7 +228,7 @@ toChainDbArgs ImmutableDB.ImmutableDbArgs {..}
-- Misc
, cdbTracer = cdbsTracer
, cdbTraceLedger = lgrTraceLedger
, cdbRegistry = immRegistry
, cdbRegistry = cdbsRegistry
, cdbGcDelay = cdbsGcDelay
, cdbGcInterval = cdbsGcInterval
, cdbBlocksToAddSize = cdbsBlocksToAddSize
Expand Down
Loading

0 comments on commit 4f4b05a

Please sign in to comment.