Skip to content

Commit

Permalink
TxSubmission2: use in node-to-node v6 protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed Dec 11, 2020
1 parent 5bdf2e1 commit 6f4abf2
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 22 deletions.
8 changes: 7 additions & 1 deletion ouroboros-consensus-test/src/Test/ThreadNet/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..))
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.Limits (waitForever)
import Ouroboros.Network.Protocol.TxSubmission.Type
import Ouroboros.Network.Protocol.TxSubmission2.Type

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.BlockchainTime
Expand Down Expand Up @@ -1059,6 +1060,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
Lazy.ByteString
Lazy.ByteString
(AnyMessage (TxSubmission (GenTxId blk) (GenTx blk)))
(AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk)))
(AnyMessage KeepAlive)
customNodeToNodeCodecs cfg = NTN.Codecs
{ cChainSyncCodec =
Expand All @@ -1076,6 +1078,9 @@ runThreadNetwork systemTime ThreadNetworkArgs
, cTxSubmissionCodec =
mapFailureCodec CodecIdFailure $
NTN.cTxSubmissionCodec NTN.identityCodecs
, cTxSubmission2Codec =
mapFailureCodec CodecIdFailure $
NTN.cTxSubmission2Codec NTN.identityCodecs
, cKeepAliveCodec =
mapFailureCodec CodecIdFailure $
NTN.cKeepAliveCodec NTN.identityCodecs
Expand Down Expand Up @@ -1607,7 +1612,8 @@ type LimitedApp' m peer blk =
-- channel with the same type on both ends, i.e., 'Lazy.ByteString'.
Lazy.ByteString
Lazy.ByteString
(AnyMessage (TxSubmission (GenTxId blk) (GenTx blk)))
(AnyMessage (TxSubmission (GenTxId blk) (GenTx blk)))
(AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk)))
(AnyMessage KeepAlive)
()

Expand Down
98 changes: 79 additions & 19 deletions ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ import Ouroboros.Network.Protocol.KeepAlive.Client
import Ouroboros.Network.Protocol.KeepAlive.Codec
import Ouroboros.Network.Protocol.KeepAlive.Server
import Ouroboros.Network.Protocol.KeepAlive.Type
import qualified Ouroboros.Network.Protocol.Trans.Hello.Util as Hello
import Ouroboros.Network.Protocol.TxSubmission.Client
import Ouroboros.Network.Protocol.TxSubmission.Codec
import Ouroboros.Network.Protocol.TxSubmission.Server
import Ouroboros.Network.Protocol.TxSubmission.Type
import Ouroboros.Network.Protocol.TxSubmission2.Codec
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Inbound
import Ouroboros.Network.TxSubmission.Outbound

Expand Down Expand Up @@ -207,21 +210,23 @@ mkHandlers
-------------------------------------------------------------------------------}

