Skip to content

Commit

Permalink
Merge #3337
Browse files Browse the repository at this point in the history
3337: CAD-3628: remove NodeInfo from trace-forward. r=denisshevchenko a=denisshevchenko

Since the new `datapoint-forward` library (#3303) will be used to forward structured data from the node, `NodeInfo` type should be removed from `trace-forward` library.

Co-authored-by: Denis Shevchenko <denis.shevchenko@iohk.io>
  • Loading branch information
iohk-bors[bot] and Denis Shevchenko authored Nov 2, 2021
2 parents b6c7928 + 40de6b0 commit ef6b90a
Show file tree
Hide file tree
Showing 15 changed files with 34 additions and 195 deletions.
5 changes: 1 addition & 4 deletions trace-dispatcher/src/Cardano/Logging/Tracer/Forward.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import qualified System.Metrics.Configuration as EKGF
import System.Metrics.Network.Forwarder (forwardEKGMetricsResp)
import qualified Trace.Forward.Configuration as TF
import Trace.Forward.Network.Forwarder (forwardTraceObjectsResp)
import Trace.Forward.Protocol.Type (NodeInfo (..))
import Trace.Forward.Utils

import Cardano.Logging.DocuGenerator
Expand All @@ -60,9 +59,8 @@ import Cardano.Logging.Utils(uncurry3)
forwardTracer :: forall m. (MonadIO m)
=> IOManager
-> TraceConfig
-> NodeInfo
-> m (Trace m FormattedMessage)
forwardTracer iomgr config nodeInfo = liftIO $ do
forwardTracer iomgr config = liftIO $ do
forwardSink <- initForwardSink tfConfig
store <- EKG.newStore
EKG.registerGcMetrics store
Expand Down Expand Up @@ -99,7 +97,6 @@ forwardTracer iomgr config nodeInfo = liftIO $ do
TF.ForwarderConfiguration
{ TF.forwarderTracer = contramap show stdoutTracer
, TF.acceptorEndpoint = TF.LocalPipe p
, TF.getNodeInfo = pure nodeInfo
, TF.disconnectedQueueSize = 200000
, TF.connectedQueueSize = 2000
}
Expand Down
6 changes: 2 additions & 4 deletions trace-forward/src/Trace/Forward/Acceptor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import Ouroboros.Network.Util.ShowProxy (ShowProxy(..))

import Trace.Forward.Network.Acceptor (listenToForwarder)
import Trace.Forward.Configuration (AcceptorConfiguration (..))
import Trace.Forward.Protocol.Type (NodeInfo)
import Trace.Forward.Utils (runActionInLoop)

runTraceAcceptor
Expand All @@ -24,7 +23,6 @@ runTraceAcceptor
=> IOManager -- ^ 'IOManager' from the external application.
-> AcceptorConfiguration lo -- ^ Acceptor configuration.
-> ([lo] -> IO ()) -- ^ The handler for 'TraceObject's received from the node.
-> (NodeInfo -> IO ()) -- ^ The handler for node's info received from the node.
-> IO ()
runTraceAcceptor iomgr config@AcceptorConfiguration{forwarderEndpoint} loHandler niHandler =
runActionInLoop (listenToForwarder iomgr config loHandler niHandler) forwarderEndpoint 1
runTraceAcceptor iomgr config@AcceptorConfiguration{forwarderEndpoint} loHandler =
runActionInLoop (listenToForwarder iomgr config loHandler) forwarderEndpoint 1
2 changes: 0 additions & 2 deletions trace-forward/src/Trace/Forward/Configuration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ data ForwarderConfiguration lo = ForwarderConfiguration
forwarderTracer :: !(Tracer IO (TraceSendRecv (TraceForward lo)))
-- | The endpoint that will be used to connect to the acceptor.
, acceptorEndpoint :: !HowToConnect
-- | An action that returns node's information.
, getNodeInfo :: !(IO NodeInfo)
-- | The big size of internal queue for tracing items. We use it in
-- the beginning of the session, to avoid queue overflow, because
-- initially there is no connection with acceptor yet, and the
Expand Down
36 changes: 10 additions & 26 deletions trace-forward/src/Trace/Forward/Network/Acceptor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ listenToForwarder
=> IOManager
-> AcceptorConfiguration lo
-> ([lo] -> IO ())
-> (NodeInfo -> IO ())
-> IO ()
listenToForwarder iomgr config@AcceptorConfiguration{forwarderEndpoint} loHandler niHandler = do
listenToForwarder iomgr config@AcceptorConfiguration{forwarderEndpoint} loHandler = do
let (LocalPipe localPipe) = forwarderEndpoint
snocket = localSnocket iomgr
address = localAddressFromPath localPipe
Expand All @@ -76,7 +75,7 @@ listenToForwarder iomgr config@AcceptorConfiguration{forwarderEndpoint} loHandle
[ MiniProtocol
{ miniProtocolNum = MiniProtocolNum 1
, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = maxBound }
, miniProtocolRun = acceptTraceObjects config loHandler niHandler
, miniProtocolRun = acceptTraceObjects config loHandler
}
]

