Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: Expose UdpSocket::try_clone #6226

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions tokio/src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,16 @@ impl UdpSocket {
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.io.take_error()
}

/// Creates a new independently owned handle to the underlying socket.
///
/// Cloned sockets don't share wakers, allowing multiple tasks to perform I/O in the same
/// direction concurrently.
///
/// See [`std::net::UdpSocket::try_clone`] for further details.
pub fn try_clone(&self) -> io::Result<Self> {
Self::from_std(self.as_socket().try_clone()?.into())
}
}

impl TryFrom<std::net::UdpSocket> for UdpSocket {
Expand Down
47 changes: 47 additions & 0 deletions tokio/tests/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,50 @@ async fn poll_ready() {
}
}
}

#[tokio::test]
async fn concurrent_recv() {
use std::sync::atomic::{AtomicBool, Ordering};

// Create two copies of the same server socket
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let server2 = server.try_clone().unwrap();
let saddr = server.local_addr().unwrap();

// Create client
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();

const MSG1: [u8; 3] = [1, 2, 3];
const MSG2: [u8; 3] = [4, 5, 6];

let got_msg1 = Arc::new(AtomicBool::new(false));
let got_msg2 = Arc::new(AtomicBool::new(false));

async fn recv(socket: UdpSocket, m1: Arc<AtomicBool>, m2: Arc<AtomicBool>) {
let mut buf = [0; 3];
socket.recv(&mut buf).await.unwrap();
match buf {
MSG1 => &m1,
MSG2 => &m2,
_ => return,
}
.store(true, Ordering::Relaxed);
}

// Try to receive a message on each clone of the socket
let recv1 = tokio::spawn(recv(server, got_msg1.clone(), got_msg2.clone()));
let recv2 = tokio::spawn(recv(server2, got_msg1.clone(), got_msg2.clone()));

// Let the spawned tasks run
tokio::task::yield_now().await;

client.send_to(&MSG1, saddr).await.unwrap();
client.send_to(&MSG2, saddr).await.unwrap();

// Both receivers should wake and read a datagram
recv1.await.unwrap();
recv2.await.unwrap();

// Both messages should have been received
assert!(got_msg1.load(Ordering::Relaxed) && got_msg1.load(Ordering::Relaxed));
}
Loading