Skip to content

Commit

Permalink
CAD-2572 Tracing: With MVar and EKGCounter
Browse files Browse the repository at this point in the history
  • Loading branch information
jutaro committed Feb 18, 2021
1 parent ece1746 commit 1b32b5f
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 36 deletions.
21 changes: 12 additions & 9 deletions cardano-node/src/Cardano/Node/Configuration/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import qualified Control.Concurrent.Async as Async
import Control.Exception.Safe (MonadCatch)
import Control.Monad.Trans.Except.Extra (catchIOExceptT)
import Control.Tracer
import Data.IORef (IORef, newIORef)
import Data.List (nub)
import qualified Data.Map as Map
import Data.Text (pack)
Expand All @@ -37,6 +36,7 @@ import Data.Version (showVersion)
import qualified System.Remote.Monitoring as EKG
import System.Metrics.Gauge (Gauge)
import System.Metrics.Label (Label)
import System.Metrics.Counter (Counter)

import Cardano.BM.Backend.Aggregation (plugin)
import Cardano.BM.Backend.EKGView (plugin)
Expand Down Expand Up @@ -119,9 +119,10 @@ data LoggingLayer = LoggingLayer
}

data EKGDirect = EKGDirect
{ ekgServer :: EKG.Server
, ekgGauges :: IORef (Map.Map Text Gauge)
, ekgLabels :: IORef (Map.Map Text Label)
{ ekgServer :: EKG.Server
, ekgGauges :: MVar (Map.Map Text Gauge)
, ekgLabels :: MVar (Map.Map Text Label)
, ekgCounters :: MVar (Map.Map Text Counter)
}

--------------------------------
Expand Down Expand Up @@ -181,12 +182,14 @@ createLoggingLayer ver nodeConfig' p = do
mbEkgDirect <- case mEKGServer of
Nothing -> pure Nothing
Just sv -> do
refGauge <- liftIO $ newIORef Map.empty
refLabel <- liftIO $ newIORef Map.empty
refGauge <- liftIO $ newMVar Map.empty
refLabel <- liftIO $ newMVar Map.empty
refCounter <- liftIO $ newMVar Map.empty
pure $ Just EKGDirect {
ekgServer = sv
, ekgGauges = refGauge
, ekgLabels = refLabel
ekgServer = sv
, ekgGauges = refGauge
, ekgLabels = refLabel
, ekgCounters = refCounter
}

pure $ mkLogLayer logConfig switchBoard mbEkgDirect trace
Expand Down
66 changes: 39 additions & 27 deletions cardano-node/src/Cardano/Tracing/Tracers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import Codec.CBOR.Read (DeserialiseFailure)
import Data.Aeson (ToJSON (..), Value (..))
import qualified Data.HashMap.Strict as Map
import qualified Data.Map as SMap
import Data.IORef (readIORef, writeIORef)
import qualified Data.Text as Text
import Data.Time (UTCTime)
import qualified System.Remote.Monitoring as EKG
import qualified System.Metrics.Gauge as Gauge
import qualified System.Metrics.Label as Label
import qualified System.Metrics.Counter as Counter

import Network.Mux (MuxTrace, WithMuxBearer)
import qualified Network.Socket as Socket (SockAddr)
Expand Down Expand Up @@ -430,27 +430,41 @@ traceD tr meta msg d = traceNamedObject tr (meta, LogValue msg (PureD d))
traceI :: Integral i => Trace IO a -> LOMeta -> Text -> i -> IO ()
traceI tr meta msg i = traceNamedObject tr (meta, LogValue msg (PureI (fromIntegral i)))

sendEKGDirectInt :: Integral a => EKGDirect -> Text -> a -> IO ()
sendEKGDirectInt ekgDirect name val = do
registeredMap <- readIORef (ekgGauges ekgDirect)
case SMap.lookup name registeredMap of
Just gauge -> Gauge.set gauge (fromIntegral val)
Nothing -> do
gauge <- EKG.getGauge name (ekgServer ekgDirect)
let registeredMap' = SMap.insert name gauge registeredMap
writeIORef (ekgGauges ekgDirect) registeredMap'
Gauge.set gauge (fromIntegral val)
sendEKGDirectCounter :: EKGDirect -> Text -> IO ()
sendEKGDirectCounter ekgDirect name = do
modifyMVar_ (ekgCounters ekgDirect) $ \registeredMap -> do
case SMap.lookup name registeredMap of
Just counter -> do
Counter.inc counter
pure registeredMap
Nothing -> do
counter <- EKG.getCounter name (ekgServer ekgDirect)
Counter.inc counter
pure $ SMap.insert name counter registeredMap

_sendEKGDirectInt :: Integral a => EKGDirect -> Text -> a -> IO ()
_sendEKGDirectInt ekgDirect name val = do
modifyMVar_ (ekgGauges ekgDirect) $ \registeredMap -> do
case SMap.lookup name registeredMap of
Just gauge -> do
Gauge.set gauge (fromIntegral val)
pure registeredMap
Nothing -> do
gauge <- EKG.getGauge name (ekgServer ekgDirect)
Gauge.set gauge (fromIntegral val)
pure $ SMap.insert name gauge registeredMap

_sendEKGDirectDouble :: EKGDirect -> Text -> Double -> IO ()
_sendEKGDirectDouble ekgDirect name val = do
registeredMap <- readIORef (ekgLabels ekgDirect)
case SMap.lookup name registeredMap of
Just label -> Label.set label (Text.pack (show val))
Nothing -> do
label <- EKG.getLabel name (ekgServer ekgDirect)
let registeredMap' = SMap.insert name label registeredMap
writeIORef (ekgLabels ekgDirect) registeredMap'
Label.set label (Text.pack (show val))
modifyMVar_ (ekgLabels ekgDirect) $ \registeredMap -> do
case SMap.lookup name registeredMap of
Just label -> do
Label.set label (Text.pack (show val))
pure registeredMap
Nothing -> do
label <- EKG.getLabel name (ekgServer ekgDirect)
Label.set label (Text.pack (show val))
pure $ SMap.insert name label registeredMap

--------------------------------------------------------------------------------
-- Consensus Tracers
Expand Down Expand Up @@ -495,15 +509,14 @@ mkConsensusTracers mbEKGDirect trSel verb tr nodeKern fStats = do
forgeTracers <- mkForgeTracers
meta <- mkLOMeta Critical Public

tHeadersServed <- STM.newTVarIO @Int 0
tBlocksServed <- STM.newTVarIO @Int 0
tSubmissionsCollected <- STM.newTVarIO 0
tSubmissionsAccepted <- STM.newTVarIO 0
tSubmissionsRejected <- STM.newTVarIO 0

pure Consensus.Tracers
{ Consensus.chainSyncClientTracer = tracerOnOff (traceChainSyncClient trSel) verb "ChainSyncClient" tr
, Consensus.chainSyncServerHeaderTracer = Tracer $ \ev -> traceServedCount mbEKGDirect tHeadersServed ev
, Consensus.chainSyncServerHeaderTracer = Tracer $ \ev -> traceServedCount mbEKGDirect ev
, Consensus.chainSyncServerBlockTracer = tracerOnOff (traceChainSyncBlockServer trSel) verb "ChainSyncBlockServer" tr
, Consensus.blockFetchDecisionTracer = tracerOnOff' (traceBlockFetchDecisions trSel) $
annotateSeverity $ teeTraceBlockFetchDecision verb elidedFetchDecision tr
Expand Down Expand Up @@ -568,12 +581,11 @@ mkConsensusTracers mbEKGDirect trSel verb tr nodeKern fStats = do
<*> counting (liftCounting staticMeta name "slot-is-immutable" tr)
<*> counting (liftCounting staticMeta name "node-is-leader" tr)

traceServedCount :: Maybe EKGDirect -> STM.TVar Int -> TraceChainSyncServerEvent blk -> IO ()
traceServedCount Nothing _ _ = pure ()
traceServedCount (Just ekgDirect) tHeadersServed ev =
when (isRollForward ev) $ do
count <- STM.modifyReadTVarIO tHeadersServed (+1)
sendEKGDirectInt ekgDirect "cardano.node.metrics.served.header.counter" count
traceServedCount :: Maybe EKGDirect -> TraceChainSyncServerEvent blk -> IO ()
traceServedCount Nothing _ = pure ()
traceServedCount (Just ekgDirect) ev =
when (isRollForward ev) $
sendEKGDirectCounter ekgDirect "cardano.node.metrics.served.header.counter.int"

traceLeadershipChecks ::
forall blk
Expand Down

0 comments on commit 1b32b5f

Please sign in to comment.