Skip to content

Commit

Permalink
Further revamp II
Browse files Browse the repository at this point in the history
  • Loading branch information
jasagredo committed Nov 10, 2021
1 parent 850cd3c commit a794cea
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 76 deletions.
2 changes: 1 addition & 1 deletion ouroboros-consensus-cardano/tools/db-analyser/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ analyse CmdLine {..} args =
Nothing -> pure genesisLedger
Just snapshot -> readSnapshot ledgerDbFS (decodeExtLedgerState' cfg) decode snapshot
initLedger <- either (error . show) pure initLedgerErr
ImmutableDB.withDB (runWithTempRegistry $ ImmutableDB.openDB immutableDbArgs) $ \immutableDB -> do
ImmutableDB.withDB (ImmutableDB.openDB immutableDbArgs (runWithTempRegistry . completeComputation)) $ \immutableDB -> do
runAnalysis analysis $ AnalysisEnv {
cfg
, initLedger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ closeOpenIterators varIters = do

open :: ImmutableDbArgs Identity IO TestBlock -> IO ImmutableDBState
open args = do
(db, internal) <- runWithTempRegistry $ openDBInternal args
(db, internal) <- openDBInternal args (runWithTempRegistry . runnableInnerWithTempRegistry)
return ImmutableDBState { db, internal }

-- | Opens a new ImmutableDB and stores it in 'varDB'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import Ouroboros.Network.Block (MaxSlotNo)

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry (runWithTempRegistry)
import Ouroboros.Consensus.Util.ResourceRegistry

import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.FS.API (SomeHasFS (..), hPutAll,
Expand Down Expand Up @@ -484,7 +484,7 @@ data VolatileDBEnv = VolatileDBEnv
-- Does not close the current VolatileDB stored in 'varDB'.
reopenDB :: VolatileDBEnv -> IO ()
reopenDB VolatileDBEnv { varDB, args } = do
db <- runWithTempRegistry $ openDB args
db <- openDB args (runWithTempRegistry . runnableInnerWithTempRegistry)
void $ swapMVar varDB db

semanticsImpl :: VolatileDBEnv -> At CmdErr Concrete -> IO (At Resp Concrete)
Expand Down Expand Up @@ -588,7 +588,7 @@ test cmds = do
}

(hist, res, trace) <- bracket
(runWithTempRegistry (openDB args) >>= newMVar)
(openDB args (runWithTempRegistry . runnableInnerWithTempRegistry) >>= newMVar)
-- Note: we might be closing a different VolatileDB than the one we
-- opened, as we can reopen it the VolatileDB, swapping the VolatileDB
-- in the MVar.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ import Ouroboros.Consensus.Util.STM (Fingerprint (..),

import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
import qualified Ouroboros.Consensus.Storage.ChainDB.API as API
import Ouroboros.Consensus.Util.ResourceRegistry (runWithTempRegistry, allocate)
import Ouroboros.Consensus.Util.ResourceRegistry (allocate,
runInnerWithTempRegistry, runWithTempRegistry)

import Ouroboros.Consensus.Storage.ChainDB.Impl.Args (ChainDbArgs,
defaultArgs)
Expand All @@ -68,7 +69,9 @@ import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.Iterator as Iterator
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB as LgrDB
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.Query as Query
import Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import Ouroboros.Consensus.Storage.FS.API
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import Ouroboros.Consensus.Storage.ImmutableDB.Impl.State (cleanUp)
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -115,15 +118,15 @@ openDBInternal
-> Bool -- ^ 'True' = Launch background tasks
-> m (ChainDB m blk, Internal m blk)
openDBInternal args launchBgTasks = runWithTempRegistry $ do
immutableDB <- fst <$> ImmutableDB.openDB argsImmutableDb
immutableDB <- ImmutableDB.openDB argsImmutableDb (`runInnerWithTempRegistry` (const $ const True))
immutableDbTipPoint <- lift $ atomically $ ImmutableDB.getTipPoint immutableDB
let immutableDbTipChunk =
chunkIndexOfPoint (Args.cdbChunkInfo args) immutableDbTipPoint
lift $ traceWith tracer $
TraceOpenEvent $
OpenedImmutableDB immutableDbTipPoint immutableDbTipChunk

volatileDB <- fst <$> VolatileDB.openDB argsVolatileDb
volatileDB <- VolatileDB.openDB argsVolatileDb (`runInnerWithTempRegistry` (const $ const True))
lift $ traceWith tracer $ TraceOpenEvent OpenedVolatileDB
let lgrReplayTracer =
LgrDB.decorateReplayTracer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,31 +212,40 @@ deleteAfter = deleteAfter_
------------------------------------------------------------------------------}

openDB ::
forall m blk.
forall m blk ans.
( IOLike m
, GetPrevHash blk
, ConvertRawHash blk
, ImmutableDbSerialiseConstraints blk
, HasCallStack
)
=> ImmutableDbArgs Identity m blk
-> WithTempRegistry () m (ImmutableDB m blk, ())
openDB args = (,()) . fst . fst <$> openDBInternal args
-> (
(forall h.
WithTempRegistry (OpenState m blk h) m (ImmutableDB m blk, OpenState m blk h, OpenState m blk h, OpenState m blk h -> m Bool)
-> ans)
-> ans)
openDB args cont = openDBInternal args (cont . swizzle)
where swizzle w = w >>= \((a, _), c, d, e) -> return (a, c, d, e)

-- | For testing purposes: exposes internals via 'Internal'
--
--
openDBInternal ::
forall m blk.
forall m blk ans.
( IOLike m
, GetPrevHash blk
, ConvertRawHash blk
, ImmutableDbSerialiseConstraints blk
, HasCallStack
)
=> ImmutableDbArgs Identity m blk
-> WithTempRegistry () m ((ImmutableDB m blk, Internal m blk), ())
openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } = do
-> (
(forall h.
WithTempRegistry (OpenState m blk h) m ((ImmutableDB m blk, Internal m blk), OpenState m blk h, OpenState m blk h, OpenState m blk h -> m Bool)
-> ans)
-> ans)
openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } cont = cont $ do
lift $ createDirectoryIfMissing hasFS True (mkFsPath [])
let validateEnv = ValidateEnv {
hasFS = hasFS
Expand Down Expand Up @@ -274,7 +283,7 @@ openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } = do
-- Note that we can still leak resources if the caller of
-- 'openDBInternal' doesn't bracket his call with 'closeDB' or doesn't
-- use a 'ResourceRegistry'.
return ((db, internal), ())
return ((db, internal), ost, ost, (>> return True) . cleanUp hasFS)

