Skip to content

Commit

Permalink
feat: add a counter for opened connections to Endpoint
Browse files Browse the repository at this point in the history
This is behind an `unstable-opened-connection-count` feature flag, so
that it can be removed again in future without a breaking change. The
intention is to use it to get a measure of the current number of
connections that we open in downstream tests, in order to see the effect
of removing the `ConnectionPool` and track down extraneous connections.
  • Loading branch information
Chris Connelly authored and lionel-faber committed Oct 13, 2021
1 parent d79a01a commit c6a9a42
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 2 deletions.
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@ authors = [ "MaidSafe Developers <dev@maidsafe.net>" ]
keywords = [ "quic" ]
edition = "2018"

[[example]]
name = "p2p_node"
required-features = ["unstable-opened-connection-count"]

[features]
default = [ "igd" ]

# `unstable-` features may be removed at any time without a breaking change release
unstable-opened-connection-count = []

[dependencies]
backoff = { version = "0.3.0", features = ["tokio"] }
bincode = "1.2.1"
Expand All @@ -25,6 +32,7 @@ quinn-proto = "0.7.3"
rcgen = "~0.8.4"
rustls = { version = "0.19.0", features = ["dangerous_configuration"] }
serde = { version = "1.0.117", features = ["derive"] }
stability = "0.1.0"
thiserror = "1.0.23"
tiny-keccak = "2.0.2"
tokio = { version = "1.2.0", features = ["sync"] }
Expand Down
5 changes: 5 additions & 0 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ async fn main() -> Result<()> {
println!("Sending to {:?} --> {:?}\n", peer, msg);
node.connect_to(&peer).await?.send(msg.clone()).await?;
}

println!(
"Done sending, opened {} connections",
node.opened_connection_count()
);
}

println!("\n---");
Expand Down
25 changes: 24 additions & 1 deletion src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ use super::{
},
};
use bytes::Bytes;
use std::net::SocketAddr;
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tokio::sync::broadcast::{self, Sender};
use tokio::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
use tokio::time::{timeout, Duration};
Expand Down Expand Up @@ -80,6 +86,9 @@ pub struct Endpoint<I: ConnId> {
termination_tx: Sender<()>,
connection_pool: ConnectionPool<I>,
connection_deduplicator: ConnectionDeduplicator,

// counts fully opened connections, excluding incoming and connections dropped by connect_to_any
opened_connection_count: Arc<AtomicUsize>,
}

impl<I: ConnId> std::fmt::Debug for Endpoint<I> {
Expand Down Expand Up @@ -232,6 +241,7 @@ impl<I: ConnId> Endpoint<I> {
termination_tx: channels.termination.0.clone(),
connection_pool: ConnectionPool::new(),
connection_deduplicator: ConnectionDeduplicator::new(),
opened_connection_count: Arc::new(AtomicUsize::new(0)),
};

Ok((endpoint, quic_incoming, channels))
Expand All @@ -247,6 +257,16 @@ impl<I: ConnId> Endpoint<I> {
self.public_addr.unwrap_or(self.local_addr)
}

/// Get the count of opened connections.
///
/// `Endpoint`s keep a count of connections they have fully opened and returned to callers.
/// Notably this excludes any incoming connections, and connections that were dropped by
/// [`connect_to_any`](Self::connect_to_any).
#[stability::unstable(feature = "opened-connection-count")]
pub fn opened_connection_count(&self) -> usize {
self.opened_connection_count.load(Ordering::Relaxed)
}

/// Removes all existing connections to a given peer
pub async fn disconnect_from(&self, peer_addr: &SocketAddr) {
self.connection_pool
Expand Down Expand Up @@ -450,6 +470,9 @@ impl<I: ConnId> Endpoint<I> {
);

let _ = completion.complete(Ok(()));

let _ = self.opened_connection_count.fetch_add(1, Ordering::Relaxed);

Ok(connection)
}
Err(error) => {
Expand Down
53 changes: 52 additions & 1 deletion src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ async fn successful_connection() -> Result<()> {
bail!("No incoming connection");
}

assert_eq!(peer1.opened_connection_count(), 0);
assert_eq!(peer2.opened_connection_count(), 1);

Ok(())
}

Expand Down Expand Up @@ -62,6 +65,10 @@ async fn single_message() -> Result<()> {
} else {
bail!("No incoming message");
}

assert_eq!(peer1.opened_connection_count(), 0);
assert_eq!(peer2.opened_connection_count(), 1);

Ok(())
}