Expand Down Expand Up @@ -113,9 +112,8 @@ acceptTraceObjects
Typeable lo)
=> AcceptorConfiguration lo
-> ([lo] -> IO ())
-> (NodeInfo -> IO ())
-> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void ()
acceptTraceObjects config loHandler niHandler =
acceptTraceObjects config loHandler =
ResponderProtocolOnly $
MuxPeerRaw $ \channel ->
timeoutWhenStopped
Expand All @@ -124,60 +122,46 @@ acceptTraceObjects config loHandler niHandler =
$ runPeer
(acceptorTracer config)
(Acceptor.codecTraceForward CBOR.encode CBOR.decode
CBOR.encode CBOR.decode
CBOR.encode CBOR.decode)
channel
(Acceptor.traceAcceptorPeer $
acceptorActions config loHandler niHandler True)
acceptorActions config loHandler)

acceptTraceObjectsInit
:: (CBOR.Serialise lo,
ShowProxy lo,
Typeable lo)
=> AcceptorConfiguration lo
-> ([lo] -> IO ())
-> (NodeInfo -> IO ())
-> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
acceptTraceObjectsInit config loHandler niHandler =
acceptTraceObjectsInit config loHandler =
InitiatorProtocolOnly $
MuxPeerRaw $ \channel ->
runPeer
(acceptorTracer config)
(Acceptor.codecTraceForward CBOR.encode CBOR.decode
CBOR.encode CBOR.decode
CBOR.encode CBOR.decode)
channel
(Acceptor.traceAcceptorPeer $
acceptorActions config loHandler niHandler True)
acceptorActions config loHandler)

acceptorActions
:: (CBOR.Serialise lo,
ShowProxy lo,
Typeable lo)
=> AcceptorConfiguration lo -- ^ Acceptor's configuration.
-> ([lo] -> IO ()) -- ^ The handler for accepted 'TraceObject's.
-> (NodeInfo -> IO ()) -- ^ The handler for accepted info about the node.
-> Bool -- ^ The flag for node's info request: only once in the beginning.
-> Acceptor.TraceAcceptor lo IO ()
acceptorActions config@AcceptorConfiguration{whatToRequest, shouldWeStop} loHandler niHandler askForNI =
acceptorActions config@AcceptorConfiguration{whatToRequest, shouldWeStop} loHandler =
-- We are able to send request for:
-- 1. node's info,
-- 2. new 'TraceObject's.
-- But request for node's info should be sent only once (in the beginning of session).
if askForNI
then
Acceptor.SendMsgNodeInfoRequest $ \replyWithNI -> do
niHandler replyWithNI
checkIfWeShouldStop
else
Acceptor.SendMsgTraceObjectsRequest TokBlocking whatToRequest $ \replyWithTraceObjects -> do
loHandler $ getTraceObjects replyWithTraceObjects
checkIfWeShouldStop
where
checkIfWeShouldStop =
Acceptor.SendMsgTraceObjectsRequest TokBlocking whatToRequest $ \replyWithTraceObjects -> do
loHandler $ getTraceObjects replyWithTraceObjects
ifM (readTVarIO shouldWeStop)
(return $ Acceptor.SendMsgDone $ return ())
(return $ acceptorActions config loHandler niHandler False)
(return $ acceptorActions config loHandler)

