From 10901451bbf569994f3e5213ff686363d6231dab Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 12 Aug 2024 11:45:38 +0200 Subject: [PATCH 1/6] try to only use tokio::mpsc in iroh-net --- Cargo.lock | 2 +- iroh-net/Cargo.toml | 11 +++-- .../src/discovery/local_swarm_discovery.rs | 41 +++++++++-------- iroh-net/src/magicsock.rs | 15 ++++--- iroh-net/src/magicsock/udp_conn.rs | 5 ++- iroh-net/src/net/netmon/actor.rs | 6 +-- iroh-net/src/net/netmon/bsd.rs | 4 +- iroh-net/src/util.rs | 45 +++++++++++++++++++ 8 files changed, 92 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00f2b29a0f..f229a55ce6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2839,7 +2839,6 @@ name = "iroh-net" version = "0.22.0" dependencies = [ "anyhow", - "async-channel", "axum", "backoff", "base64 0.22.1", @@ -2913,6 +2912,7 @@ dependencies = [ "tokio", "tokio-rustls 0.24.1", "tokio-rustls-acme", + "tokio-stream", "tokio-tungstenite", "tokio-tungstenite-wasm", "tokio-util", diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 6e0e154dc8..f2085f2927 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -17,7 +17,6 @@ workspace = true [dependencies] anyhow = { version = "1" } -async-channel = "2.3.1" base64 = "0.22.1" backoff = "0.4.0" bytes = "1" @@ -58,7 +57,6 @@ ring = "0.17" rustls = { version = "0.21.11", default-features = false, features = ["dangerous_configuration"] } serde = { version = "1", features = ["derive", "rc"] } smallvec = "1.11.1" -swarm-discovery = { version = "0.2.1", optional = true } socket2 = "0.5.3" stun-rs = "0.1.5" surge-ping = "0.8.0" @@ -92,6 +90,11 @@ tokio-rustls-acme = { version = "0.3", optional = true } iroh-metrics = { version = "0.22.0", path = "../iroh-metrics", default-features = false } strum = { version = "0.26.2", features = ["derive"] } +# local_swarm_discovery +swarm-discovery = { version = "0.2.1", optional = true } +tokio-stream = { version = "0.1.15", optional = true } + + [target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] netlink-packet-core = "0.7.0" netlink-packet-route = "0.17.0" @@ -127,7 +130,7 @@ harness = false duct = "0.13.6" [features] -default = ["metrics"] +default = ["metrics", "local_swarm_discovery", "iroh-relay"] iroh-relay = [ "dep:tokio-rustls-acme", "dep:axum", @@ -140,7 +143,7 @@ iroh-relay = [ ] metrics = ["iroh-metrics/metrics"] test-utils = ["iroh-relay"] -local_swarm_discovery = ["dep:swarm-discovery"] +local_swarm_discovery = ["dep:swarm-discovery", "dep:tokio-stream"] [[bin]] name = "iroh-relay" diff --git a/iroh-net/src/discovery/local_swarm_discovery.rs b/iroh-net/src/discovery/local_swarm_discovery.rs index a42594a6e8..9b3af51be1 100644 --- a/iroh-net/src/discovery/local_swarm_discovery.rs +++ b/iroh-net/src/discovery/local_swarm_discovery.rs @@ -11,17 +11,16 @@ use std::{ use anyhow::Result; use derive_more::FromStr; -use futures_lite::{stream::Boxed as BoxStream, StreamExt}; +use futures_lite::stream::Boxed as BoxStream; use tracing::{debug, error, trace, warn}; -use async_channel::Sender; use iroh_base::key::PublicKey; use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer}; -use tokio::task::JoinSet; +use tokio::{sync::mpsc, task::JoinSet}; use crate::{ discovery::{Discovery, DiscoveryItem}, - util::AbortingJoinHandle, + util::{send_blocking, AbortingJoinHandle}, AddrInfo, Endpoint, NodeId, }; @@ -39,13 +38,13 @@ const DISCOVERY_DURATION: Duration = Duration::from_secs(10); pub struct LocalSwarmDiscovery { #[allow(dead_code)] handle: AbortingJoinHandle<()>, - sender: Sender, + sender: mpsc::Sender, } #[derive(Debug)] enum Message { Discovery(String, Peer), - SendAddrs(NodeId, Sender>), + SendAddrs(NodeId, mpsc::Sender>), ChangeLocalAddrs(AddrInfo), Timeout(NodeId, usize), } @@ -62,7 +61,7 @@ impl LocalSwarmDiscovery { /// This relies on [`tokio::runtime::Handle::current`] and will panic if called outside of the context of a tokio runtime. pub fn new(node_id: NodeId) -> Result { debug!("Creating new LocalSwarmDiscovery service"); - let (send, recv) = async_channel::bounded(64); + let (send, mut recv) = mpsc::channel(64); let task_sender = send.clone(); let rt = tokio::runtime::Handle::current(); let discovery = LocalSwarmDiscovery::spawn_discoverer( @@ -75,19 +74,21 @@ impl LocalSwarmDiscovery { let handle = tokio::spawn(async move { let mut node_addrs: HashMap = HashMap::default(); let mut last_id = 0; - let mut senders: HashMap>>> = - HashMap::default(); + let mut senders: HashMap< + PublicKey, + HashMap>>, + > = HashMap::default(); let mut timeouts = JoinSet::new(); loop { trace!(?node_addrs, "LocalSwarmDiscovery Service loop tick"); let msg = match recv.recv().await { - Err(err) => { - error!("LocalSwarmDiscovery service error: {err:?}"); + None => { + error!("LocalSwarmDiscovery channel closed"); error!("closing LocalSwarmDiscovery"); timeouts.abort_all(); return; } - Ok(msg) => msg, + Some(msg) => msg, }; match msg { Message::Discovery(discovered_node_id, peer_info) => { @@ -189,7 +190,7 @@ impl LocalSwarmDiscovery { fn spawn_discoverer( node_id: PublicKey, - sender: Sender, + sender: mpsc::Sender, socketaddrs: BTreeSet, rt: &tokio::runtime::Handle, ) -> Result { @@ -200,9 +201,11 @@ impl LocalSwarmDiscovery { "Received peer information from LocalSwarmDiscovery" ); - sender - .send_blocking(Message::Discovery(node_id.to_string(), peer.clone())) - .ok(); + send_blocking( + &sender, + Message::Discovery(node_id.to_string(), peer.clone()), + ) + .ok(); }; let addrs = LocalSwarmDiscovery::socketaddrs_to_addrs(socketaddrs); let mut discoverer = @@ -247,7 +250,7 @@ impl From<&Peer> for DiscoveryItem { impl Discovery for LocalSwarmDiscovery { fn resolve(&self, _ep: Endpoint, node_id: NodeId) -> Option>> { - let (send, recv) = async_channel::bounded(20); + let (send, recv) = mpsc::channel(20); let discovery_sender = self.sender.clone(); tokio::spawn(async move { discovery_sender @@ -255,7 +258,8 @@ impl Discovery for LocalSwarmDiscovery { .await .ok(); }); - Some(recv.boxed()) + let stream = tokio_stream::wrappers::ReceiverStream::new(recv); + Some(Box::pin(stream)) } fn publish(&self, info: &AddrInfo) { @@ -273,6 +277,7 @@ impl Discovery for LocalSwarmDiscovery { #[cfg(test)] mod tests { use super::*; + use futures_lite::StreamExt; use testresult::TestResult; #[tokio::test] diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 2f54e459ce..995ec326f4 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -177,7 +177,7 @@ pub(crate) struct MagicSock { proxy_url: Option, /// Used for receiving relay messages. - relay_recv_receiver: async_channel::Receiver, + relay_recv_receiver: parking_lot::Mutex>, /// Stores wakers, to be called when relay_recv_ch receives new data. network_recv_wakers: parking_lot::Mutex>, network_send_wakers: parking_lot::Mutex>, @@ -788,12 +788,13 @@ impl MagicSock { if self.is_closed() { break; } - match self.relay_recv_receiver.try_recv() { - Err(async_channel::TryRecvError::Empty) => { + let mut relay_recv_receiver = self.relay_recv_receiver.lock(); + match relay_recv_receiver.try_recv() { + Err(mpsc::error::TryRecvError::Empty) => { self.network_recv_wakers.lock().replace(cx.waker().clone()); break; } - Err(async_channel::TryRecvError::Closed) => { + Err(mpsc::error::TryRecvError::Disconnected) => { return Poll::Ready(Err(io::Error::new( io::ErrorKind::NotConnected, "connection closed", @@ -1378,7 +1379,7 @@ impl Handle { insecure_skip_relay_cert_verify, } = opts; - let (relay_recv_sender, relay_recv_receiver) = async_channel::bounded(128); + let (relay_recv_sender, relay_recv_receiver) = mpsc::channel(128); let (pconn4, pconn6) = bind(port)?; let port = pconn4.port(); @@ -1412,7 +1413,7 @@ impl Handle { local_addrs: std::sync::RwLock::new((ipv4_addr, ipv6_addr)), closing: AtomicBool::new(false), closed: AtomicBool::new(false), - relay_recv_receiver, + relay_recv_receiver: parking_lot::Mutex::new(relay_recv_receiver), network_recv_wakers: parking_lot::Mutex::new(None), network_send_wakers: parking_lot::Mutex::new(None), actor_sender: actor_sender.clone(), @@ -1704,7 +1705,7 @@ struct Actor { relay_actor_sender: mpsc::Sender, relay_actor_cancel_token: CancellationToken, /// Channel to send received relay messages on, for processing. - relay_recv_sender: async_channel::Sender, + relay_recv_sender: mpsc::Sender, /// When set, is an AfterFunc timer that will call MagicSock::do_periodic_stun. periodic_re_stun_timer: time::Interval, /// The `NetInfo` provided in the last call to `net_info_func`. It's used to deduplicate calls to netInfoFunc. diff --git a/iroh-net/src/magicsock/udp_conn.rs b/iroh-net/src/magicsock/udp_conn.rs index f4d641db26..e6d6444d09 100644 --- a/iroh-net/src/magicsock/udp_conn.rs +++ b/iroh-net/src/magicsock/udp_conn.rs @@ -151,6 +151,7 @@ mod tests { use super::*; use anyhow::Result; + use tokio::sync::mpsc; const ALPN: &[u8] = b"n0/test/1"; @@ -192,7 +193,7 @@ mod tests { let (m2, _m2_key) = wrap_socket(m2)?; let m1_addr = SocketAddr::new(network.local_addr(), m1.local_addr()?.port()); - let (m1_send, m1_recv) = async_channel::bounded(8); + let (m1_send, mut m1_recv) = mpsc::channel(8); let m1_task = tokio::task::spawn(async move { if let Some(conn) = m1.accept().await { @@ -220,7 +221,7 @@ mod tests { drop(send_bi); // make sure the right values arrived - let val = m1_recv.recv().await?; + let val = m1_recv.recv().await.unwrap(); assert_eq!(val, b"hello"); m1_task.await??; diff --git a/iroh-net/src/net/netmon/actor.rs b/iroh-net/src/net/netmon/actor.rs index 083e482caa..b30aee228a 100644 --- a/iroh-net/src/net/netmon/actor.rs +++ b/iroh-net/src/net/netmon/actor.rs @@ -57,7 +57,7 @@ pub(super) struct Actor { /// OS specific monitor. #[allow(dead_code)] route_monitor: RouteMonitor, - mon_receiver: async_channel::Receiver, + mon_receiver: mpsc::Receiver, actor_receiver: mpsc::Receiver, actor_sender: mpsc::Sender, /// Callback registry. @@ -84,7 +84,7 @@ impl Actor { let wall_time = Instant::now(); // Use flume channels, as tokio::mpsc is not safe to use across ffi boundaries. - let (mon_sender, mon_receiver) = async_channel::bounded(MON_CHAN_CAPACITY); + let (mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY); let route_monitor = RouteMonitor::new(mon_sender)?; let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY); @@ -129,7 +129,7 @@ impl Actor { debounce_interval.reset_immediately(); } } - Ok(_event) = self.mon_receiver.recv() => { + Some(_event) = self.mon_receiver.recv() => { trace!("network activity detected"); last_event.replace(false); debounce_interval.reset_immediately(); diff --git a/iroh-net/src/net/netmon/bsd.rs b/iroh-net/src/net/netmon/bsd.rs index 20daef64ba..969f85a6d3 100644 --- a/iroh-net/src/net/netmon/bsd.rs +++ b/iroh-net/src/net/netmon/bsd.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use tokio::{io::AsyncReadExt, task::JoinHandle}; +use tokio::{io::AsyncReadExt, sync::mpsc, task::JoinHandle}; use tracing::{trace, warn}; #[cfg(any(target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))] @@ -23,7 +23,7 @@ impl Drop for RouteMonitor { } impl RouteMonitor { - pub(super) fn new(sender: async_channel::Sender) -> Result { + pub(super) fn new(sender: mpsc::Sender) -> Result { let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?; socket.set_nonblocking(true)?; let socket_std: std::os::unix::net::UnixStream = socket.into(); diff --git a/iroh-net/src/util.rs b/iroh-net/src/util.rs index 4986c773b1..6483135240 100644 --- a/iroh-net/src/util.rs +++ b/iroh-net/src/util.rs @@ -7,6 +7,7 @@ use std::{ task::{Context, Poll}, }; +use tokio::sync::mpsc; use futures_lite::future::Boxed as BoxFuture; use futures_util::{future::Shared, FutureExt}; @@ -132,3 +133,47 @@ impl Future for MaybeFuture { pub(crate) fn relay_only_mode() -> bool { std::option_env!("DEV_RELAY_ONLY").is_some() } + +/// Send an element blocking, automtically handling the existence of a runtime. +pub fn send_blocking(sender: &mpsc::Sender, el: T) -> std::result::Result<(), mpsc::error::SendError> { + match tokio::runtime::Handle::try_current() { + Ok(h) => { + dbg!(&h); + h.spawn(sender.send(el)); + // can not send result + Ok(()) + } + Err(err) => { + dbg!(err); + sender.blocking_send(el) + } + } +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_send_blocking_no_runtime() { + let (sender, mut receiver) = mpsc::channel(2); + + send_blocking(&sender, "hello").unwrap(); + send_blocking(&sender, "world").unwrap(); + + assert_eq!(receiver.blocking_recv().unwrap(), "hello"); + assert_eq!(receiver.blocking_recv().unwrap(), "world"); + } + + #[tokio::test] + async fn test_send_blocking_with_runtime() { + let (sender, mut receiver) = mpsc::channel(2); + + send_blocking(&sender, "hello").unwrap(); + send_blocking(&sender, "world").unwrap(); + + assert_eq!(receiver.recv().await.unwrap(), "hello"); + assert_eq!(receiver.recv().await.unwrap(), "world"); + } +} From 8a2e2f51f02ea9a081ed7d517255e92d70a1df14 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 13 Aug 2024 11:48:46 +0300 Subject: [PATCH 2/6] extend sender lifetime --- iroh-net/src/util.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/iroh-net/src/util.rs b/iroh-net/src/util.rs index 6483135240..1fcbb7945b 100644 --- a/iroh-net/src/util.rs +++ b/iroh-net/src/util.rs @@ -135,11 +135,14 @@ pub(crate) fn relay_only_mode() -> bool { } /// Send an element blocking, automtically handling the existence of a runtime. -pub fn send_blocking(sender: &mpsc::Sender, el: T) -> std::result::Result<(), mpsc::error::SendError> { +pub fn send_blocking(sender: &mpsc::Sender, el: T) -> std::result::Result<(), mpsc::error::SendError> { match tokio::runtime::Handle::try_current() { Ok(h) => { dbg!(&h); - h.spawn(sender.send(el)); + let sender = sender.clone(); + h.spawn(async move { + sender.send(el).await.ok(); + }); // can not send result Ok(()) } From 35d1f654e1f7b39c9c49f7093ba79a59456b4e92 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 13 Aug 2024 12:52:13 +0300 Subject: [PATCH 3/6] replace async_channel also in the os dependent impls of RouteMonitor --- iroh-net/src/net/netmon/android.rs | 3 ++- iroh-net/src/net/netmon/linux.rs | 4 ++-- iroh-net/src/net/netmon/windows.rs | 3 ++- iroh-net/src/util.rs | 11 ++++++----- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/iroh-net/src/net/netmon/android.rs b/iroh-net/src/net/netmon/android.rs index f92eb721f0..ace7e8f326 100644 --- a/iroh-net/src/net/netmon/android.rs +++ b/iroh-net/src/net/netmon/android.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use tokio::sync::mpsc; use super::actor::NetworkMessage; @@ -6,7 +7,7 @@ use super::actor::NetworkMessage; pub(super) struct RouteMonitor {} impl RouteMonitor { - pub(super) fn new(_sender: async_channel::Sender) -> Result { + pub(super) fn new(_sender: mpsc::Sender) -> Result { // Very sad monitor. Android doesn't allow us to do this Ok(RouteMonitor {}) diff --git a/iroh-net/src/net/netmon/linux.rs b/iroh-net/src/net/netmon/linux.rs index 12976b37e8..7a422ad9c3 100644 --- a/iroh-net/src/net/netmon/linux.rs +++ b/iroh-net/src/net/netmon/linux.rs @@ -9,7 +9,7 @@ use netlink_packet_core::NetlinkPayload; use netlink_packet_route::{address, constants::*, route, RtnlMessage}; use netlink_sys::{AsyncSocket, SocketAddr}; use rtnetlink::new_connection; -use tokio::task::JoinHandle; +use tokio::{sync::mpsc, task::JoinHandle}; use tracing::{info, trace, warn}; use crate::net::ip::is_link_local; @@ -49,7 +49,7 @@ macro_rules! get_nla { } impl RouteMonitor { - pub(super) fn new(sender: async_channel::Sender) -> Result { + pub(super) fn new(sender: mpsc::Sender) -> Result { let (mut conn, mut _handle, mut messages) = new_connection()?; // Specify flags to listen on. diff --git a/iroh-net/src/net/netmon/windows.rs b/iroh-net/src/net/netmon/windows.rs index da77899d0f..548c03e044 100644 --- a/iroh-net/src/net/netmon/windows.rs +++ b/iroh-net/src/net/netmon/windows.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::Result; use libc::c_void; +use tokio::sync::mpsc; use tracing::{trace, warn}; use windows::Win32::{ Foundation::{BOOLEAN, HANDLE as Handle}, @@ -19,7 +20,7 @@ pub(super) struct RouteMonitor { } impl RouteMonitor { - pub(super) fn new(sender: async_channel::Sender) -> Result { + pub(super) fn new(sender: mpsc::Sender) -> Result { // Register two callbacks with the windows api let mut cb_handler = CallbackHandler::default(); diff --git a/iroh-net/src/util.rs b/iroh-net/src/util.rs index 1fcbb7945b..90bf63b721 100644 --- a/iroh-net/src/util.rs +++ b/iroh-net/src/util.rs @@ -7,9 +7,9 @@ use std::{ task::{Context, Poll}, }; -use tokio::sync::mpsc; use futures_lite::future::Boxed as BoxFuture; use futures_util::{future::Shared, FutureExt}; +use tokio::sync::mpsc; pub mod chain; @@ -134,11 +134,13 @@ pub(crate) fn relay_only_mode() -> bool { std::option_env!("DEV_RELAY_ONLY").is_some() } -/// Send an element blocking, automtically handling the existence of a runtime. -pub fn send_blocking(sender: &mpsc::Sender, el: T) -> std::result::Result<(), mpsc::error::SendError> { +/// Send an element blocking, automatically handling the existence of a runtime. +pub fn send_blocking( + sender: &mpsc::Sender, + el: T, +) -> std::result::Result<(), mpsc::error::SendError> { match tokio::runtime::Handle::try_current() { Ok(h) => { - dbg!(&h); let sender = sender.clone(); h.spawn(async move { sender.send(el).await.ok(); @@ -153,7 +155,6 @@ pub fn send_blocking(sender: &mpsc::Sender, el: T) -> std: } } - #[cfg(test)] mod tests { use super::*; From 6e3f280c109dfd93fc27ca795099fa741216e641 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 13 Aug 2024 14:16:51 +0300 Subject: [PATCH 4/6] remove the weird send_blocking thing --- .../src/discovery/local_swarm_discovery.rs | 14 +++--- iroh-net/src/util.rs | 49 ------------------- 2 files changed, 8 insertions(+), 55 deletions(-) diff --git a/iroh-net/src/discovery/local_swarm_discovery.rs b/iroh-net/src/discovery/local_swarm_discovery.rs index 9b3af51be1..41af08f59b 100644 --- a/iroh-net/src/discovery/local_swarm_discovery.rs +++ b/iroh-net/src/discovery/local_swarm_discovery.rs @@ -20,7 +20,7 @@ use tokio::{sync::mpsc, task::JoinSet}; use crate::{ discovery::{Discovery, DiscoveryItem}, - util::{send_blocking, AbortingJoinHandle}, + util::AbortingJoinHandle, AddrInfo, Endpoint, NodeId, }; @@ -194,6 +194,7 @@ impl LocalSwarmDiscovery { socketaddrs: BTreeSet, rt: &tokio::runtime::Handle, ) -> Result { + let spawn_rt = rt.clone(); let callback = move |node_id: &str, peer: &Peer| { trace!( node_id, @@ -201,11 +202,12 @@ impl LocalSwarmDiscovery { "Received peer information from LocalSwarmDiscovery" ); - send_blocking( - &sender, - Message::Discovery(node_id.to_string(), peer.clone()), - ) - .ok(); + let sender = sender.clone(); + let node_id = node_id.to_string(); + let peer = peer.clone(); + spawn_rt.spawn(async move { + sender.send(Message::Discovery(node_id, peer)).await.ok(); + }); }; let addrs = LocalSwarmDiscovery::socketaddrs_to_addrs(socketaddrs); let mut discoverer = diff --git a/iroh-net/src/util.rs b/iroh-net/src/util.rs index 90bf63b721..4986c773b1 100644 --- a/iroh-net/src/util.rs +++ b/iroh-net/src/util.rs @@ -9,7 +9,6 @@ use std::{ use futures_lite::future::Boxed as BoxFuture; use futures_util::{future::Shared, FutureExt}; -use tokio::sync::mpsc; pub mod chain; @@ -133,51 +132,3 @@ impl Future for MaybeFuture { pub(crate) fn relay_only_mode() -> bool { std::option_env!("DEV_RELAY_ONLY").is_some() } - -/// Send an element blocking, automatically handling the existence of a runtime. -pub fn send_blocking( - sender: &mpsc::Sender, - el: T, -) -> std::result::Result<(), mpsc::error::SendError> { - match tokio::runtime::Handle::try_current() { - Ok(h) => { - let sender = sender.clone(); - h.spawn(async move { - sender.send(el).await.ok(); - }); - // can not send result - Ok(()) - } - Err(err) => { - dbg!(err); - sender.blocking_send(el) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_send_blocking_no_runtime() { - let (sender, mut receiver) = mpsc::channel(2); - - send_blocking(&sender, "hello").unwrap(); - send_blocking(&sender, "world").unwrap(); - - assert_eq!(receiver.blocking_recv().unwrap(), "hello"); - assert_eq!(receiver.blocking_recv().unwrap(), "world"); - } - - #[tokio::test] - async fn test_send_blocking_with_runtime() { - let (sender, mut receiver) = mpsc::channel(2); - - send_blocking(&sender, "hello").unwrap(); - send_blocking(&sender, "world").unwrap(); - - assert_eq!(receiver.recv().await.unwrap(), "hello"); - assert_eq!(receiver.recv().await.unwrap(), "world"); - } -} From 145830cab23a23b6f82cf93981696c0e0a659bc0 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 13 Aug 2024 14:30:22 +0300 Subject: [PATCH 5/6] use blocking_send in windows (note: should this be try_send? --- iroh-net/src/net/netmon/windows.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh-net/src/net/netmon/windows.rs b/iroh-net/src/net/netmon/windows.rs index 548c03e044..1b60a425cf 100644 --- a/iroh-net/src/net/netmon/windows.rs +++ b/iroh-net/src/net/netmon/windows.rs @@ -27,14 +27,14 @@ impl RouteMonitor { // 1. Unicast Address Changes let s = sender.clone(); cb_handler.register_unicast_address_change_callback(Box::new(move || { - if let Err(err) = s.send_blocking(NetworkMessage::Change) { + if let Err(err) = s.blocking_send(NetworkMessage::Change) { warn!("unable to send: unicast change notification: {:?}", err); } }))?; // 2. Route Changes cb_handler.register_route_change_callback(Box::new(move || { - if let Err(err) = sender.send_blocking(NetworkMessage::Change) { + if let Err(err) = sender.blocking_send(NetworkMessage::Change) { warn!("unable to send: route change notification: {:?}", err); } }))?; From 9603ce4817bfc5350c4bbce4b6dbd12c7533ee94 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 13 Aug 2024 19:22:50 +0300 Subject: [PATCH 6/6] fix features --- iroh-net/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index f2085f2927..4c00a036c2 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -130,7 +130,7 @@ harness = false duct = "0.13.6" [features] -default = ["metrics", "local_swarm_discovery", "iroh-relay"] +default = ["metrics"] iroh-relay = [ "dep:tokio-rustls-acme", "dep:axum",