Skip to content

Commit

Permalink
[CAD-2547] Add Prometheus metrics for slot_no.
Browse files Browse the repository at this point in the history
  • Loading branch information
ksaric committed Mar 2, 2021
1 parent 6a2ef8c commit 8a3a55e
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 91 deletions.
10 changes: 9 additions & 1 deletion cardano-db-sync-extended/app/cardano-db-sync-extended.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import Cardano.Config.Git.Rev (gitRev)

import Cardano.DbSync (runDbSyncNode)
import Cardano.DbSync.Plugin.Extended (extendedDbSyncNodePlugin)
import Cardano.DbSync.Metrics (withMetricsLayer)

import Cardano.Sync.Config
import Cardano.Sync.Config.Types
Expand All @@ -29,7 +30,14 @@ main = do
cmd <- Opt.execParser opts
case cmd of
CmdVersion -> runVersionCommand
CmdRun params -> runDbSyncNode extendedDbSyncNodePlugin params
CmdRun params -> do
let configFile = enpConfigFile params
enc <- readSyncNodeConfig configFile
let prometheusPort = dncPrometheusPort enc

-- This enables us to be much more flexibile with what we actually measure.
withMetricsLayer prometheusPort $ \metricsLayer ->
runDbSyncNode metricsLayer extendedDbSyncNodePlugin params

-- -------------------------------------------------------------------------------------------------

Expand Down
10 changes: 9 additions & 1 deletion cardano-db-sync/app/cardano-db-sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Cardano.Prelude
import Cardano.Config.Git.Rev (gitRev)

import Cardano.DbSync (defDbSyncNodePlugin, runDbSyncNode)
import Cardano.DbSync.Metrics (withMetricsLayer)

import Cardano.Sync.Config
import Cardano.Sync.Config.Types
Expand All @@ -29,7 +30,14 @@ main = do
cmd <- Opt.execParser opts
case cmd of
CmdVersion -> runVersionCommand
CmdRun params -> runDbSyncNode defDbSyncNodePlugin params
CmdRun params -> do
let configFile = enpConfigFile params
enc <- readSyncNodeConfig configFile
let prometheusPort = dncPrometheusPort enc

-- This enables us to be much more flexibile with what we actually measure.
withMetricsLayer prometheusPort $ \metricsLayer ->
runDbSyncNode metricsLayer defDbSyncNodePlugin params

-- -------------------------------------------------------------------------------------------------

Expand Down
8 changes: 4 additions & 4 deletions cardano-db-sync/src/Cardano/DbSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import Cardano.DbSync.Plugin.Default (defDbSyncNodePlugin)
import Cardano.DbSync.Rollback (unsafeRollback)
import Cardano.Sync.Database (runDbThread)

