diff --git a/Cargo.lock b/Cargo.lock index 05742c9a961..aeb7fb33319 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3269,7 +3269,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.26.2" +version = "0.26.3" dependencies = [ "anyhow", "async-std", diff --git a/Cargo.toml b/Cargo.toml index 805661b26d7..4c83081726d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,7 +99,7 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" } libp2p-quic = { version = "0.10.3", path = "transports/quic" } libp2p-relay = { version = "0.17.2", path = "protocols/relay" } libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" } -libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" } +libp2p-request-response = { version = "0.26.3", path = "protocols/request-response" } libp2p-server = { version = "0.12.7", path = "misc/server" } libp2p-stream = { version = "0.1.0-alpha.1", path = "protocols/stream" } libp2p-swarm = { version = "0.44.2", path = "swarm" } diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 92417508786..aee09cc8f4b 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.26.3 + +- Avoid hanging at capacity and dial IO errors. + See [PR 5419](https://github.com/libp2p/rust-libp2p/pull/5419). + ## 0.26.2 - Deprecate `Behaviour::add_address` in favor of `Swarm::add_peer_address`. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 1eb8c1ae95f..d621e477bfb 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-request-response" edition = "2021" rust-version = { workspace = true } description = "Generic Request/Response Protocols" -version = "0.26.2" +version = "0.26.3" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 2d45e0d7dc3..6986c22147e 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -28,6 +28,7 @@ use crate::{InboundRequestId, OutboundRequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use futures::channel::mpsc; use futures::{channel::oneshot, prelude::*}; +use instant::Instant; use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, @@ -57,6 +58,10 @@ where inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, /// The request/response message codec. codec: TCodec, + + request_timeout: Duration, + max_concurrent_streams: usize, + /// Queue of events to emit in `poll()`. pending_events: VecDeque>, /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. @@ -94,7 +99,7 @@ where pub(super) fn new( inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, codec: TCodec, - substream_timeout: Duration, + request_timeout: Duration, inbound_request_id: Arc, max_concurrent_streams: usize, ) -> Self { @@ -102,6 +107,8 @@ where Self { inbound_protocols, codec, + request_timeout, + max_concurrent_streams, pending_outbound: VecDeque::new(), requested_outbound: Default::default(), inbound_receiver, @@ -109,7 +116,7 @@ where pending_events: VecDeque::new(), inbound_request_id, worker_streams: futures_bounded::FuturesMap::new( - substream_timeout, + request_timeout, max_concurrent_streams, ), } @@ -159,6 +166,9 @@ where } }; + // Inbound connections are reported to the upper layer from within the worker task, + // so by failing to schedule the worker means the upper layer will never know + // about the inbound request. Because of that we do not report any inbound failure. if self .worker_streams .try_push(RequestId::Inbound(request_id), recv.boxed()) @@ -183,6 +193,21 @@ where .pop_front() .expect("negotiated a stream without a pending message"); + // If timeout is already reached then there is no need to proceed further. + if message.time.elapsed() >= self.request_timeout { + self.pending_events + .push_back(Event::OutboundTimeout(message.request_id)); + return; + } + + // If we are at capacity, reschedule request later on. + // + // TODO(oblique): Implement `futures_bounded::FuturesMap::is_full` + if self.worker_streams.len() == self.max_concurrent_streams { + self.requested_outbound.push_back(message); + return; + } + let mut codec = self.codec.clone(); let request_id = message.request_id; @@ -199,13 +224,10 @@ where }) }; - if self - .worker_streams + self.worker_streams .try_push(RequestId::Outbound(request_id), send.boxed()) - .is_err() - { - tracing::warn!("Dropping outbound stream because we are at capacity") - } + .ok() + .expect("worker_streams at capacity"); } fn on_dial_upgrade_error( @@ -350,6 +372,7 @@ pub struct OutboundMessage { pub(crate) request_id: OutboundRequestId, pub(crate) request: TCodec::Request, pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, + pub(crate) time: Instant, } impl fmt::Debug for OutboundMessage @@ -441,20 +464,29 @@ where })); } - // Emit outbound requests. - if let Some(request) = self.pending_outbound.pop_front() { - let protocols = request.protocols.clone(); - self.requested_outbound.push_back(request); - - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(Protocol { protocols }, ()), - }); - } + // Emit outbound requests if we are not at capacity. + if self.worker_streams.len() < self.max_concurrent_streams { + if let Some(request) = self.pending_outbound.pop_front() { + // If timeout is already reached then there is no need to proceed further. + if request.time.elapsed() >= self.request_timeout { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::OutboundTimeout(request.request_id), + )); + } + + let protocols = request.protocols.clone(); + self.requested_outbound.push_back(request); + + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(Protocol { protocols }, ()), + }); + } - debug_assert!(self.pending_outbound.is_empty()); + debug_assert!(self.pending_outbound.is_empty()); - if self.pending_outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { - self.pending_outbound.shrink_to_fit(); + if self.pending_outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { + self.pending_outbound.shrink_to_fit(); + } } Poll::Pending diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 4362b3255ad..911840185bd 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -79,6 +79,7 @@ pub use handler::ProtocolSupport; use crate::handler::OutboundMessage; use futures::channel::oneshot; use handler::Handler; +use instant::Instant; use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::{ @@ -428,6 +429,7 @@ where request_id, request, protocols: self.outbound_protocols.clone(), + time: Instant::now(), }; if let Some(request) = self.try_send_request(peer, request) {