Skip to content

Commit

Permalink
Merge #2355
Browse files Browse the repository at this point in the history
2355: CAD-2572 Direct submission of the ChainSync headers served metric to EKG r=deepfire a=jutaro

1. Bump `iohk-monitoring-framework` to the latest `master`, that includes EKG server extraction.
2. Define `EKGDirect` -- a direct handle onto the EKG server/store used by the EKG backend of the logging framework.
3. Stop submitting `cardano.node.metrics.served.header.counter.int` to the switchboard.
4. Submit it via `EKGDirect`, instead.

Co-authored-by: Yupanqui <jnf@arcor.de>
  • Loading branch information
iohk-bors[bot] and jutaro authored Feb 18, 2021
2 parents f1b27a5 + 1b32b5f commit 400f6d7
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 28 deletions.
4 changes: 2 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/input-output-hk/iohk-monitoring-framework
tag: a89c38ed5825ba17ca79fddb85651007753d699d
--sha256: 0i4p3jbr9pxhklgbky2g7rfqhccvkqzph0ak5x8bb6kwp7c7b8wf
tag: 60b13d80afa266f02f363672950e896ed735e807
--sha256: 0gci6r4c6ldrgracbr4fni4hbrl62lmm5p70cafkwk21a0kqs8cz
subdir:
contra-tracer
iohk-monitoring
Expand Down
2 changes: 2 additions & 0 deletions cardano-node/cardano-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ library
, containers
, directory
, dns
, ekg
, ekg-core
, filepath
, generic-data
, hedgehog-extras
Expand Down
36 changes: 33 additions & 3 deletions cardano-node/src/Cardano/Node/Configuration/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

module Cardano.Node.Configuration.Logging
( LoggingLayer (..)
, EKGDirect(..)
, createLoggingLayer
, nodeBasicInfo
, shutdownLoggingLayer
Expand All @@ -28,9 +29,14 @@ import Control.Exception.Safe (MonadCatch)
import Control.Monad.Trans.Except.Extra (catchIOExceptT)
import Control.Tracer
import Data.List (nub)
import qualified Data.Map as Map
import Data.Text (pack)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Data.Version (showVersion)
import qualified System.Remote.Monitoring as EKG
import System.Metrics.Gauge (Gauge)
import System.Metrics.Label (Label)
import System.Metrics.Counter (Counter)

import Cardano.BM.Backend.Aggregation (plugin)
import Cardano.BM.Backend.EKGView (plugin)
Expand Down Expand Up @@ -109,6 +115,14 @@ data LoggingLayer = LoggingLayer
, llConfiguration :: Configuration
, llAddBackend :: Backend Text -> BackendKind -> IO ()
, llSwitchboard :: Switchboard Text
, llEKGDirect :: Maybe EKGDirect
}

data EKGDirect = EKGDirect
{ ekgServer :: EKG.Server
, ekgGauges :: MVar (Map.Map Text Gauge)
, ekgLabels :: MVar (Map.Map Text Label)
, ekgCounters :: MVar (Map.Map Text Counter)
}

--------------------------------
Expand Down Expand Up @@ -163,7 +177,22 @@ createLoggingLayer ver nodeConfig' p = do
when loggingEnabled $ liftIO $
loggingPreInit nodeConfig' logConfig switchBoard trace

pure $ mkLogLayer logConfig switchBoard trace
mEKGServer <- liftIO $ Switchboard.getSbEKGServer switchBoard

mbEkgDirect <- case mEKGServer of
Nothing -> pure Nothing
Just sv -> do
refGauge <- liftIO $ newMVar Map.empty
refLabel <- liftIO $ newMVar Map.empty
refCounter <- liftIO $ newMVar Map.empty
pure $ Just EKGDirect {
ekgServer = sv
, ekgGauges = refGauge
, ekgLabels = refLabel
, ekgCounters = refCounter
}

pure $ mkLogLayer logConfig switchBoard mbEkgDirect trace
where
loggingPreInit
:: NodeConfiguration
Expand Down Expand Up @@ -217,8 +246,8 @@ createLoggingLayer ver nodeConfig' p = do
-- Record node metrics, if configured
startCapturingMetrics trace

mkLogLayer :: Configuration -> Switchboard Text -> Trace IO Text -> LoggingLayer
mkLogLayer logConfig switchBoard trace =
mkLogLayer :: Configuration -> Switchboard Text -> Maybe EKGDirect -> Trace IO Text -> LoggingLayer
mkLogLayer logConfig switchBoard mbEkgDirect trace =
LoggingLayer
{ llBasicTrace = Trace.natTrace liftIO trace
, llLogDebug = Trace.logDebug
Expand All @@ -235,6 +264,7 @@ createLoggingLayer ver nodeConfig' p = do
, llConfiguration = logConfig
, llAddBackend = Switchboard.addExternalBackend switchBoard
, llSwitchboard = switchBoard
, llEKGDirect = mbEkgDirect
}

startCapturingMetrics :: Trace IO Text -> IO ()
Expand Down
8 changes: 6 additions & 2 deletions cardano-node/src/Cardano/Node/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ runNode cmdPc = do
p