Expand Down Expand Up @@ -114,6 +121,10 @@ async fn reuse_outgoing_connection() -> Result<()> {
} else {
bail!("No incoming message");
}

assert_eq!(alice.opened_connection_count(), 1);
assert_eq!(bob.opened_connection_count(), 0);

Ok(())
}

Expand Down Expand Up @@ -168,6 +179,10 @@ async fn reuse_incoming_connection() -> Result<()> {
} else {
bail!("No incoming message");
}

assert_eq!(alice.opened_connection_count(), 1);
assert_eq!(bob.opened_connection_count(), 0);

Ok(())
}

Expand Down Expand Up @@ -213,6 +228,9 @@ async fn disconnection() -> Result<()> {
bail!("Missing incoming connection");
}

assert_eq!(alice.opened_connection_count(), 1);
assert_eq!(bob.opened_connection_count(), 1);

Ok(())
}

Expand Down Expand Up @@ -284,7 +302,7 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
let b_to_a = bob.connect_to(&alice_addr).await?;

if let Ok(Some(connection)) =
timeout(Duration::from_secs(2), alice_incoming_connections.next()).await
timeout(Duration::from_secs(60), alice_incoming_connections.next()).await
{
assert_eq!(connection.remote_address(), bob_addr);
} else {
Expand All @@ -299,6 +317,9 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
assert_eq!(message, msg2);
}

assert_eq!(alice.opened_connection_count(), 1);
assert_eq!(bob.opened_connection_count(), 2);

Ok(())
}

Expand Down Expand Up @@ -363,6 +384,9 @@ async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
bail!("No message from alice");
}

assert_eq!(alice.opened_connection_count(), 0);
assert_eq!(bob.opened_connection_count(), 1);

Ok(())
}

Expand Down Expand Up @@ -459,12 +483,17 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
}
}

assert_eq!(send_endpoint.opened_connection_count(), 1);

Ok::<_, Report>(())
}
}));
}

let _ = future::try_join_all(tasks).await?;

assert_eq!(server_endpoint.opened_connection_count(), 0);

Ok(())
}

Expand Down Expand Up @@ -567,6 +596,8 @@ async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<(
}
}

assert_eq!(send_endpoint.opened_connection_count(), 1);

Ok::<_, Report>(())
}
}));
Expand All @@ -578,6 +609,9 @@ async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<(
other => bail!("Error from test threads: {:?}", other??),
}
}

assert_eq!(server_endpoint.opened_connection_count(), 0);

Ok(())
}

Expand Down Expand Up @@ -635,6 +669,10 @@ async fn many_messages() -> Result<()> {
}));

let _ = future::try_join_all(tasks).await?;

assert_eq!(send_endpoint.opened_connection_count(), 1);
assert_eq!(recv_endpoint.opened_connection_count(), 0);

Ok(())
}

Expand Down Expand Up @@ -669,6 +707,12 @@ async fn connection_attempts_to_bootstrap_contacts_should_succeed() -> Result<()
ep.connect_to(&peer).await.map(drop)?;
}
}

assert_eq!(ep1.opened_connection_count(), 0);
assert_eq!(ep2.opened_connection_count(), 0);
assert_eq!(ep3.opened_connection_count(), 0);
assert_eq!(ep.opened_connection_count(), 3);

Ok(())
}

Expand All @@ -682,6 +726,10 @@ async fn reachability() -> Result<()> {
};
let reachable_addr = ep2.public_addr();
ep1.is_reachable(&reachable_addr).await?;

assert_eq!(ep1.opened_connection_count(), 0);
assert_eq!(ep2.opened_connection_count(), 0);

Ok(())
}

Expand Down Expand Up @@ -731,5 +779,8 @@ async fn client() -> Result<()> {
.ok_or_else(|| eyre!("Did not receive expected disconnection"))?;
assert_eq!(disconnector, server.public_addr());

assert_eq!(server.opened_connection_count(), 0);
assert_eq!(client.opened_connection_count(), 1);

Ok(())
}

0 comments on commit c6a9a42

Please sign in to comment.