Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(request-response): Avoid hanging at capacity and on dial IO errors #5419

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" }
libp2p-quic = { version = "0.10.3", path = "transports/quic" }
libp2p-relay = { version = "0.17.2", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" }
libp2p-request-response = { version = "0.26.3", path = "protocols/request-response" }
libp2p-server = { version = "0.12.7", path = "misc/server" }
libp2p-stream = { version = "0.1.0-alpha.1", path = "protocols/stream" }
libp2p-swarm = { version = "0.44.2", path = "swarm" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.26.3

- Avoid hanging at capacity and dial IO errors.
See [PR 5419](https://github.com/libp2p/rust-libp2p/pull/5419).

## 0.26.2

- Deprecate `Behaviour::add_address` in favor of `Swarm::add_peer_address`.
Expand Down
2 changes: 1 addition & 1 deletion protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-request-response"
edition = "2021"
rust-version = { workspace = true }
description = "Generic Request/Response Protocols"
version = "0.26.2"
version = "0.26.3"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
72 changes: 52 additions & 20 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::{InboundRequestId, OutboundRequestId, EMPTY_QUEUE_SHRINK_THRESHOLD};

use futures::channel::mpsc;
use futures::{channel::oneshot, prelude::*};
use instant::Instant;
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
Expand Down Expand Up @@ -57,6 +58,10 @@ where
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
/// The request/response message codec.
codec: TCodec,

request_timeout: Duration,
max_concurrent_streams: usize,

/// Queue of events to emit in `poll()`.
pending_events: VecDeque<Event<TCodec>>,
/// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
Expand Down Expand Up @@ -94,22 +99,24 @@ where
pub(super) fn new(
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
codec: TCodec,
substream_timeout: Duration,
request_timeout: Duration,
inbound_request_id: Arc<AtomicU64>,
max_concurrent_streams: usize,
) -> Self {
let (inbound_sender, inbound_receiver) = mpsc::channel(0);
Self {
inbound_protocols,
codec,
request_timeout,
max_concurrent_streams,
pending_outbound: VecDeque::new(),
requested_outbound: Default::default(),
inbound_receiver,
inbound_sender,
pending_events: VecDeque::new(),
inbound_request_id,
worker_streams: futures_bounded::FuturesMap::new(
substream_timeout,
request_timeout,
max_concurrent_streams,
),
}
Expand Down Expand Up @@ -159,6 +166,9 @@ where
}
};

// Inbound connections are reported to the upper layer from within the worker task,
// so by failing to schedule the worker means the upper layer will never know
// about the inbound request. Because of that we do not report any inbound failure.
if self
.worker_streams
.try_push(RequestId::Inbound(request_id), recv.boxed())
Expand All @@ -183,6 +193,21 @@ where
.pop_front()
.expect("negotiated a stream without a pending message");

// If timeout is already reached then there is no need to proceed further.
if message.time.elapsed() >= self.request_timeout {
self.pending_events
.push_back(Event::OutboundTimeout(message.request_id));
return;
}

// If we are at capacity, reschedule request later on.
//
// TODO(oblique): Implement `futures_bounded::FuturesMap::is_full`
if self.worker_streams.len() == self.max_concurrent_streams {
self.requested_outbound.push_back(message);
return;
}

let mut codec = self.codec.clone();
let request_id = message.request_id;

Expand All @@ -199,13 +224,10 @@ where
})
};

if self
.worker_streams
self.worker_streams
.try_push(RequestId::Outbound(request_id), send.boxed())
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
}
.ok()
.expect("worker_streams at capacity");
}

fn on_dial_upgrade_error(
Expand Down Expand Up @@ -350,6 +372,7 @@ pub struct OutboundMessage<TCodec: Codec> {
pub(crate) request_id: OutboundRequestId,
pub(crate) request: TCodec::Request,
pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
pub(crate) time: Instant,
}

impl<TCodec> fmt::Debug for OutboundMessage<TCodec>
Expand Down Expand Up @@ -441,20 +464,29 @@ where
}));
}

// Emit outbound requests.
if let Some(request) = self.pending_outbound.pop_front() {
let protocols = request.protocols.clone();
self.requested_outbound.push_back(request);

return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(Protocol { protocols }, ()),
});
}
// Emit outbound requests if we are not at capacity.
if self.worker_streams.len() < self.max_concurrent_streams {
if let Some(request) = self.pending_outbound.pop_front() {
// If timeout is already reached then there is no need to proceed further.
if request.time.elapsed() >= self.request_timeout {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::OutboundTimeout(request.request_id),
));
}

let protocols = request.protocols.clone();
self.requested_outbound.push_back(request);

return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(Protocol { protocols }, ()),
});
}

debug_assert!(self.pending_outbound.is_empty());
debug_assert!(self.pending_outbound.is_empty());

if self.pending_outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.pending_outbound.shrink_to_fit();
if self.pending_outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.pending_outbound.shrink_to_fit();
}
}

Poll::Pending
Expand Down
2 changes: 2 additions & 0 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub use handler::ProtocolSupport;
use crate::handler::OutboundMessage;
use futures::channel::oneshot;
use handler::Handler;
use instant::Instant;
use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{
Expand Down Expand Up @@ -428,6 +429,7 @@ where
request_id,
request,
protocols: self.outbound_protocols.clone(),
time: Instant::now(),
};

if let Some(request) = self.try_send_request(peer, request) {
Expand Down
Loading