Skip to content

Commit

Permalink
Split openDB in alloc and init steps.
Browse files Browse the repository at this point in the history
This also creates a child registry specific to the ChainDB where everything will
be allocated and that will keep track of every resource in the ChainDB.
  • Loading branch information
jasagredo committed Nov 1, 2021
1 parent 5743fdf commit 89ed0fe
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ type TestFollower m blk = WithEq (Follower m blk (AllComponents blk))
data ChainDBState m blk = ChainDBState
{ chainDB :: ChainDB m blk
, internal :: ChainDB.Internal m blk
, addBlockAsync :: Async m Void
, addBlockAsync :: Thread m Void
-- ^ Background thread that adds blocks to the ChainDB
}
deriving NoThunks via AllowThunk (ChainDBState m blk)
Expand All @@ -319,21 +319,21 @@ open
=> ChainDbArgs Identity m blk -> m (ChainDBState m blk)
open args = do
(chainDB, internal) <- openDBInternal args False
addBlockAsync <- async (intAddBlockRunner internal)
link addBlockAsync
addBlockAsync <- forkThreadAndLink (cdbRegistry args) "Add block runner" (intAddBlockRunner internal)
return ChainDBState { chainDB, internal, addBlockAsync }

-- PRECONDITION: the ChainDB is closed
reopen
:: (IOLike m, TestConstraints blk)
=> ChainDBEnv m blk -> m ()
reopen ChainDBEnv { varDB, args } = do
chainDBState <- open args
registry <- unsafeNewRegistry
chainDBState <- open args { cdbRegistry = registry }
void $ swapMVar varDB chainDBState

close :: IOLike m => ChainDBState m blk -> m ()
close ChainDBState { chainDB, addBlockAsync } = do
cancel addBlockAsync
cancelThread addBlockAsync
closeDB chainDB

run :: forall m blk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import Test.Util.QuickCheck
import Test.Util.SOP
import Test.Util.Tracer (recordingTracerIORef)

import Ouroboros.Consensus.Util.ResourceRegistry (unsafeNewRegistry)
import Test.Ouroboros.Storage.Orphans ()
import Test.Ouroboros.Storage.TestBlock
import Test.Ouroboros.Storage.VolatileDB.Model
Expand Down Expand Up @@ -575,6 +576,7 @@ test cmds = do
varErrors <- uncheckedNewTVarM mempty
varFs <- uncheckedNewTVarM Mock.empty
(tracer, getTrace) <- recordingTracerIORef
registry <- unsafeNewRegistry

let hasFS = mkSimErrorHasFS varFs varErrors
args = VolatileDbArgs {
Expand All @@ -584,6 +586,7 @@ test cmds = do
, volMaxBlocksPerFile = testMaxBlocksPerFile
, volTracer = tracer
, volValidationPolicy = ValidateAll
, volRegistry = registry
}

(hist, res, trace) <- bracket
Expand Down
7 changes: 2 additions & 5 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,8 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
, ChainDB.cdbVolatileDbValidation = ValidateAll
}

(_, chainDB) <- allocate registry
(\_ -> openChainDB
registry inFuture cfg initLedger
llrnChainDbArgsDefaults customiseChainDbArgs')
ChainDB.closeDB
(_, chainDBRegistry) <- createChildRegistry registry
chainDB <- openChainDB chainDBRegistry inFuture cfg initLedger llrnChainDbArgsDefaults customiseChainDbArgs'

btime <-
hardForkBlockchainTime $
Expand Down
27 changes: 16 additions & 11 deletions ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.Query as Query
import Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry,
allocate, closeRegistry)

