Skip to content

Commit

Permalink
Merge #3342
Browse files Browse the repository at this point in the history
3342: CAD-3638: extended trace-forward library. r=denisshevchenko a=denisshevchenko

Since both `trace-forward` and `datapoint-forward` specify node-specific protocols, it's better to combine them into one package. This package will be used by `trace-dispatcher` library for forwarding `TraceObject`s and `DataPoint`s from the node to external acceptor app (for example, `cardano-tracer` or `cardano-wallet`).

Co-authored-by: Denis Shevchenko <denis.shevchenko@iohk.io>
  • Loading branch information
iohk-bors[bot] and Denis Shevchenko committed Nov 15, 2021
2 parents 37d86a6 + 7551c6b commit 69c752e
Show file tree
Hide file tree
Showing 40 changed files with 1,424 additions and 847 deletions.
2 changes: 1 addition & 1 deletion cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ packages:
bench/tx-generator
plutus-example/plutus-example
trace-dispatcher
trace-forward
trace-resources
trace-forward

package cardano-api
ghc-options: -Werror
Expand Down
106 changes: 8 additions & 98 deletions trace-dispatcher/src/Cardano/Logging/Tracer/Forward.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,25 @@
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- IMPORTANT: please note that the latest changes in this module are temporary ones.
-- It will be replaced by new dispatcher's code, in "integration PR".

module Cardano.Logging.Tracer.Forward
(
forwardTracer
) where

import Codec.CBOR.Term (Term)
import Control.Concurrent.Async (race_, wait, withAsync)
import Control.Monad.IO.Class

import qualified Control.Tracer as T
import "contra-tracer" Control.Tracer (contramap, stdoutTracer)
import qualified Data.ByteString.Lazy as LBS
import Data.Void (Void)
import Data.Word (Word16)

import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits)
import Ouroboros.Network.ErrorPolicy (nullErrorPolicies)
import Ouroboros.Network.IOManager (IOManager)
import Ouroboros.Network.Mux (MiniProtocol (..),
MiniProtocolLimits (..), MiniProtocolNum (..),
MuxMode (..), OuroborosApplication (..),
RunMiniProtocol (..), miniProtocolLimits, miniProtocolNum,
miniProtocolRun)
import Ouroboros.Network.Protocol.Handshake.Codec
(cborTermVersionDataCodec, noTimeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Type (Handshake)
import Ouroboros.Network.Protocol.Handshake.Unversioned
(UnversionedProtocol (..), UnversionedProtocolData (..),
unversionedHandshakeCodec, unversionedProtocolDataCodec)
import Ouroboros.Network.Protocol.Handshake.Version
(acceptableVersion, simpleSingletonVersions)
import Ouroboros.Network.Snocket (Snocket, localAddressFromPath,
localSnocket)
import Ouroboros.Network.Socket (AcceptedConnectionsLimit (..),
SomeResponderApplication (..), cleanNetworkMutableState,
newNetworkMutableState, nullNetworkServerTracers,
withServerNode)

import qualified System.Metrics as EKG
import qualified System.Metrics.Configuration as EKGF
import System.Metrics.Network.Forwarder (forwardEKGMetricsResp)
import qualified Trace.Forward.Configuration as TF
import Trace.Forward.Network.Forwarder (forwardTraceObjectsResp)
import Trace.Forward.Utils
import qualified Trace.Forward.Configuration.TraceObject as TF
import Trace.Forward.Utils.TraceObject

import Cardano.Logging.DocuGenerator
import Cardano.Logging.Types
Expand Down Expand Up @@ -96,7 +70,7 @@ forwardTracer iomgr config = liftIO $ do
tfConfig =
TF.ForwarderConfiguration
{ TF.forwarderTracer = contramap show stdoutTracer
, TF.acceptorEndpoint = TF.LocalPipe p
, TF.acceptorEndpoint = p
, TF.disconnectedQueueSize = 200000
, TF.connectedQueueSize = 2000
}
Expand All @@ -109,70 +83,6 @@ launchForwarders
-> TF.ForwarderConfiguration TraceObject
-> ForwardSink TraceObject
-> IO ()
launchForwarders iomgr ep@(LocalSocket p) store ekgConfig tfConfig sink = flip
withAsync
wait
$ runActionInLoop
(launchForwardersViaLocalSocket iomgr ep (ekgConfig, tfConfig) sink store)
(TF.LocalPipe p)
1

launchForwardersViaLocalSocket
:: IOManager
-> ForwarderAddr
-> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject)
-> ForwardSink TraceObject
-> EKG.Store
-> IO ()
launchForwardersViaLocalSocket iomgr (LocalSocket p) configs sink store = do
let snocket = localSnocket iomgr
address = localAddressFromPath p
doListenToAcceptor snocket address noTimeLimitsHandshake configs sink store