closeDBImpl ::
forall m blk. (HasCallStack, IOLike m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ validateAndReopen ::
=> ValidateEnv m blk h
-> ResourceRegistry m
-> ValidationPolicy
-> WithTempRegistry () m (OpenState m blk h)
-> WithTempRegistry (OpenState m blk h) m (OpenState m blk h)
validateAndReopen validateEnv registry valPol = wrapFsError (Proxy @blk) $ do
(chunk, tip) <- lift $ validate validateEnv valPol
index <- lift $ cachedIndex
Expand All @@ -98,14 +98,10 @@ validateAndReopen validateEnv registry valPol = wrapFsError (Proxy @blk) $ do
case tip of
Origin -> assert (chunk == firstChunkNo) $ do
lift $ traceWith tracer NoValidLastLocation
allocateInRegistryBeforeReturning (mkOpenState hasFS index chunk Origin MustBeNew)
(\o -> cleanUp hasFS o >> return True)
(const $ const True)
mkOpenState hasFS index chunk Origin MustBeNew
NotOrigin tip' -> do
lift $ traceWith tracer $ ValidatedLastLocation chunk tip'
allocateInRegistryBeforeReturning (mkOpenState hasFS index chunk tip AllowExisting)
(\o -> cleanUp hasFS o >> return True)
(const $ const True)
mkOpenState hasFS index chunk tip AllowExisting
where
ValidateEnv { hasFS, tracer, cacheConfig, chunkInfo } = validateEnv
cacheTracer = contramap TraceCacheEvent tracer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,27 +187,28 @@ type VolatileDbSerialiseConstraints blk =
)

