Skip to content

Commit

Permalink
transport: Fix waking up on filtered events from poll_next (#287)
Browse files Browse the repository at this point in the history
This PR ensures that the transport (TCP / WebSocket) implementation
wakes up the waker `FuturesUnordered` on adding futures to the set.

This is one of the async rough edges in Rust, to properly illustrate the
issue I've compiled this rust example (comment and uncomment like 43 to
see the issue in action):

### Example 1
The future is not polled again: 

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=1e801c4864599c54a98bb9f30be1fdd5

### Example 2
The future is delayed (from 2s to 3s) because it relies on some other
implementation to wake up the context waker:

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=951080892763ca7d02ae9c5617da6c5d

### Particular implementation

The `poll_next` implementation polls 3 objects:
- `listener`: accepts incoming socket connections
- `pending_raw_connections`: dialing futures to establish socket
connection
- `pending_connections`: negotiating futures to establish libp2p
protocols

When adding futures to either `pending_raw_connections` or
`pending_connections`, the futures are not polled immediately. Instead,
they will get polled the first time an already present future finishes
execution.

This effectively leads to delays in processing both raw connections and
pending connections.
This PR in combination with
#286 has improved the
connection stability of litep2p for a high number of connections (500 in
and 500 out). The decreasing line represents release 0.8 configured with
500 in and out peers, while the stable line from 11/12 represents
litep2p with incoming PR patches. (I've restarted the node 3/4 times for
more debug logs):

![Screenshot 2024-11-12 at 14 49
43](https://github.com/user-attachments/assets/01d7f864-e219-4bed-b798-bd84b4356522)



### Next Steps

- There are a few other places were this may be happening, I'll have a
look at them after we sort out connection limits.
- @dmitry-markin suggested an even more optimized way of handling this
by polling each future individually to feed them a context waker before
adding to the inner FuturesUnordered 🙏

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv authored Nov 13, 2024
1 parent cbc657d commit e7cb35e
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub mod yamux;

mod bandwidth;
mod multistream_select;
mod utils;

#[cfg(test)]
mod mock;
Expand Down
5 changes: 2 additions & 3 deletions src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
protocol::libp2p::kademlia::{futures_stream::FuturesStream, query::QueryId},
substream::Substream,
PeerId,
protocol::libp2p::kademlia::query::QueryId, substream::Substream,
utils::futures_stream::FuturesStream, PeerId,
};

use bytes::{Bytes, BytesMut};
Expand Down
1 change: 0 additions & 1 deletion src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ const PARALLELISM_FACTOR: usize = 3;
mod bucket;
mod config;
mod executor;
mod futures_stream;
mod handle;
mod message;
mod query;
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/libp2p/kademlia/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
use crate::{
protocol::libp2p::kademlia::{
config::{DEFAULT_PROVIDER_REFRESH_INTERVAL, DEFAULT_PROVIDER_TTL},
futures_stream::FuturesStream,
record::{ContentProvider, Key, ProviderRecord, Record},
types::Key as KademliaKey,
},
utils::futures_stream::FuturesStream,
PeerId,
};

Expand Down
12 changes: 6 additions & 6 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
Transport, TransportBuilder, TransportEvent,
},
types::ConnectionId,
utils::futures_stream::FuturesStream,
};

use futures::{
Expand Down Expand Up @@ -111,12 +112,11 @@ pub(crate) struct TcpTransport {
pending_inbound_connections: HashMap<ConnectionId, PendingInboundConnection>,

/// Pending opening connections.
pending_connections: FuturesUnordered<
BoxFuture<'static, Result<NegotiatedConnection, (ConnectionId, DialError)>>,
>,
pending_connections:
FuturesStream<BoxFuture<'static, Result<NegotiatedConnection, (ConnectionId, DialError)>>>,

/// Pending raw, unnegotiated connections.
pending_raw_connections: FuturesUnordered<BoxFuture<'static, RawConnectionResult>>,
pending_raw_connections: FuturesStream<BoxFuture<'static, RawConnectionResult>>,

/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
opened_raw: HashMap<ConnectionId, (TcpStream, Multiaddr)>,
Expand Down Expand Up @@ -295,8 +295,8 @@ impl TransportBuilder for TcpTransport {
pending_open: HashMap::new(),
pending_dials: HashMap::new(),
pending_inbound_connections: HashMap::new(),
pending_connections: FuturesUnordered::new(),
pending_raw_connections: FuturesUnordered::new(),
pending_connections: FuturesStream::new(),
pending_raw_connections: FuturesStream::new(),
cancel_futures: HashMap::new(),
},
listen_addresses,
Expand Down
12 changes: 6 additions & 6 deletions src/transport/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::{
Transport, TransportBuilder, TransportEvent,
},
types::ConnectionId,
utils::futures_stream::FuturesStream,
DialError, PeerId,
};

Expand Down Expand Up @@ -115,12 +116,11 @@ pub(crate) struct WebSocketTransport {
pending_inbound_connections: HashMap<ConnectionId, PendingInboundConnection>,

/// Pending connections.
pending_connections: FuturesUnordered<
BoxFuture<'static, Result<NegotiatedConnection, (ConnectionId, DialError)>>,
>,
pending_connections:
FuturesStream<BoxFuture<'static, Result<NegotiatedConnection, (ConnectionId, DialError)>>>,

/// Pending raw, unnegotiated connections.
pending_raw_connections: FuturesUnordered<BoxFuture<'static, RawConnectionResult>>,
pending_raw_connections: FuturesStream<BoxFuture<'static, RawConnectionResult>>,

/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
opened_raw: HashMap<ConnectionId, (WebSocketStream<MaybeTlsStream<TcpStream>>, Multiaddr)>,
Expand Down Expand Up @@ -325,8 +325,8 @@ impl TransportBuilder for WebSocketTransport {
pending_open: HashMap::new(),
pending_dials: HashMap::new(),
pending_inbound_connections: HashMap::new(),
pending_connections: FuturesUnordered::new(),
pending_raw_connections: FuturesUnordered::new(),
pending_connections: FuturesStream::new(),
pending_raw_connections: FuturesStream::new(),
cancel_futures: HashMap::new(),
},
listen_addresses,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<F> FuturesStream<F> {
}
}

/// Number of futeres in the stream.
/// Number of futures in the stream.
#[cfg(test)]
pub fn len(&self) -> usize {
self.futures.len()
Expand Down
21 changes: 21 additions & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2024 litep2p developers
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

pub mod futures_stream;

0 comments on commit e7cb35e

Please sign in to comment.