Skip to content

Commit

Permalink
Reimplement ChainDB init to unmask exceptions
Browse files Browse the repository at this point in the history
Prior to this change, ChainDB was being initialized in an `allocate`
call which mask exceptions, and because of this the initialization
process was uninterruptible.

This change reimplements the logic so that initialization is no longer
masked. The ChainDB initialization now runs with a temporary resource
registry that will ensure deallocation of resources in presence of an
exception. Refer to the additions to the report for an explanation of
the new structure of registries and how they are related.
  • Loading branch information
jasagredo committed Nov 11, 2021
1 parent 5743fdf commit 8d8d8cb
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 75 deletions.
2 changes: 1 addition & 1 deletion ouroboros-consensus-cardano/tools/db-analyser/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ analyse CmdLine {..} args =
Nothing -> pure genesisLedger
Just snapshot -> readSnapshot ledgerDbFS (decodeExtLedgerState' cfg) decode snapshot
initLedger <- either (error . show) pure initLedgerErr
ImmutableDB.withDB (ImmutableDB.openDB immutableDbArgs) $ \immutableDB -> do
ImmutableDB.withDB (ImmutableDB.openDB immutableDbArgs (runWithTempRegistry . runnableInnerWithTempRegistry)) $ \immutableDB -> do
runAnalysis analysis $ AnalysisEnv {
cfg
, initLedger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ closeOpenIterators varIters = do

open :: ImmutableDbArgs Identity IO TestBlock -> IO ImmutableDBState
open args = do
(db, internal) <- openDBInternal args
(db, internal) <- openDBInternal args (runWithTempRegistry . runnableInnerWithTempRegistry)
return ImmutableDBState { db, internal }

-- | Opens a new ImmutableDB and stores it in 'varDB'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import Ouroboros.Network.Block (MaxSlotNo)

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry

import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.FS.API (SomeHasFS (..), hPutAll,
Expand Down Expand Up @@ -483,7 +484,7 @@ data VolatileDBEnv = VolatileDBEnv
-- Does not close the current VolatileDB stored in 'varDB'.
reopenDB :: VolatileDBEnv -> IO ()
reopenDB VolatileDBEnv { varDB, args } = do
db <- openDB args
db <- openDB args (runWithTempRegistry . runnableInnerWithTempRegistry)
void $ swapMVar varDB db

semanticsImpl :: VolatileDBEnv -> At CmdErr Concrete -> IO (At Resp Concrete)
Expand Down Expand Up @@ -587,7 +588,7 @@ test cmds = do
}

(hist, res, trace) <- bracket
(openDB args >>= newMVar)
(openDB args (runWithTempRegistry . runnableInnerWithTempRegistry) >>= newMVar)
-- Note: we might be closing a different VolatileDB than the one we
-- opened, as we can reopen it the VolatileDB, swapping the VolatileDB
-- in the MVar.
Expand Down
7 changes: 2 additions & 5 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,8 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
, ChainDB.cdbVolatileDbValidation = ValidateAll
}

(_, chainDB) <- allocate registry
(\_ -> openChainDB
registry inFuture cfg initLedger
llrnChainDbArgsDefaults customiseChainDbArgs')
ChainDB.closeDB
chainDB <- openChainDB registry inFuture cfg initLedger
llrnChainDbArgsDefaults customiseChainDbArgs'

btime <-
hardForkBlockchainTime $
Expand Down
53 changes: 29 additions & 24 deletions ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl (
) where

import Control.Monad (when)
import Control.Monad.Trans.Class (lift)
import Control.Tracer
import Data.Functor ((<&>))
import Data.Functor.Identity (Identity)
Expand All @@ -55,6 +56,8 @@ import Ouroboros.Consensus.Util.STM (Fingerprint (..),

import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
import qualified Ouroboros.Consensus.Storage.ChainDB.API as API
import Ouroboros.Consensus.Util.ResourceRegistry (allocate,
runInnerWithTempRegistry, runWithTempRegistry)

import Ouroboros.Consensus.Storage.ChainDB.Impl.Args (ChainDbArgs,
defaultArgs)
Expand Down Expand Up @@ -112,32 +115,32 @@ openDBInternal
=> ChainDbArgs Identity m blk
-> Bool -- ^ 'True' = Launch background tasks
-> m (ChainDB m blk, Internal m blk)
openDBInternal args launchBgTasks = do
immutableDB <- ImmutableDB.openDB argsImmutableDb
immutableDbTipPoint <- atomically $ ImmutableDB.getTipPoint immutableDB
openDBInternal args launchBgTasks = runWithTempRegistry $ do
immutableDB <- ImmutableDB.openDB argsImmutableDb (`runInnerWithTempRegistry` (const $ const True))
immutableDbTipPoint <- lift $ atomically $ ImmutableDB.getTipPoint immutableDB
let immutableDbTipChunk =
chunkIndexOfPoint (Args.cdbChunkInfo args) immutableDbTipPoint
traceWith tracer $
lift $ traceWith tracer $
TraceOpenEvent $
OpenedImmutableDB immutableDbTipPoint immutableDbTipChunk

volatileDB <- VolatileDB.openDB argsVolatileDb
traceWith tracer $ TraceOpenEvent OpenedVolatileDB
volatileDB <- VolatileDB.openDB argsVolatileDb (`runInnerWithTempRegistry` (const $ const True))
lift $ traceWith tracer $ TraceOpenEvent OpenedVolatileDB
let lgrReplayTracer =
LgrDB.decorateReplayTracer
immutableDbTipPoint
(contramap TraceLedgerReplayEvent tracer)
(lgrDB, replayed) <- LgrDB.openDB argsLgrDb
(lgrDB, replayed) <- lift $ LgrDB.openDB argsLgrDb
lgrReplayTracer
immutableDB
(Query.getAnyKnownBlock immutableDB volatileDB)
traceWith tracer $ TraceOpenEvent OpenedLgrDB
lift $ traceWith tracer $ TraceOpenEvent OpenedLgrDB

varInvalid <- newTVarIO (WithFingerprint Map.empty (Fingerprint 0))
varFutureBlocks <- newTVarIO Map.empty
varInvalid <- lift $ newTVarIO (WithFingerprint Map.empty (Fingerprint 0))
varFutureBlocks <- lift $ newTVarIO Map.empty


chainAndLedger <- ChainSel.initialChainSelection
chainAndLedger <- lift $ ChainSel.initialChainSelection
immutableDB
volatileDB
lgrDB
Expand All @@ -151,15 +154,15 @@ openDBInternal args launchBgTasks = do
ledger = VF.validatedLedger chainAndLedger
cfg = Args.cdbTopLevelConfig args

atomically $ LgrDB.setCurrent lgrDB ledger
varChain <- newTVarIO chain
varIterators <- newTVarIO Map.empty
varFollowers <- newTVarIO Map.empty
varNextIteratorKey <- newTVarIO (IteratorKey 0)
varNextFollowerKey <- newTVarIO (FollowerKey 0)
varCopyLock <- newMVar ()
varKillBgThreads <- newTVarIO $ return ()
blocksToAdd <- newBlocksToAdd (Args.cdbBlocksToAddSize args)
lift $ atomically $ LgrDB.setCurrent lgrDB ledger
varChain <- lift $ newTVarIO chain
varIterators <- lift $ newTVarIO Map.empty
varFollowers <- lift $ newTVarIO Map.empty
varNextIteratorKey <- lift $ newTVarIO (IteratorKey 0)
varNextFollowerKey <- lift $ newTVarIO (FollowerKey 0)
varCopyLock <- lift $ newMVar ()
varKillBgThreads <- lift $ newTVarIO $ return ()
blocksToAdd <- lift $ newBlocksToAdd (Args.cdbBlocksToAddSize args)

let env = CDB { cdbImmutableDB = immutableDB
, cdbVolatileDB = volatileDB
Expand All @@ -184,7 +187,7 @@ openDBInternal args launchBgTasks = do
, cdbBlocksToAdd = blocksToAdd
, cdbFutureBlocks = varFutureBlocks
}
h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env
h <- lift $ fmap CDBHandle $ newTVarIO $ ChainDbOpen env
let chainDB = API.ChainDB
{ addBlockAsync = getEnv1 h ChainSel.addBlockAsync
, getCurrentChain = getEnvSTM h Query.getCurrentChain
Expand All @@ -210,13 +213,15 @@ openDBInternal args launchBgTasks = do
, intKillBgThreads = varKillBgThreads
}

traceWith tracer $ TraceOpenEvent $ OpenedDB
lift $ traceWith tracer $ TraceOpenEvent $ OpenedDB
(castPoint $ AF.anchorPoint chain)
(castPoint $ AF.headPoint chain)

when launchBgTasks $ Background.launchBgTasks env replayed
when launchBgTasks $ lift $ Background.launchBgTasks env replayed

return (chainDB, testing)
_ <- lift $ allocate (Args.cdbRegistry args) (\_ -> return $ chainDB) API.closeDB

return ((chainDB, testing), ())
where
tracer = Args.cdbTracer args
(argsImmutableDb, argsVolatileDb, argsLgrDb, _) = Args.fromChainDbArgs args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ data ChainDbSpecificArgs f m blk = ChainDbSpecificArgs {
-- ^ Batch all scheduled GCs so that at most one GC happens every
-- 'cdbsGcInterval'.
, cdbsRegistry :: HKD f (ResourceRegistry m)
-- ^ TODO: the ImmutableDB takes a 'ResourceRegistry' too, but we're
-- using it for ChainDB-specific things. Revisit these arguments.
, cdbsTracer :: Tracer m (TraceEvent blk)
}

Expand Down Expand Up @@ -230,7 +228,7 @@ toChainDbArgs ImmutableDB.ImmutableDbArgs {..}
-- Misc
, cdbTracer = cdbsTracer
, cdbTraceLedger = lgrTraceLedger
, cdbRegistry = immRegistry
, cdbRegistry = cdbsRegistry
, cdbGcDelay = cdbsGcDelay
, cdbGcInterval = cdbsGcInterval
, cdbBlocksToAddSize = cdbsBlocksToAddSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}

-- | Immutable on-disk database of binary blobs
Expand Down Expand Up @@ -111,8 +112,7 @@ import Ouroboros.Consensus.Block hiding (headerHash)
import Ouroboros.Consensus.Util (SomePair (..))
import Ouroboros.Consensus.Util.Args
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry,
runWithTempRegistry)
import Ouroboros.Consensus.Util.ResourceRegistry

import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.FS.API
Expand Down Expand Up @@ -212,31 +212,40 @@ deleteAfter = deleteAfter_
------------------------------------------------------------------------------}