doListenToAcceptor
:: Ord addr
=> Snocket IO fd addr
-> addr
-> ProtocolTimeLimits (Handshake UnversionedProtocol Term)
-> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject)
-> ForwardSink TraceObject
-> EKG.Store
-> IO ()
doListenToAcceptor snocket address timeLimits (ekgConfig, tfConfig) sink store = do
networkState <- newNetworkMutableState
race_ (cleanNetworkMutableState networkState)
$ withServerNode
snocket
nullNetworkServerTracers
networkState
(AcceptedConnectionsLimit maxBound maxBound 0)
address
unversionedHandshakeCodec
timeLimits
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(simpleSingletonVersions
UnversionedProtocol
UnversionedProtocolData
(SomeResponderApplication $
forwarderApp [ (forwardEKGMetricsResp ekgConfig store, 1)
, (forwardTraceObjectsResp tfConfig sink, 2)
]
)
)
nullErrorPolicies
$ \_ serverAsync ->
wait serverAsync -- Block until async exception.
where
forwarderApp
:: [(RunMiniProtocol 'ResponderMode LBS.ByteString IO Void (), Word16)]
-> OuroborosApplication 'ResponderMode addr LBS.ByteString IO Void ()
forwarderApp protocols =
OuroborosApplication $ \_connectionId _shouldStopSTM ->
[ MiniProtocol
{ miniProtocolNum = MiniProtocolNum num
, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound }
, miniProtocolRun = prot
}
| (prot, num) <- protocols
]
launchForwarders _iomgr _ep _store _ekgConfig _tfConfig _sink =
-- Temp code, will be replaced by new dispatcher's code.
return ()
12 changes: 10 additions & 2 deletions trace-forward/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
# trace-forward

`trace-forward` is a library allowing to forward tracing items from one process to another one. It is built upon [`typed-protocols`](https://github.com/input-output-hk/ouroboros-network/tree/master/typed-protocols).
This library specifies two protocols allowing to forward different information from the node to external applications (for example, `cardano-tracer` or `cardano-wallet`). These protocols are built upon [`typed-protocols`](https://github.com/input-output-hk/ouroboros-network/tree/master/typed-protocols).

The `trace-dispatcher` is using `trace-forward` to forward `TraceObject`s from the node to exernal acceptors (for example, `cardano-tracer`).
The first one allows forwarding `TraceObject`s from the node to external applications. You can think of `TraceObject` as a log item, which will be saved in log files.

The second one allows forwarding `DataPoint`s, arbitrary structured data that provides `ToJSON` instance.

Please note that the node doesn't use this library directly. Instead, `trace-dispatcher` library is using it to forward mentioned data via different tracers.

# Demo

These protocols are `typed-protocols`-based, so they can be `Mux`-ed for use via the same network connection. The corresponding demo is provided by `cardano-tracer` project.

## Developers

Expand Down
28 changes: 0 additions & 28 deletions trace-forward/src/Trace/Forward/Acceptor.hs

This file was deleted.

31 changes: 31 additions & 0 deletions trace-forward/src/Trace/Forward/Configuration/DataPoint.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
module Trace.Forward.Configuration.DataPoint
( AcceptorConfiguration (..)
, ForwarderConfiguration (..)
) where

import Control.Concurrent.STM.TVar (TVar)
import Control.Tracer (Tracer)

import Ouroboros.Network.Driver (TraceSendRecv)

import Trace.Forward.Protocol.DataPoint.Type

-- | Acceptor configuration, parameterized by trace item's type.
data AcceptorConfiguration = AcceptorConfiguration
{ -- | The tracer that will be used by the acceptor in its network layer.
acceptorTracer :: !(Tracer IO (TraceSendRecv DataPointForward))
-- | The endpoint that will be used to listen to the forwarder.
, forwarderEndpoint :: !FilePath
-- | 'TVar' that can be used as a brake: if an external thread sets
-- it to 'True', the acceptor will send 'MsgDone' message to the
-- forwarder and their session will be closed.
, shouldWeStop :: !(TVar Bool)
}

-- | Forwarder configuration, parameterized by trace item's type.
data ForwarderConfiguration = ForwarderConfiguration
{ -- | The tracer that will be used by the forwarder in its network layer.
forwarderTracer :: !(Tracer IO (TraceSendRecv DataPointForward))
-- | The endpoint that will be used to connect to the acceptor.
, acceptorEndpoint :: !FilePath
}
Original file line number Diff line number Diff line change
@@ -1,40 +1,37 @@
module Trace.Forward.Configuration
module Trace.Forward.Configuration.TraceObject
( AcceptorConfiguration (..)
, ForwarderConfiguration (..)
, HowToConnect (..)
) where

import Control.Concurrent.STM.TVar (TVar)
import Control.Tracer (Tracer)
import GHC.Conc (TVar)
import Ouroboros.Network.Driver (TraceSendRecv)

import Trace.Forward.Protocol.Type
import Ouroboros.Network.Driver (TraceSendRecv)

-- | Specifies how to connect to the peer.
-- Currently, only local socket/pipe is used.
newtype HowToConnect = LocalPipe FilePath
deriving Show
import Trace.Forward.Protocol.TraceObject.Type

-- | Acceptor configuration, parameterized by trace item's type.
data AcceptorConfiguration lo = AcceptorConfiguration
{ -- | The tracer that will be used by the acceptor in its network layer.
acceptorTracer :: !(Tracer IO (TraceSendRecv (TraceForward lo)))
acceptorTracer :: !(Tracer IO (TraceSendRecv (TraceObjectForward lo)))
-- | The endpoint that will be used to listen to the forwarder.
, forwarderEndpoint :: !HowToConnect
-- Only local socket/pipe is supported.
, forwarderEndpoint :: !FilePath
-- | The request specifies how many 'TraceObject's will be requested.
, whatToRequest :: !NumberOfTraceObjects
, whatToRequest :: !NumberOfTraceObjects
-- | 'TVar' that can be used as a brake: if an external thread sets
-- it to 'True', the acceptor will send 'MsgDone' message to the
-- forwarder and their session will be closed.
, shouldWeStop :: !(TVar Bool)
, shouldWeStop :: !(TVar Bool)
}

-- | Forwarder configuration, parameterized by trace item's type.
data ForwarderConfiguration lo = ForwarderConfiguration
{ -- | The tracer that will be used by the forwarder in its network layer.
forwarderTracer :: !(Tracer IO (TraceSendRecv (TraceForward lo)))
forwarderTracer :: !(Tracer IO (TraceSendRecv (TraceObjectForward lo)))
-- | The endpoint that will be used to connect to the acceptor.
, acceptorEndpoint :: !HowToConnect
-- Only local socket/pipe is supported.
, acceptorEndpoint :: !FilePath
-- | The big size of internal queue for tracing items. We use it in
-- the beginning of the session, to avoid queue overflow, because
-- initially there is no connection with acceptor yet, and the
Expand Down
27 changes: 0 additions & 27 deletions trace-forward/src/Trace/Forward/Forwarder.hs

This file was deleted.

Loading

0 comments on commit 69c752e

Please sign in to comment.