data Timeout = Timeout
deriving (Typeable, Show)
Expand Down
6 changes: 2 additions & 4 deletions trace-forward/src/Trace/Forward/Network/Forwarder.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,9 @@ forwardTraceObjects config sink =
runPeer
(forwarderTracer config)
(Forwarder.codecTraceForward CBOR.encode CBOR.decode
CBOR.encode CBOR.decode
CBOR.encode CBOR.decode)
channel
(Forwarder.traceForwarderPeer $ readItems config sink)
(Forwarder.traceForwarderPeer $ readItems sink)

forwardTraceObjectsResp
:: (CBOR.Serialise lo,
Expand All @@ -110,7 +109,6 @@ forwardTraceObjectsResp config sink =
runPeer
(forwarderTracer config)
(Forwarder.codecTraceForward CBOR.encode CBOR.decode
CBOR.encode CBOR.decode
CBOR.encode CBOR.decode)
channel
(Forwarder.traceForwarderPeer $ readItems config sink)
(Forwarder.traceForwarderPeer $ readItems sink)
13 changes: 0 additions & 13 deletions trace-forward/src/Trace/Forward/Protocol/Acceptor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..),
import Trace.Forward.Protocol.Type

data TraceAcceptor lo m a where
SendMsgNodeInfoRequest
:: (NodeInfo -> m (TraceAcceptor lo m a))
-> TraceAcceptor lo m a

SendMsgTraceObjectsRequest
:: TokBlockingStyle blocking
-> NumberOfTraceObjects
Expand All @@ -41,15 +37,6 @@ traceAcceptorPeer
=> TraceAcceptor lo m a
-> Peer (TraceForward lo) 'AsClient 'StIdle m a
traceAcceptorPeer = \case
SendMsgNodeInfoRequest next ->
-- Send our message (request for node's info from the forwarder).
Yield (ClientAgency TokIdle) MsgNodeInfoRequest $
-- We're now into the 'StNodeInfoBusy' state, and now we'll wait for
-- a reply from the forwarder.
Await (ServerAgency TokNodeInfoBusy) $ \(MsgNodeInfoReply reply) ->
Effect $
traceAcceptorPeer <$> next reply

SendMsgTraceObjectsRequest TokBlocking request next ->
-- Send our message (request for new 'TraceObject's from the forwarder).
Yield (ClientAgency TokIdle) (MsgTraceObjectsRequest TokBlocking request) $
Expand Down
25 changes: 3 additions & 22 deletions trace-forward/src/Trace/Forward/Protocol/Codec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import Control.Monad.Class.MonadST (MonadST)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.List.NonEmpty as NE
import Text.Printf (printf)
import Network.TypedProtocol.Codec (Codec, PeerHasAgency (..), PeerRole (..),
SomeMessage (..))

import Network.TypedProtocol.Codec (Codec, PeerHasAgency (..),
PeerRole (..), SomeMessage (..))
import Network.TypedProtocol.Codec.CBOR (mkCodecCborLazyBS)

import Trace.Forward.Protocol.Type
Expand All @@ -26,14 +27,11 @@ codecTraceForward
MonadST m
=> (NumberOfTraceObjects -> CBOR.Encoding) -- ^ Encoder for 'Request'.
-> (forall s . CBOR.Decoder s NumberOfTraceObjects) -- ^ Decoder for 'Request'.
-> (NodeInfo -> CBOR.Encoding) -- ^ Encoder for reply with node's info.
-> (forall s . CBOR.Decoder s NodeInfo) -- ^ Decoder for reply with node's info.
-> ([lo] -> CBOR.Encoding) -- ^ Encoder for reply with list of 'TraceObject's.
-> (forall s . CBOR.Decoder s [lo]) -- ^ Decoder for reply with list of 'TraceObject's.
-> Codec (TraceForward lo)
DeserialiseFailure m LBS.ByteString
codecTraceForward encodeRequest decodeRequest
encodeNIReply decodeNIReply
encodeReplyList decodeReplyList =
mkCodecCborLazyBS encode decode
where
Expand All @@ -46,10 +44,6 @@ codecTraceForward encodeRequest decodeRequest
-> Message (TraceForward lo) st st'
-> CBOR.Encoding

encode (ClientAgency TokIdle) MsgNodeInfoRequest =
CBOR.encodeListLen 1
<> CBOR.encodeWord 0

encode (ClientAgency TokIdle) (MsgTraceObjectsRequest blocking request) =
CBOR.encodeListLen 3
<> CBOR.encodeWord 1
Expand All @@ -62,11 +56,6 @@ codecTraceForward encodeRequest decodeRequest
CBOR.encodeListLen 1
<> CBOR.encodeWord 2

encode (ServerAgency TokNodeInfoBusy) (MsgNodeInfoReply reply) =
CBOR.encodeListLen 2
<> CBOR.encodeWord 3
<> encodeNIReply reply

encode (ServerAgency (TokBusy _)) (MsgTraceObjectsReply reply) =
CBOR.encodeListLen 2
<> CBOR.encodeWord 4
Expand All @@ -87,9 +76,6 @@ codecTraceForward encodeRequest decodeRequest
len <- CBOR.decodeListLen
key <- CBOR.decodeWord
case (key, len, stok) of
(0, 1, ClientAgency TokIdle) ->
return $ SomeMessage MsgNodeInfoRequest

(1, 3, ClientAgency TokIdle) -> do
blocking <- CBOR.decodeBool
request <- decodeRequest
Expand All @@ -102,9 +88,6 @@ codecTraceForward encodeRequest decodeRequest
(2, 1, ClientAgency TokIdle) ->
return $ SomeMessage MsgDone

(3, 2, ServerAgency TokNodeInfoBusy) ->
SomeMessage . MsgNodeInfoReply <$> decodeNIReply

(4, 2, ServerAgency (TokBusy blocking)) -> do
replyList <- decodeReplyList
case (blocking, replyList) of
Expand All @@ -120,8 +103,6 @@ codecTraceForward encodeRequest decodeRequest
-- Failures per protocol state
(_, _, ClientAgency TokIdle) ->
fail (printf "codecTraceForward (%s) unexpected key (%d, %d)" (show stok) key len)
(_, _, ServerAgency TokNodeInfoBusy) ->
fail (printf "codecTraceForward (%s) unexpected key (%d, %d)" (show stok) key len)
(_, _, ServerAgency (TokBusy TokBlocking)) ->
fail (printf "codecTraceForward (%s) unexpected key (%d, %d)" (show stok) key len)
(_, _, ServerAgency (TokBusy TokNonBlocking)) ->
Expand Down
19 changes: 3 additions & 16 deletions trace-forward/src/Trace/Forward/Protocol/Forwarder.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,8 @@ import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..),
import Trace.Forward.Protocol.Type