{-------------------------------------------------------------------------------
Initialization
Expand Down Expand Up @@ -123,6 +125,7 @@ openDBInternal args launchBgTasks = do

volatileDB <- VolatileDB.openDB argsVolatileDb
traceWith tracer $ TraceOpenEvent OpenedVolatileDB

let lgrReplayTracer =
LgrDB.decorateReplayTracer
immutableDbTipPoint
Expand All @@ -136,7 +139,6 @@ openDBInternal args launchBgTasks = do
varInvalid <- newTVarIO (WithFingerprint Map.empty (Fingerprint 0))
varFutureBlocks <- newTVarIO Map.empty


chainAndLedger <- ChainSel.initialChainSelection
immutableDB
volatileDB
Expand All @@ -156,7 +158,7 @@ openDBInternal args launchBgTasks = do
varIterators <- newTVarIO Map.empty
varFollowers <- newTVarIO Map.empty
varNextIteratorKey <- newTVarIO (IteratorKey 0)
varNextFollowerKey <- newTVarIO (FollowerKey 0)
varNextFollowerKey <- newTVarIO (FollowerKey 0)
varCopyLock <- newMVar ()
varKillBgThreads <- newTVarIO $ return ()
blocksToAdd <- newBlocksToAdd (Args.cdbBlocksToAddSize args)
Expand All @@ -174,7 +176,7 @@ openDBInternal args launchBgTasks = do
, cdbCopyLock = varCopyLock
, cdbTracer = tracer
, cdbTraceLedger = Args.cdbTraceLedger args
, cdbRegistry = Args.cdbRegistry args
, cdbRegistry = Args.cdbsRegistry argsChainDb
, cdbGcDelay = Args.cdbGcDelay args
, cdbGcInterval = Args.cdbGcInterval args
, cdbKillBgThreads = varKillBgThreads
Expand All @@ -184,7 +186,9 @@ openDBInternal args launchBgTasks = do
, cdbBlocksToAdd = blocksToAdd
, cdbFutureBlocks = varFutureBlocks
}

h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env

let chainDB = API.ChainDB
{ addBlockAsync = getEnv1 h ChainSel.addBlockAsync
, getCurrentChain = getEnvSTM h Query.getCurrentChain
Expand All @@ -199,7 +203,7 @@ openDBInternal args launchBgTasks = do
, stream = Iterator.stream h
, newFollower = Follower.newFollower h
, getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock
, closeDB = closeDB h
, closeDB = closeDB h $ Args.cdbsRegistry argsChainDb
, isOpen = isOpen h
}
testing = Internal
Expand All @@ -216,10 +220,12 @@ openDBInternal args launchBgTasks = do

when launchBgTasks $ Background.launchBgTasks env replayed

_ <- allocate (Args.cdbsRegistry argsChainDb) (const $ return h) (`closeDB` Args.cdbsRegistry argsChainDb)

return (chainDB, testing)
where
tracer = Args.cdbTracer args
(argsImmutableDb, argsVolatileDb, argsLgrDb, _) = Args.fromChainDbArgs args
(argsImmutableDb, argsVolatileDb, argsLgrDb, argsChainDb) = Args.fromChainDbArgs args

isOpen :: IOLike m => ChainDbHandle m blk -> STM m Bool
isOpen (CDBHandle varState) = readTVar varState <&> \case
Expand All @@ -232,8 +238,8 @@ closeDB
, HasHeader (Header blk)
, HasCallStack
)
=> ChainDbHandle m blk -> m ()
closeDB (CDBHandle varState) = do
=> ChainDbHandle m blk -> ResourceRegistry m -> m ()
closeDB (CDBHandle varState) registry = do
mbOpenEnv <- atomically $ readTVar varState >>= \case
-- Idempotent
ChainDbClosed -> return Nothing
Expand All @@ -247,11 +253,10 @@ closeDB (CDBHandle varState) = do
Follower.closeAllFollowers cdb
Iterator.closeAllIterators cdb

killBgThreads <- atomically $ readTVar cdbKillBgThreads
killBgThreads

ImmutableDB.closeDB cdbImmutableDB
VolatileDB.closeDB cdbVolatileDB
ImmutableDB.closeDB cdbImmutableDB

closeRegistry registry

chain <- atomically $ readTVar cdbChain

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ data ChainDbSpecificArgs f m blk = ChainDbSpecificArgs {
-- ^ Batch all scheduled GCs so that at most one GC happens every
-- 'cdbsGcInterval'.
, cdbsRegistry :: HKD f (ResourceRegistry m)
-- ^ TODO: the ImmutableDB takes a 'ResourceRegistry' too, but we're
-- using it for ChainDB-specific things. Revisit these arguments.
, cdbsTracer :: Tracer m (TraceEvent blk)
}

