Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the Cached index using TVars #1197

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ouroboros-consensus/changelog.d/js-tvar-cache.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Non-Breaking

- Cached index is now based on `TVar`s instead of `MVar`s ensuring proper updates
to the current chunk cached information.
Original file line number Diff line number Diff line change
Expand Up @@ -263,24 +263,25 @@ addPastChunkInfo chunk lastUsed pastChunkInfo cached =
evictIfNecessary ::
Word32 -- ^ Maximum number of past chunks to cache
-> Cached blk
-> (Cached blk, Maybe ChunkNo)
-> (Maybe ChunkNo, Cached blk)
evictIfNecessary maxNbPastChunks cached
jasagredo marked this conversation as resolved.
Show resolved Hide resolved
| nbPastChunks > maxNbPastChunks
= assert (nbPastChunks == maxNbPastChunks + 1) $
case PSQ.minView pastChunksInfo of
Nothing -> error
"nbPastChunks > maxNbPastChunks but pastChunksInfo was empty"
Just (chunkNo, _p, _v, pastChunksInfo') ->
(cached', Just $ chunkNoFromInt chunkNo)
(Just $ chunkNoFromInt chunkNo, cached')
where
cached' = cached
{ nbPastChunks = maxNbPastChunks
, pastChunksInfo = pastChunksInfo'
}
| otherwise
= (cached, Nothing)
= (Nothing, cached)
where
Cached { nbPastChunks, pastChunksInfo } = cached

-- NOTE: we must inline 'evictIfNecessary' otherwise we get unexplained thunks
-- in 'Cached' and thus a space leak. Alternatively, we could disable the
-- @-fstrictness@ optimisation (enabled by default for -O1).
Expand Down Expand Up @@ -354,7 +355,7 @@ data CacheEnv m blk h = CacheEnv
{ hasFS :: HasFS m h
, registry :: ResourceRegistry m
, tracer :: Tracer m TraceCacheEvent
, cacheVar :: StrictMVar m (Cached blk)
, cacheVar :: StrictTVar m (Cached blk)
, cacheConfig :: CacheConfig
, bgThreadVar :: StrictMVar m (Maybe (Thread m Void))
-- ^ Nothing if no thread running
Expand Down Expand Up @@ -382,9 +383,8 @@ newEnv ::
newEnv hasFS registry tracer cacheConfig chunkInfo chunk = do
when (pastChunksToCache == 0) $
error "pastChunksToCache must be > 0"

currentChunkInfo <- loadCurrentChunkInfo hasFS chunkInfo chunk
cacheVar <- newMVarWithInvariants $ emptyCached chunk currentChunkInfo
cacheVar <- newTVarWithInvariants $ emptyCached chunk currentChunkInfo
bgThreadVar <- newMVar Nothing
let cacheEnv = CacheEnv {..}
mask_ $ modifyMVar_ bgThreadVar $ \_mustBeNothing -> do
Expand All @@ -397,8 +397,8 @@ newEnv hasFS registry tracer cacheConfig chunkInfo chunk = do

-- When checking invariants, check both our invariants and for thunks.
-- Note that this is only done when the corresponding flag is enabled.
newMVarWithInvariants =
newMVarWithInvariant $ checkInvariants pastChunksToCache
newTVarWithInvariants =
atomically . newTVarWithInvariant (checkInvariants pastChunksToCache)

{------------------------------------------------------------------------------
Background thread
Expand All @@ -409,13 +409,14 @@ newEnv hasFS registry tracer cacheConfig chunkInfo chunk = do
-- Will expire past chunks that haven't been used for 'expireUnusedAfter' from
-- the cache.
expireUnusedChunks ::
forall h m blk.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary tyvar bindings?

(HasCallStack, IOLike m)
=> CacheEnv m blk h
-> m Void
expireUnusedChunks CacheEnv { cacheVar, cacheConfig, tracer } =
forever $ do
now <- getMonotonicTime
mbTraceMsg <- modifyMVar cacheVar $ pure . garbageCollect now
mbTraceMsg <- atomically $ stateTVar cacheVar $ garbageCollect now
mapM_ (traceWith tracer) mbTraceMsg
threadDelay expireUnusedAfter
where
Expand All @@ -430,11 +431,11 @@ expireUnusedChunks CacheEnv { cacheVar, cacheConfig, tracer } =
garbageCollect
:: Time
-> Cached blk
-> (Cached blk, Maybe TraceCacheEvent)
-> (Maybe TraceCacheEvent, Cached blk)
garbageCollect now cached@Cached { pastChunksInfo, nbPastChunks } =
case expiredPastChunks of
[] -> (cached, Nothing)
_ -> (cached', Just traceMsg)
case expiredPastChunks of
[] -> (Nothing, cached)
_ -> (Just traceMsg, cached')
where
-- Every past chunk last used before (or at) this time, must be
-- expired.
Expand Down Expand Up @@ -573,27 +574,21 @@ getChunkInfo ::
-> m (Either (CurrentChunkInfo blk) (PastChunkInfo blk))
getChunkInfo cacheEnv chunk = do
lastUsed <- LastUsed <$> getMonotonicTime
-- Make sure we don't leave an empty MVar in case of an exception.
mbCacheHit <- bracketOnError (takeMVar cacheVar) (tryPutMVar cacheVar) $
(mbCacheHit, tr) <- atomically $ stateTVar cacheVar $
\cached@Cached { currentChunk, currentChunkInfo, nbPastChunks } -> if
| chunk == currentChunk -> do
| chunk == currentChunk ->
-- Cache hit for the current chunk
putMVar cacheVar cached
traceWith tracer $ TraceCurrentChunkHit chunk nbPastChunks
return $ Just $ Left currentChunkInfo
| Just (pastChunkInfo, cached') <- lookupPastChunkInfo chunk lastUsed cached -> do
((Just $ Left currentChunkInfo, TraceCurrentChunkHit chunk nbPastChunks), cached)
| Just (pastChunkInfo, cached') <- lookupPastChunkInfo chunk lastUsed cached ->
-- Cache hit for an chunk in the past
putMVar cacheVar cached'
traceWith tracer $ TracePastChunkHit chunk nbPastChunks
return $ Just $ Right pastChunkInfo
| otherwise -> do
((Just $ Right pastChunkInfo, TracePastChunkHit chunk nbPastChunks), cached')
| otherwise ->
-- Cache miss for an chunk in the past. We don't want to hold on to
-- the 'cacheVar' MVar, blocking all other access to the cace, while
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- the 'cacheVar' MVar, blocking all other access to the cace, while
-- the 'cacheVar' TVar, blocking all other access to the cace, while

-- we're reading things from disk, so put it back now and update the
-- cache afterwards.
putMVar cacheVar cached
traceWith tracer $ TracePastChunkMiss chunk nbPastChunks
return Nothing
((Nothing, TracePastChunkMiss chunk nbPastChunks), cached)
traceWith tracer tr
case mbCacheHit of
Just hit -> return hit
Nothing -> do
Expand All @@ -602,10 +597,9 @@ getChunkInfo cacheEnv chunk = do
-- Loading the chunk might have taken some time, so obtain the time
-- again.
lastUsed' <- LastUsed <$> getMonotonicTime
mbEvicted <- modifyMVar cacheVar $
pure .
evictIfNecessary pastChunksToCache .
addPastChunkInfo chunk lastUsed' pastChunkInfo
mbEvicted <- atomically $ stateTVar cacheVar $
evictIfNecessary pastChunksToCache
. addPastChunkInfo chunk lastUsed' pastChunkInfo
whenJust mbEvicted $ \evicted ->
-- If we had to evict, we are at 'pastChunksToCache'
traceWith tracer $ TracePastChunkEvict evicted pastChunksToCache
Expand Down Expand Up @@ -639,7 +633,7 @@ restart ::
-> m ()
restart cacheEnv chunk = do
currentChunkInfo <- loadCurrentChunkInfo hasFS chunkInfo chunk
void $ swapMVar cacheVar $ emptyCached chunk currentChunkInfo
void $ atomically $ swapTVar cacheVar $ emptyCached chunk currentChunkInfo
mask_ $ modifyMVar_ bgThreadVar $ \mbBgThread ->
case mbBgThread of
Just _ -> throwIO $ userError "background thread still running"
Expand Down Expand Up @@ -744,10 +738,9 @@ openPrimaryIndex cacheEnv chunk allowExisting = do
newCurrentChunkInfo <- case allowExisting of
MustBeNew -> return $ emptyCurrentChunkInfo chunk
AllowExisting -> loadCurrentChunkInfo hasFS chunkInfo chunk
mbEvicted <- modifyMVar cacheVar $
pure .
evictIfNecessary pastChunksToCache .
openChunk chunk lastUsed newCurrentChunkInfo
mbEvicted <- atomically $ stateTVar cacheVar $
evictIfNecessary pastChunksToCache
. openChunk chunk lastUsed newCurrentChunkInfo
whenJust mbEvicted $ \evicted ->
-- If we had to evict, we are at 'pastChunksToCache'
traceWith tracer $ TracePastChunkEvict evicted pastChunksToCache
Expand All @@ -765,7 +758,7 @@ appendOffsets ::
-> m ()
appendOffsets CacheEnv { hasFS, cacheVar } pHnd offsets = do
Primary.appendOffsets hasFS pHnd offsets
modifyMVar_ cacheVar $ pure . addCurrentChunkOffsets
atomically $ modifyTVar cacheVar addCurrentChunkOffsets
where
-- Lenses would be nice here
addCurrentChunkOffsets :: Cached blk -> Cached blk
Expand Down Expand Up @@ -859,7 +852,7 @@ appendEntry ::
-> m Word64
appendEntry CacheEnv { hasFS, cacheVar } chunk sHnd entry = do
nbBytes <- Secondary.appendEntry hasFS sHnd (withoutBlockSize entry)
modifyMVar_ cacheVar $ pure . addCurrentChunkEntry
atomically $ modifyTVar cacheVar addCurrentChunkEntry
return nbBytes
where
-- Lenses would be nice here
Expand Down