Skip to content

Commit

Permalink
Merge #2990
Browse files Browse the repository at this point in the history
2990: NodeToNode and NodeToClient handshake protocol timeouts r=coot a=coot



Co-authored-by: Marcin Szamotulski <profunctor@pm.me>
  • Loading branch information
iohk-bors[bot] and coot authored Mar 12, 2021
2 parents c36e59c + 749715b commit 3176973
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 13 deletions.
4 changes: 4 additions & 0 deletions ouroboros-network-framework/demo/ping-pong.hs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ clientPingPong pipelined =
connectToNode
(localSnocket iomgr defaultLocalSocketAddrPath)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -150,6 +151,7 @@ serverPingPong =
(AcceptedConnectionsLimit maxBound maxBound 0)
defaultLocalSocketAddr
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication app))
Expand Down Expand Up @@ -205,6 +207,7 @@ clientPingPong2 =
connectToNode
(localSnocket iomgr defaultLocalSocketAddrPath)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -259,6 +262,7 @@ serverPingPong2 =
(AcceptedConnectionsLimit maxBound maxBound 0)
defaultLocalSocketAddr
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication app))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ data HandshakeArguments connectionId vNumber vData m application = HandshakeArgu

-- | accept version, first argument is our version data the second
-- argument is the remote version data.
haAcceptVersion :: vData -> vData -> Accept vData
haAcceptVersion :: vData -> vData -> Accept vData,

-- | 'Driver' timeouts for 'Handshake' protocol.
--
haTimeLimits
:: ProtocolTimeLimits (Handshake vNumber CBOR.Term)
}


Expand All @@ -125,15 +130,16 @@ runHandshakeClient bearer
haHandshakeCodec,
haVersionDataCodec,
haVersions,
haAcceptVersion
haAcceptVersion,
haTimeLimits
} =
tryHandshake
(fst <$>
runPeerWithLimits
(WithMuxBearer connectionId `contramap` haHandshakeTracer)
haHandshakeCodec
byteLimitsHandshake
timeLimitsHandshake
haTimeLimits
(fromChannel (muxBearerAsChannel bearer handshakeProtocolNum InitiatorDir))
(handshakeClientPeer haVersionDataCodec haAcceptVersion haVersions))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Ouroboros.Network.Protocol.Handshake.Codec

, byteLimitsHandshake
, timeLimitsHandshake
, noTimeLimitsHandshake

, encodeRefuseReason
, decodeRefuseReason
Expand Down Expand Up @@ -96,6 +97,15 @@ timeLimitsHandshake = ProtocolTimeLimits stateToLimit
stateToLimit (ServerAgency TokConfirm) = shortWait


noTimeLimitsHandshake :: forall vNumber. ProtocolTimeLimits (Handshake vNumber CBOR.Term)
noTimeLimitsHandshake = ProtocolTimeLimits stateToLimit
where
stateToLimit :: forall (pr :: PeerRole) (st :: Handshake vNumber CBOR.Term).
PeerHasAgency pr st -> Maybe DiffTime
stateToLimit (ClientAgency TokPropose) = Nothing
stateToLimit (ServerAgency TokConfirm) = Nothing


-- |
-- @'Handshake'@ codec. The @'MsgProposeVersions'@ encodes proposed map in
-- ascending order and it expects to receive them in this order. This allows
Expand Down
33 changes: 24 additions & 9 deletions ouroboros-network-framework/src/Ouroboros/Network/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ import Network.Mux.DeltaQ.TraceTransformer

import Ouroboros.Network.ConnectionId
import Ouroboros.Network.Codec hiding (encode, decode)
import Ouroboros.Network.Driver (TraceSendRecv)
import Ouroboros.Network.Driver.Limits
import Ouroboros.Network.Mux
import Ouroboros.Network.ErrorPolicy
import Ouroboros.Network.Subscription.PeerState
Expand Down Expand Up @@ -193,6 +193,7 @@ connectToNode
)
=> Snocket IO fd addr
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> NetworkConnectTracers addr vNumber
-> (vData -> vData -> Accept vData)
Expand All @@ -203,7 +204,7 @@ connectToNode
-> addr
-- ^ remote address
-> IO ()
connectToNode sn handshakeCodec versionDataCodec tracers acceptVersion versions localAddr remoteAddr =
connectToNode sn handshakeCodec handshakeTimeLimits versionDataCodec tracers acceptVersion versions localAddr remoteAddr =
bracket
(Snocket.openToConnect sn remoteAddr)
(Snocket.close sn)
Expand All @@ -212,7 +213,7 @@ connectToNode sn handshakeCodec versionDataCodec tracers acceptVersion versions
Just addr -> Snocket.bind sn sd addr
Nothing -> return ()
Snocket.connect sn sd remoteAddr
connectToNode' sn handshakeCodec versionDataCodec tracers acceptVersion versions sd
connectToNode' sn handshakeCodec handshakeTimeLimits versionDataCodec tracers acceptVersion versions sd
)

