From 50e5269d1a6155f654c86d379e9180fafe292415 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Wed, 2 Oct 2024 17:52:47 +0200 Subject: [PATCH 1/3] add test to send 1GB on udp --- src/transport/sockets.rs | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/transport/sockets.rs b/src/transport/sockets.rs index a5df47f..0963934 100644 --- a/src/transport/sockets.rs +++ b/src/transport/sockets.rs @@ -102,3 +102,38 @@ impl MultipleOutSocket { unreachable!() } } + +#[cfg(test)] +mod tests { + use tracing::error; + + use super::*; + use crate::peer::PeerNode; + use crate::tests::Result; + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_send_peers() -> Result<()> { + // Generate a subscriber with the desired log level. + let subscriber = tracing_subscriber::fmt::Subscriber::builder() + .with_max_level(tracing::Level::DEBUG) + .finish(); + // Set the subscriber as global. + // so this subscriber will be used as the default in all threads for the + // remainder of the duration of the program, similar to how `loggers` + // work in the `log` crate. + tracing::subscriber::set_global_default(subscriber) + .expect("Failed on subscribe tracing"); + let mut socket = MultipleOutSocket::configure( + &MultipleOutSocket::default_configuration(), + ); + let data = [0u8; 1000]; + let root = PeerNode::generate("192.168.0.1:666", 0)?; + let target = root.as_peer_info().to_socket_address(); + + for _ in 0..1000 * 1000 { + if let Err(e) = socket.send(&data, &target).await { + error!("{e}"); + } + } + Ok(()) + } +} From acbdf13f3586a7b4d76cf4839112cd60bae9e29d Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Wed, 2 Oct 2024 18:03:18 +0200 Subject: [PATCH 2/3] Raise error if udp.send timeout --- src/transport/sockets.rs | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/transport/sockets.rs b/src/transport/sockets.rs index 0963934..20d0e43 100644 --- a/src/transport/sockets.rs +++ b/src/transport/sockets.rs @@ -11,7 +11,7 @@ use std::time::Duration; use tokio::net::UdpSocket; use tokio::runtime::Handle; use tokio::task::block_in_place; -use tokio::time::{self, Interval}; +use tokio::time::{self, timeout, Interval}; use tracing::{info, warn}; use super::encoding::Configurable; @@ -72,26 +72,28 @@ impl MultipleOutSocket { if let Some(sleep) = &mut self.udp_backoff_timeout { sleep.tick().await; } - for i in 0..self.retry_count { - let res = match remote_addr.is_ipv4() { - true => self.ipv4.send_to(data, &remote_addr).await, - false => self.ipv6.send_to(data, &remote_addr).await, + let max_retry = self.retry_count; + + for i in 1..=max_retry { + let send_fn = match remote_addr.is_ipv4() { + true => self.ipv4.send_to(data, *remote_addr), + false => self.ipv6.send_to(data, *remote_addr), }; - match res { - Ok(_) => { - if i > 0 { + + let send = timeout(self.udp_send_retry_interval, send_fn) + .await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "TIMEOUT")); + + match send { + Ok(Ok(_)) => { + if i > 1 { info!("Message sent, recovered from previous error"); } return Ok(()); } - Err(e) => { - if i < (self.retry_count - 1) { - warn!( - "Unable to send msg, temptative {}/{} - {}", - i + 1, - self.retry_count, - e - ); + Ok(Err(e)) | Err(e) => { + if i < max_retry { + warn!("Unable to send msg, temptative {i}/{max_retry} - {e}"); tokio::time::sleep(self.udp_send_retry_interval).await } else { return Err(e); From 3cee4305c1386714127fdb642aa2ed5d6daa408f Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Wed, 2 Oct 2024 20:13:16 +0200 Subject: [PATCH 3/3] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8117c44..bffa337 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Change the EncodedChunk UUID generation (aka RaptorqHeader) - Change `raptorq` dependency from `1.6` to `2.0` +- Change UDP sender to raise error if timeout` ## [0.6.1] - 2024-04-10