import Cardano.Sync (Block (..), SyncDataLayer (..), SyncNodePlugin (..),
import Cardano.Sync (Block (..), SyncDataLayer (..), SyncNodePlugin (..), MetricsLayer,
configureLogging, runSyncNode)
import Cardano.Sync.Config.Types (ConfigFile (..), GenesisFile (..), LedgerStateDir (..),
MigrationDir (..), NetworkName (..), SocketPath (..), SyncCommand (..),
Expand All @@ -49,8 +49,8 @@ import Database.Persist.Postgresql (withPostgresqlConn)
import Database.Persist.Sql (SqlBackend)


runDbSyncNode :: (SqlBackend -> SyncNodePlugin) -> SyncNodeParams -> IO ()
runDbSyncNode mkPlugin params = do
runDbSyncNode :: MetricsLayer -> (SqlBackend -> SyncNodePlugin) -> SyncNodeParams -> IO ()
runDbSyncNode metricsLayer mkPlugin params = do

-- Read the PG connection info
pgConfig <- DB.readPGPassFileEnv Nothing
Expand All @@ -69,7 +69,7 @@ runDbSyncNode mkPlugin params = do
Just slotNo -> void $ unsafeRollback trce slotNo
Nothing -> pure ()

runSyncNode (mkSyncDataLayer trce backend) trce (mkPlugin backend)
runSyncNode (mkSyncDataLayer trce backend) metricsLayer trce (mkPlugin backend)
params (insertValidateGenesisDist backend) runDbThread

-- -------------------------------------------------------------------------------------------------
Expand Down
42 changes: 32 additions & 10 deletions cardano-db-sync/src/Cardano/DbSync/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,48 @@
module Cardano.DbSync.Metrics
( Metrics (..)
, makeMetrics
, withMetricsLayer
, withMetricsServer
) where

import Cardano.Prelude

import Cardano.Sync.Types (MetricsLayer (..))

import System.Metrics.Prometheus.Concurrent.RegistryT (RegistryT (..), registerGauge,
runRegistryT, unRegistryT)
import System.Metrics.Prometheus.Http.Scrape (serveMetricsT)
import System.Metrics.Prometheus.Metric.Gauge (Gauge)
import qualified System.Metrics.Prometheus.Metric.Gauge as Gauge


data Metrics = Metrics
{ mDbHeight :: !Gauge
, mNodeHeight :: !Gauge
, mQueuePre :: !Gauge
, mQueuePost :: !Gauge
, mQueuePostWrite :: !Gauge
{ mNodeBlockHeight :: !Gauge
, mDbQueueHeight :: !Gauge
, mLatestBlockNum :: !Gauge
, mLatestSlotNum :: !Gauge
}

-- This enables us to be much more flexibile with what we actually measure.
withMetricsLayer :: Int -> (MetricsLayer -> IO a) -> IO a
withMetricsLayer prometheusPort action =
withMetricsServer prometheusPort $ \metrics -> do

-- Metrics layer.
let metricsLayer =
MetricsLayer
{ mlSetNodeBlockHeight = \nodeHeight ->
Gauge.set (fromIntegral nodeHeight) $ mNodeBlockHeight metrics
, mlSetQueuePostWrite = \queuePostWrite ->
Gauge.set (fromIntegral queuePostWrite) $ mDbQueueHeight metrics
, mlSetLatestBlockNo = \blockNo ->
Gauge.set (fromIntegral blockNo) $ mLatestBlockNum metrics
, mlSetLatestSlotNo = \slotNo ->
Gauge.set (fromIntegral slotNo) $ mLatestSlotNum metrics
}

action metricsLayer

withMetricsServer :: Int -> (Metrics -> IO a) -> IO a
withMetricsServer port action = do
(metrics, registry) <- runRegistryT $ (,) <$> makeMetrics <*> RegistryT ask
Expand All @@ -34,9 +57,8 @@ withMetricsServer port action = do
makeMetrics :: RegistryT IO Metrics
makeMetrics =
Metrics
<$> registerGauge "db_block_height" mempty
<*> registerGauge "remote_tip_height" mempty
<*> registerGauge "action_queue_length_pre" mempty
<*> registerGauge "action_queue_length_post" mempty
<*> registerGauge "action_queue_length_post_write" mempty
<$> registerGauge "remote_block_tip_height" mempty
<*> registerGauge "database_queue_height" mempty
<*> registerGauge "local_block_number" mempty
<*> registerGauge "local_slot_number" mempty

2 changes: 0 additions & 2 deletions cardano-sync/cardano-sync.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ library
Cardano.Sync.Error

Cardano.Sync.LedgerState
Cardano.Sync.Metrics

Cardano.Sync.Era.Byron.Util
Cardano.Sync.Era.Cardano.Util
Expand Down Expand Up @@ -97,7 +96,6 @@ library
, ouroboros-consensus-shelley
, ouroboros-network
, ouroboros-network-framework
, prometheus
, shelley-spec-ledger
, stm
, text
Expand Down
58 changes: 33 additions & 25 deletions cardano-sync/src/Cardano/Sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ module Cardano.Sync
, SocketPath (..)

, SyncDataLayer (..)
, MetricsLayer (..)
, emptyMetricsLayer
, Block (..)
, Meta (..)
, SyncEnv (..)
Expand All @@ -44,7 +46,6 @@ import Cardano.Sync.Config
import Cardano.Sync.Database (DbAction (..), DbActionQueue, lengthDbActionQueue,
mkDbApply, mkDbRollback, newDbActionQueue, runDbStartup, writeDbActionQueue)
import Cardano.Sync.Error
import Cardano.Sync.Metrics
import Cardano.Sync.Plugin (SyncNodePlugin (..))
import Cardano.Sync.StateQuery (StateQueryTMVar, getSlotDetails, localStateQueryHandler,
newStateQueryTMVar)
Expand Down Expand Up @@ -103,7 +104,6 @@ import qualified Ouroboros.Network.Snocket as Snocket
import Ouroboros.Network.Subscription (SubscriptionTrace)

import System.Directory (createDirectoryIfMissing)
import qualified System.Metrics.Prometheus.Metric.Gauge as Gauge


type InsertValidateGenesisFunction
Expand All @@ -113,22 +113,23 @@ type InsertValidateGenesisFunction
-> ExceptT SyncNodeError IO ()

type RunDBThreadFunction
= Trace IO Text
= MetricsLayer
-> Trace IO Text
-> SyncEnv
-> SyncNodePlugin
-> Metrics
-> DbActionQueue
-> IO ()

runSyncNode
:: SyncDataLayer
-> MetricsLayer
-> Trace IO Text
-> SyncNodePlugin
-> SyncNodeParams
-> InsertValidateGenesisFunction
-> RunDBThreadFunction
-> IO ()
runSyncNode dataLayer trce plugin enp insertValidateGenesisDist runDBThreadFunction =
runSyncNode dataLayer metricsLayer trce plugin enp insertValidateGenesisDist runDBThreadFunction =
withIOManager $ \iomgr -> do

let configFile = enpConfigFile enp
Expand All @@ -152,7 +153,7 @@ runSyncNode dataLayer trce plugin enp insertValidateGenesisDist runDBThreadFunct
case genCfg of
GenesisCardano _ bCfg _sCfg -> do
syncEnv <- ExceptT $ mkSyncEnvFromConfig dataLayer (enpLedgerStateDir enp) genCfg
liftIO $ runSyncNodeNodeClient (dncPrometheusPort enc) syncEnv iomgr trce plugin
liftIO $ runSyncNodeNodeClient metricsLayer syncEnv iomgr trce plugin
runDBThreadFunction (cardanoCodecConfig bCfg) (enpSocketPath enp)
where
cardanoCodecConfig :: Byron.Config -> CodecConfig CardanoBlock
Expand All @@ -166,7 +167,7 @@ runSyncNode dataLayer trce plugin enp insertValidateGenesisDist runDBThreadFunct
-- -------------------------------------------------------------------------------------------------

runSyncNodeNodeClient
:: Int
:: MetricsLayer
-> SyncEnv
-> IOManager
-> Trace IO Text
Expand All @@ -175,17 +176,16 @@ runSyncNodeNodeClient
-> CodecConfig CardanoBlock
-> SocketPath
-> IO ()
runSyncNodeNodeClient port env iomgr trce plugin runDBThreadFunction codecConfig (SocketPath socketPath) = do
runSyncNodeNodeClient metricsLayer env iomgr trce plugin runDBThreadFunction codecConfig (SocketPath socketPath) = do
queryVar <- newStateQueryTMVar
logInfo trce $ "localInitiatorNetworkApplication: connecting to node via " <> textShow socketPath
withMetricsServer port $ \ metrics ->
void $ subscribe
(localSnocket iomgr socketPath)
codecConfig
(envNetworkMagic env)
networkSubscriptionTracers
clientSubscriptionParams
(dbSyncProtocols trce env plugin metrics queryVar runDBThreadFunction)
void $ subscribe
(localSnocket iomgr socketPath)
codecConfig
(envNetworkMagic env)
networkSubscriptionTracers
clientSubscriptionParams
(dbSyncProtocols metricsLayer trce env plugin queryVar runDBThreadFunction)
where
clientSubscriptionParams =
ClientSubscriptionParams
Expand Down Expand Up @@ -217,17 +217,17 @@ runSyncNodeNodeClient port env iomgr trce plugin runDBThreadFunction codecConfig
handshakeTracer = toLogObject $ appendName "Handshake" trce

dbSyncProtocols
:: Trace IO Text
:: MetricsLayer
-> Trace IO Text
-> SyncEnv
-> SyncNodePlugin
-> Metrics
-> StateQueryTMVar CardanoBlock (Interpreter (CardanoEras StandardCrypto))
-> RunDBThreadFunction
-> Network.NodeToClientVersion
-> ClientCodecs CardanoBlock IO
-> ConnectionId LocalAddress
-> NodeToClientProtocols 'InitiatorMode BSL.ByteString IO () Void
dbSyncProtocols trce env plugin metrics queryVar runDBThreadFunction version codecs _connectionId =
dbSyncProtocols metricsLayer trce env plugin queryVar runDBThreadFunction version codecs _connectionId =
NodeToClientProtocols
{ localChainSyncProtocol = localChainSyncPtcl
, localTxSubmissionProtocol = dummylocalTxSubmit
Expand All @@ -252,13 +252,13 @@ dbSyncProtocols trce env plugin metrics queryVar runDBThreadFunction version cod
actionQueue <- newDbActionQueue

race_
(runDBThreadFunction trce env plugin metrics actionQueue)
(runDBThreadFunction metricsLayer trce env plugin actionQueue)
(runPipelinedPeer
localChainSyncTracer
(cChainSyncCodec codecs)
channel
(chainSyncClientPeerPipelined
$ chainSyncClient (envDataLayer env) trce env queryVar metrics latestPoints currentTip actionQueue)
$ chainSyncClient (envDataLayer env) metricsLayer trce env queryVar latestPoints currentTip actionQueue)
)

atomically $ writeDbActionQueue actionQueue DbFinish
Expand Down Expand Up @@ -337,15 +337,15 @@ getCurrentTipBlockNo dataLayer = do
--
chainSyncClient
:: SyncDataLayer
-> MetricsLayer
-> Trace IO Text
-> SyncEnv
-> StateQueryTMVar CardanoBlock (Interpreter (CardanoEras StandardCrypto))
-> Metrics
-> [Point CardanoBlock]
-> WithOrigin BlockNo
-> DbActionQueue
-> ChainSyncClientPipelined CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO ()
chainSyncClient dataLayer trce env queryVar metrics latestPoints currentTip actionQueue = do
chainSyncClient dataLayer metricsLayer trce env queryVar latestPoints currentTip actionQueue = do
ChainSyncClientPipelined $ pure $
-- Notify the core node about the our latest points at which we are
-- synchronised. This client is not persistent and thus it just
Expand Down Expand Up @@ -387,12 +387,20 @@ chainSyncClient dataLayer trce env queryVar metrics latestPoints currentTip acti
ClientStNext
{ recvMsgRollForward = \blk tip ->
logException trce "recvMsgRollForward: " $ do
Gauge.set (withOrigin 0 (fromIntegral . unBlockNo) (getTipBlockNo tip)) (mNodeHeight metrics)

-- Setting the value from network/node
let setNodeBlockHeight = mlSetNodeBlockHeight metricsLayer
setNodeBlockHeight (withOrigin 0 unBlockNo (getTipBlockNo tip))

details <- getSlotDetails trce env queryVar (cardanoBlockSlotNo blk)
newSize <- atomically $ do
writeDbActionQueue actionQueue $ mkDbApply blk details
lengthDbActionQueue actionQueue
Gauge.set (fromIntegral newSize) $ mQueuePostWrite metrics

-- The action queue size
let setQueuePostWrite = mlSetQueuePostWrite metricsLayer
setQueuePostWrite newSize

pure $ finish (At (blockNo blk)) tip
, recvMsgRollBackward = \point tip ->
logException trce "recvMsgRollBackward: " $ do
Expand Down
22 changes: 16 additions & 6 deletions cardano-sync/src/Cardano/Sync/Database.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@ import Cardano.Sync.Api
import Cardano.Sync.DbAction
import Cardano.Sync.Error
import Cardano.Sync.LedgerState
import Cardano.Sync.Metrics
import Cardano.Sync.Plugin
import Cardano.Sync.Types
import Cardano.Sync.Util

import qualified System.Metrics.Prometheus.Metric.Gauge as Gauge

data NextState
= Continue
| Done
Expand All @@ -41,10 +38,10 @@ runDbStartup trce plugin =
mapM_ (\action -> action trce) $ plugOnStartup plugin

runDbThread
:: Trace IO Text -> SyncEnv -> SyncNodePlugin -> Metrics
:: MetricsLayer -> Trace IO Text -> SyncEnv -> SyncNodePlugin
-> DbActionQueue
-> IO ()
runDbThread trce env plugin metrics queue = do
runDbThread metricsLayer trce env plugin queue = do
logInfo trce "Running DB thread"
logException trce "runDBThread: " loop
logInfo trce "Shutting down DB thread"
Expand All @@ -61,9 +58,22 @@ runDbThread trce env plugin metrics queue = do
mBlkNo <- getLatestBlock

-- Chain Maybe's.
-- Here we are extracting information from the database and writing
-- block number into the metrics so we have that available.
case bBlockNo =<< mBlkNo of
Nothing -> pure ()
Just blkNo -> Gauge.set (fromIntegral blkNo) $ mDbHeight metrics
Just blkNo -> do
let setLatestBlockNo = mlSetLatestBlockNo metricsLayer
setLatestBlockNo blkNo

-- Chain Maybe's.
-- Here we are extracting information from the database and writing
-- slot number into the metrics so we have that available.
case bSlotNo =<< mBlkNo of
Nothing -> pure ()
Just slotNo -> do
let setLatestSlotNo = mlSetLatestSlotNo metricsLayer
setLatestSlotNo slotNo

case eNextState of
Left err -> logError trce $ renderSyncNodeError err
Expand Down
Loading

0 comments on commit 8a3a55e

Please sign in to comment.