-- |
Expand All @@ -232,14 +233,15 @@ connectToNode'
)
=> Snocket IO fd addr
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> NetworkConnectTracers addr vNumber
-> (vData -> vData -> Accept vData)
-> Versions vNumber vData (OuroborosApplication appType addr BL.ByteString IO a b)
-- ^ application to run over the connection
-> fd
-> IO ()
connectToNode' sn handshakeCodec versionDataCodec NetworkConnectTracers {nctMuxTracer, nctHandshakeTracer } acceptVersion versions sd = do
connectToNode' sn handshakeCodec handshakeTimeLimits versionDataCodec NetworkConnectTracers {nctMuxTracer, nctHandshakeTracer } acceptVersion versions sd = do
connectionId <- ConnectionId <$> Snocket.getLocalAddr sn sd <*> Snocket.getRemoteAddr sn sd
muxTracer <- initDeltaQTracer' $ Mx.WithMuxBearer connectionId `contramap` nctMuxTracer
ts_start <- getMonotonicTime
Expand All @@ -254,7 +256,8 @@ connectToNode' sn handshakeCodec versionDataCodec NetworkConnectTracers {nctMuxT
haHandshakeCodec = handshakeCodec,
haVersionDataCodec = versionDataCodec,
haVersions = versions,
haAcceptVersion = acceptVersion
haAcceptVersion = acceptVersion,
haTimeLimits = handshakeTimeLimits
}
ts_end <- getMonotonicTime
case app_e of
Expand Down Expand Up @@ -284,17 +287,19 @@ connectToNodeSocket
)
=> IOManager
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> NetworkConnectTracers Socket.SockAddr vNumber
-> (vData -> vData -> Accept vData)
-> Versions vNumber vData (OuroborosApplication appType Socket.SockAddr BL.ByteString IO a b)
-- ^ application to run over the connection
-> Socket.Socket
-> IO ()
connectToNodeSocket iocp handshakeCodec versionDataCodec tracers acceptVersion versions sd =
connectToNodeSocket iocp handshakeCodec handshakeTimeLimits versionDataCodec tracers acceptVersion versions sd =
connectToNode'
(Snocket.socketSnocket iocp)
handshakeCodec
handshakeTimeLimits
versionDataCodec
tracers
acceptVersion
Expand Down Expand Up @@ -351,12 +356,13 @@ beginConnection
-> Tracer IO (Mx.WithMuxBearer (ConnectionId addr) Mx.MuxTrace)
-> Tracer IO (Mx.WithMuxBearer (ConnectionId addr) (TraceSendRecv (Handshake vNumber CBOR.Term)))
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> (vData -> vData -> Accept vData)
-> (Time -> addr -> st -> STM.STM (AcceptConnection st vNumber vData addr IO BL.ByteString))
-- ^ either accept or reject a connection.
-> Server.BeginConnection addr fd st ()
beginConnection sn muxTracer handshakeTracer handshakeCodec versionDataCodec acceptVersion fn t addr st = do
beginConnection sn muxTracer handshakeTracer handshakeCodec handshakeTimeLimits versionDataCodec acceptVersion fn t addr st = do
accept <- fn t addr st
case accept of
AcceptConnection st' connectionId versions -> pure $ Server.Accept st' $ \sd -> do
Expand All @@ -373,7 +379,8 @@ beginConnection sn muxTracer handshakeTracer handshakeCodec versionDataCodec acc
haHandshakeCodec = handshakeCodec,
haVersionDataCodec = versionDataCodec,
haVersions = versions,
haAcceptVersion = acceptVersion
haAcceptVersion = acceptVersion,
haTimeLimits = handshakeTimeLimits
}