openDB ::
forall m blk.
forall m blk ans.
( HasCallStack
, IOLike m
, HasHeader blk
, GetPrevHash blk
, VolatileDbSerialiseConstraints blk
)
=> VolatileDbArgs Identity m blk
-> WithTempRegistry () m (VolatileDB m blk, ())
openDB VolatileDbArgs { volHasFS = SomeHasFS hasFS, .. } = do
-> (
(forall h.
WithTempRegistry (OpenState blk h) m (VolatileDB m blk, OpenState blk h, OpenState blk h, OpenState blk h -> m Bool)
-> ans)
-> ans)
openDB VolatileDbArgs { volHasFS = SomeHasFS hasFS, .. } cont = cont $ do
lift $ createDirectoryIfMissing hasFS True (mkFsPath [])
ost <- allocateInRegistryBeforeReturning
(mkOpenState
ost <- mkOpenState
volCodecConfig
hasFS
volCheckIntegrity
volValidationPolicy
volTracer
volMaxBlocksPerFile)
(\o -> closeOpenHandles hasFS o >> return True)
(const $ const True)
volMaxBlocksPerFile
stVar <- lift $ RAWLock.new (DbOpen ost)
let env = VolatileDBEnv {
hasFS = hasFS
Expand All @@ -226,7 +227,7 @@ openDB VolatileDbArgs { volHasFS = SomeHasFS hasFS, .. } = do
, getBlockInfo = getBlockInfoImpl env
, getMaxSlotNo = getMaxSlotNoImpl env
}
return (volatileDB, ())
return (volatileDB, ost, ost, (>> return True) . closeOpenHandles hasFS)