Expand Down Expand Up @@ -178,6 +176,7 @@ fromChainDbArgs ChainDbArgs{..} = (
, volMaxBlocksPerFile = cdbMaxBlocksPerFile
, volValidationPolicy = cdbVolatileDbValidation
, volTracer = contramap TraceVolatileDBEvent cdbTracer
, volRegistry = cdbRegistry
}
, LgrDB.LgrDbArgs {
lgrTopLevelConfig = cdbTopLevelConfig
Expand Down Expand Up @@ -230,7 +229,7 @@ toChainDbArgs ImmutableDB.ImmutableDbArgs {..}
-- Misc
, cdbTracer = cdbsTracer
, cdbTraceLedger = lgrTraceLedger
, cdbRegistry = immRegistry
, cdbRegistry = cdbsRegistry
, cdbGcDelay = cdbsGcDelay
, cdbGcInterval = cdbsGcInterval
, cdbBlocksToAddSize = cdbsBlocksToAddSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ import Ouroboros.Consensus.Block hiding (headerHash)
import Ouroboros.Consensus.Util (SomePair (..))
import Ouroboros.Consensus.Util.Args
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry,
runWithTempRegistry)
import Ouroboros.Consensus.Util.ResourceRegistry
(HandleInRegistry (hirHandle), ResourceRegistry)

import Ouroboros.Consensus.Storage.Common
import Ouroboros.Consensus.Storage.FS.API
Expand Down Expand Up @@ -236,8 +236,9 @@ openDBInternal ::
)
=> ImmutableDbArgs Identity m blk
-> m (ImmutableDB m blk, Internal m blk)
openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } = runWithTempRegistry $ do
lift $ createDirectoryIfMissing hasFS True (mkFsPath [])
openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } = do
createDirectoryIfMissing hasFS True (mkFsPath [])

let validateEnv = ValidateEnv {
hasFS = hasFS
, chunkInfo = immChunkInfo
Expand All @@ -246,9 +247,7 @@ openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } = runWithTempR
, codecConfig = immCodecConfig
, checkIntegrity = immCheckIntegrity
}
ost <- validateAndReopen validateEnv immRegistry immValidationPolicy

stVar <- lift $ newMVar (DbOpen ost)
stVar <- newMVar . DbOpen =<< validateAndReopen validateEnv immRegistry immValidationPolicy