data TraceForwarder lo m a = TraceForwarder
{ -- | The acceptor sent us a request for node's info.
recvMsgNodeInfoRequest
:: m (NodeInfo, TraceForwarder lo m a)

-- | The acceptor sent us a request for new 'TraceObject's.
, recvMsgTraceObjectsRequest
{ -- | The acceptor sent us a request for new 'TraceObject's.
recvMsgTraceObjectsRequest
:: forall blocking.
TokBlockingStyle blocking
-> NumberOfTraceObjects
Expand All @@ -37,19 +33,10 @@ traceForwarderPeer
:: Monad m
=> TraceForwarder lo m a
-> Peer (TraceForward lo) 'AsServer 'StIdle m a
traceForwarderPeer TraceForwarder{recvMsgNodeInfoRequest, recvMsgTraceObjectsRequest, recvMsgDone} =
traceForwarderPeer TraceForwarder{recvMsgTraceObjectsRequest, recvMsgDone} =
-- In the 'StIdle' state the forwarder is awaiting a request message
-- from the acceptor.
Await (ClientAgency TokIdle) $ \case
-- The acceptor sent us a request for node's info, so now we're
-- in the 'StBusy' state which means it's the forwarder's turn to send
-- a reply.
MsgNodeInfoRequest -> Effect $ do
(reply, next) <- recvMsgNodeInfoRequest
return $ Yield (ServerAgency TokNodeInfoBusy)
(MsgNodeInfoReply reply)
(traceForwarderPeer next)

-- The acceptor sent us a request for new 'TraceObject's, so now we're
-- in the 'StBusy' state which means it's the forwarder's turn to send
-- a reply.
Expand Down
Loading

0 comments on commit ef6b90a

Please sign in to comment.