Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NodeToNode and NodeToClient handshake protocol timeouts #2990

Merged
merged 2 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ouroboros-network-framework/demo/ping-pong.hs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ clientPingPong pipelined =
connectToNode
(localSnocket iomgr defaultLocalSocketAddrPath)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -150,6 +151,7 @@ serverPingPong =
(AcceptedConnectionsLimit maxBound maxBound 0)
defaultLocalSocketAddr
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication app))
Expand Down Expand Up @@ -205,6 +207,7 @@ clientPingPong2 =
connectToNode
(localSnocket iomgr defaultLocalSocketAddrPath)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -259,6 +262,7 @@ serverPingPong2 =
(AcceptedConnectionsLimit maxBound maxBound 0)
defaultLocalSocketAddr
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication app))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ data HandshakeArguments connectionId vNumber vData m application = HandshakeArgu

-- | accept version, first argument is our version data the second
-- argument is the remote version data.
haAcceptVersion :: vData -> vData -> Accept vData
haAcceptVersion :: vData -> vData -> Accept vData,

-- | 'Driver' timeouts for 'Handshake' protocol.
--
haTimeLimits
:: ProtocolTimeLimits (Handshake vNumber CBOR.Term)
}


Expand All @@ -125,15 +130,16 @@ runHandshakeClient bearer
haHandshakeCodec,
haVersionDataCodec,
haVersions,
haAcceptVersion
haAcceptVersion,
haTimeLimits
} =
tryHandshake
(fst <$>
runPeerWithLimits
(WithMuxBearer connectionId `contramap` haHandshakeTracer)
haHandshakeCodec
byteLimitsHandshake
timeLimitsHandshake
haTimeLimits
(fromChannel (muxBearerAsChannel bearer handshakeProtocolNum InitiatorDir))
(handshakeClientPeer haVersionDataCodec haAcceptVersion haVersions))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Ouroboros.Network.Protocol.Handshake.Codec

, byteLimitsHandshake
, timeLimitsHandshake
, noTimeLimitsHandshake

, encodeRefuseReason
, decodeRefuseReason
Expand Down Expand Up @@ -96,6 +97,15 @@ timeLimitsHandshake = ProtocolTimeLimits stateToLimit
stateToLimit (ServerAgency TokConfirm) = shortWait


noTimeLimitsHandshake :: forall vNumber. ProtocolTimeLimits (Handshake vNumber CBOR.Term)
noTimeLimitsHandshake = ProtocolTimeLimits stateToLimit
where
stateToLimit :: forall (pr :: PeerRole) (st :: Handshake vNumber CBOR.Term).
PeerHasAgency pr st -> Maybe DiffTime
stateToLimit (ClientAgency TokPropose) = Nothing
stateToLimit (ServerAgency TokConfirm) = Nothing


-- |
-- @'Handshake'@ codec. The @'MsgProposeVersions'@ encodes proposed map in
-- ascending order and it expects to receive them in this order. This allows
Expand Down
33 changes: 24 additions & 9 deletions ouroboros-network-framework/src/Ouroboros/Network/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ import Network.Mux.DeltaQ.TraceTransformer

import Ouroboros.Network.ConnectionId
import Ouroboros.Network.Codec hiding (encode, decode)
import Ouroboros.Network.Driver (TraceSendRecv)
import Ouroboros.Network.Driver.Limits
import Ouroboros.Network.Mux
import Ouroboros.Network.ErrorPolicy
import Ouroboros.Network.Subscription.PeerState
Expand Down Expand Up @@ -193,6 +193,7 @@ connectToNode
)
=> Snocket IO fd addr
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> NetworkConnectTracers addr vNumber
-> (vData -> vData -> Accept vData)
Expand All @@ -203,7 +204,7 @@ connectToNode
-> addr
-- ^ remote address
-> IO ()
connectToNode sn handshakeCodec versionDataCodec tracers acceptVersion versions localAddr remoteAddr =
connectToNode sn handshakeCodec handshakeTimeLimits versionDataCodec tracers acceptVersion versions localAddr remoteAddr =
bracket
(Snocket.openToConnect sn remoteAddr)
(Snocket.close sn)
Expand All @@ -212,7 +213,7 @@ connectToNode sn handshakeCodec versionDataCodec tracers acceptVersion versions
Just addr -> Snocket.bind sn sd addr
Nothing -> return ()
Snocket.connect sn sd remoteAddr
connectToNode' sn handshakeCodec versionDataCodec tracers acceptVersion versions sd
connectToNode' sn handshakeCodec handshakeTimeLimits versionDataCodec tracers acceptVersion versions sd
)

