From a3bfbc3b316d4907e315350efd19a49bdf06fb55 Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Thu, 21 Jan 2021 18:23:09 -0800 Subject: [PATCH 1/4] consensus: introduce Watcher, use instead of registry in ChainSync client The ChainSync client was using the registry to manage a thread that could simply be managed instead with a bracket. I cleaned up the `Ouroboros.Consensus.Util.STM` module by adding this `Watcher` abstraction in order to do just that. It is now up to the callee to decide if they want to spawn the `Watcher` via `ResourceRegistry` or via a bracket. --- .../src/Test/Util/LogicalClock.hs | 13 +- .../Ouroboros/Consensus/BlockchainTime/API.hs | 11 +- .../src/Ouroboros/Consensus/Mempool/Impl.hs | 14 +- .../MiniProtocol/ChainSync/Client.hs | 53 ++++---- .../src/Ouroboros/Consensus/Util/STM.hs | 122 ++++++++++++------ 5 files changed, 133 insertions(+), 80 deletions(-) diff --git a/ouroboros-consensus-test/src/Test/Util/LogicalClock.hs b/ouroboros-consensus-test/src/Test/Util/LogicalClock.hs index 529dfe9d1de..6d8eb6be698 100644 --- a/ouroboros-consensus-test/src/Test/Util/LogicalClock.hs +++ b/ouroboros-consensus-test/src/Test/Util/LogicalClock.hs @@ -93,13 +93,12 @@ onEachTick :: (IOLike m, HasCallStack) -> m (m ()) onEachTick registry clock threadLabel action = cancelThread <$> - onEachChange - registry - threadLabel - id - Nothing - (getCurrentTick clock) - action + forkLinkedWatcher registry threadLabel Watcher { + wFingerprint = id + , wInitial = Nothing + , wNotify = action + , wReader = getCurrentTick clock + } -- | Execute action once at the specified tick onTick :: (IOLike m, HasCallStack) diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime/API.hs b/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime/API.hs index 7944697b1e9..d8efd0682a9 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime/API.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime/API.hs @@ -19,7 +19,7 @@ import NoThunks.Class (OnlyCheckWhnfNamed (..)) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.ResourceRegistry -import Ouroboros.Consensus.Util.STM (onEachChange) +import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher) {------------------------------------------------------------------------------- API @@ -67,9 +67,14 @@ onKnownSlotChange :: forall m. (IOLike m, HasCallStack) -> String -- ^ Label for the thread -> (SlotNo -> m ()) -- ^ Action to execute -> m (m ()) -onKnownSlotChange registry btime label = +onKnownSlotChange registry btime label notify = fmap cancelThread - . onEachChange registry label id Nothing getCurrentSlot' + $ forkLinkedWatcher registry label Watcher { + wFingerprint = id + , wInitial = Nothing + , wNotify = notify + , wReader = getCurrentSlot' + } where getCurrentSlot' :: STM m SlotNo getCurrentSlot' = do diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/Impl.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/Impl.hs index ca733b3e525..2ec62123aa5 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/Impl.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Mempool/Impl.hs @@ -44,7 +44,7 @@ import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq import Ouroboros.Consensus.Util (repeatedly) import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.ResourceRegistry -import Ouroboros.Consensus.Util.STM (onEachChange) +import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher) {------------------------------------------------------------------------------- Top-level API @@ -280,13 +280,15 @@ forkSyncStateOnTipPointChange :: forall m blk. ( -> MempoolEnv m blk -> m () forkSyncStateOnTipPointChange registry menv = - void $ onEachChange + void $ forkLinkedWatcher registry "Mempool.syncStateOnTipPointChange" - id - Nothing - getCurrentTip - action + Watcher { + wFingerprint = id + , wInitial = Nothing + , wNotify = action + , wReader = getCurrentTip + } where action :: Point blk -> m () action _tipPoint = void $ implSyncWithLedger menv diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index 98d58647f52..31a82a11245 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -73,9 +73,8 @@ import Ouroboros.Consensus.Protocol.Abstract import Ouroboros.Consensus.Util import Ouroboros.Consensus.Util.Assert (assertWithMsg) import Ouroboros.Consensus.Util.IOLike -import Ouroboros.Consensus.Util.ResourceRegistry -import Ouroboros.Consensus.Util.STM (WithFingerprint (..), - onEachChange) +import Ouroboros.Consensus.Util.STM (Fingerprint, Watcher (..), + WithFingerprint (..), withWatcher) import Ouroboros.Consensus.Storage.ChainDB (ChainDB, InvalidBlockReason) @@ -129,23 +128,27 @@ bracketChainSyncClient -> m a bracketChainSyncClient tracer ChainDbView { getIsInvalidBlock } varCandidates peer body = - withRegistry $ \registry -> - bracket register unregister $ \varCandidate -> do - rejectInvalidBlocks - tracer - registry - getIsInvalidBlock - (readTVar varCandidate) - body varCandidate + bracket newCandidateVar releaseCandidateVar + $ \varCandidate -> + withWatcher + "ChainSync.Client.rejectInvalidBlocks" + (invalidBlockWatcher varCandidate) + $ body varCandidate where - register = do + newCandidateVar = do varCandidate <- newTVarIO $ AF.Empty AF.AnchorGenesis atomically $ modifyTVar varCandidates $ Map.insert peer varCandidate return varCandidate - unregister _ = do + releaseCandidateVar _ = do atomically $ modifyTVar varCandidates $ Map.delete peer + invalidBlockWatcher varCandidate = + invalidBlockRejector + tracer + getIsInvalidBlock + (readTVar varCandidate) + -- Our task: after connecting to an upstream node, try to maintain an -- up-to-date header-only fragment representing their chain. We maintain -- such candidate chains in a map with upstream nodes as keys. @@ -960,32 +963,30 @@ attemptRollback rollBackPoint (frag, state) = do -- node could have rolled back such that its candidate chain no longer -- contains the invalid block, in which case we do not disconnect from it. -- --- This function spawns a background thread using the given 'ResourceRegistry'. --- -- The cost of this check is \( O(cand * check) \) where /cand/ is the size of -- the candidate fragment and /check/ is the cost of checking whether a block -- is invalid (typically \( O(\log(invalid)) \) where /invalid/ is the number -- of invalid blocks). -rejectInvalidBlocks +invalidBlockRejector :: forall m blk. ( IOLike m , BlockSupportsProtocol blk , LedgerSupportsProtocol blk ) => Tracer m (TraceChainSyncClientEvent blk) - -> ResourceRegistry m -> STM m (WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk))) -- ^ Get the invalid block checker -> STM m (AnchoredFragment (Header blk)) - -> m () -rejectInvalidBlocks tracer registry getIsInvalidBlock getCandidate = - void $ onEachChange - registry - "ChainSync.Client.rejectInvalidBlocks" - getFingerprint - Nothing - getIsInvalidBlock - (checkInvalid . forgetFingerprint) + -> Watcher m + (WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk))) + Fingerprint +invalidBlockRejector tracer getIsInvalidBlock getCandidate = + Watcher { + wFingerprint = getFingerprint + , wInitial = Nothing + , wNotify = checkInvalid . forgetFingerprint + , wReader = getIsInvalidBlock + } where checkInvalid :: (HeaderHash blk -> Maybe (InvalidBlockReason blk)) -> m () checkInvalid isInvalidBlock = do diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Util/STM.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Util/STM.hs index bd3e78cf12e..f3371e969f7 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Util/STM.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Util/STM.hs @@ -8,9 +8,12 @@ {-# LANGUAGE ScopedTypeVariables #-} module Ouroboros.Consensus.Util.STM ( + -- * 'Watcher' + Watcher (..) + , forkLinkedWatcher + , withWatcher -- * Misc - blockUntilChanged - , onEachChange + , blockUntilChanged , runWhenJust , blockUntilJust , blockUntilAllJust @@ -45,42 +48,6 @@ blockUntilChanged f b getA = do then retry else return (a, b') --- | Spawn a new thread that runs an action each time an STM value changes. --- --- NOTE: STM does not guarantee that 'onEachChange' will /literally/ be called --- on /every/ change: when the system is under heavy load, some updates may --- be missed. --- --- The thread will be linked to the registry. -onEachChange :: forall m a b. (IOLike m, Eq b, HasCallStack) - => ResourceRegistry m - -> String -- ^ Label for the thread - -> (a -> b) -- ^ Obtain a fingerprint - -> Maybe b -- ^ Optional initial fingerprint - -- If 'Nothing', the action is executed once - -- immediately to obtain the initial fingerprint. - -> STM m a - -> (a -> m ()) - -> m (Thread m Void) -onEachChange registry label f mbInitB getA notify = - forkLinkedThread registry label body - where - body :: m Void - body = do - initB <- case mbInitB of - Just initB -> return initB - Nothing -> do - a <- atomically getA - notify a - return $ f a - loop initB - - loop :: b -> m Void - loop b = do - (a, b') <- atomically $ blockUntilChanged f b getA - notify a - loop b' - -- | Spawn a new thread that waits for an STM value to become 'Just' -- -- The thread will be linked to the registry. @@ -132,3 +99,82 @@ simStateT stVar (Sim k) = Sim $ \(StateT f) -> do (a, st') <- k (f st) writeTVar stVar st' return a + +{------------------------------------------------------------------------------- + Watchers +-------------------------------------------------------------------------------} + +-- | Specification for a thread that watches a variable, and reports interesting +-- changes. +-- +-- NOTE: STM does not guarantee that 'wNotify' will /literally/ be called on +-- /every/ change: when the system is under heavy load, some updates may be +-- missed. +data Watcher m a fp = Watcher { + -- | Obtain a fingerprint from a value of the monitored variable. + wFingerprint :: a -> fp + -- | The initial fingerprint + -- + -- If 'Nothing', the action is executed once immediately to obtain the + -- initial fingerprint. + , wInitial :: Maybe fp + -- | An action executed each time the fingerprint changes. + , wNotify :: a -> m () + -- | The variable to monitor. + , wReader :: STM m a + } + +-- | Execute a 'Watcher' +-- +-- NOT EXPORTED +runWatcher :: forall m a fp. (IOLike m, Eq fp, HasCallStack) + => Watcher m a fp + -> m Void +runWatcher watcher = do + initB <- case mbInitFP of + Just fp -> return fp + Nothing -> do + a <- atomically getA + notify a + return $ f a + loop initB + where + Watcher { + wFingerprint = f + , wInitial = mbInitFP + , wNotify = notify + , wReader = getA + } = watcher + + loop :: fp -> m Void + loop fp = do + (a, fp') <- atomically $ blockUntilChanged f fp getA + notify a + loop fp' + +-- | Spawn a new thread that runs a 'Watcher' +-- +-- The thread will be linked to the registry. +forkLinkedWatcher :: forall m a fp. (IOLike m, Eq fp, HasCallStack) + => ResourceRegistry m + -> String -- ^ Label for the thread + -> Watcher m a fp + -> m (Thread m Void) +forkLinkedWatcher registry label watcher = + forkLinkedThread registry label $ runWatcher watcher + +-- | Spawn a new thread that runs a 'Watcher' +-- +-- The thread is bracketed via 'withAsync' and 'link'ed. +-- +-- We do not provide the 'Async' handle only because our anticipated use cases +-- don't need it. +withWatcher :: forall m a fp r. (IOLike m, Eq fp, HasCallStack) + => String -- ^ Label for the thread + -> Watcher m a fp + -> m r + -> m r +withWatcher label watcher k = + withAsync + (do labelThisThread label; runWatcher watcher) + (\h -> do link h; k) From e26d1f5fab23c39e3ebcada80d9e121ecc4f8950 Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Mon, 25 Jan 2021 18:52:13 -0800 Subject: [PATCH 2/4] consensus: change onKnownSlotChange to knownSlotWatcher This commit is propagating the new Watcher abstraction through an abbreviation. --- .../Test/Consensus/BlockchainTime/Simple.hs | 24 ++++++------ .../Ouroboros/Consensus/BlockchainTime/API.hs | 37 ++++++++----------- .../src/Ouroboros/Consensus/NodeKernel.hs | 6 ++- 3 files changed, 31 insertions(+), 36 deletions(-) diff --git a/ouroboros-consensus-test/test-consensus/Test/Consensus/BlockchainTime/Simple.hs b/ouroboros-consensus-test/test-consensus/Test/Consensus/BlockchainTime/Simple.hs index 1d74b97b663..cccb4650d23 100644 --- a/ouroboros-consensus-test/test-consensus/Test/Consensus/BlockchainTime/Simple.hs +++ b/ouroboros-consensus-test/test-consensus/Test/Consensus/BlockchainTime/Simple.hs @@ -30,6 +30,7 @@ import Ouroboros.Consensus.Block import Ouroboros.Consensus.BlockchainTime import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.ResourceRegistry +import Ouroboros.Consensus.Util.STM (withWatcher) import Ouroboros.Consensus.Util.Time import Test.Util.Orphans.Arbitrary () @@ -293,24 +294,23 @@ testOverrideDelay :: forall m. (IOLike m, MonadTime m, MonadDelay (OverrideDelay -> Int -- ^ Number of slots to collect -> OverrideDelay m [SlotNo] testOverrideDelay systemStart slotLength maxClockRewind numSlots = do - result <- withRegistry $ \registry -> do + withRegistry $ \registry -> do time <- simpleBlockchainTime registry (defaultSystemTime systemStart nullTracer) slotLength maxClockRewind slotsVar <- uncheckedNewTVarM [] - cancelCollection <- - onKnownSlotChange registry time "testOverrideDelay" $ \slotNo -> - atomically $ modifyTVar slotsVar (slotNo :) - -- Wait to collect the required number of slots - slots <- atomically $ do - slots <- readTVar slotsVar - when (length slots < numSlots) $ retry - return slots - cancelCollection - return $ reverse slots - return result + withWatcher + "testOverrideDelay" + ( knownSlotWatcher time $ \slotNo -> do + atomically $ modifyTVar slotsVar (slotNo :) + ) $ do + -- Wait to collect the required number of slots + atomically $ do + slots <- readTVar slotsVar + when (length slots < numSlots) $ retry + return $ reverse slots {------------------------------------------------------------------------------- Test-programmable time diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime/API.hs b/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime/API.hs index d8efd0682a9..6a26a6af696 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime/API.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime/API.hs @@ -9,17 +9,15 @@ module Ouroboros.Consensus.BlockchainTime.API ( BlockchainTime(..) , CurrentSlot(..) - , onKnownSlotChange + , knownSlotWatcher ) where import GHC.Generics (Generic) -import GHC.Stack import NoThunks.Class (OnlyCheckWhnfNamed (..)) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Util.IOLike -import Ouroboros.Consensus.Util.ResourceRegistry -import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher) +import Ouroboros.Consensus.Util.STM (Watcher (..)) {------------------------------------------------------------------------------- API @@ -54,27 +52,22 @@ data CurrentSlot = Derived functionality -------------------------------------------------------------------------------} --- | Spawn a thread to run an action each time the slot changes +-- | Watches for changes in the current slot -- -- The action will not be called until the current slot becomes known -- (if the tip of our ledger is too far away from the current wallclock time, --- we may not know what the current 'SlotId' is). --- --- Returns a handle to kill the thread. -onKnownSlotChange :: forall m. (IOLike m, HasCallStack) - => ResourceRegistry m - -> BlockchainTime m - -> String -- ^ Label for the thread - -> (SlotNo -> m ()) -- ^ Action to execute - -> m (m ()) -onKnownSlotChange registry btime label notify = - fmap cancelThread - $ forkLinkedWatcher registry label Watcher { - wFingerprint = id - , wInitial = Nothing - , wNotify = notify - , wReader = getCurrentSlot' - } +-- we may not know what the current 'SlotNo' is). +knownSlotWatcher :: forall m. IOLike m + => BlockchainTime m + -> (SlotNo -> m ()) -- ^ Action to execute + -> Watcher m SlotNo SlotNo +knownSlotWatcher btime notify = + Watcher { + wFingerprint = id + , wInitial = Nothing + , wNotify = notify + , wReader = getCurrentSlot' + } where getCurrentSlot' :: STM m SlotNo getCurrentSlot' = do diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs index 63e13caaf31..6a8fcd65391 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs @@ -349,8 +349,10 @@ forkBlockForging -> BlockForging m blk -> m () forkBlockForging maxTxCapacityOverride IS{..} blockForging = - void $ onKnownSlotChange registry btime threadLabel $ - withEarlyExit_ . go + void + $ forkLinkedWatcher registry threadLabel + $ knownSlotWatcher btime + $ withEarlyExit_ . go where threadLabel :: String threadLabel = From ca301935f32d1d515732664b7b71e20c04232f52 Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Mon, 25 Jan 2021 18:56:54 -0800 Subject: [PATCH 3/4] consensus: change onEachTick to tickWatcher This commit is propagating the new Watcher abstraction through an abbreviation. --- .../src/Test/Util/LogicalClock.hs | 28 ++++++++----------- .../MiniProtocol/ChainSync/Client.hs | 9 ++++-- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/ouroboros-consensus-test/src/Test/Util/LogicalClock.hs b/ouroboros-consensus-test/src/Test/Util/LogicalClock.hs index 6d8eb6be698..3fb90c7edfa 100644 --- a/ouroboros-consensus-test/src/Test/Util/LogicalClock.hs +++ b/ouroboros-consensus-test/src/Test/Util/LogicalClock.hs @@ -16,7 +16,7 @@ module Test.Util.LogicalClock ( , new , sufficientTimeFor -- * Scheduling actions - , onEachTick + , tickWatcher , onTick , blockUntilTick ) where @@ -83,22 +83,16 @@ tickDelay = 0.5 -------------------------------------------------------------------------------} -- | Execute action on every clock tick --- --- Returns a handle to cancel the thread. -onEachTick :: (IOLike m, HasCallStack) - => ResourceRegistry m - -> LogicalClock m - -> String - -> (Tick -> m ()) - -> m (m ()) -onEachTick registry clock threadLabel action = - cancelThread <$> - forkLinkedWatcher registry threadLabel Watcher { - wFingerprint = id - , wInitial = Nothing - , wNotify = action - , wReader = getCurrentTick clock - } +tickWatcher :: LogicalClock m + -> (Tick -> m ()) + -> Watcher m Tick Tick +tickWatcher clock action = + Watcher { + wFingerprint = id + , wInitial = Nothing + , wNotify = action + , wReader = getCurrentTick clock + } -- | Execute action once at the specified tick onTick :: (IOLike m, HasCallStack) diff --git a/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs index b78c74bf469..36ae2d9027d 100644 --- a/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs @@ -62,7 +62,7 @@ import Ouroboros.Consensus.Util.Condense import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.ResourceRegistry import Ouroboros.Consensus.Util.STM (Fingerprint (..), - WithFingerprint (..)) + WithFingerprint (..), forkLinkedWatcher) import Test.Util.LogicalClock (LogicalClock, NumTicks (..), Tick (..)) import qualified Test.Util.LogicalClock as LogicalClock @@ -305,7 +305,12 @@ runChainSync securityParam (ClientUpdates clientUpdates) -- Schedule updates of the client and server chains varLastUpdate <- uncheckedNewTVarM 0 - void $ LogicalClock.onEachTick registry clock "scheduled updates" $ \tick -> do + let forkLinkedTickWatcher :: (Tick -> m ()) -> m () + forkLinkedTickWatcher = + void + . forkLinkedWatcher registry "scheduled updates" + . LogicalClock.tickWatcher clock + forkLinkedTickWatcher $ \tick -> do -- Stop updating the client and server chains when the chain sync client -- has thrown an exception or has gracefully terminated, so that at the -- end, we can read the chains in the states they were in when the From bcf8af8932bc25f45a7668a4525716a3b08d97fb Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Thu, 21 Jan 2021 18:25:46 -0800 Subject: [PATCH 4/4] consensus: add and use bracketWithPrivateRegistry See the new declaration's docstring. This is intended to prevent the confusion that lead to the recently fixed resource link the ChainSync server. --- .../Test/Consensus/BlockchainTime/Simple.hs | 16 ++++--- .../Consensus/Network/NodeToClient.hs | 11 ++--- .../Ouroboros/Consensus/Network/NodeToNode.hs | 13 +++--- .../Consensus/Util/ResourceRegistry.hs | 46 +++++++++++++++++++ 4 files changed, 66 insertions(+), 20 deletions(-) diff --git a/ouroboros-consensus-test/test-consensus/Test/Consensus/BlockchainTime/Simple.hs b/ouroboros-consensus-test/test-consensus/Test/Consensus/BlockchainTime/Simple.hs index cccb4650d23..453ce3fa90d 100644 --- a/ouroboros-consensus-test/test-consensus/Test/Consensus/BlockchainTime/Simple.hs +++ b/ouroboros-consensus-test/test-consensus/Test/Consensus/BlockchainTime/Simple.hs @@ -294,16 +294,18 @@ testOverrideDelay :: forall m. (IOLike m, MonadTime m, MonadDelay (OverrideDelay -> Int -- ^ Number of slots to collect -> OverrideDelay m [SlotNo] testOverrideDelay systemStart slotLength maxClockRewind numSlots = do - withRegistry $ \registry -> do - time <- simpleBlockchainTime - registry - (defaultSystemTime systemStart nullTracer) - slotLength - maxClockRewind + bracketWithPrivateRegistry + (\registry -> simpleBlockchainTime + registry + (defaultSystemTime systemStart nullTracer) + slotLength + maxClockRewind) + (\_btime -> pure ()) + $ \btime -> do slotsVar <- uncheckedNewTVarM [] withWatcher "testOverrideDelay" - ( knownSlotWatcher time $ \slotNo -> do + ( knownSlotWatcher btime $ \slotNo -> do atomically $ modifyTVar slotsVar (slotNo :) ) $ do -- Wait to collect the required number of slots diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToClient.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToClient.hs index b55603ab7ea..c8b3534182c 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToClient.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToClient.hs @@ -359,17 +359,16 @@ mkApps kernel Tracers {..} Codecs {..} Handlers {..} = -> m ((), Maybe bCS) aChainSyncServer them channel = do labelThisThread "LocalChainSyncServer" - withRegistry $ \registry -> - bracket - (chainSyncBlockServerFollower (getChainDB kernel) registry) - ChainDB.followerClose - (\flr -> runPeer + bracketWithPrivateRegistry + (chainSyncBlockServerFollower (getChainDB kernel)) + ChainDB.followerClose + $ \flr -> + runPeer (contramap (TraceLabelPeer them) tChainSyncTracer) cChainSyncCodec channel $ chainSyncServerPeer $ hChainSyncServer flr - ) aTxSubmissionServer :: localPeer diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs index aa793730e7d..5688de016f8 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs @@ -487,12 +487,12 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = -> m ((), Maybe bCS) aChainSyncServer version them channel = do labelThisThread "ChainSyncServer" - withRegistry $ \registry -> do - chainSyncTimeout <- genChainSyncTimeout - bracket - (chainSyncHeaderServerFollower (getChainDB kernel) registry) - ChainDB.followerClose - (\flr -> runPeerWithLimits + chainSyncTimeout <- genChainSyncTimeout + bracketWithPrivateRegistry + (chainSyncHeaderServerFollower (getChainDB kernel)) + ChainDB.followerClose + $ \flr -> + runPeerWithLimits (contramap (TraceLabelPeer them) tChainSyncSerialisedTracer) cChainSyncCodecSerialised (byteLimitsChainSync (const 0)) -- TODO: Real Bytelimits, see #1727 @@ -500,7 +500,6 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = channel $ chainSyncServerPeer $ hChainSyncServer flr version - ) aBlockFetchClient :: NodeToNodeVersion diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Util/ResourceRegistry.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Util/ResourceRegistry.hs index d42b47847dd..dd947532e7e 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Util/ResourceRegistry.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Util/ResourceRegistry.hs @@ -18,6 +18,7 @@ module Ouroboros.Consensus.Util.ResourceRegistry ( , ResourceRegistryThreadException -- * Creating and releasing the registry itself , withRegistry + , bracketWithPrivateRegistry , registryThread -- * Allocating and releasing regular resources , ResourceKey @@ -587,6 +588,51 @@ releaseResources rr keys releaser = do withRegistry :: (IOLike m, HasCallStack) => (ResourceRegistry m -> m a) -> m a withRegistry = bracket unsafeNewRegistry closeRegistry +-- | Create a new private registry for use by a bracketed resource +-- +-- Use this combinator as a more specific and easier-to-maintain alternative to +-- the following. +-- +-- > 'withRegistry' $ \rr -> +-- > 'bracket' (newFoo rr) closeFoo $ \foo -> +-- > (... rr does not occur in this scope ...) +-- +-- NB The scoped body can use `withRegistry` if it also needs its own, separate +-- registry. +-- +-- Use this combinator to emphasize that the registry is private to (ie only +-- used by and/or via) the bracketed resource and that it thus has nearly the +-- same lifetime. This combinator ensures the following specific invariants +-- regarding lifetimes and order of releases. +-- +-- o The registry itself is older than the bracketed resource. +-- +-- o The only registered resources older than the bracketed resource were +-- allocated in the registry by the function that allocated the bracketed +-- resource. +-- +-- o Because of the older resources, the bracketed resource is itself also +-- registered in the registry; that's the only way we can be sure to release +-- all resources in the right order. +-- +-- NB Because the registry is private to the resource, the @a@ type could save +-- the handle to @registry@ and safely close the registry if the scoped body +-- calls @closeA@ before the bracket ends. Though we have not used the type +-- system to guarantee that the interface of the @a@ type cannot leak the +-- registry to the body, this combinator does its part to keep the registry +-- private to the bracketed resource. +-- +-- See documentation of 'ResourceRegistry' for a more general discussion. +bracketWithPrivateRegistry :: (IOLike m, HasCallStack) + => (ResourceRegistry m -> m a) + -> (a -> m ()) -- ^ Release the resource + -> (a -> m r) + -> m r +bracketWithPrivateRegistry newA closeA body = + withRegistry $ \registry -> do + (_key, a) <- allocate registry (\_key -> newA registry) closeA + body a + {------------------------------------------------------------------------------- Temporary registry -------------------------------------------------------------------------------}