Skip to content

Commit

Permalink
Introduce UDP streams
Browse files Browse the repository at this point in the history
Introduce new `inbound-datagram-stream` and `outbound-datagram-stream` types and moved `receive` and `send` methods to those respectively. These streams are returned by `bind` can be individually subscribed to. This resolves a design issue where a UDP server  would end up in a spin loop because `receive` returned EWOULDBLOCK but poll_* always returned immediately because the socket was ready for sending. In this new setup, users can poll each direction separately. Fixes WebAssembly/wasi-sockets#64

Additionally:
- Enable send-like behaviour by making `outbound-datagram::remote-address` optional. Fixes WebAssembly/wasi-sockets#57
- Dropped the `network` parameter from the `connect` call, because `bind` is now _required_ to perform IO.
  • Loading branch information
badeend committed Oct 23, 2023
1 parent 3176f03 commit 10b79d9
Show file tree
Hide file tree
Showing 7 changed files with 523 additions and 364 deletions.
37 changes: 20 additions & 17 deletions crates/test-programs/src/bin/preview2_udp_sample_application.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use test_programs::wasi::sockets::network::{
IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network,
IpAddress, IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network,
};
use test_programs::wasi::sockets::udp::{Datagram, UdpSocket};
use test_programs::wasi::sockets::udp::{OutboundDatagram, UdpSocket};

fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddress) {
let unspecified_addr = IpSocketAddress::new(IpAddress::new_unspecified(family), 0);

let first_message = &[];
let second_message = b"Hello, world!";
let third_message = b"Greetings, planet!";
Expand All @@ -12,32 +14,32 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres

let server = UdpSocket::new(family).unwrap();

server.blocking_bind(&net, bind_address).unwrap();
let (server_inbound, _) = server.blocking_bind(&net, bind_address).unwrap();
let addr = server.local_address().unwrap();

let client_addr = {
let client = UdpSocket::new(family).unwrap();
client.blocking_connect(&net, addr).unwrap();
let (_, client_outbound) = client.blocking_bind(&net, unspecified_addr).unwrap();
client.blocking_connect(addr).unwrap();

let datagrams = [
Datagram {
OutboundDatagram {
data: first_message.to_vec(),
remote_address: addr,
remote_address: None,
},
Datagram {
OutboundDatagram {
data: second_message.to_vec(),
remote_address: addr,
remote_address: Some(addr),
},
];
client.blocking_send(&datagrams).unwrap();
client_outbound.blocking_send(&datagrams).unwrap();

client.local_address().unwrap()
};

{
// Check that we've received our sent messages.
// Not guaranteed to work but should work in practice.
let datagrams = server.blocking_receive(2..100).unwrap();
let datagrams = server_inbound.blocking_receive(2..100).unwrap();
assert_eq!(datagrams.len(), 2);

assert_eq!(datagrams[0].data, first_message);
Expand All @@ -50,21 +52,22 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres
// Another client
{
let client = UdpSocket::new(family).unwrap();
client.blocking_connect(&net, addr).unwrap();
let (_, client_outbound) = client.blocking_bind(&net, unspecified_addr).unwrap();
// Send without connect

let datagrams = [Datagram {
let datagrams = [OutboundDatagram {
data: third_message.to_vec(),
remote_address: addr,
remote_address: Some(addr),
}];
client.blocking_send(&datagrams).unwrap();
client_outbound.blocking_send(&datagrams).unwrap();
}

{
// Check that we sent and received our message!
let datagrams = server.blocking_receive(1..100).unwrap();
let datagrams = server_inbound.blocking_receive(1..100).unwrap();
assert_eq!(datagrams.len(), 1);

assert_eq!(datagrams[0].data, third_message); // Not guaranteed to work but should work in practice.
assert_eq!(datagrams[0].data, third_message);
}
}

Expand Down
22 changes: 12 additions & 10 deletions crates/test-programs/src/sockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use crate::wasi::sockets::network::{
Network,
};
use crate::wasi::sockets::tcp::TcpSocket;
use crate::wasi::sockets::udp::{Datagram, UdpSocket};
use crate::wasi::sockets::udp::{
InboundDatagram, InboundDatagramStream, OutboundDatagram, OutboundDatagramStream, UdpSocket,
};
use crate::wasi::sockets::{tcp_create_socket, udp_create_socket};
use std::ops::Range;

Expand Down Expand Up @@ -130,7 +132,7 @@ impl UdpSocket {
&self,
network: &Network,
local_address: IpSocketAddress,
) -> Result<(), ErrorCode> {
) -> Result<(InboundDatagramStream, OutboundDatagramStream), ErrorCode> {
let sub = self.subscribe();

self.start_bind(&network, local_address)?;
Expand All @@ -143,14 +145,10 @@ impl UdpSocket {
}
}

pub fn blocking_connect(
&self,
network: &Network,
remote_address: IpSocketAddress,
) -> Result<(), ErrorCode> {
pub fn blocking_connect(&self, remote_address: IpSocketAddress) -> Result<(), ErrorCode> {
let sub = self.subscribe();

self.start_connect(&network, remote_address)?;
self.start_connect(remote_address)?;

loop {
match self.finish_connect() {
Expand All @@ -159,8 +157,10 @@ impl UdpSocket {
}
}
}
}

pub fn blocking_send(&self, mut datagrams: &[Datagram]) -> Result<(), ErrorCode> {
impl OutboundDatagramStream {
pub fn blocking_send(&self, mut datagrams: &[OutboundDatagram]) -> Result<(), ErrorCode> {
let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false);
let pollable = self.subscribe();

Expand All @@ -176,8 +176,10 @@ impl UdpSocket {

Ok(())
}
}

pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<Datagram>, ErrorCode> {
impl InboundDatagramStream {
pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<InboundDatagram>, ErrorCode> {
let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false);
let pollable = self.subscribe();
let mut datagrams = vec![];
Expand Down
Loading

0 comments on commit 10b79d9

Please sign in to comment.