-- |
Expand All @@ -232,14 +233,15 @@ connectToNode'
)
=> Snocket IO fd addr
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> NetworkConnectTracers addr vNumber
-> (vData -> vData -> Accept vData)
-> Versions vNumber vData (OuroborosApplication appType addr BL.ByteString IO a b)
-- ^ application to run over the connection
-> fd
-> IO ()
connectToNode' sn handshakeCodec versionDataCodec NetworkConnectTracers {nctMuxTracer, nctHandshakeTracer } acceptVersion versions sd = do
connectToNode' sn handshakeCodec handshakeTimeLimits versionDataCodec NetworkConnectTracers {nctMuxTracer, nctHandshakeTracer } acceptVersion versions sd = do
connectionId <- ConnectionId <$> Snocket.getLocalAddr sn sd <*> Snocket.getRemoteAddr sn sd
muxTracer <- initDeltaQTracer' $ Mx.WithMuxBearer connectionId `contramap` nctMuxTracer
ts_start <- getMonotonicTime
Expand All @@ -254,7 +256,8 @@ connectToNode' sn handshakeCodec versionDataCodec NetworkConnectTracers {nctMuxT
haHandshakeCodec = handshakeCodec,
haVersionDataCodec = versionDataCodec,
haVersions = versions,
haAcceptVersion = acceptVersion
haAcceptVersion = acceptVersion,
haTimeLimits = handshakeTimeLimits
}
ts_end <- getMonotonicTime
case app_e of
Expand Down Expand Up @@ -284,17 +287,19 @@ connectToNodeSocket
)
=> IOManager
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> NetworkConnectTracers Socket.SockAddr vNumber
-> (vData -> vData -> Accept vData)
-> Versions vNumber vData (OuroborosApplication appType Socket.SockAddr BL.ByteString IO a b)
-- ^ application to run over the connection
-> Socket.Socket
-> IO ()
connectToNodeSocket iocp handshakeCodec versionDataCodec tracers acceptVersion versions sd =
connectToNodeSocket iocp handshakeCodec handshakeTimeLimits versionDataCodec tracers acceptVersion versions sd =
connectToNode'
(Snocket.socketSnocket iocp)
handshakeCodec
handshakeTimeLimits
versionDataCodec
tracers
acceptVersion
Expand Down Expand Up @@ -351,12 +356,13 @@ beginConnection
-> Tracer IO (Mx.WithMuxBearer (ConnectionId addr) Mx.MuxTrace)
-> Tracer IO (Mx.WithMuxBearer (ConnectionId addr) (TraceSendRecv (Handshake vNumber CBOR.Term)))
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> (vData -> vData -> Accept vData)
-> (Time -> addr -> st -> STM.STM (AcceptConnection st vNumber vData addr IO BL.ByteString))
-- ^ either accept or reject a connection.
-> Server.BeginConnection addr fd st ()
beginConnection sn muxTracer handshakeTracer handshakeCodec versionDataCodec acceptVersion fn t addr st = do
beginConnection sn muxTracer handshakeTracer handshakeCodec handshakeTimeLimits versionDataCodec acceptVersion fn t addr st = do
accept <- fn t addr st
case accept of
AcceptConnection st' connectionId versions -> pure $ Server.Accept st' $ \sd -> do
Expand All @@ -373,7 +379,8 @@ beginConnection sn muxTracer handshakeTracer handshakeCodec versionDataCodec acc
haHandshakeCodec = handshakeCodec,
haVersionDataCodec = versionDataCodec,
haVersions = versions,
haAcceptVersion = acceptVersion
haAcceptVersion = acceptVersion,
haTimeLimits = handshakeTimeLimits
}

