From e9bc8a3b5b0c3056f2619ac0224c66e23c7edfcb Mon Sep 17 00:00:00 2001 From: Denis Shevchenko Date: Tue, 7 Sep 2021 11:35:56 +0400 Subject: [PATCH] Update trace-forward. --- cabal.project | 1 - trace-forward-demo/README.md | 137 -------- trace-forward-demo/acceptor-mux.hs | 317 ------------------ trace-forward-demo/acceptor.hs | 47 --- trace-forward-demo/forwarder-mux.hs | 242 ------------- trace-forward-demo/forwarder.hs | 75 ----- .../scripts/runDisconnectAcceptor.sh | 7 - .../scripts/runDisconnectForwarder.sh | 7 - trace-forward-demo/trace-forward-demo.cabal | 93 ----- 9 files changed, 926 deletions(-) delete mode 100644 trace-forward-demo/README.md delete mode 100644 trace-forward-demo/acceptor-mux.hs delete mode 100644 trace-forward-demo/acceptor.hs delete mode 100644 trace-forward-demo/forwarder-mux.hs delete mode 100644 trace-forward-demo/forwarder.hs delete mode 100755 trace-forward-demo/scripts/runDisconnectAcceptor.sh delete mode 100755 trace-forward-demo/scripts/runDisconnectForwarder.sh delete mode 100644 trace-forward-demo/trace-forward-demo.cabal diff --git a/cabal.project b/cabal.project index 7ba5fbb2c61..62e96a3ed24 100644 --- a/cabal.project +++ b/cabal.project @@ -16,7 +16,6 @@ packages: plutus-example/plutus-example trace-dispatcher trace-forward - trace-forward-demo trace-resources package cardano-api diff --git a/trace-forward-demo/README.md b/trace-forward-demo/README.md deleted file mode 100644 index 9f2b9697b1c..00000000000 --- a/trace-forward-demo/README.md +++ /dev/null @@ -1,137 +0,0 @@ -# Demo Programs - -There are two demo programs, `demo-forwarder` and `demo-acceptor`. You can run these programs in different terminals and see an interaction between them. - -Please see `forwarder.hs` module as an example of how to use `trace-forward` library in the node and `acceptor.hs` module as an example of how to use it in acceptor application (for example, tracer or RTView). - -## How To Build It - -As a result of `cabal build all` command, two demo programs will be built: `demo-forwarder` and `demo-acceptor`. - -## How To Run It - -### Connection Via Local Pipe - -The example command to run `demo-acceptor` program: - -``` -./demo-acceptor /path/to/demo.sock -``` - -where `/path/to/demo.sock` is the path to the pipe file that will be created and used for connection with the forwarder. - -The example command to run `demo-forwarder` program: - -``` -./demo-forwarder /path/to/demo.sock -``` - -where `/path/to/demo.sock` is the path to the pipe file that will be created (if needed) and used for connection with the acceptor. - -### Connection Via Remote Socket - -The example command to run `demo-acceptor` program: - -``` -./demo-acceptor 127.0.0.1 3010 -``` - -where `127.0.0.1` and `3010` are the host and port; the acceptor will listen to them to accept the connection from the forwarder. - -The example command to run `demo-forwarder` program: - -``` -./demo-forwarder 127.0.0.1 3010 -``` - -where `127.0.0.1` and `3010` are the host and port; the forwarder will use them to establish the connection with the acceptor. - -# Mux Demo Programs - -There are two demo programs, `demo-forwarder-mux` and `demo-acceptor-mux`. You can run these programs in different terminals and see an interaction between them. - -These demo-programs use two libraries: `trace-forward` and [`ekg-forward`](https://github.com/input-output-hk/ekg-forward). The purpose of these demo-programs is demonstration of `Mux`-ing of two `typed-protocol`s using one single connection. - -You can use it as a practical example of how to integrate these two libraries in the forwarder application (for example, `cardano-node`) and in the acceptor application (for example, tracer or [RTView](https://github.com/input-output-hk/cardano-rt-view)). - -Demo-programs can be launched in different modes. - -## Simple Mode - -Run demo-programs like this: - -``` -$ ./demo-acceptor-mux ./demo-mux.sock 1000 -``` - -``` -$ ./demo-forwarder-mux ./demo-mux.sock -``` - -or like this: - -``` -$ ./demo-acceptor-mux 127.0.0.1 3010 1000 -``` - -``` -$ ./demo-forwarder-mux 127.0.0.1 3010 -``` - -In these examples, `demo-mux.sock` is a local pipe, and `127.0.0.1 3010` is a host and a port. - -Next value for the acceptor is the number of requested `LogObject`s, in this example the acceptor will ask `1000` `LogObject`s. - -## Benchmark Mode - -Run demo-programs like this: - -``` -$ ./demo-acceptor-mux ./demo-mux.sock 1000 -b 2 -``` - -``` -$ ./demo-forwarder-mux ./demo-mux.sock 1 -b 0.000001 -``` - -Flags before `-b` are the same as in the simple mode. Flag `-b` means `"benchmark"`. - -The value after `-b` flag for the acceptor is a speed frequency: how often the acceptor will print the speed of accepting `LogObject`s. This speed is a number of `LogObject`s received in 1 second. In this example the acceptor will print this speed every `2` seconds. - -The value after `-b` flag for the forwarder is a fill frequency: how often the forwarder will fill its local queue of `LogObject`s. In this example the forwarder will write the new `LogObject` in its local queue every `0.000001` seconds. - -Since benchmark mode should work as fast as possible, it allows the connection via local pipe only (in this example it's `demo-mux.sock`). - -## Benchmark Mode (Limited) - -Run demo-programs like this: - -``` -$ ./demo-acceptor-mux ./demo-mux.sock 0.001 1000 -b 2 -t 1000000 -``` - -``` -$ ./demo-forwarder-mux ./demo-mux.sock 1 -b 0.000001 -``` - -The new flag here is `-t` which means `"total"`. The value after `-t` is a total number of `LogObject`s that will be requested by acceptor. After the acceptor will receive such a number of `LogObject`s, the test will be stopped. In this example, `1000000` `LogObject`s will be requested. - -## Disconnect Mode - -Run demo-programs like this: - -``` -$ ./demo-acceptor-mux 127.0.0.1 3010 --dc 30 -``` - -``` -$ ./demo-forwarder-mux 127.0.0.1 3010 --dc 25 -``` - -Flag `--dc` means `"DisConnect"`. The number is a disconnect frequency, in seconds: how often the program will break the connection. In this example, the acceptor will break it every 30 seconds, and the forwarder - every 25 seconds. - -In disconnect mode, the acceptor asks for `LogObject`s as usually, but both the acceptor and the forwarder periodically break the connection and re-establish it again. The purpose of disconnect mode is to check if network resources clean up correctly. This is why disconnect mode allows the connection via `host` and `port` only. - -### Disconnect Mode: Scripts - -For simplicity, it is possible to run the forwarder and the acceptor using these scripts: `./scripts/runDisconnectForwarder.sh` and `./scripts/runDisconnectAcceptor.sh`. These scripts will launch demo-programs and will `watch` for active TCP-connecions using `lsof` command. diff --git a/trace-forward-demo/acceptor-mux.hs b/trace-forward-demo/acceptor-mux.hs deleted file mode 100644 index 44c72ecbcf0..00000000000 --- a/trace-forward-demo/acceptor-mux.hs +++ /dev/null @@ -1,317 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE PackageImports #-} -{-# LANGUAGE ScopedTypeVariables #-} - -import Codec.CBOR.Term (Term) -import Control.Concurrent (ThreadId, killThread, myThreadId, threadDelay) -import Control.Concurrent.Async (async, asyncThreadId, wait, withAsync) -import Control.Concurrent.STM (atomically) -import Control.Concurrent.STM.TBQueue (newTBQueueIO) -import Control.Concurrent.STM.TVar -import "contra-tracer" Control.Tracer (contramap, nullTracer, stdoutTracer) -import Control.Monad (forever, void, when) -import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef) -import Data.Fixed (Pico) -import Data.Maybe (isJust) -import Data.Text (pack) -import Data.Time.Clock (NominalDiffTime, getCurrentTime, - diffUTCTime, secondsToNominalDiffTime) -import Data.Void (Void) -import Data.Word (Word16, Word64) -import System.Environment (getArgs) -import System.Exit (die) - -import Control.Exception (SomeException, try) -import qualified Data.ByteString.Lazy as LBS -import qualified Network.Socket as Socket -import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..), - MiniProtocolNum (..), MuxMode (..), - OuroborosApplication (..), - RunMiniProtocol (..), - miniProtocolLimits, miniProtocolNum, miniProtocolRun) -import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) -import Ouroboros.Network.ErrorPolicy (nullErrorPolicies) -import Ouroboros.Network.IOManager (withIOManager) -import Ouroboros.Network.Snocket (Snocket, localAddressFromPath, localSnocket, socketSnocket) -import Ouroboros.Network.Socket (AcceptedConnectionsLimit (..), - SomeResponderApplication (..), - cleanNetworkMutableState, newNetworkMutableState, - nullNetworkServerTracers, withServerNode) -import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, - noTimeLimitsHandshake, - timeLimitsHandshake) -import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..), - UnversionedProtocolData (..), - unversionedHandshakeCodec, - unversionedProtocolDataCodec) -import Ouroboros.Network.Protocol.Handshake.Type (Handshake) -import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, simpleSingletonVersions) -import qualified System.Metrics as EKG - -import Cardano.Logging (TraceObject) - -import qualified Trace.Forward.Configuration as TF -import qualified Trace.Forward.Protocol.Type as TF -import Trace.Forward.Network.Acceptor (acceptTraceObjects) - -import System.Metrics.Network.Acceptor (acceptEKGMetrics) -import System.Metrics.Store.Acceptor (emptyMetricsLocalStore) -import qualified System.Metrics.Configuration as EKGF -import qualified System.Metrics.ReqResp as EKGF - -main :: IO () -main = do - (listenIt, freq, itemsNum, benchSpeedFreq, totalObjs, reConnectTest) <- do - args <- getArgs - if "--dc" `elem` args - then - case args of - [host, port, "--dc", freq] -> - return ( RemoteSocket host port - , 1 -- This is disconnect test, so the frequency of requests doesn't matter. - , 100 -- This is disconnect test, so the number of requested TraceObjects doesn't matter. - , Nothing - , Nothing - , Just (read freq :: Pico) -- This is how often the server will be shut down. - ) - _ -> die "Usage: demo-acceptor-mux host port --dc freqInSecs" - else - case args of - [path, freq, n] -> - return ( LocalPipe path - , read freq :: Pico - , read n :: Word16 - , Nothing - , Nothing - , Nothing - ) - [host, port, freq, n] -> - return ( RemoteSocket host port - , read freq :: Pico - , read n :: Word16 - , Nothing - , Nothing - , Nothing - ) - [path, freq, n, "-b", sp] -> - return ( LocalPipe path - , read freq :: Pico - , read n :: Word16 - , Just (read sp :: Pico) - , Nothing - , Nothing - ) - [path, freq, n, "-b", sp, "-t", tn] -> - return ( LocalPipe path - , read freq :: Pico - , read n :: Word16 - , Just (read sp :: Pico) - , Just (read tn :: Word64) - , Nothing - ) - _ -> - die "Usage: demo-acceptor-mux (pathToPipe | host port) freqInSecs itemsNum [-b freqInSecs] [-t totalObjs]" - - configs <- mkConfigs listenIt freq itemsNum benchSpeedFreq totalObjs - - tidVar <- newTVarIO =<< myThreadId -- Just for filling TVar, it will be replaced anyway. - - case reConnectTest of - Nothing -> launchAcceptors listenIt configs tidVar - Just rcFreq -> runReConnector (launchAcceptors listenIt configs tidVar) rcFreq tidVar - -mkConfigs - :: HowToConnect - -> Pico - -> Word16 - -> Maybe Pico - -> Maybe Word64 - -> IO (EKGF.AcceptorConfiguration, TF.AcceptorConfiguration TraceObject) -mkConfigs listenIt freq itemsNum benchSpeedFreq totalObjs = do - stopEKGF <- newIORef False - stopTF <- newIORef False - loCounter <- newIORef (0 :: Word64) - - when benchMode $ - void . async $ runSpeedPrinter loCounter 0 stopTF - - case totalObjs of - Nothing -> return () - Just tn -> do - startTime <- getCurrentTime - putStrLn $ "Start time: " <> show startTime - void . async $ stopWhenTotalReached startTime loCounter tn stopTF stopEKGF - - let ekgConfig = - EKGF.AcceptorConfiguration - { EKGF.acceptorTracer = if benchMode then nullTracer else contramap show stdoutTracer - , EKGF.forwarderEndpoint = forEKGF listenIt - , EKGF.requestFrequency = secondsToNominalDiffTime freq - , EKGF.whatToRequest = EKGF.GetAllMetrics - -- Currently, only TF works in bench mode. - , EKGF.actionOnResponse = if benchMode then (\_ -> return ()) else print - , EKGF.shouldWeStop = stopEKGF - , EKGF.actionOnDone = putStrLn "EKGF: we are done!" - } - tfConfig :: TF.AcceptorConfiguration TraceObject - tfConfig = - TF.AcceptorConfiguration - { TF.acceptorTracer = if benchMode then nullTracer else contramap show stdoutTracer - , TF.forwarderEndpoint = forTF listenIt - , TF.whatToRequest = TF.GetTraceObjects itemsNum - -- Currently, only TF works in bench mode. - , TF.actionOnReply = if benchMode then count loCounter else print - , TF.shouldWeStop = stopTF - , TF.actionOnDone = putStrLn "TF: we are done!" - } - return (ekgConfig, tfConfig) - where - forTF (LocalPipe p) = TF.LocalPipe p - forTF (RemoteSocket h p) = TF.RemoteSocket (pack h) (read p :: TF.Port) - - forEKGF (LocalPipe p) = EKGF.LocalPipe p - forEKGF (RemoteSocket h p) = EKGF.RemoteSocket (pack h) (read p :: EKGF.Port) - - benchMode = isJust benchSpeedFreq - - count :: IORef Word64 -> [TraceObject] -> IO () - count loCounter los = - atomicModifyIORef' loCounter $ \cnt -> (cnt + fromIntegral (length los), ()) - - runSpeedPrinter loCounter diff stopTF = - case benchSpeedFreq of - Nothing -> return () - Just sp -> do - let waitInMicroSecs = toMicroSecs . secondsToNominalDiffTime $ sp - threadDelay waitInMicroSecs - --Check should we stop... - shouldIStop <- readIORef stopTF - if shouldIStop - then return () - else do - n <- readIORef loCounter - let newObjsNum = n - diff - putStrLn $ "Bench mode: " <> show newObjsNum - <> " new TraceObjects were received during last " - <> show waitInMicroSecs <> " mks." - runSpeedPrinter loCounter n stopTF - - stopWhenTotalReached startTime loCounter totalObjsNum stopTF stopEKGF = do - n <- readIORef loCounter - if n < totalObjsNum - then do - threadDelay 1000 - stopWhenTotalReached startTime loCounter totalObjsNum stopTF stopEKGF - else do - stopTime <- getCurrentTime - let timeDiff = stopTime `diffUTCTime` startTime - putStrLn $ "Stop time: " <> show stopTime - putStrLn $ show n <> " TraceObjects were received during " - <> show timeDiff - atomicModifyIORef' stopTF $ const (True, ()) - atomicModifyIORef' stopEKGF $ const (True, ()) - -toMicroSecs :: NominalDiffTime -> Int -toMicroSecs dt = fromEnum dt `div` 1000000 - -runReConnector :: IO () -> Pico -> TVar ThreadId -> IO () -runReConnector acceptor rcFreq tidVar = forever $ do - putStrLn "ReConnect test, start acceptor..." - withAsync acceptor $ \_ -> do - threadDelay . toMicroSecs . secondsToNominalDiffTime $ rcFreq - putStrLn "ReConnect test, stop acceptor..." - tid <- readTVarIO tidVar - putStrLn $ "KILL TID: " <> show tid - killThread tid - --- Network part - -data HowToConnect - = LocalPipe !FilePath - | RemoteSocket !String !String - -launchAcceptors - :: HowToConnect - -> (EKGF.AcceptorConfiguration, TF.AcceptorConfiguration TraceObject) - -> TVar ThreadId - -> IO () -launchAcceptors endpoint configs tidVar = - try (launchAcceptors' endpoint configs tidVar) >>= \case - Left (_e :: SomeException) -> - launchAcceptors endpoint configs tidVar - Right _ -> return () - -launchAcceptors' - :: HowToConnect - -> (EKGF.AcceptorConfiguration, TF.AcceptorConfiguration TraceObject) - -> TVar ThreadId - -> IO () -launchAcceptors' endpoint configs tidVar = withIOManager $ \iocp -> do - case endpoint of - LocalPipe localPipe -> do - let snocket = localSnocket iocp localPipe - address = localAddressFromPath localPipe - void $ doListenToForwarder snocket address noTimeLimitsHandshake configs tidVar - RemoteSocket host port -> do - listenAddress:_ <- Socket.getAddrInfo Nothing (Just host) (Just port) - let snocket = socketSnocket iocp - address = Socket.addrAddress listenAddress - void $ doListenToForwarder snocket address timeLimitsHandshake configs tidVar - -doListenToForwarder - :: Ord addr - => Snocket IO fd addr - -> addr - -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) - -> (EKGF.AcceptorConfiguration, TF.AcceptorConfiguration TraceObject) - -> TVar ThreadId - -> IO Void -doListenToForwarder snocket address timeLimits (ekgConfig, tfConfig) tidVar = do - store <- EKG.newStore - metricsStore <- newIORef emptyMetricsLocalStore - loQueue <- newTBQueueIO 1000000 - niStore <- newIORef [] - - networkState <- newNetworkMutableState - _ <- async $ cleanNetworkMutableState networkState - withServerNode - snocket - nullNetworkServerTracers - networkState - (AcceptedConnectionsLimit maxBound maxBound 0) - address - unversionedHandshakeCodec - timeLimits - (cborTermVersionDataCodec unversionedProtocolDataCodec) - acceptableVersion - (simpleSingletonVersions - UnversionedProtocol - UnversionedProtocolData - (SomeResponderApplication $ - acceptorApp [ (acceptEKGMetrics ekgConfig store metricsStore, 1) - , (acceptTraceObjects tfConfig loQueue niStore, 2) - ] - ) - ) - nullErrorPolicies - $ \_ serverAsync -> do - let tid = asyncThreadId serverAsync - -- Store it to will be able to kill it later. - putStrLn $ "STORE TID: " <> show tid - atomically $ modifyTVar' tidVar (const tid) - wait serverAsync -- Block until async exception. - where - acceptorApp - :: [(RunMiniProtocol 'ResponderMode LBS.ByteString IO Void (), Word16)] - -> OuroborosApplication 'ResponderMode addr LBS.ByteString IO Void () - acceptorApp protocols = - OuroborosApplication $ \_connectionId _shouldStopSTM -> - [ MiniProtocol - { miniProtocolNum = MiniProtocolNum num - , miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound } - , miniProtocolRun = prot - } - | (prot, num) <- protocols - ] diff --git a/trace-forward-demo/acceptor.hs b/trace-forward-demo/acceptor.hs deleted file mode 100644 index cbdaa882414..00000000000 --- a/trace-forward-demo/acceptor.hs +++ /dev/null @@ -1,47 +0,0 @@ -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE PackageImports #-} - -import Control.Concurrent.STM.TBQueue (newTBQueueIO) -import "contra-tracer" Control.Tracer (contramap, stdoutTracer) -import Data.IORef (newIORef) -import Data.Text (pack) -import System.Environment (getArgs) -import System.Exit (die) - -import Cardano.Logging (TraceObject) - -import Trace.Forward.Acceptor (runTraceAcceptor) -import Trace.Forward.Configuration (AcceptorConfiguration (..), - HowToConnect (..), Port) -import Trace.Forward.Protocol.Type (Request (..)) - -main :: IO () -main = do - -- Prepare the acceptor's configuration. - listenIt <- getArgs >>= \case - [path] -> return $ LocalPipe path - [host, port] -> return $ RemoteSocket (pack host) (read port :: Port) - _ -> die "Usage: demo-acceptor (pathToLocalPipe | host port)" - weAreDone <- newIORef False - let config :: AcceptorConfiguration TraceObject - config = - AcceptorConfiguration - { acceptorTracer = contramap show stdoutTracer - , forwarderEndpoint = listenIt - , whatToRequest = GetTraceObjects 10 - , actionOnReply = print - , shouldWeStop = weAreDone - , actionOnDone = putStrLn "We are done!" - } - - -- Create an empty TBQueue where received 'LogObject's will be stored. - queue <- newTBQueueIO 100 - - -- Create an empty store where received node's info will be stored. - niStore <- newIORef [] - - -- Run the acceptor. It will listen to the forwarder, and after the connection - -- will be established, the acceptor will ask for N 'LogObject's from the forwarder. - -- After these 'LogObject's will be received, the acceptor will write them in the 'queue'. - runTraceAcceptor config queue niStore diff --git a/trace-forward-demo/forwarder-mux.hs b/trace-forward-demo/forwarder-mux.hs deleted file mode 100644 index 5c918f02b84..00000000000 --- a/trace-forward-demo/forwarder-mux.hs +++ /dev/null @@ -1,242 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE PackageImports #-} -{-# LANGUAGE ScopedTypeVariables #-} - -import Codec.CBOR.Term (Term) -import Control.Concurrent (threadDelay) -import Control.Concurrent.Async (async, withAsync) -import Control.Monad (forever) -import "contra-tracer" Control.Tracer (contramap, nullTracer, stdoutTracer) -import Data.Fixed (Pico) -import Data.Maybe (isJust) -import Data.Text (pack) -import Data.Time.Clock (NominalDiffTime, UTCTime, getCurrentTime, secondsToNominalDiffTime) -import Data.Void (Void) -import Data.Word (Word16) -import System.Environment (getArgs) -import System.Exit (die) - -import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, writeTBQueue) -import Control.Exception (SomeException, try) -import Control.Monad.STM (atomically) -import qualified Data.ByteString.Lazy as LBS -import qualified Network.Socket as Socket -import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) -import Ouroboros.Network.IOManager (withIOManager) -import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolLimits (..), - MiniProtocolNum (..), MuxMode (..), - OuroborosApplication (..), RunMiniProtocol (..), - miniProtocolLimits, miniProtocolNum, miniProtocolRun) -import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, - noTimeLimitsHandshake, - timeLimitsHandshake) -import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..), - UnversionedProtocolData (..), - unversionedHandshakeCodec, - unversionedProtocolDataCodec) -import Ouroboros.Network.Protocol.Handshake.Type (Handshake) -import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, simpleSingletonVersions) -import Ouroboros.Network.Snocket (Snocket, localAddressFromPath, localSnocket, socketSnocket) -import Ouroboros.Network.Socket (connectToNode, nullNetworkConnectTracers) -import qualified System.Metrics as EKG - -import Cardano.Logging (DetailLevel (..), SeverityS (..), TraceObject (..)) - -import qualified Trace.Forward.Configuration as TF -import Trace.Forward.Network.Forwarder (forwardTraceObjects) -import Trace.Forward.Protocol.Type (NodeInfo (..)) - -import qualified System.Metrics.Configuration as EKGF -import System.Metrics.Network.Forwarder (forwardEKGMetrics) - -main :: IO () -main = do - (howToConnect, freq, benchFillFreq, reConnectTest) <- do - args <- getArgs - if "--dc" `elem` args - then - case args of - [host, port, "--dc", freq] -> - return ( RemoteSocket host port - , 0.5 - , Nothing - , Just (read freq :: Pico) -- This is how often the client will be shut down. - ) - _ -> die "Usage: demo-forwarder-mux host port --dc freqInSecs" - else - case args of - [path, freq] -> - return ( LocalPipe path - , read freq :: Pico - , Nothing - , Nothing - ) - [host, port, freq] -> - return ( RemoteSocket host port - , read freq :: Pico - , Nothing - , Nothing - ) - [path, freq, "-b", ff] -> - return ( LocalPipe path - , read freq :: Pico - , Just (read ff :: Pico) - , Nothing - ) - _ -> - die "Usage: demo-forwarder-mux (pathToLocalPipe | host port) freqInSecs [-b fillFreqInSecs]" - - configs <- mkConfigs howToConnect freq benchFillFreq <$> getCurrentTime - - case reConnectTest of - Nothing -> launchForwarders howToConnect benchFillFreq configs - Just rcFreq -> runReConnector (launchForwarders howToConnect benchFillFreq configs) rcFreq - -mkConfigs - :: HowToConnect - -> Pico - -> Maybe Pico - -> UTCTime - -> IO (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject) -mkConfigs howToConnect freq benchFillFreq now = (ekgConfig, tfConfig) - where - ekgConfig = - EKGF.ForwarderConfiguration - { EKGF.forwarderTracer = if benchMode then nullTracer else contramap show stdoutTracer - , EKGF.acceptorEndpoint = forEKGF howToConnect - , EKGF.reConnectFrequency = secondsToNominalDiffTime freq - , EKGF.actionOnRequest = const (return ()) - } - tfConfig = - TF.ForwarderConfiguration - { TF.forwarderTracer = if benchMode then nullTracer else contramap show stdoutTracer - , TF.acceptorEndpoint = forTF howToConnect - , TF.nodeBasicInfo = pure - NodeInfo - { niName = "core-1" - , niProtocol = "Shelley" - , niVersion = "1.28.0" - , niCommit = "abcdefg" - , niStartTime = now - , niSystemStartTime = now - } - } - - forTF (LocalPipe p) = TF.LocalPipe p - forTF (RemoteSocket h p) = TF.RemoteSocket (pack h) (read p :: TF.Port) - - forEKGF (LocalPipe p) = EKGF.LocalPipe p - forEKGF (RemoteSocket h p) = EKGF.RemoteSocket (pack h) (read p :: EKGF.Port) - - benchMode = isJust benchFillFreq - -toMicroSecs :: NominalDiffTime -> Int -toMicroSecs dt = fromEnum dt `div` 1000000 - -runReConnector :: IO () -> Pico -> IO () -runReConnector forwarder rcFreq = forever $ do - putStrLn "ReConnect test, start forwarder..." - withAsync forwarder $ \_ -> do - threadDelay . toMicroSecs . secondsToNominalDiffTime $ rcFreq - putStrLn "ReConnect test, stop forwarder..." - --- Network part - -data HowToConnect - = LocalPipe !FilePath - | RemoteSocket !String !String - -launchForwarders - :: HowToConnect - -> Maybe Pico - -> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject) - -> IO () -launchForwarders endpoint benchFillFreq configs = - try (launchForwarders' endpoint benchFillFreq configs) >>= \case - Left (_e :: SomeException) -> - launchForwarders endpoint benchFillFreq configs - Right _ -> return () - -launchForwarders' - :: HowToConnect - -> Maybe Pico - -> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject) - -> IO () -launchForwarders' endpoint benchFillFreq configs = withIOManager $ \iocp -> do - case endpoint of - LocalPipe localPipe -> do - let snocket = localSnocket iocp localPipe - address = localAddressFromPath localPipe - doConnectToAcceptor snocket address noTimeLimitsHandshake benchFillFreq configs - RemoteSocket host port -> do - acceptorAddr:_ <- Socket.getAddrInfo Nothing (Just host) (Just port) - let snocket = socketSnocket iocp - address = Socket.addrAddress acceptorAddr - doConnectToAcceptor snocket address timeLimitsHandshake benchFillFreq configs - -doConnectToAcceptor - :: Snocket IO fd addr - -> addr - -> ProtocolTimeLimits (Handshake UnversionedProtocol Term) - -> Maybe Pico - -> (EKGF.ForwarderConfiguration, TF.ForwarderConfiguration TraceObject) - -> IO () -doConnectToAcceptor snocket address timeLimits benchFillFreq (ekgConfig, tfConfig) = do - tfQueue <- newTBQueueIO 1000000 - _ <- async $ traceObjectsWriter tfQueue benchFillFreq - store <- EKG.newStore - EKG.registerGcMetrics store - - connectToNode - snocket - unversionedHandshakeCodec - timeLimits - (cborTermVersionDataCodec unversionedProtocolDataCodec) - nullNetworkConnectTracers - acceptableVersion - (simpleSingletonVersions - UnversionedProtocol - UnversionedProtocolData $ - forwarderApp [ (forwardEKGMetrics ekgConfig store, 1) - , (forwardTraceObjects tfConfig tfQueue, 2) - ] - ) - Nothing - address - where - forwarderApp - :: [(RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void, Word16)] - -> OuroborosApplication 'InitiatorMode addr LBS.ByteString IO () Void - forwarderApp protocols = - OuroborosApplication $ \_connectionId _shouldStopSTM -> - [ MiniProtocol - { miniProtocolNum = MiniProtocolNum num - , miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound } - , miniProtocolRun = prot - } - | (prot, num) <- protocols - ] - -traceObjectsWriter :: TBQueue TraceObject -> Maybe Pico -> IO () -traceObjectsWriter queue benchFillFreq = forever $ do - now <- getCurrentTime - atomically $ writeTBQueue queue (mkTraceObject now) - threadDelay fillPause - where - mkTraceObject now' = TraceObject - { toHuman = Just "Human Message 1" - , toMachine = Nothing - , toNamespace = ["demoNamespace"] - , toSeverity = Info - , toDetails = DNormal - , toTimestamp = now' - , toHostname = "linux" - , toThreadId = "1" - } - - fillPause = case benchFillFreq of - Just ff -> toMicroSecs . secondsToNominalDiffTime $ ff - Nothing -> 500000 diff --git a/trace-forward-demo/forwarder.hs b/trace-forward-demo/forwarder.hs deleted file mode 100644 index 013df9da4f3..00000000000 --- a/trace-forward-demo/forwarder.hs +++ /dev/null @@ -1,75 +0,0 @@ -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE PackageImports #-} - -import Control.Concurrent (threadDelay) -import Control.Concurrent.Async (async) -import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, writeTBQueue) -import Control.Monad (forever) -import Control.Monad.STM (atomically) -import "contra-tracer" Control.Tracer (contramap, stdoutTracer) -import Data.Text (pack) -import Data.Time.Clock (getCurrentTime) -import System.Environment (getArgs) -import System.Exit (die) - -import Cardano.Logging (DetailLevel (..), SeverityS (..), TraceObject (..)) - -import Trace.Forward.Forwarder (runTraceForwarder) -import Trace.Forward.Configuration (ForwarderConfiguration (..), - HowToConnect (..), Port) -import Trace.Forward.Protocol.Type (NodeInfo (..)) - -main :: IO () -main = do - -- Prepare the forwarder's configuration. - howToConnect <- getArgs >>= \case - [path] -> return $ LocalPipe path - [host, port] -> return $ RemoteSocket (pack host) (read port :: Port) - _ -> die "Usage: demo-forwarder (pathToLocalPipe | host port)" - now <- getCurrentTime - let config :: ForwarderConfiguration TraceObject - config = - ForwarderConfiguration - { forwarderTracer = contramap show stdoutTracer - , acceptorEndpoint = howToConnect - , nodeBasicInfo = return - NodeInfo - { niName = "core-1" - , niProtocol = "Shelley" - , niVersion = "1.28.0" - , niCommit = "abcdefg" - , niStartTime = now - , niSystemStartTime = now - } - } - - -- Create a queue for 'TraceObject's: when the acceptor will ask for N 'TraceObject's - -- they will be taken from this queue. - queue <- newTBQueueIO 1000 - - -- This thread will write 'TraceObject's to the queue. - _ <- async $ traceObjectsWriter queue - - -- Run the forwarder. It will establish the connection with the acceptor, - -- then the acceptor will periodically ask for 'TraceObject's, the forwarder - -- will take them from the 'queue' and send them back. - runTraceForwarder config queue - -traceObjectsWriter :: TBQueue TraceObject -> IO () -traceObjectsWriter queue = forever $ do - now <- getCurrentTime - atomically $ writeTBQueue queue (mkTraceObject now) - threadDelay 500000 - where - mkTraceObject now' = TraceObject - { toHuman = Just "Human Message 1" - , toMachine = Nothing - , toNamespace = ["demoNamespace"] - , toSeverity = Info - , toDetails = DNormal - , toTimestamp = now' - , toHostname = "linux" - , toThreadId = "1" - } diff --git a/trace-forward-demo/scripts/runDisconnectAcceptor.sh b/trace-forward-demo/scripts/runDisconnectAcceptor.sh deleted file mode 100755 index 5f15f44691f..00000000000 --- a/trace-forward-demo/scripts/runDisconnectAcceptor.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/sh - -./demo-acceptor-mux 127.0.0.1 3010 --dc 30 > /dev/null 2>&1 & - -PID=$(ps aux | grep demo-acceptor-mux | grep -v 'grep' | awk '{ print $2 }') - -watch -- "lsof -p ${PID} | grep TCP | wc -l" diff --git a/trace-forward-demo/scripts/runDisconnectForwarder.sh b/trace-forward-demo/scripts/runDisconnectForwarder.sh deleted file mode 100755 index 6c745dee92a..00000000000 --- a/trace-forward-demo/scripts/runDisconnectForwarder.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/sh - -./demo-forwarder-mux 127.0.0.1 3010 --dc 25 > /dev/null 2>&1 & - -PID=$(ps aux | grep demo-forwarder-mux | grep -v 'grep' | awk '{ print $2 }') - -watch -- "lsof -p ${PID} | grep TCP | wc -l" diff --git a/trace-forward-demo/trace-forward-demo.cabal b/trace-forward-demo/trace-forward-demo.cabal deleted file mode 100644 index 95d6c2e9847..00000000000 --- a/trace-forward-demo/trace-forward-demo.cabal +++ /dev/null @@ -1,93 +0,0 @@ -cabal-version: 2.4 -name: trace-forward-demo -version: 0.1.0 -synopsis: See README for more info -description: See README for more info -license: Apache-2.0 -license-file: LICENSE -copyright: 2021 Input Output (Hong Kong) Ltd. -author: IOHK -maintainer: operations@iohk.io -build-type: Simple -extra-doc-files: README.md - -executable demo-acceptor - main-is: acceptor.hs - build-depends: base - , contra-tracer - , ouroboros-network-framework - , stm - , text - , time - , trace-dispatcher - , trace-forward - - default-language: Haskell2010 - ghc-options: -Wall - -threaded - -rtsopts - -with-rtsopts=-T - -executable demo-forwarder - main-is: forwarder.hs - build-depends: async - , base - , contra-tracer - , ouroboros-network-framework - , stm - , text - , time - , trace-dispatcher - , trace-forward - - default-language: Haskell2010 - ghc-options: -Wall - -threaded - -rtsopts - -with-rtsopts=-T - -executable demo-acceptor-mux - main-is: acceptor-mux.hs - build-depends: async - , base - , bytestring - , cborg - , contra-tracer - , ekg-core - , ekg-forward - , network - , ouroboros-network-framework - , stm - , text - , time - , trace-dispatcher - , trace-forward - - default-language: Haskell2010 - ghc-options: -Wall - -threaded - -rtsopts - -with-rtsopts=-T - -executable demo-forwarder-mux - main-is: forwarder-mux.hs - build-depends: async - , base - , bytestring - , cborg - , contra-tracer - , ekg-core - , ekg-forward - , network - , ouroboros-network-framework - , stm - , text - , time - , trace-dispatcher - , trace-forward - - default-language: Haskell2010 - ghc-options: -Wall - -threaded - -rtsopts - -with-rtsopts=-T