Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read messages before applying quota to avoid mplex backpressure issues #4697

Merged
merged 5 commits into from
Jun 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 33 additions & 25 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1105,19 +1105,6 @@ proc handleIncomingStream(network: Eth2Node,

nbc_reqresp_messages_received.inc(1, [shortProtocolId(protocolId)])

# The request quota is shared between all requests - it represents the
# cost to perform a service on behalf of a client and is incurred
# regardless if the request succeeds or fails - we don't count waiting
# for this quota against timeouts so as not to prematurely disconnect
# clients that are on the edge - nonetheless, the client will count it.
#
# When a client exceeds their quota, they will be slowed down without
# notification - as long as they don't make parallel requests (which is
# limited by libp2p), this will naturally adapt them to the available
# quota.

awaitQuota(peer, libp2pRequestCost, shortProtocolId(protocolId))

# TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end
let deadline = sleepAsync RESP_TIMEOUT

Expand All @@ -1131,19 +1118,20 @@ proc handleIncomingStream(network: Eth2Node,
else:
false

let msg = when isEmptyMsg:
NetRes[MsgRec].ok default(MsgRec)
else:
let msg =
try:
awaitWithTimeout(
readChunkPayload(conn, peer, MsgRec), deadline):
# Timeout, e.g., cancellation due to fulfillment by different peer.
# Treat this similarly to `UnexpectedEOF`, `PotentiallyExpectedEOF`.
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
await sendErrorResponse(
peer, conn, InvalidRequest,
errorMsgLit "Request full data not sent in time")
return
when isEmptyMsg:
NetRes[MsgRec].ok default(MsgRec)
else:
awaitWithTimeout(
readChunkPayload(conn, peer, MsgRec), deadline):
# Timeout, e.g., cancellation due to fulfillment by different peer.
# Treat this similarly to `UnexpectedEOF`, `PotentiallyExpectedEOF`.
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
await sendErrorResponse(
peer, conn, InvalidRequest,
errorMsgLit "Request full data not sent in time")
return

except SerializationError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
Expand All @@ -1152,6 +1140,26 @@ proc handleIncomingStream(network: Eth2Node,
except SnappyError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.msg
finally:
# The request quota is shared between all requests - it represents the
# cost to perform a service on behalf of a client and is incurred
# regardless if the request succeeds or fails - we don't count waiting
# for this quota against timeouts so as not to prematurely disconnect
# clients that are on the edge - nonetheless, the client will count it.

# When a client exceeds their quota, they will be slowed down without
# notification - as long as they don't make parallel requests (which is
# limited by libp2p), this will naturally adapt them to the available
# quota.

# Note that the `msg` will be stored in memory while we wait for the
# quota to be available. The amount of such messages in memory is
# bounded by the libp2p limit of parallel streams

# This quota also applies to invalid requests thanks to the use of
# `finally`.

awaitQuota(peer, libp2pRequestCost, shortProtocolId(protocolId))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, we need an explanation here why it's done in finally, which is .. unusual

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if msg.isErr:
if msg.error.kind in ProtocolViolations:
Expand Down