From 3e1bff4565a4ff268634b8a96ef17f9fd139cb1e Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Fri, 24 Sep 2021 12:26:13 +0100 Subject: [PATCH] refactor!: remove `Endpoint::try_send_*` The `try_send_*` methods exist to send messages using a connection that already exists in the pool. This is useful for sending messages to clients, since it would be impossible to connect to them in case a connection is not in the pool, so using `connect_to` would retry until failure. This use-case is now covered by the `Endpoint::get_connection_by_addr` method. BREAKING CHANGE: `Endpoint::try_send_message` and `Endpoint::try_send_message_with` have been removed. Use `Endpoint::get_connection_by_addr` and `Connection::send_*` instead. --- src/endpoint.rs | 74 ++------------------------------------------- src/tests/common.rs | 30 +++++++++--------- 2 files changed, 17 insertions(+), 87 deletions(-) diff --git a/src/endpoint.rs b/src/endpoint.rs index e03d1c27..9d72e719 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -11,7 +11,7 @@ use super::igd::{forward_port, IgdError}; use super::wire_msg::WireMsg; use super::{ - config::{Config, InternalConfig, RetryConfig}, + config::{Config, InternalConfig}, connection_deduplicator::{ConnectionDeduplicator, DedupHandle}, connection_pool::{ConnId, ConnectionPool, ConnectionRemover}, connections::{ @@ -19,7 +19,7 @@ use super::{ DisconnectionEvents, RecvStream, SendStream, }, error::{ - ClientEndpointError, ConnectionError, EndpointError, RecvError, RpcError, SendError, + ClientEndpointError, ConnectionError, EndpointError, RecvError, RpcError, SerializationError, }, }; @@ -401,76 +401,6 @@ impl Endpoint { connection.open_bi(priority).await } - /// Send a message to a peer over an existing connection. - /// - /// # Priority - /// - /// Locally buffered data from streams with higher priority will be transmitted before data from - /// streams with lower priority. Changing the priority of a stream with pending data may only - /// take effect after that data has been transmitted. Using many different priority levels per - /// connection may have a negative impact on performance. - /// - /// `0` is a sensible default for 'normal' priority. - /// - /// # Connection pooling - /// - /// Note that unlike most methods on `Endpoint`, this **will not** use the connection pool. This - /// method is intended to be used when it's necessary to send a message on an existing - /// connection only. - /// - /// # Errors - /// - /// If a connection with `dest` exists in the pool but the message fails to send, - /// `Err(Some(_))` will be returned. If there's no connection with `dest` in the pool, then - /// `Err(None)` will be returned. - pub async fn try_send_message( - &self, - msg: Bytes, - dest: &SocketAddr, - priority: i32, - ) -> Result<(), Option> { - self.try_send_message_with(msg, dest, priority, None).await - } - - /// Send a message to a peer over an existing connection using given retry configuration. - /// - /// The given `retries`, if any, will override the [`Config::retry_config`] used to create the - /// endpoint. - /// - /// # Priority - /// - /// Locally buffered data from streams with higher priority will be transmitted before data from - /// streams with lower priority. Changing the priority of a stream with pending data may only - /// take effect after that data has been transmitted. Using many different priority levels per - /// connection may have a negative impact on performance. - /// - /// `0` is a sensible default for 'normal' priority. - /// - /// # Errors - /// - /// If a connection with `dest` exists in the pool but the message fails to send, - /// `Err(Some(_))` will be returned. If there's no connection with `dest` in the pool, then - /// `Err(None)` will be returned. - pub async fn try_send_message_with( - &self, - msg: Bytes, - dest: &SocketAddr, - priority: i32, - retries: Option<&RetryConfig>, - ) -> Result<(), Option> { - if let Some((conn, guard)) = self.connection_pool.get_by_addr(dest).await { - trace!("Connection exists in the connection pool: {}", dest); - let connection = self.wrap_connection(conn, guard); - retries - .unwrap_or(&self.config.retry_config) - .retry(|| async { Ok(connection.send_uni(msg.clone(), priority).await?) }) - .await?; - Ok(()) - } else { - Err(None) - } - } - /// Close all the connections of this endpoint immediately and stop accepting new connections. pub fn close(&self) { let _ = self.termination_tx.send(()); diff --git a/src/tests/common.rs b/src/tests/common.rs index 350f3ad5..c65fda20 100644 --- a/src/tests/common.rs +++ b/src/tests/common.rs @@ -301,7 +301,8 @@ async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> { new_endpoint().await?; let alice_addr = alice.public_addr(); - let (bob, _, mut bob_incoming_messages, _, _) = new_endpoint().await?; + let (bob, mut bob_incoming_connections, mut bob_incoming_messages, _, _) = + new_endpoint().await?; let bob_addr = bob.public_addr(); // Try to establish two connections to the same peer at the same time. @@ -324,17 +325,19 @@ async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> { // Send two messages, one from each end let msg0 = random_msg(1024); alice - .try_send_message(msg0.clone(), &bob_addr, 0) - .await - .map_err(|error| { - error - .map(|error| eyre!(error)) - .unwrap_or_else(|| eyre!("lost connection from alice to bob")) - })?; + .connect_to(&bob_addr) + .await? + .send(msg0.clone()) + .await?; let msg1 = random_msg(1024); b_to_a.send(msg1.clone()).await?; + // Bob did not get a new incoming connection + if let Ok(Some(_)) = timeout(Duration::from_secs(2), bob_incoming_connections.next()).await { + eyre!("Unexpected incoming connection from alice to bob"); + } + // Both messages are received at the other end if let Some((src, message)) = alice_incoming_messages.next().await { assert_eq!(src, bob_addr); @@ -706,13 +709,10 @@ async fn client() -> Result<()> { assert_eq!(&message[..], b"hello"); server - .try_send_message(b"world"[..].into(), &client.public_addr(), 0) - .await - .map_err(|error| { - error - .map(|error| eyre!(error)) - .unwrap_or_else(|| eyre!("no longer connected to client")) - })?; + .connect_to(&client.public_addr()) + .await? + .send(b"world"[..].into()) + .await?; let (sender, message) = client_messages .next() .await