From c2d62455e459896146bde41756b1ba682445efbc Mon Sep 17 00:00:00 2001 From: Marcin Szamotulski Date: Fri, 11 Dec 2020 17:28:37 +0100 Subject: [PATCH] tx-submission-2: use in node-to-node v6 protocol --- .../src/Test/ThreadNet/Network.hs | 8 +- .../Ouroboros/Consensus/Network/NodeToNode.hs | 98 +++++++++++++++---- .../src/Ouroboros/Consensus/Node.hs | 4 +- .../Ouroboros/Network/NodeToNode/Version.hs | 6 ++ 4 files changed, 94 insertions(+), 22 deletions(-) diff --git a/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs b/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs index 319d45aac1a..b0e10df1be6 100644 --- a/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs @@ -69,6 +69,7 @@ import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..)) import Ouroboros.Network.Protocol.KeepAlive.Type import Ouroboros.Network.Protocol.Limits (waitForever) import Ouroboros.Network.Protocol.TxSubmission.Type +import Ouroboros.Network.Protocol.TxSubmission2.Type import Ouroboros.Consensus.Block import Ouroboros.Consensus.BlockchainTime @@ -1057,6 +1058,7 @@ runThreadNetwork systemTime ThreadNetworkArgs Lazy.ByteString Lazy.ByteString (AnyMessage (TxSubmission (GenTxId blk) (GenTx blk))) + (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) (AnyMessage KeepAlive) customNodeToNodeCodecs cfg = NTN.Codecs { cChainSyncCodec = @@ -1074,6 +1076,9 @@ runThreadNetwork systemTime ThreadNetworkArgs , cTxSubmissionCodec = mapFailureCodec CodecIdFailure $ NTN.cTxSubmissionCodec NTN.identityCodecs + , cTxSubmission2Codec = + mapFailureCodec CodecIdFailure $ + NTN.cTxSubmission2Codec NTN.identityCodecs , cKeepAliveCodec = mapFailureCodec CodecIdFailure $ NTN.cKeepAliveCodec NTN.identityCodecs @@ -1605,7 +1610,8 @@ type LimitedApp' m peer blk = -- channel with the same type on both ends, i.e., 'Lazy.ByteString'. Lazy.ByteString Lazy.ByteString - (AnyMessage (TxSubmission (GenTxId blk) (GenTx blk))) + (AnyMessage (TxSubmission (GenTxId blk) (GenTx blk))) + (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) (AnyMessage KeepAlive) () diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs index bbe3e007ac2..3e465eeb5bb 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs @@ -67,10 +67,13 @@ import Ouroboros.Network.Protocol.KeepAlive.Client import Ouroboros.Network.Protocol.KeepAlive.Codec import Ouroboros.Network.Protocol.KeepAlive.Server import Ouroboros.Network.Protocol.KeepAlive.Type +import qualified Ouroboros.Network.Protocol.Trans.Hello.Util as Hello import Ouroboros.Network.Protocol.TxSubmission.Client import Ouroboros.Network.Protocol.TxSubmission.Codec import Ouroboros.Network.Protocol.TxSubmission.Server import Ouroboros.Network.Protocol.TxSubmission.Type +import Ouroboros.Network.Protocol.TxSubmission2.Codec +import Ouroboros.Network.Protocol.TxSubmission2.Type import Ouroboros.Network.TxSubmission.Inbound import Ouroboros.Network.TxSubmission.Outbound @@ -207,21 +210,23 @@ mkHandlers -------------------------------------------------------------------------------} -- | Node-to-node protocol codecs needed to run 'Handlers'. -data Codecs blk e m bCS bSCS bBF bSBF bTX bKA = Codecs { +data Codecs blk e m bCS bSCS bBF bSBF bTX bTX2 bKA = Codecs { cChainSyncCodec :: Codec (ChainSync (Header blk) (Point blk) (Tip blk)) e m bCS , cChainSyncCodecSerialised :: Codec (ChainSync (SerialisedHeader blk) (Point blk) (Tip blk)) e m bSCS , cBlockFetchCodec :: Codec (BlockFetch blk (Point blk)) e m bBF , cBlockFetchCodecSerialised :: Codec (BlockFetch (Serialised blk) (Point blk)) e m bSBF , cTxSubmissionCodec :: Codec (TxSubmission (GenTxId blk) (GenTx blk)) e m bTX + , cTxSubmission2Codec :: Codec (TxSubmission2 (GenTxId blk) (GenTx blk)) e m bTX2 , cKeepAliveCodec :: Codec KeepAlive e m bKA } -- | Protocol codecs for the node-to-node protocols -defaultCodecs :: forall m blk. (IOLike m, SerialiseNodeToNodeConstraints blk) +defaultCodecs :: forall m blk. (IOLike m, SerialiseNodeToNodeConstraints blk, + ShowProxy (GenTxId blk), ShowProxy (GenTx blk)) => CodecConfig blk -> BlockNodeToNodeVersion blk -> Codecs blk DeserialiseFailure m - ByteString ByteString ByteString ByteString ByteString ByteString + ByteString ByteString ByteString ByteString ByteString ByteString ByteString defaultCodecs ccfg version = Codecs { cChainSyncCodec = codecChainSync @@ -262,6 +267,13 @@ defaultCodecs ccfg version = Codecs { enc dec + , cTxSubmission2Codec = + codecTxSubmission2 + enc + dec + enc + dec + , cKeepAliveCodec = codecKeepAlive } @@ -283,6 +295,7 @@ identityCodecs :: Monad m (AnyMessage (BlockFetch blk (Point blk))) (AnyMessage (BlockFetch (Serialised blk) (Point blk))) (AnyMessage (TxSubmission (GenTxId blk) (GenTx blk))) + (AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk))) (AnyMessage KeepAlive) identityCodecs = Codecs { cChainSyncCodec = codecChainSyncId @@ -290,6 +303,7 @@ identityCodecs = Codecs { , cBlockFetchCodec = codecBlockFetchId , cBlockFetchCodecSerialised = codecBlockFetchId , cTxSubmissionCodec = codecTxSubmissionId + , cTxSubmission2Codec = codecTxSubmission2Id , cKeepAliveCodec = codecKeepAliveId } @@ -307,6 +321,7 @@ data Tracers' peer blk e f = Tracers { , tBlockFetchTracer :: f (TraceLabelPeer peer (TraceSendRecv (BlockFetch blk (Point blk)))) , tBlockFetchSerialisedTracer :: f (TraceLabelPeer peer (TraceSendRecv (BlockFetch (Serialised blk) (Point blk)))) , tTxSubmissionTracer :: f (TraceLabelPeer peer (TraceSendRecv (TxSubmission (GenTxId blk) (GenTx blk)))) + , tTxSubmission2Tracer :: f (TraceLabelPeer peer (TraceSendRecv (TxSubmission2 (GenTxId blk) (GenTx blk)))) } instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer blk e f) where @@ -316,6 +331,7 @@ instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer blk e f) where , tBlockFetchTracer = f tBlockFetchTracer , tBlockFetchSerialisedTracer = f tBlockFetchSerialisedTracer , tTxSubmissionTracer = f tTxSubmissionTracer + , tTxSubmission2Tracer = f tTxSubmission2Tracer } where f :: forall a. Semigroup a @@ -331,6 +347,7 @@ nullTracers = Tracers { , tBlockFetchTracer = nullTracer , tBlockFetchSerialisedTracer = nullTracer , tTxSubmissionTracer = nullTracer + , tTxSubmission2Tracer = nullTracer } showTracers :: ( Show blk @@ -348,6 +365,7 @@ showTracers tr = Tracers { , tBlockFetchTracer = showTracing tr , tBlockFetchSerialisedTracer = showTracing tr , tTxSubmissionTracer = showTracing tr + , tTxSubmission2Tracer = showTracing tr } {------------------------------------------------------------------------------- @@ -371,38 +389,45 @@ type ServerApp m peer bytes a = -- | Applications for the node-to-node protocols -- -- See 'Network.Mux.Types.MuxApplication' -data Apps m peer bCS bBF bTX bKA a = Apps { +data Apps m peer bCS bBF bTX bTX2 bKA a = Apps { -- | Start a chain sync client that communicates with the given upstream -- node. - aChainSyncClient :: ClientApp m peer bCS a + aChainSyncClient :: ClientApp m peer bCS a -- | Start a chain sync server. - , aChainSyncServer :: ServerApp m peer bCS a + , aChainSyncServer :: ServerApp m peer bCS a -- | Start a block fetch client that communicates with the given -- upstream node. - , aBlockFetchClient :: ClientApp m peer bBF a + , aBlockFetchClient :: ClientApp m peer bBF a -- | Start a block fetch server. - , aBlockFetchServer :: ServerApp m peer bBF a + , aBlockFetchServer :: ServerApp m peer bBF a -- | Start a transaction submission client that communicates with the -- given upstream node. - , aTxSubmissionClient :: ClientApp m peer bTX a + , aTxSubmissionClient :: ClientApp m peer bTX a -- | Start a transaction submission server. - , aTxSubmissionServer :: ServerApp m peer bTX a + , aTxSubmissionServer :: ServerApp m peer bTX a + + -- | Start a transaction submission v2 client that communicates with the + -- given upstream node. + , aTxSubmission2Client :: ClientApp m peer bTX2 a + + -- | Start a transaction submission v2 server. + , aTxSubmission2Server :: ServerApp m peer bTX2 a -- | Start a keep-alive client. - , aKeepAliveClient :: ClientApp m peer bKA a + , aKeepAliveClient :: ClientApp m peer bKA a -- | Start a keep-alive server. - , aKeepAliveServer :: ServerApp m peer bKA a + , aKeepAliveServer :: ServerApp m peer bKA a } -- | Construct the 'NetworkApplication' for the node-to-node protocols mkApps - :: forall m remotePeer localPeer blk e bCS bBF bTX bKA. + :: forall m remotePeer localPeer blk e bCS bBF bTX bTX2 bKA. ( IOLike m , MonadTimer m , Ord remotePeer @@ -415,10 +440,10 @@ mkApps ) => NodeKernel m remotePeer localPeer blk -- ^ Needed for bracketing only -> Tracers m remotePeer blk e - -> Codecs blk e m bCS bCS bBF bBF bTX bKA + -> Codecs blk e m bCS bCS bBF bBF bTX bTX2 bKA -> m ChainSyncTimeout -> Handlers m remotePeer blk - -> Apps m remotePeer bCS bBF bTX bKA () + -> Apps m remotePeer bCS bBF bTX bTX2 bKA () mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = Apps {..} where @@ -537,6 +562,37 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = channel (txSubmissionServerPeerPipelined (hTxSubmissionServer version them)) + aTxSubmission2Client + :: NodeToNodeVersion + -> ControlMessageSTM m + -> remotePeer + -> Channel m bTX2 + -> m ((), Maybe bTX2) + aTxSubmission2Client version controlMessageSTM them channel = do + labelThisThread "TxSubmissionClient" + runPeerWithLimits + (contramap (TraceLabelPeer them) tTxSubmission2Tracer) + cTxSubmission2Codec + (byteLimitsTxSubmission2 (const 0)) -- TODO: Real Bytelimits, see #1727 + timeLimitsTxSubmission2 + channel + (Hello.wrapClientPeer (txSubmissionClientPeer (hTxSubmissionClient version controlMessageSTM them))) + + aTxSubmission2Server + :: NodeToNodeVersion + -> remotePeer + -> Channel m bTX2 + -> m ((), Maybe bTX2) + aTxSubmission2Server version them channel = do + labelThisThread "TxSubmissionServer" + runPipelinedPeerWithLimits + (contramap (TraceLabelPeer them) tTxSubmission2Tracer) + cTxSubmission2Codec + (byteLimitsTxSubmission2 (const 0)) -- TODO: Real Bytelimits, see #1727 + timeLimitsTxSubmission2 + channel + (Hello.wrapServerPeerPipelined (txSubmissionServerPeerPipelined (hTxSubmissionServer version them))) + aKeepAliveClient :: NodeToNodeVersion -> ControlMessageSTM m @@ -592,7 +648,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = initiator :: MiniProtocolParameters -> NodeToNodeVersion - -> Apps m (ConnectionId peer) b b b b a + -> Apps m (ConnectionId peer) b b b b b a -> OuroborosApplication 'InitiatorMode peer b m a Void initiator miniProtocolParameters version Apps {..} = nodeToNodeProtocols @@ -609,7 +665,9 @@ initiator miniProtocolParameters version Apps {..} = blockFetchProtocol = (InitiatorProtocolOnly (MuxPeerRaw (aBlockFetchClient version controlMessageSTM them))), txSubmissionProtocol = - (InitiatorProtocolOnly (MuxPeerRaw (aTxSubmissionClient version controlMessageSTM them))), + if version >= NodeToNodeV_6 + then InitiatorProtocolOnly (MuxPeerRaw (aTxSubmission2Client version controlMessageSTM them)) + else InitiatorProtocolOnly (MuxPeerRaw (aTxSubmissionClient version controlMessageSTM them)), keepAliveProtocol = (InitiatorProtocolOnly (MuxPeerRaw (aKeepAliveClient version controlMessageSTM them))) }) @@ -622,7 +680,7 @@ initiator miniProtocolParameters version Apps {..} = responder :: MiniProtocolParameters -> NodeToNodeVersion - -> Apps m (ConnectionId peer) b b b b a + -> Apps m (ConnectionId peer) b b b b b a -> OuroborosApplication 'ResponderMode peer b m Void a responder miniProtocolParameters version Apps {..} = nodeToNodeProtocols @@ -633,7 +691,9 @@ responder miniProtocolParameters version Apps {..} = blockFetchProtocol = (ResponderProtocolOnly (MuxPeerRaw (aBlockFetchServer version them))), txSubmissionProtocol = - (ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version them))), + if version >= NodeToNodeV_6 + then ResponderProtocolOnly (MuxPeerRaw (aTxSubmission2Server version them)) + else ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version them)), keepAliveProtocol = (ResponderProtocolOnly (MuxPeerRaw (aKeepAliveServer version them))) }) diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs index 3ea5db88d2e..dc4ae514cfd 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Node.hs @@ -313,7 +313,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} = :: NodeKernelArgs m (ConnectionId addrNTN) (ConnectionId addrNTC) blk -> NodeKernel m (ConnectionId addrNTN) (ConnectionId addrNTC) blk -> BlockNodeToNodeVersion blk - -> NTN.Apps m (ConnectionId addrNTN) ByteString ByteString ByteString ByteString () + -> NTN.Apps m (ConnectionId addrNTN) ByteString ByteString ByteString ByteString ByteString () mkNodeToNodeApps nodeKernelArgs nodeKernel version = NTN.mkApps nodeKernel @@ -336,7 +336,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} = mkDiffusionApplications :: MiniProtocolParameters -> ( BlockNodeToNodeVersion blk - -> NTN.Apps m (ConnectionId addrNTN) ByteString ByteString ByteString ByteString () + -> NTN.Apps m (ConnectionId addrNTN) ByteString ByteString ByteString ByteString ByteString () ) -> ( BlockNodeToClientVersion blk -> NTC.Apps m (ConnectionId addrNTC) ByteString ByteString ByteString () diff --git a/ouroboros-network/src/Ouroboros/Network/NodeToNode/Version.hs b/ouroboros-network/src/Ouroboros/Network/NodeToNode/Version.hs index e1f90f71575..8ea42076955 100644 --- a/ouroboros-network/src/Ouroboros/Network/NodeToNode/Version.hs +++ b/ouroboros-network/src/Ouroboros/Network/NodeToNode/Version.hs @@ -45,6 +45,10 @@ data NodeToNodeVersion -- ^ Changes: -- -- * Enable @CardanoNodeToNodeVersion4@, i.e., Mary + | NodeToNodeV_6 + -- ^ Changes: + -- + -- * Replace 'TxSubmision' with 'Txsubmission2' protocol. deriving (Eq, Ord, Enum, Bounded, Show, Typeable) nodeToNodeVersionCodec :: CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion @@ -55,12 +59,14 @@ nodeToNodeVersionCodec = CodecCBORTerm { encodeTerm, decodeTerm } encodeTerm NodeToNodeV_3 = CBOR.TInt 3 encodeTerm NodeToNodeV_4 = CBOR.TInt 4 encodeTerm NodeToNodeV_5 = CBOR.TInt 5 + encodeTerm NodeToNodeV_6 = CBOR.TInt 6 decodeTerm (CBOR.TInt 1) = Right NodeToNodeV_1 decodeTerm (CBOR.TInt 2) = Right NodeToNodeV_2 decodeTerm (CBOR.TInt 3) = Right NodeToNodeV_3 decodeTerm (CBOR.TInt 4) = Right NodeToNodeV_4 decodeTerm (CBOR.TInt 5) = Right NodeToNodeV_5 + decodeTerm (CBOR.TInt 6) = Right NodeToNodeV_6 decodeTerm (CBOR.TInt n) = Left ( T.pack "decode NodeToNodeVersion: unknonw tag: " <> T.pack (show n) , Just n