Skip to content

Commit

Permalink
wasi-sockets: Introduce UDP streams (bytecodealliance#7243)
Browse files Browse the repository at this point in the history
* Introduce UDP streams

Introduce new `inbound-datagram-stream` and `outbound-datagram-stream` types and moved `receive` and `send` methods to those respectively. These streams are returned by `bind` can be individually subscribed to. This resolves a design issue where a UDP server  would end up in a spin loop because `receive` returned EWOULDBLOCK but poll_* always returned immediately because the socket was ready for sending. In this new setup, users can poll each direction separately. Fixes WebAssembly/wasi-sockets#64

Additionally:
- Enable send-like behaviour by making `outbound-datagram::remote-address` optional. Fixes WebAssembly/wasi-sockets#57
- Dropped the `network` parameter from the `connect` call, because `bind` is now _required_ to perform IO.

* Align names with wasi-http

* Revert previous changes to `bind`. Replace `connect` with `stream`

Remove the Mutex again. Instead allow `stream` to be called multiple times, but trap if the previous streams are still active.

* The code block was treated as Rust code.

* Align more closely to wasi-io's input&output-stream

* Use `send` instead of `sendto` on connected sockets.

prtest:full
  • Loading branch information
badeend authored Oct 25, 2023
1 parent d361cf2 commit a6a9bdf
Show file tree
Hide file tree
Showing 11 changed files with 680 additions and 404 deletions.
36 changes: 20 additions & 16 deletions crates/test-programs/src/bin/preview2_udp_sample_application.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use test_programs::wasi::sockets::network::{
IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network,
IpAddress, IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network,
};
use test_programs::wasi::sockets::udp::{Datagram, UdpSocket};
use test_programs::wasi::sockets::udp::{OutgoingDatagram, UdpSocket};

fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddress) {
let unspecified_addr = IpSocketAddress::new(IpAddress::new_unspecified(family), 0);

let first_message = &[];
let second_message = b"Hello, world!";
let third_message = b"Greetings, planet!";
Expand All @@ -13,31 +15,32 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres
let server = UdpSocket::new(family).unwrap();

server.blocking_bind(&net, bind_address).unwrap();
let (server_incoming, _) = server.stream(None).unwrap();
let addr = server.local_address().unwrap();

let client_addr = {
let client = UdpSocket::new(family).unwrap();
client.blocking_connect(&net, addr).unwrap();
client.blocking_bind(&net, unspecified_addr).unwrap();
let (_, client_outgoing) = client.stream(Some(addr)).unwrap();

let datagrams = [
Datagram {
OutgoingDatagram {
data: first_message.to_vec(),
remote_address: addr,
remote_address: None,
},
Datagram {
OutgoingDatagram {
data: second_message.to_vec(),
remote_address: addr,
remote_address: Some(addr),
},
];
client.blocking_send(&datagrams).unwrap();
client_outgoing.blocking_send(&datagrams).unwrap();

client.local_address().unwrap()
};

{
// Check that we've received our sent messages.
// Not guaranteed to work but should work in practice.
let datagrams = server.blocking_receive(2..100).unwrap();
let datagrams = server_incoming.blocking_receive(2..100).unwrap();
assert_eq!(datagrams.len(), 2);

assert_eq!(datagrams[0].data, first_message);
Expand All @@ -50,21 +53,22 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres
// Another client
{
let client = UdpSocket::new(family).unwrap();
client.blocking_connect(&net, addr).unwrap();
client.blocking_bind(&net, unspecified_addr).unwrap();
let (_, client_outgoing) = client.stream(None).unwrap();

let datagrams = [Datagram {
let datagrams = [OutgoingDatagram {
data: third_message.to_vec(),
remote_address: addr,
remote_address: Some(addr),
}];
client.blocking_send(&datagrams).unwrap();
client_outgoing.blocking_send(&datagrams).unwrap();
}

{
// Check that we sent and received our message!
let datagrams = server.blocking_receive(1..100).unwrap();
let datagrams = server_incoming.blocking_receive(1..100).unwrap();
assert_eq!(datagrams.len(), 1);

assert_eq!(datagrams[0].data, third_message); // Not guaranteed to work but should work in practice.
assert_eq!(datagrams[0].data, third_message);
}
}

Expand Down
39 changes: 18 additions & 21 deletions crates/test-programs/src/sockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use crate::wasi::sockets::network::{
Network,
};
use crate::wasi::sockets::tcp::TcpSocket;
use crate::wasi::sockets::udp::{Datagram, UdpSocket};
use crate::wasi::sockets::udp::{
IncomingDatagram, IncomingDatagramStream, OutgoingDatagram, OutgoingDatagramStream, UdpSocket,
};
use crate::wasi::sockets::{tcp_create_socket, udp_create_socket};
use std::ops::Range;

Expand Down Expand Up @@ -142,42 +144,42 @@ impl UdpSocket {
}
}
}
}

pub fn blocking_connect(
&self,
network: &Network,
remote_address: IpSocketAddress,
) -> Result<(), ErrorCode> {
impl OutgoingDatagramStream {
fn blocking_check_send(&self, timeout: &Pollable) -> Result<u64, ErrorCode> {
let sub = self.subscribe();

self.start_connect(&network, remote_address)?;

loop {
match self.finish_connect() {
Err(ErrorCode::WouldBlock) => sub.wait(),
match self.check_send() {
Ok(0) => sub.wait_until(timeout)?,
result => return result,
}
}
}

pub fn blocking_send(&self, mut datagrams: &[Datagram]) -> Result<(), ErrorCode> {
pub fn blocking_send(&self, mut datagrams: &[OutgoingDatagram]) -> Result<(), ErrorCode> {
let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false);
let pollable = self.subscribe();

while !datagrams.is_empty() {
match self.send(datagrams) {
let permit = self.blocking_check_send(&timeout)?;
let chunk_len = datagrams.len().min(permit as usize);
match self.send(&datagrams[..chunk_len]) {
Ok(0) => {}
Ok(packets_sent) => {
datagrams = &datagrams[(packets_sent as usize)..];
let packets_sent = packets_sent as usize;
datagrams = &datagrams[packets_sent..];
}
Err(ErrorCode::WouldBlock) => pollable.wait_until(&timeout)?,
Err(err) => return Err(err),
}
}

Ok(())
}
}

pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<Datagram>, ErrorCode> {
impl IncomingDatagramStream {
pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<IncomingDatagram>, ErrorCode> {
let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false);
let pollable = self.subscribe();
let mut datagrams = vec![];
Expand All @@ -187,11 +189,6 @@ impl UdpSocket {
Ok(mut chunk) => {
datagrams.append(&mut chunk);

if datagrams.len() >= count.start as usize {
return Ok(datagrams);
}
}
Err(ErrorCode::WouldBlock) => {
if datagrams.len() >= count.start as usize {
return Ok(datagrams);
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/wasi-http/wit/deps/sockets/udp-create-socket.wit
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ interface udp-create-socket {
/// Similar to `socket(AF_INET or AF_INET6, SOCK_DGRAM, IPPROTO_UDP)` in POSIX.
///
/// This function does not require a network capability handle. This is considered to be safe because
/// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind`/`connect` is called,
/// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind` is called,
/// the socket is effectively an in-memory configuration object, unable to communicate with the outside world.
///
/// All sockets are non-blocking. Use the wasi-poll interface to block on asynchronous operations.
Expand Down
Loading

0 comments on commit a6a9bdf

Please sign in to comment.