loggingLayer <- case eLoggingLayer of
Left err -> putTextLn (show err) >> exitFailure
Left err -> putTextLn (show err) >> exitFailure
Right res -> return res

!trace <- setupTrace loggingLayer
Expand All @@ -125,7 +125,11 @@ runNode cmdPc = do
-- Used for ledger queries and peer connection status.
nodeKernelData :: NodeKernelData blk <- mkNodeKernelData

tracers <- mkTracers (ncTraceConfig nc) trace nodeKernelData
tracers <- mkTracers
(ncTraceConfig nc)
trace
nodeKernelData
(llEKGDirect loggingLayer)

Async.withAsync (handlePeersListSimple trace nodeKernelData)
$ \_peerLogingThread ->
Expand Down
89 changes: 68 additions & 21 deletions cardano-node/src/Cardano/Tracing/Tracers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ import GHC.Clock (getMonotonicTimeNSec)
import Codec.CBOR.Read (DeserialiseFailure)
import Data.Aeson (ToJSON (..), Value (..))
import qualified Data.HashMap.Strict as Map
import qualified Data.Map as SMap
import qualified Data.Text as Text
import Data.Time (UTCTime)
import qualified System.Remote.Monitoring as EKG
import qualified System.Metrics.Gauge as Gauge
import qualified System.Metrics.Label as Label
import qualified System.Metrics.Counter as Counter

import Network.Mux (MuxTrace, WithMuxBearer)
import qualified Network.Socket as Socket (SockAddr)
Expand Down Expand Up @@ -274,16 +279,20 @@ mkTracers
=> TraceOptions
-> Trace IO Text
-> NodeKernelData blk
-> Maybe EKGDirect
-> IO (Tracers peer localPeer blk)
mkTracers tOpts@(TracingOn trSel) tr nodeKern = do
mkTracers tOpts@(TracingOn trSel) tr nodeKern ekgDirect = do
fStats <- mkForgingStats
consensusTracers <- mkConsensusTracers trSel verb tr nodeKern fStats
consensusTracers <- mkConsensusTracers ekgDirect trSel verb tr nodeKern fStats
elidedChainDB <- newstate -- for eliding messages in ChainDB tracer