openDB ::
forall m blk.
forall m blk ans.
( IOLike m
, GetPrevHash blk
, ConvertRawHash blk
, ImmutableDbSerialiseConstraints blk
, HasCallStack
)
=> ImmutableDbArgs Identity m blk
-> m (ImmutableDB m blk)
openDB args = fst <$> openDBInternal args
-> (
(forall h.
WithTempRegistry (OpenState m blk h) m (ImmutableDB m blk, OpenState m blk h, OpenState m blk h, OpenState m blk h -> m Bool)
-> ans)
-> ans)
openDB args cont = openDBInternal args (cont . swizzle)
where swizzle w = w >>= \((a, _), c, d, e) -> return (a, c, d, e)

-- | For testing purposes: exposes internals via 'Internal'
--
--
openDBInternal ::
forall m blk.
forall m blk ans.
( IOLike m
, GetPrevHash blk
, ConvertRawHash blk
, ImmutableDbSerialiseConstraints blk
, HasCallStack
)
=> ImmutableDbArgs Identity m blk
-> m (ImmutableDB m blk, Internal m blk)
openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } = runWithTempRegistry $ do
-> (
(forall h.
WithTempRegistry (OpenState m blk h) m ((ImmutableDB m blk, Internal m blk), OpenState m blk h, OpenState m blk h, OpenState m blk h -> m Bool)
-> ans)
-> ans)
openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } cont = cont $ do
lift $ createDirectoryIfMissing hasFS True (mkFsPath [])
let validateEnv = ValidateEnv {
hasFS = hasFS
Expand All @@ -256,7 +265,6 @@ openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } = runWithTempR
, checkIntegrity = immCheckIntegrity
, chunkInfo = immChunkInfo
, tracer = immTracer
, registry = immRegistry
, cacheConfig = immCacheConfig
, codecConfig = immCodecConfig
}
Expand All @@ -275,7 +283,7 @@ openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } = runWithTempR
-- Note that we can still leak resources if the caller of
-- 'openDBInternal' doesn't bracket his call with 'closeDB' or doesn't
-- use a 'ResourceRegistry'.
return ((db, internal), ost)
return ((db, internal), ost, ost, (>> return True) . cleanUp hasFS)