case app_e of
Expand Down Expand Up @@ -511,6 +518,7 @@ runServerThread
-> fd
-> AcceptedConnectionsLimit
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> (vData -> vData -> Accept vData)
-> Versions vNumber vData (SomeResponderApplication addr BL.ByteString IO b)
Expand All @@ -527,6 +535,7 @@ runServerThread NetworkServerTracers { nstMuxTracer
sd
acceptedConnectionsLimit
handshakeCodec
handshakeTimeLimits
versionDataCodec
acceptVersion
versions
Expand All @@ -538,7 +547,7 @@ runServerThread NetworkServerTracers { nstMuxTracer
(fromSnocket nmsConnectionTable sn sd)
acceptedConnectionsLimit
(acceptException sockAddr)
(beginConnection sn nstMuxTracer nstHandshakeTracer handshakeCodec versionDataCodec acceptVersion (acceptConnectionTx sockAddr))
(beginConnection sn nstMuxTracer nstHandshakeTracer handshakeCodec handshakeTimeLimits versionDataCodec acceptVersion (acceptConnectionTx sockAddr))
-- register producer when application starts, it will be unregistered
-- using 'CompleteConnection'
(\remoteAddr thread st -> pure $ registerProducer remoteAddr thread
Expand Down Expand Up @@ -592,6 +601,7 @@ withServerNode
-> AcceptedConnectionsLimit
-> addr
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> (vData -> vData -> Accept vData)
-> Versions vNumber vData (SomeResponderApplication addr BL.ByteString IO b)
Expand All @@ -610,6 +620,7 @@ withServerNode sn
acceptedConnectionsLimit
addr
handshakeCodec
handshakeTimeLimits
versionDataCodec
acceptVersion
versions
Expand All @@ -623,6 +634,7 @@ withServerNode sn
acceptedConnectionsLimit
sd
handshakeCodec
handshakeTimeLimits
versionDataCodec
acceptVersion
versions
Expand Down Expand Up @@ -658,6 +670,7 @@ withServerNode'
-> AcceptedConnectionsLimit
-> fd
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> (vData -> vData -> Accept vData)
-> Versions vNumber vData (SomeResponderApplication addr BL.ByteString IO b)
Expand All @@ -676,6 +689,7 @@ withServerNode' sn
acceptedConnectionsLimit
sd
handshakeCodec
handshakeTimeLimits
versionDataCodec
acceptVersion
versions
Expand All @@ -690,6 +704,7 @@ withServerNode' sn
sd
acceptedConnectionsLimit
handshakeCodec
handshakeTimeLimits
versionDataCodec
acceptVersion
versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ prop_socket_send_recv initiatorAddr responderAddr f xs =
(AcceptedConnectionsLimit maxBound maxBound 0)
responderAddr
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication responderApp))
Expand All @@ -253,6 +254,7 @@ prop_socket_send_recv initiatorAddr responderAddr f xs =
connectToNode
snocket
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
(NetworkConnectTracers activeMuxTracer nullTracer)
acceptableVersion
Expand Down Expand Up @@ -488,6 +490,7 @@ prop_socket_client_connect_error _ xs =
<- try $ False <$ connectToNode
(socketSnocket iomgr)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ prop_send_recv f xs _first = ioProperty $ withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
(Socket.addrAddress responderAddr)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication responderApp))
Expand All @@ -610,6 +611,7 @@ prop_send_recv f xs _first = ioProperty $ withIOManager $ \iocp -> do
(connectToNodeSocket
iocp
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -730,6 +732,7 @@ prop_send_recv_init_and_rsp f xs = ioProperty $ withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
responderAddr
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication (appX rrcfg)))
Expand All @@ -750,6 +753,7 @@ prop_send_recv_init_and_rsp f xs = ioProperty $ withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
responderAddr
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication (appX rrcfg)))
Expand All @@ -775,6 +779,7 @@ prop_send_recv_init_and_rsp f xs = ioProperty $ withIOManager $ \iocp -> do
(connectToNodeSocket
iocp
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -847,6 +852,7 @@ _demo = ioProperty $ withIOManager $ \iocp -> do
(connectToNodeSocket
iocp
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand All @@ -869,6 +875,7 @@ _demo = ioProperty $ withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
(Socket.addrAddress addr)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication appRsp))
Expand Down
4 changes: 4 additions & 0 deletions ouroboros-network/demo/chain-sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ clientChainSync sockPaths = withIOManager $ \iocp ->
connectToNode
(localSnocket iocp sockPath)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -187,6 +188,7 @@ serverChainSync sockAddr = withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
(localAddressFromPath sockAddr)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(simpleSingletonVersions
Expand Down Expand Up @@ -358,6 +360,7 @@ clientBlockFetch sockAddrs = withIOManager $ \iocp -> do
connectToNode
(localSnocket iocp defaultLocalSocketAddrPath)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -413,6 +416,7 @@ serverBlockFetch sockAddr = withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
(localAddressFromPath sockAddr)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(simpleSingletonVersions
Expand Down
Loading

0 comments on commit 3176973

Please sign in to comment.