{------------------------------------------------------------------------------
VolatileDB API
Expand Down
113 changes: 68 additions & 45 deletions ouroboros-consensus/src/Ouroboros/Consensus/Util/ResourceRegistry.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ module Ouroboros.Consensus.Util.ResourceRegistry (
, TempRegistryException (..)
, allocateTemp
, modifyWithTempRegistry
, runInnerWithTempRegistry
, runWithTempRegistry
, allocateInRegistryBeforeReturning
, runnableInnerWithTempRegistry
-- ** opaque
, WithTempRegistry
-- * Combinators primarily for testing
Expand Down Expand Up @@ -765,6 +766,69 @@ runWithTempRegistry m = withRegistry $ \rr -> do
}
return a

-- | Embed a self-contained 'WithTempRegistry' computation into a larger one.
--
-- The internal 'WithTempRegistry' is effectively passed to
-- 'runWithTempRegistry'. It therefore must have no dangling resources, for
-- example. This is the meaning of /self-contained/ above.
--
-- The key difference beyond 'runWithTempRegistry' is that the resulting
-- composite resource is also guaranteed to be registered in the outer
-- 'WithTempRegistry' computation's registry once the inner registry is closed.
-- Combined with the following assumption, this establishes the invariant that
-- all resources are (transitively) in a temporary registry.
--
-- As the resource might require some implementation details to be closed, the
-- function to close it will also be provided by the inner computation.
--
-- ASSUMPTION: closing @res@ closes every resource contained in @inner_st@
--
-- NOTE: In the current implementation, there will be a brief moment where the
-- inner registry still contains the inner computation's resources and also the
-- outer registry simultaneously contains the new composite resource. If an async
-- is received at that time, then the inner resources will be closed and then
-- the composite resource will be later closed. This means there's a risk of
-- /double freeing/, which can be harmless if anticipated.
runInnerWithTempRegistry
:: forall inner_st st m res a. IOLike m
=> WithTempRegistry inner_st m (a, inner_st, res, res -> m Bool)
-- ^ The embedded computation; see ASSUMPTION above
-> (st -> res -> Bool)
-- ^ How to check; same as for 'allocateTemp'
-> WithTempRegistry st m a
runInnerWithTempRegistry inner isTransferred = do
outerTR <- WithTempRegistry ask

lift $ runWithTempRegistry $ do
(a, inner_st, res, free) <- inner

-- allocate in the outer layer
_ <- withFixedTempRegistry outerTR
$ allocateTemp (return res) free isTransferred

-- TODO This point here is where an async exception could cause both the
-- inner resources to be closed and the outer resource to be closed later
--
-- If we want to do better than that, we'll need a variant of
-- 'runWithTempRegistry' that lets us perform some action with async
-- exceptions masked "at the same time" it closes its registry.

pure (a, inner_st)
where
withFixedTempRegistry env (WithTempRegistry (ReaderT f)) =
WithTempRegistry $ ReaderT $ \_ -> f env

-- | Convenience function.
--
-- When a @WithTempRegistry@ computation has the shape expected by
-- @runInnerWithTempRegistry@, but instead it is run directly, some of the
-- returned values have to be omitted.
runnableInnerWithTempRegistry
:: Monad m
=> WithTempRegistry st m (a, st, st, st -> m Bool)
-> WithTempRegistry st m (a, st)
runnableInnerWithTempRegistry = (>>= \(a,b,_,_) -> return (a,b))

-- | When 'runWithTempRegistry' exits successfully while there are still
-- resources remaining in the temporary registry that haven't been transferred
-- to the final state.
Expand Down Expand Up @@ -811,28 +875,6 @@ instance MonadTrans (WithTempRegistry st) where
instance MonadState s m => MonadState s (WithTempRegistry st m) where
state = WithTempRegistry . state

-- | When we make use of temporary registries sometimes the resulting state
-- might not be the final state in which the resources should be allocate, as
-- the context on which we run this subcomputation is part of a broader context
-- where a registry is keeping track of the resources. If there are intermediate
-- steps and the resulting state from this computation will eventually be
-- tracked by the higher registry but this is not the case yet, this combinator
-- allows you to make use of the broader registry. -- TODO @js needs rewording.
allocateInRegistryBeforeReturning
:: IOLike m
=> WithTempRegistry st m st -- ^ The original computation
-> (st -> m Bool) -- ^ The closer function
-> (st' -> st -> Bool) -- ^ Check that the value is stored in the new state
-> WithTempRegistry st' m st
allocateInRegistryBeforeReturning w closer checker =
WithTempRegistry
$ ReaderT
$ \r1 -> runWithTempRegistry
$ do
a <- w
_ <- lift $ allocateTemp' r1 (return a) closer checker
return (a,a)

-- | Untrack all the resources from the registry that have been transferred to
-- the given state.
--
Expand Down Expand Up @@ -866,28 +908,9 @@ allocateTemp
-> (st -> a -> Bool)
-- ^ Check whether the resource is in the given state
-> WithTempRegistry st m a
allocateTemp alloc free isTransferred = WithTempRegistry $
ReaderT $ \treg -> allocateTemp' treg alloc free isTransferred

-- | Allocate a resource in a temporary registry until it has been transferred
-- to the final state @st@, making the temporal registry explicit. See
-- 'runWithTempRegistry' for more details.
allocateTemp'
:: (IOLike m, HasCallStack)
=> TempRegistry st m
-- ^ The TempRegistry made explicit
-> m a
-- ^ Allocate the resource
-> (a -> m Bool)
-- ^ Release the resource, return 'True' when the resource was actually
-- released, return 'False' when the resource was already released.
--
-- Note that it is safe to always return 'True' when unsure.
-> (st -> a -> Bool)
-- ^ Check whether the resource is in the given state
-> m a
allocateTemp' (TempRegistry rr varTransferredTo) alloc free isTransferred = do
(key, a) <- fmap mustBeRight $
allocateTemp alloc free isTransferred = WithTempRegistry $ do
TempRegistry rr varTransferredTo <- ask
(key, a) <- lift $ fmap mustBeRight $
allocateEither rr (fmap Right . const alloc) free
atomically $ modifyTVar varTransferredTo $ mappend $
TransferredTo $ \st ->
Expand Down

0 comments on commit a794cea

Please sign in to comment.