closeDBImpl ::
forall m blk. (HasCallStack, IOLike m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import GHC.Stack (HasCallStack)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Util (SomePair (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry,
WithTempRegistry, allocateTemp, modifyWithTempRegistry)
import Ouroboros.Consensus.Util.ResourceRegistry

import Ouroboros.Consensus.Storage.FS.API
import Ouroboros.Consensus.Storage.FS.API.Types
Expand All @@ -64,7 +63,6 @@ data ImmutableDBEnv m blk = forall h. Eq h => ImmutableDBEnv {
, checkIntegrity :: !(blk -> Bool)
, chunkInfo :: !ChunkInfo
, tracer :: !(Tracer m (TraceEvent blk))
, registry :: !(ResourceRegistry m)
, cacheConfig :: !Index.CacheConfig
, codecConfig :: !(CodecConfig blk)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import qualified Streaming.Prelude as S
import Ouroboros.Consensus.Block hiding (hashSize)
import Ouroboros.Consensus.Util (lastMaybe, whenJust)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry,
WithTempRegistry)
import Ouroboros.Consensus.Util.ResourceRegistry

import Ouroboros.Consensus.Storage.FS.API
import Ouroboros.Consensus.Storage.FS.API.Types
Expand Down Expand Up @@ -85,7 +84,6 @@ validateAndReopen ::
)
=> ValidateEnv m blk h
-> ResourceRegistry m
-- ^ Not used for validation, only used to open a new index
-> ValidationPolicy
-> WithTempRegistry (OpenState m blk h) m (OpenState m blk h)
validateAndReopen validateEnv registry valPol = wrapFsError (Proxy @blk) $ do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Util.Args
import Ouroboros.Consensus.Util.IOLike
import qualified Ouroboros.Consensus.Util.MonadSTM.RAWLock as RAWLock
import Ouroboros.Consensus.Util.ResourceRegistry (allocateTemp,
runWithTempRegistry)
import Ouroboros.Consensus.Util.ResourceRegistry

import Ouroboros.Consensus.Storage.Common (BlockComponent (..))
import Ouroboros.Consensus.Storage.FS.API
Expand Down Expand Up @@ -188,25 +187,28 @@ type VolatileDbSerialiseConstraints blk =
)