let dbEnv = ImmutableDBEnv {
hasFS = hasFS
Expand All @@ -270,29 +269,27 @@ openDBInternal ImmutableDbArgs { immHasFS = SomeHasFS hasFS, .. } = runWithTempR
internal = Internal {
deleteAfter_ = deleteAfterImpl dbEnv
}
-- TODO move 'withTempResourceRegistry' outside of this function?
--
-- Note that we can still leak resources if the caller of
-- 'openDBInternal' doesn't bracket his call with 'closeDB' or doesn't
-- use a 'ResourceRegistry'.
return ((db, internal), ost)

traceWith immTracer DBOpened

return (db, internal)

closeDBImpl ::
forall m blk. (HasCallStack, IOLike m)
=> ImmutableDBEnv m blk
-> m ()
closeDBImpl ImmutableDBEnv { hasFS, tracer, varInternalState } = do
closeDBImpl ImmutableDBEnv { tracer, varInternalState } = do
internalState <- takeMVar varInternalState
case internalState of
-- Already closed
DbClosed -> do
putMVar varInternalState internalState
traceWith tracer $ DBAlreadyClosed
traceWith tracer DBAlreadyClosed
DbOpen openState -> do
-- Close the database before doing the file-system operations so that
-- in case these fail, we don't leave the database open.
putMVar varInternalState DbClosed
cleanUp hasFS openState
cleanUp openState
traceWith tracer DBClosed

deleteAfterImpl ::
Expand All @@ -303,21 +300,21 @@ deleteAfterImpl ::
deleteAfterImpl dbEnv@ImmutableDBEnv { tracer, chunkInfo } newTip =
-- We're not using 'Index' in this function but truncating the index files
-- directly.
modifyOpenState dbEnv $ \hasFS -> do
modifyOpenState dbEnv $ \hasFS registry -> do
st@OpenState { currentIndex, currentTip } <- get

when ((CompareTip <$> newTip) < (CompareTip <$> currentTip)) $ do
lift $ lift $ do
lift $ do
traceWith tracer $ DeletingAfter newTip
-- Release the open handles, as we might have to remove files that are
-- currently opened.
cleanUp hasFS st
cleanUp st
truncateTo hasFS st newTipChunkSlot
-- Reset the index, as it can contain stale information. Also restarts
-- the background thread expiring unused past chunks.
Index.restart currentIndex newChunk

ost <- lift $ mkOpenState hasFS currentIndex newChunk newTip allowExisting
ost <- lift $ mkOpenState hasFS registry currentIndex newChunk newTip allowExisting Nothing
put ost
where
newTipChunkSlot :: WithOrigin ChunkSlot
Expand Down Expand Up @@ -438,7 +435,7 @@ appendBlockImpl ::
-> blk
-> m ()
appendBlockImpl dbEnv blk =
modifyOpenState dbEnv $ \hasFS -> do
modifyOpenState dbEnv $ \hasFS registry -> do
OpenState {
currentTip = initialTip
, currentIndex = index
Expand All @@ -461,7 +458,7 @@ appendBlockImpl dbEnv blk =
let newChunksToStart :: Int
newChunksToStart = fromIntegral $ countChunks chunk initialChunk
replicateM_ newChunksToStart $
startNewChunk hasFS index chunkInfo initialChunk
startNewChunk hasFS index chunkInfo initialChunk registry

-- We may have updated the state with 'startNewChunk', so get the
-- (possibly) updated state.
Expand Down Expand Up @@ -490,11 +487,11 @@ appendBlockImpl dbEnv blk =
chunkSlotForTip chunkInfo tip

-- Append to the end of the chunk file.
(blockSize, entrySize) <- lift $ lift $ do
(blockSize, entrySize) <- lift $ do

-- Write to the chunk file
let bytes = CBOR.toLazyByteString $ encodeDisk codecConfig blk
(blockSize, crc) <- hPutAllCRC hasFS currentChunkHandle bytes
(blockSize, crc) <- hPutAllCRC hasFS (hirHandle currentChunkHandle) bytes

-- Write to the secondary index file
let entry = Secondary.Entry {
Expand All @@ -510,7 +507,7 @@ appendBlockImpl dbEnv blk =
Index.appendEntry
index
chunk
currentSecondaryHandle
(hirHandle currentSecondaryHandle)
(WithBlockSize (fromIntegral blockSize) entry)

-- Write to the primary index file
Expand All @@ -520,7 +517,7 @@ appendBlockImpl dbEnv blk =
nextFreeRelSlot
currentSecondaryOffset
offsets = backfillOffsets <> [currentSecondaryOffset + entrySize]
Index.appendOffsets index currentPrimaryHandle offsets
Index.appendOffsets index (hirHandle currentPrimaryHandle) offsets

return (blockSize, entrySize)

Expand Down Expand Up @@ -548,14 +545,14 @@ appendBlockImpl dbEnv blk =
BinaryBlockInfo {..} = getBinaryBlockInfo blk

startNewChunk ::
forall m h blk. (HasCallStack, IOLike m, Eq h)
forall m h blk. (HasCallStack, IOLike m)
=> HasFS m h
-> Index m blk h
-> ChunkInfo
-> ChunkNo -- ^ Chunk containing the tip
-> ModifyOpenState m blk h ()
startNewChunk hasFS index chunkInfo tipChunk = do
st@OpenState {..} <- get
startNewChunk hasFS index chunkInfo tipChunk registry = do
OpenState {..} <- get

-- We have to take care when starting multiple new chunks in a row. In the
-- first call the tip will be in the current chunk, but in subsequent
Expand All @@ -581,11 +578,11 @@ startNewChunk hasFS index chunkInfo tipChunk = do
nextFreeRelSlot
currentSecondaryOffset

lift $ lift $
Index.appendOffsets index currentPrimaryHandle backfillOffsets
`finally` closeOpenHandles hasFS st
lift $
Index.appendOffsets index (hirHandle currentPrimaryHandle) backfillOffsets

st' <- lift $
mkOpenState hasFS index (nextChunkNo currentChunk) currentTip MustBeNew
mkOpenState hasFS registry index (nextChunkNo currentChunk) currentTip MustBeNew $
Just (currentChunkHandle, currentPrimaryHandle, currentSecondaryHandle)

put st'
Loading

0 comments on commit 89ed0fe

Please sign in to comment.