-- | Node-to-node protocol codecs needed to run 'Handlers'.
data Codecs blk e m bCS bSCS bBF bSBF bTX bKA = Codecs {
data Codecs blk e m bCS bSCS bBF bSBF bTX bTX2 bKA = Codecs {
cChainSyncCodec :: Codec (ChainSync (Header blk) (Point blk) (Tip blk)) e m bCS
, cChainSyncCodecSerialised :: Codec (ChainSync (SerialisedHeader blk) (Point blk) (Tip blk)) e m bSCS
, cBlockFetchCodec :: Codec (BlockFetch blk (Point blk)) e m bBF
, cBlockFetchCodecSerialised :: Codec (BlockFetch (Serialised blk) (Point blk)) e m bSBF
, cTxSubmissionCodec :: Codec (TxSubmission (GenTxId blk) (GenTx blk)) e m bTX
, cTxSubmission2Codec :: Codec (TxSubmission2 (GenTxId blk) (GenTx blk)) e m bTX2
, cKeepAliveCodec :: Codec KeepAlive e m bKA
}

-- | Protocol codecs for the node-to-node protocols
defaultCodecs :: forall m blk. (IOLike m, SerialiseNodeToNodeConstraints blk)
defaultCodecs :: forall m blk. (IOLike m, SerialiseNodeToNodeConstraints blk,
ShowProxy (GenTxId blk), ShowProxy (GenTx blk))
=> CodecConfig blk
-> BlockNodeToNodeVersion blk
-> Codecs blk DeserialiseFailure m
ByteString ByteString ByteString ByteString ByteString ByteString
ByteString ByteString ByteString ByteString ByteString ByteString ByteString
defaultCodecs ccfg version = Codecs {
cChainSyncCodec =
codecChainSync
Expand Down Expand Up @@ -262,6 +267,13 @@ defaultCodecs ccfg version = Codecs {
enc
dec

, cTxSubmission2Codec =
codecTxSubmission2
enc
dec
enc
dec

, cKeepAliveCodec =
codecKeepAlive
}
Expand All @@ -283,13 +295,15 @@ identityCodecs :: Monad m
(AnyMessage (BlockFetch blk (Point blk)))
(AnyMessage (BlockFetch (Serialised blk) (Point blk)))
(AnyMessage (TxSubmission (GenTxId blk) (GenTx blk)))
(AnyMessage (TxSubmission2 (GenTxId blk) (GenTx blk)))
(AnyMessage KeepAlive)
identityCodecs = Codecs {
cChainSyncCodec = codecChainSyncId
, cChainSyncCodecSerialised = codecChainSyncId
, cBlockFetchCodec = codecBlockFetchId
, cBlockFetchCodecSerialised = codecBlockFetchId
, cTxSubmissionCodec = codecTxSubmissionId
, cTxSubmission2Codec = codecTxSubmission2Id
, cKeepAliveCodec = codecKeepAliveId
}

Expand All @@ -307,6 +321,7 @@ data Tracers' peer blk e f = Tracers {
, tBlockFetchTracer :: f (TraceLabelPeer peer (TraceSendRecv (BlockFetch blk (Point blk))))
, tBlockFetchSerialisedTracer :: f (TraceLabelPeer peer (TraceSendRecv (BlockFetch (Serialised blk) (Point blk))))
, tTxSubmissionTracer :: f (TraceLabelPeer peer (TraceSendRecv (TxSubmission (GenTxId blk) (GenTx blk))))
, tTxSubmission2Tracer :: f (TraceLabelPeer peer (TraceSendRecv (TxSubmission2 (GenTxId blk) (GenTx blk))))
}

instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer blk e f) where
Expand All @@ -316,6 +331,7 @@ instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer blk e f) where
, tBlockFetchTracer = f tBlockFetchTracer
, tBlockFetchSerialisedTracer = f tBlockFetchSerialisedTracer
, tTxSubmissionTracer = f tTxSubmissionTracer
, tTxSubmission2Tracer = f tTxSubmission2Tracer
}
where
f :: forall a. Semigroup a
Expand All @@ -331,6 +347,7 @@ nullTracers = Tracers {
, tBlockFetchTracer = nullTracer
, tBlockFetchSerialisedTracer = nullTracer
, tTxSubmissionTracer = nullTracer
, tTxSubmission2Tracer = nullTracer
}