openDB ::
forall m blk.
forall m blk ans.
( HasCallStack
, IOLike m
, HasHeader blk
, GetPrevHash blk
, VolatileDbSerialiseConstraints blk
)
=> VolatileDbArgs Identity m blk
-> m (VolatileDB m blk)
openDB VolatileDbArgs { volHasFS = SomeHasFS hasFS, .. } = runWithTempRegistry $ do
-> (
(forall h.
WithTempRegistry (OpenState blk h) m (VolatileDB m blk, OpenState blk h, OpenState blk h, OpenState blk h -> m Bool)
-> ans)
-> ans)
openDB VolatileDbArgs { volHasFS = SomeHasFS hasFS, .. } cont = cont $ do
lift $ createDirectoryIfMissing hasFS True (mkFsPath [])
ost <-
mkOpenState
volCodecConfig
hasFS
volCheckIntegrity
volValidationPolicy
volTracer
volMaxBlocksPerFile
ost <- mkOpenState
volCodecConfig
hasFS
volCheckIntegrity
volValidationPolicy
volTracer
volMaxBlocksPerFile
stVar <- lift $ RAWLock.new (DbOpen ost)
let env = VolatileDBEnv {
hasFS = hasFS
Expand All @@ -225,7 +227,7 @@ openDB VolatileDbArgs { volHasFS = SomeHasFS hasFS, .. } = runWithTempRegistry $
, getBlockInfo = getBlockInfoImpl env
, getMaxSlotNo = getMaxSlotNoImpl env
}
return (volatileDB, ost)
return (volatileDB, ost, ost, (>> return True) . closeOpenHandles hasFS)

{------------------------------------------------------------------------------
VolatileDB API
Expand Down
Loading

0 comments on commit 8d8d8cb

Please sign in to comment.