diff --git a/cabal.project b/cabal.project index 86c76f87600..115fbce9046 100644 --- a/cabal.project +++ b/cabal.project @@ -13,8 +13,8 @@ packages: bench/tx-generator plutus-example/plutus-example trace-dispatcher - trace-forward trace-resources + trace-forward package cardano-api ghc-options: -Werror diff --git a/trace-dispatcher/src/Cardano/Logging/Tracer/Forward.hs b/trace-dispatcher/src/Cardano/Logging/Tracer/Forward.hs index 42d2f8ecadc..68abed59d55 100644 --- a/trace-dispatcher/src/Cardano/Logging/Tracer/Forward.hs +++ b/trace-dispatcher/src/Cardano/Logging/Tracer/Forward.hs @@ -4,51 +4,25 @@ {-# LANGUAGE PackageImports #-} {-# LANGUAGE ScopedTypeVariables #-} +-- IMPORTANT: please note that the latest changes in this module are temporary ones. +-- It will be replaced by new dispatcher's code, in "integration PR". module Cardano.Logging.Tracer.Forward ( forwardTracer ) where -import Codec.CBOR.Term (Term) -import Control.Concurrent.Async (race_, wait, withAsync) import Control.Monad.IO.Class import qualified Control.Tracer as T import "contra-tracer" Control.Tracer (contramap, stdoutTracer) -import qualified Data.ByteString.Lazy as LBS -import Data.Void (Void) -import Data.Word (Word16) -import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) -import Ouroboros.Network.ErrorPolicy (nullErrorPolicies) import Ouroboros.Network.IOManager (IOManager) -import Ouroboros.Network.Mux (MiniProtocol (..), - MiniProtocolLimits (..), MiniProtocolNum (..), - MuxMode (..), OuroborosApplication (..), - RunMiniProtocol (..), miniProtocolLimits, miniProtocolNum, - miniProtocolRun) -import Ouroboros.Network.Protocol.Handshake.Codec - (cborTermVersionDataCodec, noTimeLimitsHandshake) -import Ouroboros.Network.Protocol.Handshake.Type (Handshake) -import Ouroboros.Network.Protocol.Handshake.Unversioned - (UnversionedProtocol (..), UnversionedProtocolData (..), - unversionedHandshakeCodec, unversionedProtocolDataCodec) -import Ouroboros.Network.Protocol.Handshake.Version - (acceptableVersion, simpleSingletonVersions) -import Ouroboros.Network.Snocket (Snocket, localAddressFromPath, - localSnocket) -import Ouroboros.Network.Socket (AcceptedConnectionsLimit (..), - SomeResponderApplication (..), cleanNetworkMutableState, - newNetworkMutableState, nullNetworkServerTracers, - withServerNode) import qualified System.Metrics as EKG import qualified System.Metrics.Configuration as EKGF -import System.Metrics.Network.Forwarder (forwardEKGMetricsResp) -import qualified Trace.Forward.Configuration as TF -import Trace.Forward.Network.Forwarder (forwardTraceObjectsResp) -import Trace.Forward.Utils +import qualified Trace.Forward.Configuration.TraceObject as TF +import Trace.Forward.Utils.TraceObject import Cardano.Logging.DocuGenerator import Cardano.Logging.Types @@ -96,7 +70,7 @@ forwardTracer iomgr config = liftIO $ do tfConfig = TF.ForwarderConfiguration { TF.forwarderTracer = contramap show stdoutTracer - , TF.acceptorEndpoint = TF.LocalPipe p + , TF.acceptorEndpoint = p , TF.disconnectedQueueSize = 200000 , TF.connectedQueueSize = 2000 } @@ -109,70 +83,6 @@ launchForwarders -> TF.ForwarderConfiguration TraceObject -> ForwardSink TraceObject -> IO () -launchForwarders iomgr ep@(LocalSocket p) store ekgConfig tfConfig sink = flip - withAsync - wait - $ runActionInLoop - (launchForwardersViaLocalSocket iomgr ep (ekgConfig, tfConfig) sink store) - (TF.LocalPipe p) - 1 - -launchForwardersViaLocalSocket - :: IOManager - -> ForwarderAddr - -> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject) - -> ForwardSink TraceObject - -> EKG.Store - -> IO () -launchForwardersViaLocalSocket iomgr (LocalSocket p) configs sink store = do - let snocket = localSnocket iomgr - address = localAddressFromPath p - doListenToAcceptor snocket address noTimeLimitsHandshake configs sink store - -doListenToAcceptor - :: Ord addr - => Snocket IO fd addr - -> addr - -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) - -> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject) - -> ForwardSink TraceObject - -> EKG.Store - -> IO () -doListenToAcceptor snocket address timeLimits (ekgConfig, tfConfig) sink store = do - networkState <- newNetworkMutableState - race_ (cleanNetworkMutableState networkState) - $ withServerNode - snocket - nullNetworkServerTracers - networkState - (AcceptedConnectionsLimit maxBound maxBound 0) - address - unversionedHandshakeCodec - timeLimits - (cborTermVersionDataCodec unversionedProtocolDataCodec) - acceptableVersion - (simpleSingletonVersions - UnversionedProtocol - UnversionedProtocolData - (SomeResponderApplication $ - forwarderApp [ (forwardEKGMetricsResp ekgConfig store, 1) - , (forwardTraceObjectsResp tfConfig sink, 2) - ] - ) - ) - nullErrorPolicies - $ \_ serverAsync -> - wait serverAsync -- Block until async exception. - where - forwarderApp - :: [(RunMiniProtocol 'ResponderMode LBS.ByteString IO Void (), Word16)] - -> OuroborosApplication 'ResponderMode addr LBS.ByteString IO Void () - forwarderApp protocols = - OuroborosApplication $ \_connectionId _shouldStopSTM -> - [ MiniProtocol - { miniProtocolNum = MiniProtocolNum num - , miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound } - , miniProtocolRun = prot - } - | (prot, num) <- protocols - ] +launchForwarders _iomgr _ep _store _ekgConfig _tfConfig _sink = + -- Temp code, will be replaced by new dispatcher's code. + return () diff --git a/trace-forward/README.md b/trace-forward/README.md index 435f147f835..6c6a49ca5d4 100644 --- a/trace-forward/README.md +++ b/trace-forward/README.md @@ -1,8 +1,16 @@ # trace-forward -`trace-forward` is a library allowing to forward tracing items from one process to another one. It is built upon [`typed-protocols`](https://github.com/input-output-hk/ouroboros-network/tree/master/typed-protocols). +This library specifies two protocols allowing to forward different information from the node to external applications (for example, `cardano-tracer`). These protocols are built upon [`typed-protocols`](https://github.com/input-output-hk/ouroboros-network/tree/master/typed-protocols). -The `trace-dispatcher` is using `trace-forward` to forward `TraceObject`s from the node to exernal acceptors (for example, `cardano-tracer`). +The first one allows forwarding `TraceObject`s from the node to external applications. You can think of `TraceObject` as a log item, which will be saved in log files. + +The second one allows forwarding `DataPoint`s, arbitrary structured data that provides `ToJSON` instance. + +Please note that the node doesn't use this library directly. Instead, `trace-dispatcher` library is using it to forward mentioned data via different tracers. + +# Demo + +These protocols are `typed-protocols`-based, so they can be `Mux`-ed for use via the same network connection. The corresponding demo is provided by `cardano-tracer` project. ## Developers diff --git a/trace-forward/src/Trace/Forward/Acceptor.hs b/trace-forward/src/Trace/Forward/Acceptor.hs deleted file mode 100644 index 5addbc0e048..00000000000 --- a/trace-forward/src/Trace/Forward/Acceptor.hs +++ /dev/null @@ -1,28 +0,0 @@ -{-# LANGUAGE NamedFieldPuns #-} - --- | This top-level module will be used by the acceptor application. --- Acceptor application asks 'TraceObject's from the forwarder application. -module Trace.Forward.Acceptor - ( runTraceAcceptor - ) where - -import qualified Codec.Serialise as CBOR -import Data.Typeable (Typeable) - -import Ouroboros.Network.IOManager (IOManager) -import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) - -import Trace.Forward.Network.Acceptor (listenToForwarder) -import Trace.Forward.Configuration (AcceptorConfiguration (..)) -import Trace.Forward.Utils (runActionInLoop) - -runTraceAcceptor - :: (CBOR.Serialise lo, - ShowProxy lo, - Typeable lo) - => IOManager -- ^ 'IOManager' from the external application. - -> AcceptorConfiguration lo -- ^ Acceptor configuration. - -> ([lo] -> IO ()) -- ^ The handler for 'TraceObject's received from the node. - -> IO () -runTraceAcceptor iomgr config@AcceptorConfiguration{forwarderEndpoint} loHandler = - runActionInLoop (listenToForwarder iomgr config loHandler) forwarderEndpoint 1 diff --git a/trace-forward/src/Trace/Forward/Configuration/DataPoint.hs b/trace-forward/src/Trace/Forward/Configuration/DataPoint.hs new file mode 100644 index 00000000000..bc41de0f322 --- /dev/null +++ b/trace-forward/src/Trace/Forward/Configuration/DataPoint.hs @@ -0,0 +1,31 @@ +module Trace.Forward.Configuration.DataPoint + ( AcceptorConfiguration (..) + , ForwarderConfiguration (..) + ) where + +import Control.Concurrent.STM.TVar (TVar) +import Control.Tracer (Tracer) + +import Ouroboros.Network.Driver (TraceSendRecv) + +import Trace.Forward.Protocol.DataPoint.Type + +-- | Acceptor configuration, parameterized by trace item's type. +data AcceptorConfiguration = AcceptorConfiguration + { -- | The tracer that will be used by the acceptor in its network layer. + acceptorTracer :: !(Tracer IO (TraceSendRecv DataPointForward)) + -- | The endpoint that will be used to listen to the forwarder. + , forwarderEndpoint :: !FilePath + -- | 'TVar' that can be used as a brake: if an external thread sets + -- it to 'True', the acceptor will send 'MsgDone' message to the + -- forwarder and their session will be closed. + , shouldWeStop :: !(TVar Bool) + } + +-- | Forwarder configuration, parameterized by trace item's type. +data ForwarderConfiguration = ForwarderConfiguration + { -- | The tracer that will be used by the forwarder in its network layer. + forwarderTracer :: !(Tracer IO (TraceSendRecv DataPointForward)) + -- | The endpoint that will be used to connect to the acceptor. + , acceptorEndpoint :: !FilePath + } diff --git a/trace-forward/src/Trace/Forward/Configuration.hs b/trace-forward/src/Trace/Forward/Configuration/TraceObject.hs similarity index 73% rename from trace-forward/src/Trace/Forward/Configuration.hs rename to trace-forward/src/Trace/Forward/Configuration/TraceObject.hs index f627972fe18..a1cc05081c0 100644 --- a/trace-forward/src/Trace/Forward/Configuration.hs +++ b/trace-forward/src/Trace/Forward/Configuration/TraceObject.hs @@ -1,40 +1,37 @@ -module Trace.Forward.Configuration +module Trace.Forward.Configuration.TraceObject ( AcceptorConfiguration (..) , ForwarderConfiguration (..) - , HowToConnect (..) ) where +import Control.Concurrent.STM.TVar (TVar) import Control.Tracer (Tracer) -import GHC.Conc (TVar) -import Ouroboros.Network.Driver (TraceSendRecv) -import Trace.Forward.Protocol.Type +import Ouroboros.Network.Driver (TraceSendRecv) --- | Specifies how to connect to the peer. --- Currently, only local socket/pipe is used. -newtype HowToConnect = LocalPipe FilePath - deriving Show +import Trace.Forward.Protocol.TraceObject.Type -- | Acceptor configuration, parameterized by trace item's type. data AcceptorConfiguration lo = AcceptorConfiguration { -- | The tracer that will be used by the acceptor in its network layer. - acceptorTracer :: !(Tracer IO (TraceSendRecv (TraceForward lo))) + acceptorTracer :: !(Tracer IO (TraceSendRecv (TraceObjectForward lo))) -- | The endpoint that will be used to listen to the forwarder. - , forwarderEndpoint :: !HowToConnect + -- Only local socket/pipe is supported. + , forwarderEndpoint :: !FilePath -- | The request specifies how many 'TraceObject's will be requested. - , whatToRequest :: !NumberOfTraceObjects + , whatToRequest :: !NumberOfTraceObjects -- | 'TVar' that can be used as a brake: if an external thread sets -- it to 'True', the acceptor will send 'MsgDone' message to the -- forwarder and their session will be closed. - , shouldWeStop :: !(TVar Bool) + , shouldWeStop :: !(TVar Bool) } -- | Forwarder configuration, parameterized by trace item's type. data ForwarderConfiguration lo = ForwarderConfiguration { -- | The tracer that will be used by the forwarder in its network layer. - forwarderTracer :: !(Tracer IO (TraceSendRecv (TraceForward lo))) + forwarderTracer :: !(Tracer IO (TraceSendRecv (TraceObjectForward lo))) -- | The endpoint that will be used to connect to the acceptor. - , acceptorEndpoint :: !HowToConnect + -- Only local socket/pipe is supported. + , acceptorEndpoint :: !FilePath -- | The big size of internal queue for tracing items. We use it in -- the beginning of the session, to avoid queue overflow, because -- initially there is no connection with acceptor yet, and the diff --git a/trace-forward/src/Trace/Forward/Forwarder.hs b/trace-forward/src/Trace/Forward/Forwarder.hs deleted file mode 100644 index 48c960d8123..00000000000 --- a/trace-forward/src/Trace/Forward/Forwarder.hs +++ /dev/null @@ -1,27 +0,0 @@ -{-# LANGUAGE NamedFieldPuns #-} - --- This top-level module will be used by the forwarder application. --- Forwarder application collects 'TraceObject's and sends them to --- the acceptor application. -module Trace.Forward.Forwarder - ( runTraceForwarder - ) where - -import qualified Codec.Serialise as CBOR - -import Ouroboros.Network.IOManager (IOManager) -import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) - -import Trace.Forward.Configuration (ForwarderConfiguration (..)) -import Trace.Forward.Network.Forwarder (connectToAcceptor) -import Trace.Forward.Utils - -runTraceForwarder - :: (CBOR.Serialise lo, - ShowProxy lo) - => IOManager -- ^ 'IOManager' from the external application. - -> ForwarderConfiguration lo -- ^ Forwarder configuration. - -> ForwardSink lo -- ^ Forward "sink" that will be used to write tracing items. - -> IO () -runTraceForwarder iomgr config@ForwarderConfiguration{acceptorEndpoint} forwardSink = - runActionInLoop (connectToAcceptor iomgr config forwardSink) acceptorEndpoint 1 diff --git a/trace-forward/src/Trace/Forward/Network/Acceptor.hs b/trace-forward/src/Trace/Forward/Network/Acceptor.hs deleted file mode 100644 index ba7a6df42b3..00000000000 --- a/trace-forward/src/Trace/Forward/Network/Acceptor.hs +++ /dev/null @@ -1,185 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE NumericUnderscores #-} - -module Trace.Forward.Network.Acceptor - ( listenToForwarder - -- | Export this function for Mux purpose. - , acceptTraceObjects - , acceptTraceObjectsInit - , Timeout (..) - ) where - -import Codec.CBOR.Term (Term) -import qualified Codec.Serialise as CBOR -import Control.Concurrent.Async (race, race_, wait) -import Control.Monad.Extra (ifM) -import Control.Monad.STM (atomically, check) -import Control.Concurrent.STM.TVar (TVar, readTVar, readTVarIO, registerDelay) -import Control.Exception (Exception, throwIO) -import qualified Data.ByteString.Lazy as LBS -import Data.Typeable (Typeable) -import Data.Void (Void) -import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..), - MiniProtocolNum (..), MuxMode (..), - OuroborosApplication (..), MuxPeer (..), - RunMiniProtocol (..), - miniProtocolLimits, miniProtocolNum, miniProtocolRun) -import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) -import Ouroboros.Network.Driver.Simple (runPeer) -import Ouroboros.Network.ErrorPolicy (nullErrorPolicies) -import Ouroboros.Network.IOManager (IOManager) -import Ouroboros.Network.Snocket (Snocket, localAddressFromPath, localSnocket) -import Ouroboros.Network.Socket (AcceptedConnectionsLimit (..), - SomeResponderApplication (..), - cleanNetworkMutableState, newNetworkMutableState, - nullNetworkServerTracers, withServerNode) -import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, - noTimeLimitsHandshake) -import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..), - UnversionedProtocolData (..), - unversionedHandshakeCodec, - unversionedProtocolDataCodec) -import Ouroboros.Network.Protocol.Handshake.Type (Handshake) -import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, - simpleSingletonVersions) -import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) - -import qualified Trace.Forward.Protocol.Acceptor as Acceptor -import qualified Trace.Forward.Protocol.Codec as Acceptor -import Trace.Forward.Protocol.Type -import Trace.Forward.Queue (getTraceObjects) -import Trace.Forward.Configuration (AcceptorConfiguration (..), HowToConnect (..)) - -listenToForwarder - :: (CBOR.Serialise lo, - ShowProxy lo, - Typeable lo) - => IOManager - -> AcceptorConfiguration lo - -> ([lo] -> IO ()) - -> IO () -listenToForwarder iomgr config@AcceptorConfiguration{forwarderEndpoint} loHandler = do - let (LocalPipe localPipe) = forwarderEndpoint - snocket = localSnocket iomgr - address = localAddressFromPath localPipe - doListenToForwarder snocket address noTimeLimitsHandshake app - where - app = - -- TODO: There's _shouldStopSTM and 'shouldWeStop' in - -- 'AcceptorConfiguration'. Currently 'ouroboros-network' does not exposes - -- the write end of `_shouldStopSTM`, if it did we could use it instead of - -- 'shouldWeStop'. - OuroborosApplication $ \_connectionId _shouldStopSTM -> - [ MiniProtocol - { miniProtocolNum = MiniProtocolNum 1 - , miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound } - , miniProtocolRun = acceptTraceObjects config loHandler - } - ] - -doListenToForwarder - :: Ord addr - => Snocket IO fd addr - -> addr - -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) - -> OuroborosApplication 'ResponderMode addr LBS.ByteString IO Void () - -> IO () -doListenToForwarder snocket address timeLimits app = do - networkState <- newNetworkMutableState - race_ (cleanNetworkMutableState networkState) - $ withServerNode - snocket - nullNetworkServerTracers - networkState - (AcceptedConnectionsLimit maxBound maxBound 0) - address - unversionedHandshakeCodec - timeLimits - (cborTermVersionDataCodec unversionedProtocolDataCodec) - acceptableVersion - (simpleSingletonVersions - UnversionedProtocol - UnversionedProtocolData - (SomeResponderApplication app)) - nullErrorPolicies - $ \_ serverAsync -> wait serverAsync -- Block until async exception. - -acceptTraceObjects - :: (CBOR.Serialise lo, - ShowProxy lo, - Typeable lo) - => AcceptorConfiguration lo - -> ([lo] -> IO ()) - -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () -acceptTraceObjects config loHandler = - ResponderProtocolOnly $ - MuxPeerRaw $ \channel -> - timeoutWhenStopped - (shouldWeStop config) - 15_000 -- 15sec - $ runPeer - (acceptorTracer config) - (Acceptor.codecTraceForward CBOR.encode CBOR.decode - CBOR.encode CBOR.decode) - channel - (Acceptor.traceAcceptorPeer $ - acceptorActions config loHandler) - -acceptTraceObjectsInit - :: (CBOR.Serialise lo, - ShowProxy lo, - Typeable lo) - => AcceptorConfiguration lo - -> ([lo] -> IO ()) - -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void -acceptTraceObjectsInit config loHandler = - InitiatorProtocolOnly $ - MuxPeerRaw $ \channel -> - runPeer - (acceptorTracer config) - (Acceptor.codecTraceForward CBOR.encode CBOR.decode - CBOR.encode CBOR.decode) - channel - (Acceptor.traceAcceptorPeer $ - acceptorActions config loHandler) - -acceptorActions - :: (CBOR.Serialise lo, - ShowProxy lo, - Typeable lo) - => AcceptorConfiguration lo -- ^ Acceptor's configuration. - -> ([lo] -> IO ()) -- ^ The handler for accepted 'TraceObject's. - -> Acceptor.TraceAcceptor lo IO () -acceptorActions config@AcceptorConfiguration{whatToRequest, shouldWeStop} loHandler = - -- We are able to send request for: - -- 1. node's info, - -- 2. new 'TraceObject's. - -- But request for node's info should be sent only once (in the beginning of session). - Acceptor.SendMsgTraceObjectsRequest TokBlocking whatToRequest $ \replyWithTraceObjects -> do - loHandler $ getTraceObjects replyWithTraceObjects - ifM (readTVarIO shouldWeStop) - (return $ Acceptor.SendMsgDone $ return ()) - (return $ acceptorActions config loHandler) - -data Timeout = Timeout - deriving (Typeable, Show) - -instance Exception Timeout where - --- | Timeout shutdown of an action. It can run only for specified miliseconds --- once the 'TVar' is set to 'True'. --- -timeoutWhenStopped :: TVar Bool - -> Int -- timeout in miliseconds - -> IO a - -> IO a -timeoutWhenStopped stopVar delay io = - either id id <$> - race io - ( do atomically (readTVar stopVar >>= check) - v <- registerDelay delay - atomically (readTVar v >>= check) - throwIO Timeout - ) diff --git a/trace-forward/src/Trace/Forward/Network/Forwarder.hs b/trace-forward/src/Trace/Forward/Network/Forwarder.hs deleted file mode 100644 index 6a51f4c036e..00000000000 --- a/trace-forward/src/Trace/Forward/Network/Forwarder.hs +++ /dev/null @@ -1,114 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE NamedFieldPuns #-} - -module Trace.Forward.Network.Forwarder - ( connectToAcceptor - -- | Export this function for Mux purpose. - , forwardTraceObjects - , forwardTraceObjectsResp - ) where - -import Codec.CBOR.Term (Term) -import qualified Codec.Serialise as CBOR -import qualified Data.ByteString.Lazy as LBS -import Data.Void (Void) -import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) -import Ouroboros.Network.Driver.Simple (runPeer) -import Ouroboros.Network.IOManager (IOManager) -import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..), - MiniProtocolNum (..), MuxMode (..), - OuroborosApplication (..), MuxPeer (..), - RunMiniProtocol (..), - miniProtocolLimits, miniProtocolNum, miniProtocolRun) -import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, - noTimeLimitsHandshake) -import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..), - UnversionedProtocolData (..), - unversionedHandshakeCodec, - unversionedProtocolDataCodec) -import Ouroboros.Network.Protocol.Handshake.Type (Handshake) -import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, simpleSingletonVersions) -import Ouroboros.Network.Snocket (Snocket, localAddressFromPath, localSnocket) -import Ouroboros.Network.Socket (connectToNode, nullNetworkConnectTracers) -import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) - -import Trace.Forward.Configuration (ForwarderConfiguration (..), HowToConnect (..)) -import Trace.Forward.Queue (readItems) -import Trace.Forward.Utils -import qualified Trace.Forward.Protocol.Forwarder as Forwarder -import qualified Trace.Forward.Protocol.Codec as Forwarder - -connectToAcceptor - :: (CBOR.Serialise lo, - ShowProxy lo) - => IOManager - -> ForwarderConfiguration lo - -> ForwardSink lo - -> IO () -connectToAcceptor iomgr config@ForwarderConfiguration{acceptorEndpoint} sink = do - let (LocalPipe localPipe) = acceptorEndpoint - snocket = localSnocket iomgr - address = localAddressFromPath localPipe - doConnectToAcceptor snocket address noTimeLimitsHandshake app - where - app = - OuroborosApplication $ \_connectionId _shouldStopSTM -> - [ MiniProtocol - { miniProtocolNum = MiniProtocolNum 1 - , miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound } - , miniProtocolRun = forwardTraceObjects config sink - } - ] - -doConnectToAcceptor - :: Snocket IO fd addr - -> addr - -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) - -> OuroborosApplication 'InitiatorMode addr LBS.ByteString IO () Void - -> IO () -doConnectToAcceptor snocket address timeLimits app = - connectToNode - snocket - unversionedHandshakeCodec - timeLimits - (cborTermVersionDataCodec unversionedProtocolDataCodec) - nullNetworkConnectTracers - acceptableVersion - (simpleSingletonVersions - UnversionedProtocol - UnversionedProtocolData - app) - Nothing - address - -forwardTraceObjects - :: (CBOR.Serialise lo, - ShowProxy lo) - => ForwarderConfiguration lo - -> ForwardSink lo - -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void -forwardTraceObjects config sink = - InitiatorProtocolOnly $ - MuxPeerRaw $ \channel -> - runPeer - (forwarderTracer config) - (Forwarder.codecTraceForward CBOR.encode CBOR.decode - CBOR.encode CBOR.decode) - channel - (Forwarder.traceForwarderPeer $ readItems sink) - -forwardTraceObjectsResp - :: (CBOR.Serialise lo, - ShowProxy lo) - => ForwarderConfiguration lo - -> ForwardSink lo - -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () -forwardTraceObjectsResp config sink = - ResponderProtocolOnly $ - MuxPeerRaw $ \channel -> - runPeer - (forwarderTracer config) - (Forwarder.codecTraceForward CBOR.encode CBOR.decode - CBOR.encode CBOR.decode) - channel - (Forwarder.traceForwarderPeer $ readItems sink) diff --git a/trace-forward/src/Trace/Forward/Protocol/DataPoint/Acceptor.hs b/trace-forward/src/Trace/Forward/Protocol/DataPoint/Acceptor.hs new file mode 100644 index 00000000000..040b2d510a8 --- /dev/null +++ b/trace-forward/src/Trace/Forward/Protocol/DataPoint/Acceptor.hs @@ -0,0 +1,55 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RankNTypes #-} + +-- | A view of the trace forwarding/accepting protocol +-- from the point of view of the client. +-- +-- For execution, a conversion into the typed protocol is provided. +-- +module Trace.Forward.Protocol.DataPoint.Acceptor + ( DataPointAcceptor(..) + , dataPointAcceptorPeer + ) where + +import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..), + PeerRole (..)) + +import Trace.Forward.Protocol.DataPoint.Type + +data DataPointAcceptor m a where + SendMsgDataPointsRequest + :: [DataPointName] + -> (DataPointValues -> m (DataPointAcceptor m a)) + -> DataPointAcceptor m a + + SendMsgDone + :: m a + -> DataPointAcceptor m a + +-- | Interpret a particular action sequence into the client side of the protocol. +-- +dataPointAcceptorPeer + :: Monad m + => DataPointAcceptor m a + -> Peer DataPointForward 'AsClient 'StIdle m a +dataPointAcceptorPeer = \case + SendMsgDataPointsRequest request next -> + -- Send our message (request for new 'DataPoint's from the forwarder). + Yield (ClientAgency TokIdle) (MsgDataPointsRequest request) $ + -- We're now into the 'StBusy' state, and now we'll wait for a reply + -- from the forwarder. It is assuming that the forwarder will reply + -- immediately (even there are no 'DataPoint's). + Await (ServerAgency TokBusy) $ \(MsgDataPointsReply reply) -> + Effect $ + dataPointAcceptorPeer <$> next reply + + SendMsgDone getResult -> + -- We do an actual transition using 'yield', to go from the 'StIdle' to + -- 'StDone' state. Once in the 'StDone' state we can actually stop using + -- 'done', with a return value. + Effect $ + Yield (ClientAgency TokIdle) MsgDone . Done TokDone + <$> getResult diff --git a/trace-forward/src/Trace/Forward/Protocol/DataPoint/Codec.hs b/trace-forward/src/Trace/Forward/Protocol/DataPoint/Codec.hs new file mode 100644 index 00000000000..39d9e46706a --- /dev/null +++ b/trace-forward/src/Trace/Forward/Protocol/DataPoint/Codec.hs @@ -0,0 +1,83 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Trace.Forward.Protocol.DataPoint.Codec + ( codecDataPointForward + ) where + +import qualified Codec.CBOR.Decoding as CBOR +import qualified Codec.CBOR.Encoding as CBOR +import Codec.CBOR.Read (DeserialiseFailure) +import Control.Monad.Class.MonadST (MonadST) +import qualified Data.ByteString.Lazy as LBS +import Text.Printf (printf) + +import Network.TypedProtocol.Codec (Codec, PeerHasAgency (..), + PeerRole (..), SomeMessage (..)) +import Network.TypedProtocol.Codec.CBOR (mkCodecCborLazyBS) + +import Trace.Forward.Protocol.DataPoint.Type + +codecDataPointForward + :: forall m. + MonadST m + => ([DataPointName] -> CBOR.Encoding) -- ^ Encoder for 'Request'. + -> (forall s . CBOR.Decoder s [DataPointName]) -- ^ Decoder for 'Request'. + -> (DataPointValues -> CBOR.Encoding) -- ^ Encoder for reply with list of 'DataPoint's values. + -> (forall s . CBOR.Decoder s DataPointValues) -- ^ Decoder for reply with list of 'DataPoint's values. + -> Codec DataPointForward + DeserialiseFailure m LBS.ByteString +codecDataPointForward encodeRequest decodeRequest + encodeReplyList decodeReplyList = + mkCodecCborLazyBS encode decode + where + -- Encode messages. + encode + :: forall (pr :: PeerRole) + (st :: DataPointForward) + (st' :: DataPointForward). + PeerHasAgency pr st + -> Message DataPointForward st st' + -> CBOR.Encoding + + encode (ClientAgency TokIdle) (MsgDataPointsRequest request) = + CBOR.encodeListLen 2 + <> CBOR.encodeWord 1 + <> encodeRequest request + + encode (ClientAgency TokIdle) MsgDone = + CBOR.encodeListLen 1 + <> CBOR.encodeWord 2 + + encode (ServerAgency TokBusy) (MsgDataPointsReply reply) = + CBOR.encodeListLen 2 + <> CBOR.encodeWord 3 + <> encodeReplyList reply + + -- Decode messages + decode + :: forall (pr :: PeerRole) + (st :: DataPointForward) s. + PeerHasAgency pr st + -> CBOR.Decoder s (SomeMessage st) + decode stok = do + len <- CBOR.decodeListLen + key <- CBOR.decodeWord + case (key, len, stok) of + (1, 2, ClientAgency TokIdle) -> + SomeMessage . MsgDataPointsRequest <$> decodeRequest + + (2, 1, ClientAgency TokIdle) -> + return $ SomeMessage MsgDone + + (3, 2, ServerAgency TokBusy) -> + SomeMessage . MsgDataPointsReply <$> decodeReplyList + + -- Failures per protocol state + (_, _, ClientAgency TokIdle) -> + fail (printf "codecDataPointForward (%s) unexpected key (%d, %d)" (show stok) key len) + (_, _, ServerAgency TokBusy) -> + fail (printf "codecDataPointForward (%s) unexpected key (%d, %d)" (show stok) key len) diff --git a/trace-forward/src/Trace/Forward/Protocol/DataPoint/Forwarder.hs b/trace-forward/src/Trace/Forward/Protocol/DataPoint/Forwarder.hs new file mode 100644 index 00000000000..34c566a81b7 --- /dev/null +++ b/trace-forward/src/Trace/Forward/Protocol/DataPoint/Forwarder.hs @@ -0,0 +1,49 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE NamedFieldPuns #-} + +module Trace.Forward.Protocol.DataPoint.Forwarder + ( DataPointForwarder (..) + , dataPointForwarderPeer + ) where + +import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..), + PeerRole (..)) + +import Trace.Forward.Protocol.DataPoint.Type + +data DataPointForwarder m a = DataPointForwarder + { -- | The acceptor sent us a request for new 'DataPoint's. + recvMsgDataPointsRequest + :: [DataPointName] + -> m (DataPointValues, DataPointForwarder m a) + + -- | The acceptor terminated. Here we have a pure return value, but we + -- could have done another action in 'm' if we wanted to. + , recvMsgDone :: m a + } + +-- | Interpret a particular action sequence into the server side of the protocol. +-- +dataPointForwarderPeer + :: Monad m + => DataPointForwarder m a + -> Peer DataPointForward 'AsServer 'StIdle m a +dataPointForwarderPeer DataPointForwarder{recvMsgDataPointsRequest, recvMsgDone} = + -- In the 'StIdle' state the forwarder is awaiting a request message + -- from the acceptor. + Await (ClientAgency TokIdle) $ \case + -- The acceptor sent us a request for new 'DataPoint's, so now we're + -- in the 'StBusy' state which means it's the forwarder's turn to send + -- a reply. + MsgDataPointsRequest request -> Effect $ do + (reply, next) <- recvMsgDataPointsRequest request + return $ Yield (ServerAgency TokBusy) + (MsgDataPointsReply reply) + (dataPointForwarderPeer next) + + -- The acceptor sent the done transition, so we're in the 'StDone' state + -- so all we can do is stop using 'done', with a return value. + MsgDone -> Effect $ Done TokDone <$> recvMsgDone diff --git a/trace-forward/src/Trace/Forward/Protocol/DataPoint/Type.hs b/trace-forward/src/Trace/Forward/Protocol/DataPoint/Type.hs new file mode 100644 index 00000000000..aa0f84c0e16 --- /dev/null +++ b/trace-forward/src/Trace/Forward/Protocol/DataPoint/Type.hs @@ -0,0 +1,121 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE EmptyCase #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE TypeFamilies #-} + +-- | The type of the 'DataPoint' forwarding/accepting protocol. +-- + +module Trace.Forward.Protocol.DataPoint.Type + ( DataPointName + , DataPointValue + , DataPointValues + , DataPointForward (..) + , Message (..) + , ClientHasAgency (..) + , ServerHasAgency (..) + , NobodyHasAgency (..) + ) where + +import qualified Data.ByteString.Lazy as LBS +import Data.Text (Text) + +import Network.TypedProtocol.Core (Protocol (..)) +import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) + +-- | A kind to identify our protocol, and the types of the states in the state +-- transition diagram of the protocol. +-- +-- IMPORTANT NOTE: the following terminology is used: +-- +-- 1. From the protocol's point of view, two peers talk to each other: +-- the forwarder and the acceptor. +-- 2. The forwarder is an application that collects 'DataPoint's and sends +-- them to the acceptor by request (with 'MsgDataPointsReply'). +-- 3. The acceptor is an application that receives 'DataPoint's from the +-- forwarder. +-- 4. You can think of the acceptor as a client, and the forwarder as a server. +-- After the connection is established, the acceptor asks for 'DataPoint's, +-- the forwarder replies to it. + +type DataPointName = Text +type DataPointValue = LBS.ByteString +type DataPointValues = [(DataPointName, Maybe DataPointValue)] + +data DataPointForward where + + -- | Both acceptor and forwarder are in idle state. The acceptor can send a + -- request for a list of 'DataPoint's ('MsgDataPointsRequest'); + -- the forwarder is waiting for a request, it will replay with 'MsgDataPointsReply'. + StIdle :: DataPointForward + + -- | The acceptor has sent a next request for 'DataPoint's. The acceptor is + -- now waiting for a reply, and the forwarder is busy getting ready to send a + -- reply with new list of 'DataPoint's. + StBusy :: DataPointForward + + -- | Both the acceptor and forwarder are in the terminal state. They're done. + StDone :: DataPointForward + +instance ShowProxy DataPointForward where + showProxy _ = "DataPointForward" + +instance Protocol DataPointForward where + + -- | The messages in the trace forwarding/accepting protocol. + -- + data Message DataPointForward from to where + -- | Request the list of 'DataPoint's from the forwarder. + -- State: Idle -> Busy. + MsgDataPointsRequest + :: [DataPointName] + -> Message DataPointForward 'StIdle 'StBusy + + -- | Reply with a list of 'DataPoint's for the acceptor. + -- State: Busy -> Idle. + MsgDataPointsReply + :: DataPointValues + -> Message DataPointForward 'StBusy 'StIdle + + -- | Terminating message. State: Idle -> Done. + MsgDone + :: Message DataPointForward 'StIdle 'StDone + + -- | This is an explanation of our states, in terms of which party has agency + -- in each state. + -- + -- 1. When both peers are in Idle state, the acceptor can send a message + -- to the forwarder (request for new 'DataPoint's), + -- 2. When both peers are in Busy state, the forwarder is expected to send + -- a reply to the acceptor (list of new 'DataPoint's). + -- + -- So we assume that, from __interaction__ point of view: + -- 1. ClientHasAgency (from 'Network.TypedProtocol.Core') corresponds to acceptor's agency. + -- 3. ServerHasAgency (from 'Network.TypedProtocol.Core') corresponds to forwarder's agency. + -- + data ClientHasAgency st where + TokIdle :: ClientHasAgency 'StIdle + + data ServerHasAgency st where + TokBusy :: ServerHasAgency 'StBusy + + data NobodyHasAgency st where + TokDone :: NobodyHasAgency 'StDone + + -- | Impossible cases. + exclusionLemma_ClientAndServerHaveAgency TokIdle tok = case tok of {} + exclusionLemma_NobodyAndClientHaveAgency TokDone tok = case tok of {} + exclusionLemma_NobodyAndServerHaveAgency TokDone tok = case tok of {} + +instance Show (Message DataPointForward from to) where + show MsgDataPointsRequest{} = "MsgDataPointsRequest" + show MsgDataPointsReply{} = "MsgDataPointsReply" + show MsgDone{} = "MsgDone" + +instance Show (ClientHasAgency (st :: DataPointForward)) where + show TokIdle = "TokIdle" + +instance Show (ServerHasAgency (st :: DataPointForward)) where + show TokBusy{} = "TokBusy" diff --git a/trace-forward/src/Trace/Forward/Protocol/Acceptor.hs b/trace-forward/src/Trace/Forward/Protocol/TraceObject/Acceptor.hs similarity index 76% rename from trace-forward/src/Trace/Forward/Protocol/Acceptor.hs rename to trace-forward/src/Trace/Forward/Protocol/TraceObject/Acceptor.hs index 93d3c18c369..494439d8e01 100644 --- a/trace-forward/src/Trace/Forward/Protocol/Acceptor.hs +++ b/trace-forward/src/Trace/Forward/Protocol/TraceObject/Acceptor.hs @@ -9,34 +9,34 @@ -- -- For execution, a conversion into the typed protocol is provided. -- -module Trace.Forward.Protocol.Acceptor - ( TraceAcceptor(..) - , traceAcceptorPeer +module Trace.Forward.Protocol.TraceObject.Acceptor + ( TraceObjectAcceptor(..) + , traceObjectAcceptorPeer ) where import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..), PeerRole (..)) -import Trace.Forward.Protocol.Type +import Trace.Forward.Protocol.TraceObject.Type -data TraceAcceptor lo m a where +data TraceObjectAcceptor lo m a where SendMsgTraceObjectsRequest :: TokBlockingStyle blocking -> NumberOfTraceObjects - -> (BlockingReplyList blocking lo -> m (TraceAcceptor lo m a)) - -> TraceAcceptor lo m a + -> (BlockingReplyList blocking lo -> m (TraceObjectAcceptor lo m a)) + -> TraceObjectAcceptor lo m a SendMsgDone :: m a - -> TraceAcceptor lo m a + -> TraceObjectAcceptor lo m a -- | Interpret a particular action sequence into the client side of the protocol. -- -traceAcceptorPeer +traceObjectAcceptorPeer :: Monad m - => TraceAcceptor lo m a - -> Peer (TraceForward lo) 'AsClient 'StIdle m a -traceAcceptorPeer = \case + => TraceObjectAcceptor lo m a + -> Peer (TraceObjectForward lo) 'AsClient 'StIdle m a +traceObjectAcceptorPeer = \case SendMsgTraceObjectsRequest TokBlocking request next -> -- Send our message (request for new 'TraceObject's from the forwarder). Yield (ClientAgency TokIdle) (MsgTraceObjectsRequest TokBlocking request) $ @@ -44,7 +44,7 @@ traceAcceptorPeer = \case -- from the forwarder. Await (ServerAgency (TokBusy TokBlocking)) $ \(MsgTraceObjectsReply reply) -> Effect $ - traceAcceptorPeer <$> next reply + traceObjectAcceptorPeer <$> next reply SendMsgTraceObjectsRequest TokNonBlocking request next -> -- Send our message (request for new 'TraceObject's from the forwarder). @@ -54,7 +54,7 @@ traceAcceptorPeer = \case -- immediately (even there are no 'TraceObject's). Await (ServerAgency (TokBusy TokNonBlocking)) $ \(MsgTraceObjectsReply reply) -> Effect $ - traceAcceptorPeer <$> next reply + traceObjectAcceptorPeer <$> next reply SendMsgDone getResult -> -- We do an actual transition using 'yield', to go from the 'StIdle' to diff --git a/trace-forward/src/Trace/Forward/Protocol/Codec.hs b/trace-forward/src/Trace/Forward/Protocol/TraceObject/Codec.hs similarity index 68% rename from trace-forward/src/Trace/Forward/Protocol/Codec.hs rename to trace-forward/src/Trace/Forward/Protocol/TraceObject/Codec.hs index 334dea3873e..b1e1925d63c 100644 --- a/trace-forward/src/Trace/Forward/Protocol/Codec.hs +++ b/trace-forward/src/Trace/Forward/Protocol/TraceObject/Codec.hs @@ -4,8 +4,8 @@ {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} -module Trace.Forward.Protocol.Codec ( - codecTraceForward +module Trace.Forward.Protocol.TraceObject.Codec + ( codecTraceObjectForward ) where import qualified Codec.CBOR.Decoding as CBOR @@ -20,46 +20,46 @@ import Network.TypedProtocol.Codec (Codec, PeerHasAgency (..), PeerRole (..), SomeMessage (..)) import Network.TypedProtocol.Codec.CBOR (mkCodecCborLazyBS) -import Trace.Forward.Protocol.Type +import Trace.Forward.Protocol.TraceObject.Type -codecTraceForward +codecTraceObjectForward :: forall lo m. MonadST m => (NumberOfTraceObjects -> CBOR.Encoding) -- ^ Encoder for 'Request'. -> (forall s . CBOR.Decoder s NumberOfTraceObjects) -- ^ Decoder for 'Request'. -> ([lo] -> CBOR.Encoding) -- ^ Encoder for reply with list of 'TraceObject's. -> (forall s . CBOR.Decoder s [lo]) -- ^ Decoder for reply with list of 'TraceObject's. - -> Codec (TraceForward lo) + -> Codec (TraceObjectForward lo) DeserialiseFailure m LBS.ByteString -codecTraceForward encodeRequest decodeRequest - encodeReplyList decodeReplyList = +codecTraceObjectForward encodeRequest decodeRequest + encodeReplyList decodeReplyList = mkCodecCborLazyBS encode decode where -- Encode messages. encode :: forall (pr :: PeerRole) - (st :: TraceForward lo) - (st' :: TraceForward lo). + (st :: TraceObjectForward lo) + (st' :: TraceObjectForward lo). PeerHasAgency pr st - -> Message (TraceForward lo) st st' + -> Message (TraceObjectForward lo) st st' -> CBOR.Encoding encode (ClientAgency TokIdle) (MsgTraceObjectsRequest blocking request) = - CBOR.encodeListLen 3 - <> CBOR.encodeWord 1 - <> CBOR.encodeBool (case blocking of - TokBlocking -> True - TokNonBlocking -> False) - <> encodeRequest request + CBOR.encodeListLen 3 + <> CBOR.encodeWord 1 + <> CBOR.encodeBool (case blocking of + TokBlocking -> True + TokNonBlocking -> False) + <> encodeRequest request encode (ClientAgency TokIdle) MsgDone = - CBOR.encodeListLen 1 - <> CBOR.encodeWord 2 + CBOR.encodeListLen 1 + <> CBOR.encodeWord 2 encode (ServerAgency (TokBusy _)) (MsgTraceObjectsReply reply) = - CBOR.encodeListLen 2 - <> CBOR.encodeWord 4 - <> encodeReplyList replyList + CBOR.encodeListLen 2 + <> CBOR.encodeWord 3 + <> encodeReplyList replyList where replyList = case reply of @@ -69,7 +69,7 @@ codecTraceForward encodeRequest decodeRequest -- Decode messages decode :: forall (pr :: PeerRole) - (st :: TraceForward lo) s. + (st :: TraceObjectForward lo) s. PeerHasAgency pr st -> CBOR.Decoder s (SomeMessage st) decode stok = do @@ -88,7 +88,7 @@ codecTraceForward encodeRequest decodeRequest (2, 1, ClientAgency TokIdle) -> return $ SomeMessage MsgDone - (4, 2, ServerAgency (TokBusy blocking)) -> do + (3, 2, ServerAgency (TokBusy blocking)) -> do replyList <- decodeReplyList case (blocking, replyList) of (TokBlocking, x:xs) -> @@ -98,12 +98,12 @@ codecTraceForward encodeRequest decodeRequest return $ SomeMessage (MsgTraceObjectsReply (NonBlockingReply los)) (TokBlocking, []) -> - fail "codecTraceForward: MsgTraceObjectsReply: empty list not permitted" + fail "codecTraceObjectForward: MsgTraceObjectsReply: empty list not permitted" -- Failures per protocol state (_, _, ClientAgency TokIdle) -> - fail (printf "codecTraceForward (%s) unexpected key (%d, %d)" (show stok) key len) + fail (printf "codecTraceObjectForward (%s) unexpected key (%d, %d)" (show stok) key len) (_, _, ServerAgency (TokBusy TokBlocking)) -> - fail (printf "codecTraceForward (%s) unexpected key (%d, %d)" (show stok) key len) + fail (printf "codecTraceObjectForward (%s) unexpected key (%d, %d)" (show stok) key len) (_, _, ServerAgency (TokBusy TokNonBlocking)) -> - fail (printf "codecTraceForward (%s) unexpected key (%d, %d)" (show stok) key len) + fail (printf "codecTraceObjectForward (%s) unexpected key (%d, %d)" (show stok) key len) diff --git a/trace-forward/src/Trace/Forward/Protocol/Forwarder.hs b/trace-forward/src/Trace/Forward/Protocol/TraceObject/Forwarder.hs similarity index 72% rename from trace-forward/src/Trace/Forward/Protocol/Forwarder.hs rename to trace-forward/src/Trace/Forward/Protocol/TraceObject/Forwarder.hs index b07e594054c..0aa2d8937be 100644 --- a/trace-forward/src/Trace/Forward/Protocol/Forwarder.hs +++ b/trace-forward/src/Trace/Forward/Protocol/TraceObject/Forwarder.hs @@ -4,23 +4,23 @@ {-# LANGUAGE RankNTypes #-} {-# LANGUAGE NamedFieldPuns #-} -module Trace.Forward.Protocol.Forwarder - ( TraceForwarder (..) - , traceForwarderPeer +module Trace.Forward.Protocol.TraceObject.Forwarder + ( TraceObjectForwarder (..) + , traceObjectForwarderPeer ) where import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..), PeerRole (..)) -import Trace.Forward.Protocol.Type +import Trace.Forward.Protocol.TraceObject.Type -data TraceForwarder lo m a = TraceForwarder +data TraceObjectForwarder lo m a = TraceObjectForwarder { -- | The acceptor sent us a request for new 'TraceObject's. recvMsgTraceObjectsRequest :: forall blocking. TokBlockingStyle blocking -> NumberOfTraceObjects - -> m (BlockingReplyList blocking lo, TraceForwarder lo m a) + -> m (BlockingReplyList blocking lo, TraceObjectForwarder lo m a) -- | The acceptor terminated. Here we have a pure return value, but we -- could have done another action in 'm' if we wanted to. @@ -29,11 +29,11 @@ data TraceForwarder lo m a = TraceForwarder -- | Interpret a particular action sequence into the server side of the protocol. -- -traceForwarderPeer +traceObjectForwarderPeer :: Monad m - => TraceForwarder lo m a - -> Peer (TraceForward lo) 'AsServer 'StIdle m a -traceForwarderPeer TraceForwarder{recvMsgTraceObjectsRequest, recvMsgDone} = + => TraceObjectForwarder lo m a + -> Peer (TraceObjectForward lo) 'AsServer 'StIdle m a +traceObjectForwarderPeer TraceObjectForwarder{recvMsgTraceObjectsRequest, recvMsgDone} = -- In the 'StIdle' state the forwarder is awaiting a request message -- from the acceptor. Await (ClientAgency TokIdle) $ \case @@ -44,7 +44,7 @@ traceForwarderPeer TraceForwarder{recvMsgTraceObjectsRequest, recvMsgDone} = (reply, next) <- recvMsgTraceObjectsRequest blocking request return $ Yield (ServerAgency (TokBusy blocking)) (MsgTraceObjectsReply reply) - (traceForwarderPeer next) + (traceObjectForwarderPeer next) -- The acceptor sent the done transition, so we're in the 'StDone' state -- so all we can do is stop using 'done', with a return value. diff --git a/trace-forward/src/Trace/Forward/Protocol/Type.hs b/trace-forward/src/Trace/Forward/Protocol/TraceObject/Type.hs similarity index 88% rename from trace-forward/src/Trace/Forward/Protocol/Type.hs rename to trace-forward/src/Trace/Forward/Protocol/TraceObject/Type.hs index f3a1ee99de2..e783e00909d 100644 --- a/trace-forward/src/Trace/Forward/Protocol/Type.hs +++ b/trace-forward/src/Trace/Forward/Protocol/TraceObject/Type.hs @@ -10,11 +10,9 @@ -- | The type of the trace forwarding/accepting protocol. -- --- Since we are using a typed protocol framework this is in some sense /the/ --- definition of the protocol: what is allowed and what is not allowed. -module Trace.Forward.Protocol.Type - ( TraceForward (..) +module Trace.Forward.Protocol.TraceObject.Type + ( TraceObjectForward (..) , TokBlockingStyle (..) , Message (..) , ClientHasAgency (..) @@ -29,6 +27,7 @@ import Data.List.NonEmpty (NonEmpty) import Data.Proxy (Proxy(..)) import Data.Word (Word16) import GHC.Generics (Generic) + import Network.TypedProtocol.Core (Protocol (..)) import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) @@ -55,7 +54,7 @@ newtype NumberOfTraceObjects = NumberOfTraceObjects instance ShowProxy NumberOfTraceObjects instance Serialise NumberOfTraceObjects -data TraceForward lo where +data TraceObjectForward lo where -- | Both acceptor and forwarder are in idle state. The acceptor can send a -- request for a list of 'TraceObject's ('MsgTraceObjectsRequest'); @@ -65,22 +64,22 @@ data TraceForward lo where -- Node's info is an important information about the node, such as -- its protocol, version, start time, etc. It is assuming that the node -- must provide this information. - StIdle :: TraceForward lo + StIdle :: TraceObjectForward lo -- | The acceptor has sent a next request for 'TraceObject's. The acceptor is -- now waiting for a reply, and the forwarder is busy getting ready to send a -- reply with new list of 'TraceObject's. -- -- There are two sub-states for this, for blocking and non-blocking cases. - StBusy :: StBlockingStyle -> TraceForward lo + StBusy :: StBlockingStyle -> TraceObjectForward lo -- | Both the acceptor and forwarder are in the terminal state. They're done. - StDone :: TraceForward lo + StDone :: TraceObjectForward lo instance (ShowProxy lo) - => ShowProxy (TraceForward lo) where + => ShowProxy (TraceObjectForward lo) where showProxy _ = concat - [ "TraceForward (" + [ "TraceObjectForward (" , showProxy (Proxy :: Proxy lo) , ")" ] @@ -113,11 +112,11 @@ data BlockingReplyList (blocking :: StBlockingStyle) lo where deriving instance Eq lo => Eq (BlockingReplyList blocking lo) deriving instance Show lo => Show (BlockingReplyList blocking lo) -instance Protocol (TraceForward lo) where +instance Protocol (TraceObjectForward lo) where -- | The messages in the trace forwarding/accepting protocol. -- - data Message (TraceForward lo) from to where + data Message (TraceObjectForward lo) from to where -- | Request the list of 'TraceObject's from the forwarder. -- State: Idle -> Busy. -- @@ -131,17 +130,17 @@ instance Protocol (TraceForward lo) where MsgTraceObjectsRequest :: TokBlockingStyle blocking -> NumberOfTraceObjects - -> Message (TraceForward lo) 'StIdle ('StBusy blocking) + -> Message (TraceObjectForward lo) 'StIdle ('StBusy blocking) -- | Reply with a list of 'TraceObject's for the acceptor. -- State: Busy -> Idle. MsgTraceObjectsReply :: BlockingReplyList blocking lo - -> Message (TraceForward lo) ('StBusy blocking) 'StIdle + -> Message (TraceObjectForward lo) ('StBusy blocking) 'StIdle -- | Terminating message. State: Idle -> Done. MsgDone - :: Message (TraceForward lo) 'StIdle 'StDone + :: Message (TraceObjectForward lo) 'StIdle 'StDone -- | This is an explanation of our states, in terms of which party has agency -- in each state. @@ -170,13 +169,13 @@ instance Protocol (TraceForward lo) where exclusionLemma_NobodyAndServerHaveAgency TokDone tok = case tok of {} instance Show lo - => Show (Message (TraceForward lo) from to) where + => Show (Message (TraceObjectForward lo) from to) where show MsgTraceObjectsRequest{} = "MsgTraceObjectsRequest" show MsgTraceObjectsReply{} = "MsgTraceObjectsReply" show MsgDone{} = "MsgDone" -instance Show (ClientHasAgency (st :: TraceForward lo)) where +instance Show (ClientHasAgency (st :: TraceObjectForward lo)) where show TokIdle = "TokIdle" -instance Show (ServerHasAgency (st :: TraceForward lo)) where +instance Show (ServerHasAgency (st :: TraceObjectForward lo)) where show TokBusy{} = "TokBusy" diff --git a/trace-forward/src/Trace/Forward/Queue.hs b/trace-forward/src/Trace/Forward/Queue.hs deleted file mode 100644 index b3a7896a227..00000000000 --- a/trace-forward/src/Trace/Forward/Queue.hs +++ /dev/null @@ -1,59 +0,0 @@ -{-# LANGUAGE GADTs #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE NamedFieldPuns #-} - -module Trace.Forward.Queue - ( readItems - , getTraceObjects - ) where - -import Control.Concurrent.STM (STM, atomically, retry) -import Control.Concurrent.STM.TBQueue -import Control.Concurrent.STM.TVar -import Control.Monad (unless) -import qualified Data.List.NonEmpty as NE -import Data.Word (Word16) - -import qualified Trace.Forward.Protocol.Forwarder as Forwarder -import Trace.Forward.Protocol.Type -import Trace.Forward.Utils - -readItems - :: ForwardSink lo -- ^ The sink contains the queue we read 'TraceObject's from. - -> Forwarder.TraceForwarder lo IO () -readItems sink@ForwardSink{forwardQueue, wasUsed} = - Forwarder.TraceForwarder - { Forwarder.recvMsgTraceObjectsRequest = \blocking (NumberOfTraceObjects n) -> do - replyList <- - case blocking of - TokBlocking -> do - objs <- atomically $ getNTraceObjects n forwardQueue >>= \case - [] -> retry -- No 'TraceObject's yet, just wait... - (x:xs) -> return $ x NE.:| xs - atomically . modifyTVar' wasUsed . const $ True - return $ BlockingReply objs - TokNonBlocking -> do - objs <- atomically $ getNTraceObjects n forwardQueue - unless (null objs) $ - atomically . modifyTVar' wasUsed . const $ True - return $ NonBlockingReply objs - return (replyList, readItems sink) - , Forwarder.recvMsgDone = return () - } - --- | Returns at most N 'TraceObject's from the queue. -getNTraceObjects - :: Word16 - -> TVar (TBQueue lo) - -> STM [lo] -getNTraceObjects 0 _ = return [] -getNTraceObjects n q = - readTVar q >>= tryReadTBQueue >>= \case - Just lo' -> (lo' :) <$> getNTraceObjects (n - 1) q - Nothing -> return [] - -getTraceObjects - :: BlockingReplyList blocking lo -- ^ The reply with list of 'TraceObject's. - -> [lo] -getTraceObjects (BlockingReply neList) = NE.toList neList -getTraceObjects (NonBlockingReply list) = list diff --git a/trace-forward/src/Trace/Forward/Run/DataPoint/Acceptor.hs b/trace-forward/src/Trace/Forward/Run/DataPoint/Acceptor.hs new file mode 100644 index 00000000000..c33cab8729b --- /dev/null +++ b/trace-forward/src/Trace/Forward/Run/DataPoint/Acceptor.hs @@ -0,0 +1,75 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NamedFieldPuns #-} + +module Trace.Forward.Run.DataPoint.Acceptor + ( acceptDataPointsInit + , acceptDataPointsResp + ) where + +import qualified Codec.Serialise as CBOR +import Control.Monad.Extra (ifM) +import Control.Monad.STM (atomically, check) +import Control.Concurrent.STM.TVar (modifyTVar', readTVar, readTVarIO) +import Control.Concurrent.STM.TMVar (putTMVar) +import qualified Data.ByteString.Lazy as LBS +import Data.Void (Void) +import Ouroboros.Network.Mux (MuxMode (..), MuxPeer (..), RunMiniProtocol (..)) +import Ouroboros.Network.Driver.Simple (runPeer) + +import qualified Trace.Forward.Protocol.DataPoint.Acceptor as Acceptor +import qualified Trace.Forward.Protocol.DataPoint.Codec as Acceptor +import Trace.Forward.Protocol.DataPoint.Type (DataPointName) +import Trace.Forward.Configuration.DataPoint (AcceptorConfiguration (..)) +import Trace.Forward.Utils.DataPoint (DataPointAsker (..)) + +acceptDataPointsInit + :: AcceptorConfiguration + -> DataPointAsker + -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void +acceptDataPointsInit config dpAsker = + InitiatorProtocolOnly $ runPeerWithAsker config dpAsker + +acceptDataPointsResp + :: AcceptorConfiguration + -> DataPointAsker + -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () +acceptDataPointsResp config dpAsker = + ResponderProtocolOnly $ runPeerWithAsker config dpAsker + +runPeerWithAsker + :: AcceptorConfiguration + -> DataPointAsker + -> MuxPeer LBS.ByteString IO () +runPeerWithAsker config dpAsker = + MuxPeerRaw $ \channel -> + runPeer + (acceptorTracer config) + (Acceptor.codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Acceptor.dataPointAcceptorPeer $ acceptorActions config dpAsker []) + +acceptorActions + :: AcceptorConfiguration + -> DataPointAsker + -> [DataPointName] + -> Acceptor.DataPointAcceptor IO () +acceptorActions config@AcceptorConfiguration{shouldWeStop} + dpAsker@DataPointAsker{askDataPoints, dataPointsNames, dataPointsReply} + dpNames = + Acceptor.SendMsgDataPointsRequest dpNames $ \replyWithDataPoints -> do + -- Ok, reply with 'DataPoint's is already here, update the asker. + atomically $ do + putTMVar dataPointsReply replyWithDataPoints + -- To prevent new automatic request. + modifyTVar' askDataPoints $ const False + ifM (readTVarIO shouldWeStop) + (return $ Acceptor.SendMsgDone $ return ()) + $ do + -- Block here until external context explicitly ask for 'DataPoint's again. + atomically $ readTVar askDataPoints >>= check + -- Ok, external context asked for 'DataPoint's, take their names. + dpNames' <- readTVarIO dataPointsNames + -- Ask. + return $ acceptorActions config dpAsker dpNames' diff --git a/trace-forward/src/Trace/Forward/Run/DataPoint/Forwarder.hs b/trace-forward/src/Trace/Forward/Run/DataPoint/Forwarder.hs new file mode 100644 index 00000000000..89e6721825e --- /dev/null +++ b/trace-forward/src/Trace/Forward/Run/DataPoint/Forwarder.hs @@ -0,0 +1,44 @@ +{-# LANGUAGE DataKinds #-} + +module Trace.Forward.Run.DataPoint.Forwarder + ( forwardDataPointsInit + , forwardDataPointsResp + ) where + +import qualified Codec.Serialise as CBOR +import qualified Data.ByteString.Lazy as LBS +import Data.Void (Void) +import Ouroboros.Network.Driver.Simple (runPeer) +import Ouroboros.Network.Mux (MuxMode (..), MuxPeer (..), RunMiniProtocol (..)) + +import Trace.Forward.Configuration.DataPoint (ForwarderConfiguration (..)) +import Trace.Forward.Utils.DataPoint +import qualified Trace.Forward.Protocol.DataPoint.Forwarder as Forwarder +import qualified Trace.Forward.Protocol.DataPoint.Codec as Forwarder + +forwardDataPointsInit + :: ForwarderConfiguration + -> DataPointStore + -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void +forwardDataPointsInit config dpStore = + InitiatorProtocolOnly $ runPeerWithDPStore config dpStore + +forwardDataPointsResp + :: ForwarderConfiguration + -> DataPointStore + -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () +forwardDataPointsResp config dpStore = + ResponderProtocolOnly $ runPeerWithDPStore config dpStore + +runPeerWithDPStore + :: ForwarderConfiguration + -> DataPointStore + -> MuxPeer LBS.ByteString IO () +runPeerWithDPStore config dpStore = + MuxPeerRaw $ \channel -> + runPeer + (forwarderTracer config) + (Forwarder.codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Forwarder.dataPointForwarderPeer $ readFromStore dpStore) diff --git a/trace-forward/src/Trace/Forward/Run/TraceObject/Acceptor.hs b/trace-forward/src/Trace/Forward/Run/TraceObject/Acceptor.hs new file mode 100644 index 00000000000..89249fb40f7 --- /dev/null +++ b/trace-forward/src/Trace/Forward/Run/TraceObject/Acceptor.hs @@ -0,0 +1,102 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} + +module Trace.Forward.Run.TraceObject.Acceptor + ( acceptTraceObjectsInit + , acceptTraceObjectsResp + ) where + +import qualified Codec.Serialise as CBOR +import Control.Concurrent.Async (race) +import Control.Monad.Extra (ifM) +import Control.Monad.STM (atomically, check) +import Control.Concurrent.STM.TVar (TVar, readTVar, readTVarIO, registerDelay) +import Control.Exception (Exception, throwIO) +import qualified Data.ByteString.Lazy as LBS +import Data.Typeable (Typeable) +import Data.Void (Void) + +import Ouroboros.Network.Mux (MuxMode (..), MuxPeer (..), RunMiniProtocol (..)) +import Ouroboros.Network.Driver.Simple (runPeer) +import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) + +import qualified Trace.Forward.Protocol.TraceObject.Acceptor as Acceptor +import qualified Trace.Forward.Protocol.TraceObject.Codec as Acceptor +import Trace.Forward.Protocol.TraceObject.Type +import Trace.Forward.Utils.TraceObject (getTraceObjectsFromReply) +import Trace.Forward.Configuration.TraceObject (AcceptorConfiguration (..)) + +acceptTraceObjectsInit + :: (CBOR.Serialise lo, + ShowProxy lo, + Typeable lo) + => AcceptorConfiguration lo -- ^ Acceptor's configuration. + -> ([lo] -> IO ()) -- ^ The handler for accepted 'TraceObject's. + -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void +acceptTraceObjectsInit config loHandler = + InitiatorProtocolOnly $ runPeerWithHandler config loHandler + +acceptTraceObjectsResp + :: (CBOR.Serialise lo, + ShowProxy lo, + Typeable lo) + => AcceptorConfiguration lo -- ^ Acceptor's configuration. + -> ([lo] -> IO ()) -- ^ The handler for accepted 'TraceObject's. + -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () +acceptTraceObjectsResp config loHandler = + ResponderProtocolOnly $ runPeerWithHandler config loHandler + +runPeerWithHandler + :: (CBOR.Serialise lo, + ShowProxy lo, + Typeable lo) + => AcceptorConfiguration lo + -> ([lo] -> IO ()) + -> MuxPeer LBS.ByteString IO () +runPeerWithHandler config@AcceptorConfiguration{acceptorTracer, shouldWeStop} loHandler = + MuxPeerRaw $ \channel -> + timeoutWhenStopped + shouldWeStop + 15_000 -- 15sec + $ runPeer + acceptorTracer + (Acceptor.codecTraceObjectForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Acceptor.traceObjectAcceptorPeer $ acceptorActions config loHandler) + +acceptorActions + :: (CBOR.Serialise lo, + ShowProxy lo, + Typeable lo) + => AcceptorConfiguration lo -- ^ Acceptor's configuration. + -> ([lo] -> IO ()) -- ^ The handler for accepted 'TraceObject's. + -> Acceptor.TraceObjectAcceptor lo IO () +acceptorActions config@AcceptorConfiguration{whatToRequest, shouldWeStop} loHandler = + Acceptor.SendMsgTraceObjectsRequest TokBlocking whatToRequest $ \replyWithTraceObjects -> do + loHandler $ getTraceObjectsFromReply replyWithTraceObjects + ifM (readTVarIO shouldWeStop) + (return $ Acceptor.SendMsgDone $ return ()) + (return $ acceptorActions config loHandler) + +data Timeout = Timeout + deriving (Typeable, Show) + +instance Exception Timeout where + +-- | Timeout shutdown of an action. It can run only for specified miliseconds +-- once the 'TVar' is set to 'True'. +timeoutWhenStopped :: TVar Bool + -> Int -- ^ Timeout, in miliseconds. + -> IO a + -> IO a +timeoutWhenStopped stopVar delay action = + either id id <$> + race action + ( do atomically (readTVar stopVar >>= check) + v <- registerDelay delay + atomically (readTVar v >>= check) + throwIO Timeout + ) diff --git a/trace-forward/src/Trace/Forward/Run/TraceObject/Forwarder.hs b/trace-forward/src/Trace/Forward/Run/TraceObject/Forwarder.hs new file mode 100644 index 00000000000..79e9a72851d --- /dev/null +++ b/trace-forward/src/Trace/Forward/Run/TraceObject/Forwarder.hs @@ -0,0 +1,50 @@ +{-# LANGUAGE DataKinds #-} + +module Trace.Forward.Run.TraceObject.Forwarder + ( forwardTraceObjectsInit + , forwardTraceObjectsResp + ) where + +import qualified Codec.Serialise as CBOR +import qualified Data.ByteString.Lazy as LBS +import Data.Void (Void) +import Ouroboros.Network.Driver.Simple (runPeer) +import Ouroboros.Network.Mux (MuxMode (..), MuxPeer (..), RunMiniProtocol (..)) +import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) + +import qualified Trace.Forward.Protocol.TraceObject.Forwarder as Forwarder +import qualified Trace.Forward.Protocol.TraceObject.Codec as Forwarder +import Trace.Forward.Utils.TraceObject +import Trace.Forward.Configuration.TraceObject (ForwarderConfiguration (..)) + +forwardTraceObjectsInit + :: (CBOR.Serialise lo, + ShowProxy lo) + => ForwarderConfiguration lo + -> ForwardSink lo + -> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void +forwardTraceObjectsInit config sink = + InitiatorProtocolOnly $ runPeerWithSink config sink + +forwardTraceObjectsResp + :: (CBOR.Serialise lo, + ShowProxy lo) + => ForwarderConfiguration lo + -> ForwardSink lo + -> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void () +forwardTraceObjectsResp config sink = + ResponderProtocolOnly $ runPeerWithSink config sink + +runPeerWithSink + :: (ShowProxy lo, CBOR.Serialise lo) + => ForwarderConfiguration lo + -> ForwardSink lo + -> MuxPeer LBS.ByteString IO () +runPeerWithSink config sink = + MuxPeerRaw $ \channel -> + runPeer + (forwarderTracer config) + (Forwarder.codecTraceObjectForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + channel + (Forwarder.traceObjectForwarderPeer $ readFromSink sink) diff --git a/trace-forward/src/Trace/Forward/Utils/DataPoint.hs b/trace-forward/src/Trace/Forward/Utils/DataPoint.hs new file mode 100644 index 00000000000..2ae0920672f --- /dev/null +++ b/trace-forward/src/Trace/Forward/Utils/DataPoint.hs @@ -0,0 +1,117 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE NamedFieldPuns #-} + +module Trace.Forward.Utils.DataPoint + ( DataPoint (..) + , DataPointStore + , DataPointAsker (..) + , initDataPointStore + , initDataPointAsker + , writeToStore + , readFromStore + , askForDataPoints + ) where + +import Control.Concurrent.STM (atomically, check, orElse) +import Control.Concurrent.STM.TVar +import Control.Concurrent.STM.TMVar +import Data.Aeson +import qualified Data.Map.Strict as M + +import Trace.Forward.Protocol.DataPoint.Forwarder +import Trace.Forward.Protocol.DataPoint.Type + +-- | Type wrapper for some value of type 'v'. The only reason we need this +-- wrapper is an ability to store different values in the same 'DataPointStore'. +-- +-- Please note that when the acceptor application will read the value of type 'v' +-- from the store, this value is just as unstructured JSON, but not Haskell +-- value of type 'v'. That's why 'FromJSON' instance for type 'v' should be +-- available for the acceptor application, to decode unstructured JSON. +-- +data DataPoint where + DataPoint :: ToJSON v => v -> DataPoint + +type DataPointStore = TVar (M.Map DataPointName DataPoint) + +initDataPointStore :: IO DataPointStore +initDataPointStore = newTVarIO M.empty + +-- | Write 'DataPoint' to the store. +writeToStore + :: DataPointStore + -> DataPointName + -> DataPoint + -> IO () +writeToStore dpStore dpName dp = atomically $ + modifyTVar' dpStore $ \store -> + if dpName `M.member` store + then M.adjust (const dp) dpName store + else M.insert dpName dp store + +-- | Read 'DataPoint's from the store. Please note that we don't care what's +-- inside of 'DataPoint', we just know it can be encoded to JSON. +readFromStore + :: DataPointStore + -> DataPointForwarder IO () +readFromStore dpStore = + DataPointForwarder + { recvMsgDataPointsRequest = \dpNames -> do + store <- readTVarIO dpStore + let replyList = map (lookupDataPoint store) dpNames + return (replyList, readFromStore dpStore) + , recvMsgDone = return () + } + where + lookupDataPoint store dpName = + ( dpName + , (\(DataPoint v) -> Just $ encode v) =<< M.lookup dpName store + ) + +-- | Since 'DataPointForward' protocol does not assume the stream of requests/replies, +-- we use the 'TVar's to provide to acceptor's side an ability to ask 'DataPoint's +-- explicitly. +data DataPointAsker = DataPointAsker + { -- | The "ask flag": we use it to notify that we want 'DataPoint's. + askDataPoints :: !(TVar Bool) + -- | The names of 'DataPoint's we need. + , dataPointsNames :: !(TVar [DataPointName]) + -- | The list of received 'DataPoint's' values. + -- By default it's empty, but when 'DataPoint's + -- are received they will be stored here. + , dataPointsReply :: !(TMVar DataPointValues) + } + +initDataPointAsker :: IO DataPointAsker +initDataPointAsker = DataPointAsker + <$> newTVarIO False + <*> newTVarIO [] + <*> newEmptyTMVarIO + +askForDataPoints + :: DataPointAsker + -> [DataPointName] + -> IO DataPointValues +askForDataPoints _ [] = return [] +askForDataPoints DataPointAsker{askDataPoints, dataPointsNames, dataPointsReply} dpNames = do + atomically $ do + modifyTVar' dataPointsNames $ const dpNames -- Fill the names of 'DataPoint's we need. + modifyTVar' askDataPoints $ const True -- Ask them! The flag for acceptor's part + -- of the protocol, it's initiate the request. + -- Since the acceptor's part of the protocol already sent the request, + -- we are waiting for reply: currently 'dataPointsReply' is empty, + -- so we are stuck on 'retry'. + -- + -- Unfortunately, it's possible that 'dataPointsReply' won't be filled by 'DataPointValues', + -- because of some error (for example, network problems). To prevent an ifinite 'retry', + -- we start a max timer in parallel. + maxTimer <- registerDelay tenSeconds + atomically $ + takeTMVar dataPointsReply -- If everything is OK, we'll have an answer earlier than 10 seconds. + `orElse` + (readTVar maxTimer >>= check >> return []) -- No later than after 10 seconds we return []. + +-- | If 'retry' takes more than 10 seconds - it's definitely a problem. +tenSeconds :: Int +tenSeconds = 10 * 1000000 diff --git a/trace-forward/src/Trace/Forward/Utils.hs b/trace-forward/src/Trace/Forward/Utils/TraceObject.hs similarity index 56% rename from trace-forward/src/Trace/Forward/Utils.hs rename to trace-forward/src/Trace/Forward/Utils/TraceObject.hs index 0c9c57cb27a..bd02703e366 100644 --- a/trace-forward/src/Trace/Forward/Utils.hs +++ b/trace-forward/src/Trace/Forward/Utils/TraceObject.hs @@ -1,50 +1,28 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} -module Trace.Forward.Utils +module Trace.Forward.Utils.TraceObject ( ForwardSink (..) , initForwardSink , writeToSink - , runActionInLoop + , readFromSink + , getTraceObjectsFromReply ) where -import Control.Concurrent.STM (atomically) +import Control.Concurrent.STM (STM, atomically, retry) import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TVar -import Control.Exception (SomeAsyncException (..), fromException, tryJust) +import Control.Monad (unless) import Control.Monad.Extra (whenM) -import Control.Tracer (showTracing, stdoutTracer, traceWith) +import qualified Data.List.NonEmpty as NE +import Data.Word (Word16) import System.IO -import System.Time.Extra (sleep) -import Trace.Forward.Configuration - --- | Run monadic action in a loop. If there's an exception, it will re-run --- the action again, after pause that grows. -runActionInLoop - :: IO () - -> HowToConnect - -> Word - -> IO () -runActionInLoop action endpoint prevDelay = - tryJust excludeAsyncExceptions action >>= \case - Left e -> do - logTrace $ "trace-forward, connection with " <> show endpoint <> " failed: " <> show e - sleep $ fromIntegral currentDelay - runActionInLoop action endpoint currentDelay - Right _ -> return () - where - excludeAsyncExceptions e = - case fromException e of - Just SomeAsyncException {} -> Nothing - _ -> Just e - - logTrace = traceWith $ showTracing stdoutTracer - - currentDelay = - if prevDelay < 60 - then prevDelay * 2 - else 60 -- After we reached 60+ secs delay, repeat an attempt every minute. +import Trace.Forward.Configuration.TraceObject +import Trace.Forward.Protocol.TraceObject.Type +import qualified Trace.Forward.Protocol.TraceObject.Forwarder as Forwarder data ForwardSink lo = ForwardSink { forwardQueue :: !(TVar (TBQueue lo)) @@ -107,3 +85,43 @@ writeToSink ForwardSink{forwardQueue, disconnectedSize, connectedSize, wasUsed} switchQueue size = newTBQueue (fromIntegral size) >>= modifyTVar' forwardQueue . const + +readFromSink + :: ForwardSink lo -- ^ The sink contains the queue we read 'TraceObject's from. + -> Forwarder.TraceObjectForwarder lo IO () +readFromSink sink@ForwardSink{forwardQueue, wasUsed} = + Forwarder.TraceObjectForwarder + { Forwarder.recvMsgTraceObjectsRequest = \blocking (NumberOfTraceObjects n) -> do + replyList <- + case blocking of + TokBlocking -> do + objs <- atomically $ getNTraceObjects n forwardQueue >>= \case + [] -> retry -- No 'TraceObject's yet, just wait... + (x:xs) -> return $ x NE.:| xs + atomically . modifyTVar' wasUsed . const $ True + return $ BlockingReply objs + TokNonBlocking -> do + objs <- atomically $ getNTraceObjects n forwardQueue + unless (null objs) $ + atomically . modifyTVar' wasUsed . const $ True + return $ NonBlockingReply objs + return (replyList, readFromSink sink) + , Forwarder.recvMsgDone = return () + } + +-- | Returns at most N 'TraceObject's from the queue. +getNTraceObjects + :: Word16 + -> TVar (TBQueue lo) + -> STM [lo] +getNTraceObjects 0 _ = return [] +getNTraceObjects n q = + readTVar q >>= tryReadTBQueue >>= \case + Just lo' -> (lo' :) <$> getNTraceObjects (n - 1) q + Nothing -> return [] + +getTraceObjectsFromReply + :: BlockingReplyList blocking lo -- ^ The reply with list of 'TraceObject's. + -> [lo] +getTraceObjectsFromReply (BlockingReply neList) = NE.toList neList +getTraceObjectsFromReply (NonBlockingReply list) = list diff --git a/trace-forward/test/Main.hs b/trace-forward/test/Main.hs index 72e2e6ef9e5..6d902611096 100644 --- a/trace-forward/test/Main.hs +++ b/trace-forward/test/Main.hs @@ -1,15 +1,15 @@ -module Main (main) where +module Main + ( main + ) where import Test.Tasty -import qualified Test.Trace.Forward.Protocol.Tests as Protocol -import qualified Test.Trace.Forward.Demo.Tests as Demo +import qualified Test.Trace.Forward.Protocol.DataPoint.Tests as DataPoint +import qualified Test.Trace.Forward.Protocol.TraceObject.Tests as TraceObject main :: IO () -main = defaultMain tests - -tests :: TestTree -tests = testGroup "trace-forward" - [ Protocol.tests - , Demo.tests - ] +main = defaultMain $ + testGroup "trace-forward" + [ DataPoint.tests + , TraceObject.tests + ] diff --git a/trace-forward/test/Test/Trace/Forward/Demo/Configs.hs b/trace-forward/test/Test/Trace/Forward/Demo/Configs.hs deleted file mode 100644 index 7cc2cee4efe..00000000000 --- a/trace-forward/test/Test/Trace/Forward/Demo/Configs.hs +++ /dev/null @@ -1,37 +0,0 @@ -module Test.Trace.Forward.Demo.Configs - ( mkAcceptorConfig - , mkForwarderConfig - ) where - -import Control.Tracer (nullTracer) -import GHC.Conc (TVar) - -import Trace.Forward.Configuration -import Trace.Forward.Protocol.Type - -import Test.Trace.Forward.Protocol.TraceItem - -mkAcceptorConfig - :: HowToConnect - -> TVar Bool - -> AcceptorConfiguration TraceItem -mkAcceptorConfig ep weAreDone = - AcceptorConfiguration - { acceptorTracer = nullTracer - , forwarderEndpoint = ep - , whatToRequest = NumberOfTraceObjects 10 - , shouldWeStop = weAreDone - } - -mkForwarderConfig - :: HowToConnect - -> Word - -> Word - -> ForwarderConfiguration TraceItem -mkForwarderConfig ep disconnectedSize connectedSize = - ForwarderConfiguration - { forwarderTracer = nullTracer - , acceptorEndpoint = ep - , disconnectedQueueSize = disconnectedSize - , connectedQueueSize = connectedSize - } diff --git a/trace-forward/test/Test/Trace/Forward/Demo/Tests.hs b/trace-forward/test/Test/Trace/Forward/Demo/Tests.hs deleted file mode 100644 index 99dfdc2515e..00000000000 --- a/trace-forward/test/Test/Trace/Forward/Demo/Tests.hs +++ /dev/null @@ -1,87 +0,0 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE ScopedTypeVariables #-} - -module Test.Trace.Forward.Demo.Tests - ( tests - ) where - -import Control.Concurrent.Async (withAsync) -import Data.Functor ((<&>)) -import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef) -import GHC.Conc -import System.Directory (getTemporaryDirectory) -#if defined(mingw32_HOST_OS) -import System.FilePath ((), dropDrive) -import qualified Data.Text as T -#else -import System.FilePath (()) -#endif -import Test.Tasty -import Test.Tasty.QuickCheck -import System.Time.Extra (sleep) - -import Ouroboros.Network.IOManager (withIOManager) - -import Trace.Forward.Acceptor -import Trace.Forward.Configuration -import Trace.Forward.Forwarder -import Trace.Forward.Utils - -import Test.Trace.Forward.Demo.Configs -import Test.Trace.Forward.Protocol.Codec () -import Test.Trace.Forward.Protocol.TraceItem - -tests :: TestTree -tests = localOption (QuickCheckTests 1) $ testGroup "Trace.Forward.Demo" - [ testProperty "LocalPipe" $ prop_RemoteSocket 200 - ] - -prop_RemoteSocket :: Int -> Property -prop_RemoteSocket n = ioProperty . withIOManager $ \iomgr -> do - ep <- LocalPipe <$> mkLocalPipePath - - acceptedItems :: IORef [TraceItem] <- newIORef [] - weAreDone <- newTVarIO False - let forwarderConfig = mkForwarderConfig ep (fromIntegral n) (fromIntegral n) - sink <- initForwardSink forwarderConfig - - itemsToForward <- generateNTraceItems n - - withAsync (runTraceAcceptor - iomgr - (mkAcceptorConfig ep weAreDone) - (traceItemsHandler acceptedItems)) $ \_ -> do - sleep 0.5 - withAsync (runTraceForwarder iomgr forwarderConfig sink) $ \_ -> do - mapM_ (writeToSink sink) itemsToForward - -- Just wait till the acceptor will ask and receive all 'TraceItem's from the forwarder. - waitForFinish acceptedItems n weAreDone - - -- Take accepted items and compare results. - acceptedItems' <- readIORef acceptedItems - return $ itemsToForward === acceptedItems' - -traceItemsHandler :: IORef [TraceItem] -> [TraceItem] -> IO () -traceItemsHandler acceptedItems' items = do - atomicModifyIORef' acceptedItems' $ \storedItems -> (storedItems ++ items, ()) - -generateNTraceItems :: Int -> IO [TraceItem] -generateNTraceItems n = generate (infiniteListOf arbitrary) <&> take n - -waitForFinish :: IORef [TraceItem] -> Int -> TVar Bool -> IO () -waitForFinish acceptedItems' n weAreDone' = do - items' <- readIORef acceptedItems' - if length items' < n - then sleep 0.001 >> waitForFinish acceptedItems' n weAreDone' - else atomically $ writeTVar weAreDone' True - -mkLocalPipePath :: IO FilePath -mkLocalPipePath = do - tmpDir <- getTemporaryDirectory -#if defined(mingw32_HOST_OS) - return $ "\\\\.\\pipe\\" <> (T.unpack . T.replace "\\" "-" . T.pack) (dropDrive tmpDir) - <> "_" "trace-forward-test" -#else - return $ tmpDir "trace-forward-test.sock" -#endif diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/Common.hs b/trace-forward/test/Test/Trace/Forward/Protocol/Common.hs new file mode 100644 index 00000000000..4470cb21db6 --- /dev/null +++ b/trace-forward/test/Test/Trace/Forward/Protocol/Common.hs @@ -0,0 +1,24 @@ +{-# LANGUAGE FlexibleInstances #-} + +module Test.Trace.Forward.Protocol.Common + ( splits2 + , splits3 + ) where + +import qualified Data.ByteString.Lazy as LBS + +-- | Generate all 2-splits of a string. +splits2 :: LBS.ByteString -> [[LBS.ByteString]] +splits2 bs = zipWith (\a b -> [a,b]) (LBS.inits bs) (LBS.tails bs) + +-- | Generate all 3-splits of a string. +splits3 :: LBS.ByteString -> [[LBS.ByteString]] +splits3 bs = + [ [a,b,c] + | (a,bs') <- zip (LBS.inits bs) (LBS.tails bs) + , (b,c ) <- zip (LBS.inits bs') (LBS.tails bs') + ] + +-- | For 'f' function in Tests. +instance Show (Int -> Int) where + show _ = "" diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Codec.hs b/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Codec.hs new file mode 100644 index 00000000000..59b5e0f9b1b --- /dev/null +++ b/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Codec.hs @@ -0,0 +1,36 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE OverloadedStrings #-} + +module Test.Trace.Forward.Protocol.DataPoint.Codec () where + +import qualified Data.Aeson as A +import Test.QuickCheck + +import Network.TypedProtocol.Core +import Network.TypedProtocol.Codec + +import Trace.Forward.Protocol.DataPoint.Type + +import Test.Trace.Forward.Protocol.DataPoint.Item + +instance Arbitrary (AnyMessageAndAgency DataPointForward) where + arbitrary = oneof + [ pure $ AnyMessageAndAgency (ClientAgency TokIdle) (MsgDataPointsRequest ["NodeInfo"]) + , pure $ AnyMessageAndAgency (ServerAgency TokBusy) (MsgDataPointsReply [("NodeInfo", Nothing)]) + , pure $ AnyMessageAndAgency (ServerAgency TokBusy) (MsgDataPointsReply [("NodeInfo", Just ni)]) + , pure $ AnyMessageAndAgency (ClientAgency TokIdle) MsgDone + ] + where + ni = A.encode $ TestNodeInfo + { niName = "core-1" + , niVersion = "1.30.1" + , niCommit = "abcdefg" + , niProtocol = "Shelley" + } + +instance Eq (AnyMessage DataPointForward) where + AnyMessage (MsgDataPointsRequest r1) == AnyMessage (MsgDataPointsRequest r2) = r1 == r2 + AnyMessage (MsgDataPointsReply r1) == AnyMessage (MsgDataPointsReply r2) = r1 == r2 + AnyMessage MsgDone == AnyMessage MsgDone = True + _ == _ = False diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Direct.hs b/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Direct.hs new file mode 100644 index 00000000000..fd402972bbf --- /dev/null +++ b/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Direct.hs @@ -0,0 +1,23 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Test.Trace.Forward.Protocol.DataPoint.Direct + ( direct + ) where + +import Trace.Forward.Protocol.DataPoint.Acceptor +import Trace.Forward.Protocol.DataPoint.Forwarder +import Trace.Forward.Protocol.DataPoint.Type + +direct :: Monad m + => DataPointForwarder m a + -> DataPointAcceptor m b + -> m (a, b) +direct DataPointForwarder { recvMsgDone } + (SendMsgDone mdone) = + (,) <$> recvMsgDone <*> mdone +direct DataPointForwarder { recvMsgDataPointsRequest } + (SendMsgDataPointsRequest (dpNames :: [DataPointName]) mclient) = do + (reply, server) <- recvMsgDataPointsRequest dpNames + client <- mclient reply + direct server client diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Examples.hs b/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Examples.hs new file mode 100644 index 00000000000..a7c2b757426 --- /dev/null +++ b/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Examples.hs @@ -0,0 +1,45 @@ +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Test.Trace.Forward.Protocol.DataPoint.Examples + ( dataPointAcceptorApply + , dataPointForwarderCount + ) where + +import Trace.Forward.Protocol.DataPoint.Acceptor +import Trace.Forward.Protocol.DataPoint.Forwarder +import Trace.Forward.Protocol.DataPoint.Type + +dataPointAcceptorApply + :: forall m. Monad m + => (Int -> Int) + -> Int + -> Int -- ^ count of number of requests + -> DataPointAcceptor m Int +dataPointAcceptorApply f = go + where + go :: Int -> Int -> DataPointAcceptor m Int + go acc n + | n <= 0 = + SendMsgDone $ return acc + | otherwise = + SendMsgDataPointsRequest + [] + $ \(_reply :: DataPointValues) -> return $ go (f acc) (pred n) + +-- | A server which counts number received of 'MsgDataPointsRequest'. +-- +dataPointForwarderCount + :: forall m. Monad m + => DataPointForwarder m Int +dataPointForwarderCount = go 0 + where + go n = + DataPointForwarder + { recvMsgDone = return n + , recvMsgDataPointsRequest = + \(dpNames :: [DataPointName]) -> + return ( zip dpNames (repeat Nothing) + , go (succ n) + ) + } diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Item.hs b/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Item.hs new file mode 100644 index 00000000000..634559b7ec5 --- /dev/null +++ b/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Item.hs @@ -0,0 +1,17 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} + +module Test.Trace.Forward.Protocol.DataPoint.Item + ( TestNodeInfo (..) + ) where + +import qualified Data.Aeson as A +import Data.Text (Text) +import GHC.Generics + +data TestNodeInfo = TestNodeInfo + { niName :: !Text + , niVersion :: !Text + , niCommit :: !Text + , niProtocol :: !Text + } deriving (Eq, Generic, A.ToJSON, A.FromJSON, Show) diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Tests.hs b/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Tests.hs new file mode 100644 index 00000000000..838b2c7d4d9 --- /dev/null +++ b/trace-forward/test/Test/Trace/Forward/Protocol/DataPoint/Tests.hs @@ -0,0 +1,130 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE RankNTypes #-} + +module Test.Trace.Forward.Protocol.DataPoint.Tests + ( tests + ) where + +import qualified Codec.Serialise as CBOR +import Control.Monad.IOSim (runSimOrThrow) +import Control.Monad.Class.MonadST +import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadThrow +import Control.Monad.ST (runST) +import Control.Tracer (nullTracer) +import Test.Tasty +import Test.Tasty.QuickCheck + +import Network.TypedProtocol.Codec +import Network.TypedProtocol.Proofs +import Ouroboros.Network.Channel +import Ouroboros.Network.Driver.Simple (runConnectedPeers) + +import Trace.Forward.Protocol.DataPoint.Acceptor +import Trace.Forward.Protocol.DataPoint.Forwarder +import Trace.Forward.Protocol.DataPoint.Codec +import Trace.Forward.Protocol.DataPoint.Type + +import Test.Trace.Forward.Protocol.DataPoint.Codec () +import Test.Trace.Forward.Protocol.DataPoint.Direct +import Test.Trace.Forward.Protocol.DataPoint.Examples + +import Test.Trace.Forward.Protocol.Common + +tests :: TestTree +tests = testGroup "Trace.Forward.Protocol.DataPoint" + [ testProperty "codec" prop_codec_DataPointForward + , testProperty "codec 2-splits" prop_codec_splits2_DataPointForward + , testProperty "codec 3-splits" (withMaxSuccess 33 prop_codec_splits3_DataPointForward) + , testProperty "direct" prop_direct_DataPointForward + , testProperty "connect" prop_connect_DataPointForward + , testProperty "channel ST" prop_channel_ST_DataPointForward + , testProperty "channel IO" prop_channel_IO_DataPointForward + ] + +prop_codec_DataPointForward + :: AnyMessageAndAgency DataPointForward + -> Bool +prop_codec_DataPointForward msg = runST $ + prop_codecM + (codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + msg + +prop_codec_splits2_DataPointForward + :: AnyMessageAndAgency DataPointForward + -> Bool +prop_codec_splits2_DataPointForward msg = runST $ + prop_codec_splitsM + splits2 + (codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + msg + + +prop_codec_splits3_DataPointForward + :: AnyMessageAndAgency DataPointForward + -> Bool +prop_codec_splits3_DataPointForward msg = runST $ + prop_codec_splitsM + splits3 + (codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + msg + +prop_direct_DataPointForward + :: (Int -> Int) + -> NonNegative Int + -> Property +prop_direct_DataPointForward f (NonNegative n) = + runSimOrThrow + (direct + dataPointForwarderCount + (dataPointAcceptorApply f 0 n)) + === + (n, foldr (.) id (replicate n f) 0) + +prop_connect_DataPointForward + :: (Int -> Int) + -> NonNegative Int + -> Bool +prop_connect_DataPointForward f (NonNegative n) = + case runSimOrThrow + (connect + (dataPointForwarderPeer dataPointForwarderCount) + (dataPointAcceptorPeer $ dataPointAcceptorApply f 0 n)) of + (s, c, TerminalStates TokDone TokDone) -> (s, c) == (n, foldr (.) id (replicate n f) 0) + +prop_channel + :: ( MonadST m + , MonadAsync m + , MonadCatch m + ) + => (Int -> Int) + -> Int + -> m Property +prop_channel f n = do + (s, c) <- runConnectedPeers createConnectedChannels + nullTracer + (codecDataPointForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + forwarder acceptor + return ((s, c) === (n, foldr (.) id (replicate n f) 0)) + where + forwarder = dataPointForwarderPeer dataPointForwarderCount + acceptor = dataPointAcceptorPeer $ dataPointAcceptorApply f 0 n + +prop_channel_ST_DataPointForward + :: (Int -> Int) + -> NonNegative Int + -> Property +prop_channel_ST_DataPointForward f (NonNegative n) = + runSimOrThrow (prop_channel f n) + +prop_channel_IO_DataPointForward + :: (Int -> Int) + -> NonNegative Int + -> Property +prop_channel_IO_DataPointForward f (NonNegative n) = + ioProperty (prop_channel f n) diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/Tests.hs b/trace-forward/test/Test/Trace/Forward/Protocol/Tests.hs deleted file mode 100644 index 6108ba05113..00000000000 --- a/trace-forward/test/Test/Trace/Forward/Protocol/Tests.hs +++ /dev/null @@ -1,28 +0,0 @@ -module Test.Trace.Forward.Protocol.Tests - ( tests - ) where - -import qualified Codec.Serialise as CBOR -import Control.Monad.ST (runST) -import Test.Tasty -import Test.Tasty.QuickCheck - -import Network.TypedProtocol.Codec - -import Trace.Forward.Protocol.Codec -import Trace.Forward.Protocol.Type - -import Test.Trace.Forward.Protocol.Codec () -import Test.Trace.Forward.Protocol.TraceItem - -tests :: TestTree -tests = testGroup "Trace.Forward.Protocol" - [ testProperty "codec" prop_codec_TraceForward - ] - -prop_codec_TraceForward :: AnyMessageAndAgency (TraceForward TraceItem) -> Bool -prop_codec_TraceForward msg = - runST $ prop_codecM - (codecTraceForward CBOR.encode CBOR.decode - CBOR.encode CBOR.decode) - msg diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/Codec.hs b/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Codec.hs similarity index 74% rename from trace-forward/test/Test/Trace/Forward/Protocol/Codec.hs rename to trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Codec.hs index 793ee74fb70..1fa69dca1b1 100644 --- a/trace-forward/test/Test/Trace/Forward/Protocol/Codec.hs +++ b/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Codec.hs @@ -1,21 +1,25 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} -module Test.Trace.Forward.Protocol.Codec () where +module Test.Trace.Forward.Protocol.TraceObject.Codec () where import Test.QuickCheck import Network.TypedProtocol.Core import Network.TypedProtocol.Codec -import Trace.Forward.Protocol.Type +import Trace.Forward.Protocol.TraceObject.Type -import Test.Trace.Forward.Protocol.TraceItem +import Test.Trace.Forward.Protocol.TraceObject.Item instance Arbitrary NumberOfTraceObjects where - arbitrary = NumberOfTraceObjects <$> arbitrary + arbitrary = oneof + [ pure $ NumberOfTraceObjects 1 + , pure $ NumberOfTraceObjects 10 + , pure $ NumberOfTraceObjects 100 + ] -instance Arbitrary (AnyMessageAndAgency (TraceForward TraceItem)) where +instance Arbitrary (AnyMessageAndAgency (TraceObjectForward TraceItem)) where arbitrary = oneof [ AnyMessageAndAgency (ClientAgency TokIdle) . MsgTraceObjectsRequest TokBlocking <$> arbitrary , AnyMessageAndAgency (ClientAgency TokIdle) . MsgTraceObjectsRequest TokNonBlocking <$> arbitrary @@ -24,7 +28,7 @@ instance Arbitrary (AnyMessageAndAgency (TraceForward TraceItem)) where , pure $ AnyMessageAndAgency (ClientAgency TokIdle) MsgDone ] -instance Eq (AnyMessage (TraceForward TraceItem)) where +instance Eq (AnyMessage (TraceObjectForward TraceItem)) where AnyMessage (MsgTraceObjectsRequest TokBlocking r1) == AnyMessage (MsgTraceObjectsRequest TokBlocking r2) = r1 == r2 AnyMessage (MsgTraceObjectsRequest TokNonBlocking r1) diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Direct.hs b/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Direct.hs new file mode 100644 index 00000000000..507d50183a7 --- /dev/null +++ b/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Direct.hs @@ -0,0 +1,22 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Test.Trace.Forward.Protocol.TraceObject.Direct + ( direct + ) where + +import Trace.Forward.Protocol.TraceObject.Acceptor +import Trace.Forward.Protocol.TraceObject.Forwarder + +direct :: Monad m + => TraceObjectForwarder lo m a + -> TraceObjectAcceptor lo m b + -> m (a, b) +direct TraceObjectForwarder { recvMsgDone } + (SendMsgDone mdone) = + (,) <$> recvMsgDone <*> mdone +direct TraceObjectForwarder { recvMsgTraceObjectsRequest } + (SendMsgTraceObjectsRequest blocking numOfTO mclient) = do + (reply, server) <- recvMsgTraceObjectsRequest blocking numOfTO + client <- mclient reply + direct server client diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Examples.hs b/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Examples.hs new file mode 100644 index 00000000000..503af34e98f --- /dev/null +++ b/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Examples.hs @@ -0,0 +1,52 @@ +{-# LANGUAGE GADTs #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Test.Trace.Forward.Protocol.TraceObject.Examples + ( traceObjectAcceptorApply + , traceObjectForwarderCount + ) where + +import qualified Data.List.NonEmpty as NE + +import Trace.Forward.Protocol.TraceObject.Acceptor +import Trace.Forward.Protocol.TraceObject.Forwarder +import Trace.Forward.Protocol.TraceObject.Type + +traceObjectAcceptorApply + :: forall m. Monad m + => (Int -> Int) + -> Int + -> Int -- ^ count of number of requests + -> TraceObjectAcceptor Int m Int +traceObjectAcceptorApply f = go + where + go :: Int -> Int -> TraceObjectAcceptor Int m Int + go acc n + | n <= 0 = + SendMsgDone $ return acc + | otherwise = + SendMsgTraceObjectsRequest + TokNonBlocking + (NumberOfTraceObjects 1) + $ \_reply -> return $ go (f acc) (pred n) + +-- | A server which counts number received of 'MsgTraceObjectsRequest'. +-- +traceObjectForwarderCount + :: forall m. Monad m + => TraceObjectForwarder Int m Int +traceObjectForwarderCount = go 0 + where + go :: Int -> TraceObjectForwarder Int m Int + go n = + TraceObjectForwarder + { recvMsgDone = return n + , recvMsgTraceObjectsRequest = + \blocking _numOfTO -> + return ( case blocking of + TokBlocking -> BlockingReply (NE.fromList [1, 2, 3]) + TokNonBlocking -> NonBlockingReply [1, 2] + , go (succ n) + ) + } diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/TraceItem.hs b/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Item.hs similarity index 60% rename from trace-forward/test/Test/Trace/Forward/Protocol/TraceItem.hs rename to trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Item.hs index cab48a48f99..98ce598ab93 100644 --- a/trace-forward/test/Test/Trace/Forward/Protocol/TraceItem.hs +++ b/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Item.hs @@ -2,14 +2,13 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE OverloadedStrings #-} -module Test.Trace.Forward.Protocol.TraceItem +module Test.Trace.Forward.Protocol.TraceObject.Item ( TraceItem (..) ) where import Codec.Serialise (Serialise (..)) import Data.List.NonEmpty (NonEmpty, fromList) import Data.Text (Text) -import Data.Time (UTCTime (..), fromGregorian) import GHC.Generics import Test.QuickCheck @@ -38,23 +37,13 @@ instance Arbitrary DetailLevel where instance Serialise DetailLevel -instance Arbitrary UTCTime where - arbitrary = oneof - [ pure $ UTCTime (fromGregorian 2021 7 24) ((22 * 3600) + (15 * 60) + 1) - , pure $ UTCTime (fromGregorian 2021 7 25) ((12 * 3600) + (4 * 60) + 37) - , pure $ UTCTime (fromGregorian 2021 7 26) ((23 * 3600) + (19 * 60) + 56) - ] - -- | Trace items that will be used during testing. --- This type is similar to the real 'TraceObject' that will be used by the node. +-- This type is an imitation of the real 'TraceObject' that will be used by the node. data TraceItem = TraceItem - { tiHuman :: Maybe String - , tiNamespace :: [String] - , tiSeverity :: Severity - , tiDetails :: DetailLevel - , tiTimestamp :: UTCTime - , tiHostname :: String - , tiThreadId :: Text + { tiSeverity :: Severity + , tiDetails :: DetailLevel + , tiHostname :: String + , tiThreadId :: Text } deriving (Eq, Ord, Show, Generic) instance Serialise TraceItem @@ -64,11 +53,8 @@ instance Arbitrary TraceItem where arbitrary = TraceItem <$> arbitrary <*> arbitrary - <*> arbitrary - <*> arbitrary - <*> arbitrary - <*> arbitrary - <*> oneof [pure "1", pure "10"] + <*> oneof [pure "nixos", pure "Darwin", pure "testHost"] + <*> oneof [pure "1", pure "10", pure "14"] instance Arbitrary (NonEmpty TraceItem) where arbitrary = fromList <$> listOf1 arbitrary diff --git a/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Tests.hs b/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Tests.hs new file mode 100644 index 00000000000..1b3f911e919 --- /dev/null +++ b/trace-forward/test/Test/Trace/Forward/Protocol/TraceObject/Tests.hs @@ -0,0 +1,127 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GADTs #-} + +module Test.Trace.Forward.Protocol.TraceObject.Tests + ( tests + ) where + +import qualified Codec.Serialise as CBOR +import Control.Monad.Class.MonadST +import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadThrow +import Control.Monad.ST (runST) +import Control.Monad.IOSim (runSimOrThrow) +import Control.Tracer (nullTracer) +import Test.Tasty +import Test.Tasty.QuickCheck + +import Network.TypedProtocol.Codec +import Network.TypedProtocol.Proofs +import Ouroboros.Network.Channel +import Ouroboros.Network.Driver.Simple (runConnectedPeers) + +import Trace.Forward.Protocol.TraceObject.Acceptor +import Trace.Forward.Protocol.TraceObject.Forwarder +import Trace.Forward.Protocol.TraceObject.Codec +import Trace.Forward.Protocol.TraceObject.Type + +import Test.Trace.Forward.Protocol.TraceObject.Codec () +import Test.Trace.Forward.Protocol.TraceObject.Direct +import Test.Trace.Forward.Protocol.TraceObject.Examples +import Test.Trace.Forward.Protocol.TraceObject.Item + +import Test.Trace.Forward.Protocol.Common + +tests :: TestTree +tests = testGroup "Trace.Forward.Protocol.TraceObject" + [ testProperty "codec" prop_codec_TraceObjectForward + , testProperty "codec 2-splits" prop_codec_splits2_TraceObjectForward + , testProperty "codec 3-splits" (withMaxSuccess 33 prop_codec_splits3_TraceObjectForward) + , testProperty "direct" prop_direct_TraceObjectForward + , testProperty "connect" prop_connect_TraceObjectForward + , testProperty "channel ST" prop_channel_ST_TraceObjectForward + , testProperty "channel IO" prop_channel_IO_TraceObjectForward + ] + +prop_codec_TraceObjectForward :: AnyMessageAndAgency (TraceObjectForward TraceItem) -> Bool +prop_codec_TraceObjectForward msg = runST $ + prop_codecM + (codecTraceObjectForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + msg + +prop_codec_splits2_TraceObjectForward + :: AnyMessageAndAgency (TraceObjectForward TraceItem) + -> Bool +prop_codec_splits2_TraceObjectForward msg = runST $ + prop_codec_splitsM + splits2 + (codecTraceObjectForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + msg + +prop_codec_splits3_TraceObjectForward + :: AnyMessageAndAgency (TraceObjectForward TraceItem) + -> Bool +prop_codec_splits3_TraceObjectForward msg = runST $ + prop_codec_splitsM + splits3 + (codecTraceObjectForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + msg + +prop_direct_TraceObjectForward + :: (Int -> Int) + -> NonNegative Int + -> Property +prop_direct_TraceObjectForward f (NonNegative n) = + runSimOrThrow + (direct + traceObjectForwarderCount + (traceObjectAcceptorApply f 0 n)) + === + (n, foldr (.) id (replicate n f) 0) + +prop_connect_TraceObjectForward + :: (Int -> Int) + -> NonNegative Int + -> Bool +prop_connect_TraceObjectForward f (NonNegative n) = + case runSimOrThrow + (connect + (traceObjectForwarderPeer traceObjectForwarderCount) + (traceObjectAcceptorPeer $ traceObjectAcceptorApply f 0 n)) of + (s, c, TerminalStates TokDone TokDone) -> (s, c) == (n, foldr (.) id (replicate n f) 0) + +prop_channel + :: ( MonadST m + , MonadAsync m + , MonadCatch m + ) + => (Int -> Int) + -> Int + -> m Property +prop_channel f n = do + (s, c) <- runConnectedPeers createConnectedChannels + nullTracer + (codecTraceObjectForward CBOR.encode CBOR.decode + CBOR.encode CBOR.decode) + forwarder acceptor + return ((s, c) === (n, foldr (.) id (replicate n f) 0)) + where + forwarder = traceObjectForwarderPeer traceObjectForwarderCount + acceptor = traceObjectAcceptorPeer $ traceObjectAcceptorApply f 0 n + +prop_channel_ST_TraceObjectForward + :: (Int -> Int) + -> NonNegative Int + -> Property +prop_channel_ST_TraceObjectForward f (NonNegative n) = + runSimOrThrow (prop_channel f n) + +prop_channel_IO_TraceObjectForward + :: (Int -> Int) + -> NonNegative Int + -> Property +prop_channel_IO_TraceObjectForward f (NonNegative n) = + ioProperty (prop_channel f n) diff --git a/trace-forward/trace-forward.cabal b/trace-forward/trace-forward.cabal index 065f3680ea9..06ffe0aedf3 100644 --- a/trace-forward/trace-forward.cabal +++ b/trace-forward/trace-forward.cabal @@ -1,8 +1,9 @@ cabal-version: 2.4 name: trace-forward version: 0.1.0 -synopsis: See README for more info -description: See README for more info +synopsis: The forwarding protocols library for cardano node. +description: The library providing typed protocols for forwarding different + information from the cardano node to an external application. license: Apache-2.0 license-file: LICENSE copyright: 2021 Input Output (Hong Kong) Ltd. @@ -31,29 +32,38 @@ library import: base, project-config hs-source-dirs: src - exposed-modules: Trace.Forward.Acceptor - Trace.Forward.Configuration - Trace.Forward.Forwarder - Trace.Forward.Queue - Trace.Forward.Utils + exposed-modules: Trace.Forward.Protocol.DataPoint.Acceptor + Trace.Forward.Protocol.DataPoint.Codec + Trace.Forward.Protocol.DataPoint.Forwarder + Trace.Forward.Protocol.DataPoint.Type + Trace.Forward.Protocol.TraceObject.Acceptor + Trace.Forward.Protocol.TraceObject.Codec + Trace.Forward.Protocol.TraceObject.Forwarder + Trace.Forward.Protocol.TraceObject.Type - Trace.Forward.Network.Acceptor - Trace.Forward.Network.Forwarder + Trace.Forward.Run.DataPoint.Acceptor + Trace.Forward.Run.DataPoint.Forwarder + Trace.Forward.Run.TraceObject.Acceptor + Trace.Forward.Run.TraceObject.Forwarder - Trace.Forward.Protocol.Acceptor - Trace.Forward.Protocol.Codec - Trace.Forward.Protocol.Forwarder - Trace.Forward.Protocol.Type + Trace.Forward.Configuration.DataPoint + Trace.Forward.Configuration.TraceObject - build-depends: async + Trace.Forward.Utils.DataPoint + Trace.Forward.Utils.TraceObject + + build-depends: aeson + , async , bytestring , cborg + , containers , contra-tracer , extra , io-classes , ouroboros-network-framework , serialise , stm + , text , typed-protocols , typed-protocols-cborg @@ -63,17 +73,25 @@ test-suite test main-is: Main.hs hs-source-dirs: test - other-modules: Test.Trace.Forward.Protocol.Codec - Test.Trace.Forward.Protocol.Tests - Test.Trace.Forward.Protocol.TraceItem - Test.Trace.Forward.Demo.Configs - Test.Trace.Forward.Demo.Tests + other-modules: Test.Trace.Forward.Protocol.TraceObject.Codec + Test.Trace.Forward.Protocol.TraceObject.Direct + Test.Trace.Forward.Protocol.TraceObject.Examples + Test.Trace.Forward.Protocol.TraceObject.Item + Test.Trace.Forward.Protocol.TraceObject.Tests + + Test.Trace.Forward.Protocol.DataPoint.Codec + Test.Trace.Forward.Protocol.DataPoint.Direct + Test.Trace.Forward.Protocol.DataPoint.Examples + Test.Trace.Forward.Protocol.DataPoint.Item + Test.Trace.Forward.Protocol.DataPoint.Tests + + Test.Trace.Forward.Protocol.Common - build-depends: async + build-depends: aeson + , bytestring , contra-tracer - , directory - , extra - , filepath + , io-classes + , io-sim , ouroboros-network-framework , trace-forward , QuickCheck @@ -82,7 +100,6 @@ test-suite test , tasty-quickcheck , typed-protocols , text - , time ghc-options: -rtsopts -threaded