showTracers :: ( Show blk
Expand All @@ -348,6 +365,7 @@ showTracers tr = Tracers {
, tBlockFetchTracer = showTracing tr
, tBlockFetchSerialisedTracer = showTracing tr
, tTxSubmissionTracer = showTracing tr
, tTxSubmission2Tracer = showTracing tr
}

{-------------------------------------------------------------------------------
Expand All @@ -371,38 +389,45 @@ type ServerApp m peer bytes a =
-- | Applications for the node-to-node protocols
--
-- See 'Network.Mux.Types.MuxApplication'
data Apps m peer bCS bBF bTX bKA a = Apps {
data Apps m peer bCS bBF bTX bTX2 bKA a = Apps {
-- | Start a chain sync client that communicates with the given upstream
-- node.
aChainSyncClient :: ClientApp m peer bCS a
aChainSyncClient :: ClientApp m peer bCS a

-- | Start a chain sync server.
, aChainSyncServer :: ServerApp m peer bCS a
, aChainSyncServer :: ServerApp m peer bCS a

-- | Start a block fetch client that communicates with the given
-- upstream node.
, aBlockFetchClient :: ClientApp m peer bBF a
, aBlockFetchClient :: ClientApp m peer bBF a

-- | Start a block fetch server.
, aBlockFetchServer :: ServerApp m peer bBF a
, aBlockFetchServer :: ServerApp m peer bBF a

-- | Start a transaction submission client that communicates with the
-- given upstream node.
, aTxSubmissionClient :: ClientApp m peer bTX a
, aTxSubmissionClient :: ClientApp m peer bTX a

-- | Start a transaction submission server.
, aTxSubmissionServer :: ServerApp m peer bTX a
, aTxSubmissionServer :: ServerApp m peer bTX a

-- | Start a transaction submission v2 client that communicates with the
-- given upstream node.
, aTxSubmission2Client :: ClientApp m peer bTX2 a

-- | Start a transaction submission v2 server.
, aTxSubmission2Server :: ServerApp m peer bTX2 a

-- | Start a keep-alive client.
, aKeepAliveClient :: ClientApp m peer bKA a
, aKeepAliveClient :: ClientApp m peer bKA a

-- | Start a keep-alive server.
, aKeepAliveServer :: ServerApp m peer bKA a
, aKeepAliveServer :: ServerApp m peer bKA a
}

-- | Construct the 'NetworkApplication' for the node-to-node protocols
mkApps
:: forall m remotePeer localPeer blk e bCS bBF bTX bKA.
:: forall m remotePeer localPeer blk e bCS bBF bTX bTX2 bKA.
( IOLike m
, MonadTimer m
, Ord remotePeer
Expand All @@ -415,10 +440,10 @@ mkApps
)
=> NodeKernel m remotePeer localPeer blk -- ^ Needed for bracketing only
-> Tracers m remotePeer blk e
-> Codecs blk e m bCS bCS bBF bBF bTX bKA
-> Codecs blk e m bCS bCS bBF bBF bTX bTX2 bKA
-> m ChainSyncTimeout
-> Handlers m remotePeer blk
-> Apps m remotePeer bCS bBF bTX bKA ()
-> Apps m remotePeer bCS bBF bTX bTX2 bKA ()
mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
Apps {..}
where
Expand Down Expand Up @@ -537,6 +562,37 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
channel
(txSubmissionServerPeerPipelined (hTxSubmissionServer version them))

aTxSubmission2Client
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bTX2
-> m ((), Maybe bTX2)
aTxSubmission2Client version controlMessageSTM them channel = do
labelThisThread "TxSubmissionClient"
runPeerWithLimits
(contramap (TraceLabelPeer them) tTxSubmission2Tracer)
cTxSubmission2Codec
(byteLimitsTxSubmission2 (const 0)) -- TODO: Real Bytelimits, see #1727
timeLimitsTxSubmission2
channel
(Hello.wrapClientPeer (txSubmissionClientPeer (hTxSubmissionClient version controlMessageSTM them)))

aTxSubmission2Server
:: NodeToNodeVersion
-> remotePeer
-> Channel m bTX2
-> m ((), Maybe bTX2)
aTxSubmission2Server version them channel = do
labelThisThread "TxSubmissionServer"
runPipelinedPeerWithLimits
(contramap (TraceLabelPeer them) tTxSubmission2Tracer)
cTxSubmission2Codec
(byteLimitsTxSubmission2 (const 0)) -- TODO: Real Bytelimits, see #1727
timeLimitsTxSubmission2
channel
(Hello.wrapServerPeerPipelined (txSubmissionServerPeerPipelined (hTxSubmissionServer version them)))

aKeepAliveClient
:: NodeToNodeVersion
-> ControlMessageSTM m
Expand Down Expand Up @@ -592,7 +648,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
initiator
:: MiniProtocolParameters
-> NodeToNodeVersion
-> Apps m (ConnectionId peer) b b b b a
-> Apps m (ConnectionId peer) b b b b b a
-> OuroborosApplication 'InitiatorMode peer b m a Void
initiator miniProtocolParameters version Apps {..} =
nodeToNodeProtocols
Expand All @@ -609,7 +665,9 @@ initiator miniProtocolParameters version Apps {..} =
blockFetchProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aBlockFetchClient version controlMessageSTM them))),
txSubmissionProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aTxSubmissionClient version controlMessageSTM them))),
if version >= NodeToNodeV_6
then InitiatorProtocolOnly (MuxPeerRaw (aTxSubmission2Client version controlMessageSTM them))
else InitiatorProtocolOnly (MuxPeerRaw (aTxSubmissionClient version controlMessageSTM them)),
keepAliveProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aKeepAliveClient version controlMessageSTM them)))
})
Expand All @@ -622,7 +680,7 @@ initiator miniProtocolParameters version Apps {..} =
responder
:: MiniProtocolParameters
-> NodeToNodeVersion
-> Apps m (ConnectionId peer) b b b b a
-> Apps m (ConnectionId peer) b b b b b a
-> OuroborosApplication 'ResponderMode peer b m Void a
responder miniProtocolParameters version Apps {..} =
nodeToNodeProtocols
Expand All @@ -633,7 +691,9 @@ responder miniProtocolParameters version Apps {..} =
blockFetchProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aBlockFetchServer version them))),
txSubmissionProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version them))),
if version >= NodeToNodeV_6
then ResponderProtocolOnly (MuxPeerRaw (aTxSubmission2Server version them))
else ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version them)),
keepAliveProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aKeepAliveServer version them)))
})
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
:: NodeKernelArgs m (ConnectionId addrNTN) (ConnectionId addrNTC) blk
-> NodeKernel m (ConnectionId addrNTN) (ConnectionId addrNTC) blk
-> BlockNodeToNodeVersion blk
-> NTN.Apps m (ConnectionId addrNTN) ByteString ByteString ByteString ByteString ()
-> NTN.Apps m (ConnectionId addrNTN) ByteString ByteString ByteString ByteString ByteString ()
mkNodeToNodeApps nodeKernelArgs nodeKernel version =
NTN.mkApps
nodeKernel
Expand All @@ -337,7 +337,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
mkDiffusionApplications
:: MiniProtocolParameters
-> ( BlockNodeToNodeVersion blk
-> NTN.Apps m (ConnectionId addrNTN) ByteString ByteString ByteString ByteString ()
-> NTN.Apps m (ConnectionId addrNTN) ByteString ByteString ByteString ByteString ByteString ()
)
-> ( BlockNodeToClientVersion blk
-> NTC.Apps m (ConnectionId addrNTC) ByteString ByteString ByteString ()
Expand Down
6 changes: 6 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/NodeToNode/Version.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ data NodeToNodeVersion
-- ^ Changes:
--
-- * Enable @CardanoNodeToNodeVersion4@, i.e., Mary
| NodeToNodeV_6
-- ^ Changes:
--
-- * Replace 'TxSubmision' with 'Txsubmission2' protocol.
deriving (Eq, Ord, Enum, Bounded, Show, Typeable)

nodeToNodeVersionCodec :: CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion
Expand All @@ -55,12 +59,14 @@ nodeToNodeVersionCodec = CodecCBORTerm { encodeTerm, decodeTerm }
encodeTerm NodeToNodeV_3 = CBOR.TInt 3
encodeTerm NodeToNodeV_4 = CBOR.TInt 4
encodeTerm NodeToNodeV_5 = CBOR.TInt 5
encodeTerm NodeToNodeV_6 = CBOR.TInt 6

decodeTerm (CBOR.TInt 1) = Right NodeToNodeV_1
decodeTerm (CBOR.TInt 2) = Right NodeToNodeV_2
decodeTerm (CBOR.TInt 3) = Right NodeToNodeV_3
decodeTerm (CBOR.TInt 4) = Right NodeToNodeV_4
decodeTerm (CBOR.TInt 5) = Right NodeToNodeV_5
decodeTerm (CBOR.TInt 6) = Right NodeToNodeV_6
decodeTerm (CBOR.TInt n) = Left ( T.pack "decode NodeToNodeVersion: unknonw tag: "
<> T.pack (show n)
, Just n
Expand Down

0 comments on commit 6f4abf2

Please sign in to comment.