pure Tracers
{ chainDBTracer = tracerOnOff' (traceChainDB trSel) $
annotateSeverity $ teeTraceChainTip tOpts elidedChainDB
(appendName "ChainDB" tr) (appendName "metrics" tr)
annotateSeverity $ teeTraceChainTip
tOpts elidedChainDB
ekgDirect
(appendName "ChainDB" tr)
(appendName "metrics" tr)
, consensusTracers = consensusTracers
, nodeToClientTracers = nodeToClientTracers' trSel verb tr
, nodeToNodeTracers = nodeToNodeTracers' trSel verb tr
Expand All @@ -302,7 +311,7 @@ mkTracers tOpts@(TracingOn trSel) tr nodeKern = do
verb :: TracingVerbosity
verb = traceVerbosity trSel

mkTracers TracingOff _ _ =
mkTracers TracingOff _ _ _ =
pure Tracers
{ chainDBTracer = nullTracer
, consensusTracers = Consensus.Tracers
Expand Down Expand Up @@ -359,14 +368,15 @@ teeTraceChainTip
)
=> TraceOptions
-> MVar (Maybe (WithSeverity (ChainDB.TraceEvent blk)), Integer)
-> Maybe EKGDirect
-> Trace IO Text
-> Trace IO Text
-> Tracer IO (WithSeverity (ChainDB.TraceEvent blk))
teeTraceChainTip TracingOff _ _ _ = nullTracer
teeTraceChainTip (TracingOn trSel) elided trTrc trMet =
teeTraceChainTip TracingOff _ _ _ _ = nullTracer
teeTraceChainTip (TracingOn trSel) elided ekgDirect trTrc trMet =
Tracer $ \ev -> do
traceWith (teeTraceChainTipElide (traceVerbosity trSel) elided trTrc) ev
traceWith (ignoringSeverity (traceChainMetrics trMet)) ev
traceWith (ignoringSeverity (traceChainMetrics ekgDirect trMet)) ev

teeTraceChainTipElide
:: ( ConvertRawHash blk
Expand All @@ -388,10 +398,10 @@ ignoringSeverity tr = Tracer $ \(WithSeverity _ ev) -> traceWith tr ev

traceChainMetrics
:: forall blk. HasHeader (Header blk)
=> Trace IO Text -> Tracer IO (ChainDB.TraceEvent blk)
traceChainMetrics tr = Tracer $ \ev ->
fromMaybe (pure ()) $
doTrace <$> chainTipInformation ev
=> Maybe EKGDirect -> Trace IO Text -> Tracer IO (ChainDB.TraceEvent blk)
traceChainMetrics Nothing _ = nullTracer
traceChainMetrics (Just _ekgDirect) tr = Tracer $ \ev ->
fromMaybe (pure ()) $ doTrace <$> chainTipInformation ev
where
chainTipInformation :: ChainDB.TraceEvent blk -> Maybe ChainInformation
chainTipInformation = \case
Expand Down Expand Up @@ -420,6 +430,42 @@ traceD tr meta msg d = traceNamedObject tr (meta, LogValue msg (PureD d))
traceI :: Integral i => Trace IO a -> LOMeta -> Text -> i -> IO ()
traceI tr meta msg i = traceNamedObject tr (meta, LogValue msg (PureI (fromIntegral i)))

sendEKGDirectCounter :: EKGDirect -> Text -> IO ()
sendEKGDirectCounter ekgDirect name = do
modifyMVar_ (ekgCounters ekgDirect) $ \registeredMap -> do
case SMap.lookup name registeredMap of
Just counter -> do
Counter.inc counter
pure registeredMap
Nothing -> do
counter <- EKG.getCounter name (ekgServer ekgDirect)
Counter.inc counter
pure $ SMap.insert name counter registeredMap

_sendEKGDirectInt :: Integral a => EKGDirect -> Text -> a -> IO ()
_sendEKGDirectInt ekgDirect name val = do
modifyMVar_ (ekgGauges ekgDirect) $ \registeredMap -> do
case SMap.lookup name registeredMap of
Just gauge -> do
Gauge.set gauge (fromIntegral val)
pure registeredMap
Nothing -> do
gauge <- EKG.getGauge name (ekgServer ekgDirect)
Gauge.set gauge (fromIntegral val)
pure $ SMap.insert name gauge registeredMap

_sendEKGDirectDouble :: EKGDirect -> Text -> Double -> IO ()
_sendEKGDirectDouble ekgDirect name val = do
modifyMVar_ (ekgLabels ekgDirect) $ \registeredMap -> do
case SMap.lookup name registeredMap of
Just label -> do
Label.set label (Text.pack (show val))
pure registeredMap
Nothing -> do
label <- EKG.getLabel name (ekgServer ekgDirect)
Label.set label (Text.pack (show val))
pure $ SMap.insert name label registeredMap

--------------------------------------------------------------------------------
-- Consensus Tracers
--------------------------------------------------------------------------------
Expand Down Expand Up @@ -448,34 +494,29 @@ mkConsensusTracers
, HasKESMetricsData blk
, Show (Header blk)
)
=> TraceSelection
=> Maybe EKGDirect
-> TraceSelection
-> TracingVerbosity
-> Trace IO Text
-> NodeKernelData blk
-> ForgingStats
-> IO (Consensus.Tracers' peer localPeer blk (Tracer IO))
mkConsensusTracers trSel verb tr nodeKern fStats = do
mkConsensusTracers mbEKGDirect trSel verb tr nodeKern fStats = do
let trmet = appendName "metrics" tr

blockForgeOutcomeExtractor <- mkOutcomeExtractor
elidedFetchDecision <- newstate -- for eliding messages in FetchDecision tr
forgeTracers <- mkForgeTracers
meta <- mkLOMeta Critical Public

tHeadersServed <- STM.newTVarIO @Int 0
tBlocksServed <- STM.newTVarIO @Int 0
tSubmissionsCollected <- STM.newTVarIO 0
tSubmissionsAccepted <- STM.newTVarIO 0
tSubmissionsRejected <- STM.newTVarIO 0

pure Consensus.Tracers
{ Consensus.chainSyncClientTracer = tracerOnOff (traceChainSyncClient trSel) verb "ChainSyncClient" tr
, Consensus.chainSyncServerHeaderTracer = tracerOnOff' (traceChainSyncHeaderServer trSel) $
Tracer $ \ev -> do
traceWith (annotateSeverity . toLogObject' verb $ appendName "ChainSyncHeaderServer" tr) ev
when (isRollForward ev) $
traceI trmet meta "served.header.count" =<<
STM.modifyReadTVarIO tHeadersServed (+1)
, Consensus.chainSyncServerHeaderTracer = Tracer $ \ev -> traceServedCount mbEKGDirect ev
, Consensus.chainSyncServerBlockTracer = tracerOnOff (traceChainSyncBlockServer trSel) verb "ChainSyncBlockServer" tr
, Consensus.blockFetchDecisionTracer = tracerOnOff' (traceBlockFetchDecisions trSel) $
annotateSeverity $ teeTraceBlockFetchDecision verb elidedFetchDecision tr
Expand Down Expand Up @@ -544,6 +585,12 @@ mkConsensusTracers trSel verb tr nodeKern fStats = do
<*> counting (liftCounting staticMeta name "slot-is-immutable" tr)
<*> counting (liftCounting staticMeta name "node-is-leader" tr)

traceServedCount :: Maybe EKGDirect -> TraceChainSyncServerEvent blk -> IO ()
traceServedCount Nothing _ = pure ()
traceServedCount (Just ekgDirect) ev =
when (isRollForward ev) $
sendEKGDirectCounter ekgDirect "cardano.node.metrics.served.header.counter.int"

traceLeadershipChecks ::
forall blk
. ( Consensus.RunNode blk
Expand Down

0 comments on commit 400f6d7

Please sign in to comment.