Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasi-sockets: Introduce UDP streams #7243

Merged
merged 6 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions crates/test-programs/src/bin/preview2_udp_sample_application.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use test_programs::wasi::sockets::network::{
IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network,
IpAddress, IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network,
};
use test_programs::wasi::sockets::udp::{Datagram, UdpSocket};
use test_programs::wasi::sockets::udp::{OutgoingDatagram, UdpSocket};

fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddress) {
let unspecified_addr = IpSocketAddress::new(IpAddress::new_unspecified(family), 0);

let first_message = &[];
let second_message = b"Hello, world!";
let third_message = b"Greetings, planet!";
Expand All @@ -13,31 +15,32 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres
let server = UdpSocket::new(family).unwrap();

server.blocking_bind(&net, bind_address).unwrap();
let (server_incoming, _) = server.stream(None).unwrap();
let addr = server.local_address().unwrap();

let client_addr = {
let client = UdpSocket::new(family).unwrap();
client.blocking_connect(&net, addr).unwrap();
client.blocking_bind(&net, unspecified_addr).unwrap();
let (_, client_outgoing) = client.stream(Some(addr)).unwrap();

let datagrams = [
Datagram {
OutgoingDatagram {
data: first_message.to_vec(),
remote_address: addr,
remote_address: None,
},
Datagram {
OutgoingDatagram {
data: second_message.to_vec(),
remote_address: addr,
remote_address: Some(addr),
},
];
client.blocking_send(&datagrams).unwrap();
client_outgoing.blocking_send(&datagrams).unwrap();

client.local_address().unwrap()
};

{
// Check that we've received our sent messages.
// Not guaranteed to work but should work in practice.
let datagrams = server.blocking_receive(2..100).unwrap();
let datagrams = server_incoming.blocking_receive(2..100).unwrap();
assert_eq!(datagrams.len(), 2);

assert_eq!(datagrams[0].data, first_message);
Expand All @@ -50,21 +53,22 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres
// Another client
{
let client = UdpSocket::new(family).unwrap();
client.blocking_connect(&net, addr).unwrap();
client.blocking_bind(&net, unspecified_addr).unwrap();
let (_, client_outgoing) = client.stream(None).unwrap();

let datagrams = [Datagram {
let datagrams = [OutgoingDatagram {
data: third_message.to_vec(),
remote_address: addr,
remote_address: Some(addr),
}];
client.blocking_send(&datagrams).unwrap();
client_outgoing.blocking_send(&datagrams).unwrap();
}

{
// Check that we sent and received our message!
let datagrams = server.blocking_receive(1..100).unwrap();
let datagrams = server_incoming.blocking_receive(1..100).unwrap();
assert_eq!(datagrams.len(), 1);

assert_eq!(datagrams[0].data, third_message); // Not guaranteed to work but should work in practice.
assert_eq!(datagrams[0].data, third_message);
}
}

Expand Down
39 changes: 18 additions & 21 deletions crates/test-programs/src/sockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use crate::wasi::sockets::network::{
Network,
};
use crate::wasi::sockets::tcp::TcpSocket;
use crate::wasi::sockets::udp::{Datagram, UdpSocket};
use crate::wasi::sockets::udp::{
IncomingDatagram, IncomingDatagramStream, OutgoingDatagram, OutgoingDatagramStream, UdpSocket,
};
use crate::wasi::sockets::{tcp_create_socket, udp_create_socket};
use std::ops::Range;

Expand Down Expand Up @@ -142,42 +144,42 @@ impl UdpSocket {
}
}
}
}

pub fn blocking_connect(
&self,
network: &Network,
remote_address: IpSocketAddress,
) -> Result<(), ErrorCode> {
impl OutgoingDatagramStream {
fn blocking_check_send(&self, timeout: &Pollable) -> Result<u64, ErrorCode> {
let sub = self.subscribe();

self.start_connect(&network, remote_address)?;

loop {
match self.finish_connect() {
Err(ErrorCode::WouldBlock) => sub.wait(),
match self.check_send() {
Ok(0) => sub.wait_until(timeout)?,
result => return result,
}
}
}

pub fn blocking_send(&self, mut datagrams: &[Datagram]) -> Result<(), ErrorCode> {
pub fn blocking_send(&self, mut datagrams: &[OutgoingDatagram]) -> Result<(), ErrorCode> {
let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false);
let pollable = self.subscribe();

while !datagrams.is_empty() {
match self.send(datagrams) {
let permit = self.blocking_check_send(&timeout)?;
let chunk_len = datagrams.len().min(permit as usize);
match self.send(&datagrams[..chunk_len]) {
Ok(0) => {}
Ok(packets_sent) => {
datagrams = &datagrams[(packets_sent as usize)..];
let packets_sent = packets_sent as usize;
datagrams = &datagrams[packets_sent..];
}
Err(ErrorCode::WouldBlock) => pollable.wait_until(&timeout)?,
Err(err) => return Err(err),
}
}

Ok(())
}
}

pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<Datagram>, ErrorCode> {
impl IncomingDatagramStream {
pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<IncomingDatagram>, ErrorCode> {
let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false);
let pollable = self.subscribe();
let mut datagrams = vec![];
Expand All @@ -187,11 +189,6 @@ impl UdpSocket {
Ok(mut chunk) => {
datagrams.append(&mut chunk);

if datagrams.len() >= count.start as usize {
return Ok(datagrams);
}
}
Err(ErrorCode::WouldBlock) => {
if datagrams.len() >= count.start as usize {
return Ok(datagrams);
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/wasi-http/wit/deps/sockets/udp-create-socket.wit
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ interface udp-create-socket {
/// Similar to `socket(AF_INET or AF_INET6, SOCK_DGRAM, IPPROTO_UDP)` in POSIX.
///
/// This function does not require a network capability handle. This is considered to be safe because
/// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind`/`connect` is called,
/// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind` is called,
/// the socket is effectively an in-memory configuration object, unable to communicate with the outside world.
///
/// All sockets are non-blocking. Use the wasi-poll interface to block on asynchronous operations.
Expand Down
Loading
Loading