case app_e of
Expand Down Expand Up @@ -511,6 +518,7 @@ runServerThread
-> fd
-> AcceptedConnectionsLimit
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> (vData -> vData -> Accept vData)
-> Versions vNumber vData (SomeResponderApplication addr BL.ByteString IO b)
Expand All @@ -527,6 +535,7 @@ runServerThread NetworkServerTracers { nstMuxTracer
sd
acceptedConnectionsLimit
handshakeCodec
handshakeTimeLimits
versionDataCodec
acceptVersion
versions
Expand All @@ -538,7 +547,7 @@ runServerThread NetworkServerTracers { nstMuxTracer
(fromSnocket nmsConnectionTable sn sd)
acceptedConnectionsLimit
(acceptException sockAddr)
(beginConnection sn nstMuxTracer nstHandshakeTracer handshakeCodec versionDataCodec acceptVersion (acceptConnectionTx sockAddr))
(beginConnection sn nstMuxTracer nstHandshakeTracer handshakeCodec handshakeTimeLimits versionDataCodec acceptVersion (acceptConnectionTx sockAddr))
-- register producer when application starts, it will be unregistered
-- using 'CompleteConnection'
(\remoteAddr thread st -> pure $ registerProducer remoteAddr thread
Expand Down Expand Up @@ -592,6 +601,7 @@ withServerNode
-> AcceptedConnectionsLimit
-> addr
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> (vData -> vData -> Accept vData)
-> Versions vNumber vData (SomeResponderApplication addr BL.ByteString IO b)
Expand All @@ -610,6 +620,7 @@ withServerNode sn
acceptedConnectionsLimit
addr
handshakeCodec
handshakeTimeLimits
versionDataCodec
acceptVersion
versions
Expand All @@ -623,6 +634,7 @@ withServerNode sn
acceptedConnectionsLimit
sd
handshakeCodec
handshakeTimeLimits
versionDataCodec
acceptVersion
versions
Expand Down Expand Up @@ -658,6 +670,7 @@ withServerNode'
-> AcceptedConnectionsLimit
-> fd
-> Codec (Handshake vNumber CBOR.Term) CBOR.DeserialiseFailure IO BL.ByteString
-> ProtocolTimeLimits (Handshake vNumber CBOR.Term)
-> VersionDataCodec CBOR.Term vNumber vData
-> (vData -> vData -> Accept vData)
-> Versions vNumber vData (SomeResponderApplication addr BL.ByteString IO b)
Expand All @@ -676,6 +689,7 @@ withServerNode' sn
acceptedConnectionsLimit
sd
handshakeCodec
handshakeTimeLimits
versionDataCodec
acceptVersion
versions
Expand All @@ -690,6 +704,7 @@ withServerNode' sn
sd
acceptedConnectionsLimit
handshakeCodec
handshakeTimeLimits
versionDataCodec
acceptVersion
versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ prop_socket_send_recv initiatorAddr responderAddr f xs =
(AcceptedConnectionsLimit maxBound maxBound 0)
responderAddr
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication responderApp))
Expand All @@ -253,6 +254,7 @@ prop_socket_send_recv initiatorAddr responderAddr f xs =
connectToNode
snocket
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
(NetworkConnectTracers activeMuxTracer nullTracer)
acceptableVersion
Expand Down Expand Up @@ -488,6 +490,7 @@ prop_socket_client_connect_error _ xs =
<- try $ False <$ connectToNode
(socketSnocket iomgr)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ prop_send_recv f xs _first = ioProperty $ withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
(Socket.addrAddress responderAddr)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication responderApp))
Expand All @@ -610,6 +611,7 @@ prop_send_recv f xs _first = ioProperty $ withIOManager $ \iocp -> do
(connectToNodeSocket
iocp
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -730,6 +732,7 @@ prop_send_recv_init_and_rsp f xs = ioProperty $ withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
responderAddr
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication (appX rrcfg)))
Expand All @@ -750,6 +753,7 @@ prop_send_recv_init_and_rsp f xs = ioProperty $ withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
responderAddr
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication (appX rrcfg)))
Expand All @@ -775,6 +779,7 @@ prop_send_recv_init_and_rsp f xs = ioProperty $ withIOManager $ \iocp -> do
(connectToNodeSocket
iocp
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -847,6 +852,7 @@ _demo = ioProperty $ withIOManager $ \iocp -> do
(connectToNodeSocket
iocp
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand All @@ -869,6 +875,7 @@ _demo = ioProperty $ withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
(Socket.addrAddress addr)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(unversionedProtocol (SomeResponderApplication appRsp))
Expand Down
4 changes: 4 additions & 0 deletions ouroboros-network/demo/chain-sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ clientChainSync sockPaths = withIOManager $ \iocp ->
connectToNode
(localSnocket iocp sockPath)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -187,6 +188,7 @@ serverChainSync sockAddr = withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
(localAddressFromPath sockAddr)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(simpleSingletonVersions
Expand Down Expand Up @@ -358,6 +360,7 @@ clientBlockFetch sockAddrs = withIOManager $ \iocp -> do
connectToNode
(localSnocket iocp defaultLocalSocketAddrPath)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
nullNetworkConnectTracers
acceptableVersion
Expand Down Expand Up @@ -413,6 +416,7 @@ serverBlockFetch sockAddr = withIOManager $ \iocp -> do
(AcceptedConnectionsLimit maxBound maxBound 0)
(localAddressFromPath sockAddr)
unversionedHandshakeCodec
noTimeLimitsHandshake
(cborTermVersionDataCodec unversionedProtocolDataCodec)
acceptableVersion
(simpleSingletonVersions
Expand Down
Loading