diff --git a/crates/test-programs/src/bin/preview2_udp_sample_application.rs b/crates/test-programs/src/bin/preview2_udp_sample_application.rs index de8a0a27ca46..41da701e685e 100644 --- a/crates/test-programs/src/bin/preview2_udp_sample_application.rs +++ b/crates/test-programs/src/bin/preview2_udp_sample_application.rs @@ -1,9 +1,11 @@ use test_programs::wasi::sockets::network::{ - IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network, + IpAddress, IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network, }; -use test_programs::wasi::sockets::udp::{Datagram, UdpSocket}; +use test_programs::wasi::sockets::udp::{OutgoingDatagram, UdpSocket}; fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddress) { + let unspecified_addr = IpSocketAddress::new(IpAddress::new_unspecified(family), 0); + let first_message = &[]; let second_message = b"Hello, world!"; let third_message = b"Greetings, planet!"; @@ -13,31 +15,32 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres let server = UdpSocket::new(family).unwrap(); server.blocking_bind(&net, bind_address).unwrap(); + let (server_incoming, _) = server.stream(None).unwrap(); let addr = server.local_address().unwrap(); let client_addr = { let client = UdpSocket::new(family).unwrap(); - client.blocking_connect(&net, addr).unwrap(); + client.blocking_bind(&net, unspecified_addr).unwrap(); + let (_, client_outgoing) = client.stream(Some(addr)).unwrap(); let datagrams = [ - Datagram { + OutgoingDatagram { data: first_message.to_vec(), - remote_address: addr, + remote_address: None, }, - Datagram { + OutgoingDatagram { data: second_message.to_vec(), - remote_address: addr, + remote_address: Some(addr), }, ]; - client.blocking_send(&datagrams).unwrap(); + client_outgoing.blocking_send(&datagrams).unwrap(); client.local_address().unwrap() }; { // Check that we've received our sent messages. - // Not guaranteed to work but should work in practice. - let datagrams = server.blocking_receive(2..100).unwrap(); + let datagrams = server_incoming.blocking_receive(2..100).unwrap(); assert_eq!(datagrams.len(), 2); assert_eq!(datagrams[0].data, first_message); @@ -50,21 +53,22 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres // Another client { let client = UdpSocket::new(family).unwrap(); - client.blocking_connect(&net, addr).unwrap(); + client.blocking_bind(&net, unspecified_addr).unwrap(); + let (_, client_outgoing) = client.stream(None).unwrap(); - let datagrams = [Datagram { + let datagrams = [OutgoingDatagram { data: third_message.to_vec(), - remote_address: addr, + remote_address: Some(addr), }]; - client.blocking_send(&datagrams).unwrap(); + client_outgoing.blocking_send(&datagrams).unwrap(); } { // Check that we sent and received our message! - let datagrams = server.blocking_receive(1..100).unwrap(); + let datagrams = server_incoming.blocking_receive(1..100).unwrap(); assert_eq!(datagrams.len(), 1); - assert_eq!(datagrams[0].data, third_message); // Not guaranteed to work but should work in practice. + assert_eq!(datagrams[0].data, third_message); } } diff --git a/crates/test-programs/src/sockets.rs b/crates/test-programs/src/sockets.rs index 994e0e55e191..3033582b6066 100644 --- a/crates/test-programs/src/sockets.rs +++ b/crates/test-programs/src/sockets.rs @@ -7,7 +7,9 @@ use crate::wasi::sockets::network::{ Network, }; use crate::wasi::sockets::tcp::TcpSocket; -use crate::wasi::sockets::udp::{Datagram, UdpSocket}; +use crate::wasi::sockets::udp::{ + IncomingDatagram, IncomingDatagramStream, OutgoingDatagram, OutgoingDatagramStream, UdpSocket, +}; use crate::wasi::sockets::{tcp_create_socket, udp_create_socket}; use std::ops::Range; @@ -142,42 +144,42 @@ impl UdpSocket { } } } +} - pub fn blocking_connect( - &self, - network: &Network, - remote_address: IpSocketAddress, - ) -> Result<(), ErrorCode> { +impl OutgoingDatagramStream { + fn blocking_check_send(&self, timeout: &Pollable) -> Result { let sub = self.subscribe(); - self.start_connect(&network, remote_address)?; - loop { - match self.finish_connect() { - Err(ErrorCode::WouldBlock) => sub.wait(), + match self.check_send() { + Ok(0) => sub.wait_until(timeout)?, result => return result, } } } - pub fn blocking_send(&self, mut datagrams: &[Datagram]) -> Result<(), ErrorCode> { + pub fn blocking_send(&self, mut datagrams: &[OutgoingDatagram]) -> Result<(), ErrorCode> { let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false); - let pollable = self.subscribe(); while !datagrams.is_empty() { - match self.send(datagrams) { + let permit = self.blocking_check_send(&timeout)?; + let chunk_len = datagrams.len().min(permit as usize); + match self.send(&datagrams[..chunk_len]) { + Ok(0) => {} Ok(packets_sent) => { - datagrams = &datagrams[(packets_sent as usize)..]; + let packets_sent = packets_sent as usize; + datagrams = &datagrams[packets_sent..]; } - Err(ErrorCode::WouldBlock) => pollable.wait_until(&timeout)?, Err(err) => return Err(err), } } Ok(()) } +} - pub fn blocking_receive(&self, count: Range) -> Result, ErrorCode> { +impl IncomingDatagramStream { + pub fn blocking_receive(&self, count: Range) -> Result, ErrorCode> { let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false); let pollable = self.subscribe(); let mut datagrams = vec![]; @@ -187,11 +189,6 @@ impl UdpSocket { Ok(mut chunk) => { datagrams.append(&mut chunk); - if datagrams.len() >= count.start as usize { - return Ok(datagrams); - } - } - Err(ErrorCode::WouldBlock) => { if datagrams.len() >= count.start as usize { return Ok(datagrams); } else { diff --git a/crates/wasi-http/wit/deps/sockets/udp-create-socket.wit b/crates/wasi-http/wit/deps/sockets/udp-create-socket.wit index 4e61d30fa222..cc58234d8455 100644 --- a/crates/wasi-http/wit/deps/sockets/udp-create-socket.wit +++ b/crates/wasi-http/wit/deps/sockets/udp-create-socket.wit @@ -8,7 +8,7 @@ interface udp-create-socket { /// Similar to `socket(AF_INET or AF_INET6, SOCK_DGRAM, IPPROTO_UDP)` in POSIX. /// /// This function does not require a network capability handle. This is considered to be safe because - /// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind`/`connect` is called, + /// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind` is called, /// the socket is effectively an in-memory configuration object, unable to communicate with the outside world. /// /// All sockets are non-blocking. Use the wasi-poll interface to block on asynchronous operations. diff --git a/crates/wasi-http/wit/deps/sockets/udp.wit b/crates/wasi-http/wit/deps/sockets/udp.wit index 8641cc2c5dbd..232c2b41c7fb 100644 --- a/crates/wasi-http/wit/deps/sockets/udp.wit +++ b/crates/wasi-http/wit/deps/sockets/udp.wit @@ -3,17 +3,34 @@ interface udp { use wasi:io/poll@0.2.0-rc-2023-11-05.{pollable}; use network.{network, error-code, ip-socket-address, ip-address-family}; + /// A received datagram. + record incoming-datagram { + /// The payload. + /// + /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. + data: list, - record datagram { - data: list, // Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. + /// The source address. + /// + /// This field is guaranteed to match the remote address the stream was initialized with, if any. + /// + /// Equivalent to the `src_addr` out parameter of `recvfrom`. remote-address: ip-socket-address, + } + + /// A datagram to be sent out. + record outgoing-datagram { + /// The payload. + data: list, - /// Possible future additions: - /// local-address: ip-socket-address, // IP_PKTINFO / IP_RECVDSTADDR / IPV6_PKTINFO - /// local-interface: u32, // IP_PKTINFO / IP_RECVIF - /// ttl: u8, // IP_RECVTTL - /// dscp: u6, // IP_RECVTOS - /// ecn: u2, // IP_RECVTOS + /// The destination address. + /// + /// The requirements on this field depend on how the stream was initialized: + /// - with a remote address: this field must be None or match the stream's remote address exactly. + /// - without a remote address: this field is required. + /// + /// If this value is None, the send operation is equivalent to `send` in POSIX. Otherwise it is equivalent to `sendto`. + remote-address: option, } @@ -24,9 +41,7 @@ interface udp { /// /// If the IP address is zero (`0.0.0.0` in IPv4, `::` in IPv6), it is left to the implementation to decide which /// network interface(s) to bind to. - /// If the TCP/UDP port is zero, the socket will be bound to a random free port. - /// - /// When a socket is not explicitly bound, the first invocation to connect will implicitly bind the socket. + /// If the port is zero, the socket will be bound to a random free port. /// /// Unlike in POSIX, this function is async. This enables interactive WASI hosts to inject permission prompts. /// @@ -49,95 +64,47 @@ interface udp { start-bind: func(network: borrow, local-address: ip-socket-address) -> result<_, error-code>; finish-bind: func() -> result<_, error-code>; - /// Set the destination address. - /// - /// The local-address is updated based on the best network path to `remote-address`. - /// - /// When a destination address is set: - /// - all receive operations will only return datagrams sent from the provided `remote-address`. - /// - the `send` function can only be used to send to this destination. - /// - /// Note that this function does not generate any network traffic and the peer is not aware of this "connection". - /// - /// Unlike in POSIX, this function is async. This enables interactive WASI hosts to inject permission prompts. - /// - /// # Typical `start` errors + /// Set up inbound & outbound communication channels, optionally to a specific peer. + /// + /// This function only changes the local socket configuration and does not generate any network traffic. + /// On success, the `remote-address` of the socket is updated. The `local-address` may be updated as well, + /// based on the best network path to `remote-address`. + /// + /// When a `remote-address` is provided, the returned streams are limited to communicating with that specific peer: + /// - `send` can only be used to send to this destination. + /// - `receive` will only return datagrams sent from the provided `remote-address`. + /// + /// This method may be called multiple times on the same socket to change its association, but + /// only the most recently returned pair of streams will be operational. Implementations may trap if + /// the streams returned by a previous invocation haven't been dropped yet before calling `stream` again. + /// + /// The POSIX equivalent in pseudo-code is: + /// ```text + /// if (was previously connected) { + /// connect(s, AF_UNSPEC) + /// } + /// if (remote_address is Some) { + /// connect(s, remote_address) + /// } + /// ``` + /// + /// Unlike in POSIX, the socket must already be explicitly bound. + /// + /// # Typical errors /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The socket is already bound to a different network. The `network` passed to `connect` must be identical to the one passed to `bind`. - /// - /// # Typical `finish` errors + /// - `invalid-state`: The socket is not bound. /// - `address-in-use`: Tried to perform an implicit bind, but there were no ephemeral ports available. (EADDRINUSE, EADDRNOTAVAIL on Linux, EAGAIN on BSD) - /// - `not-in-progress`: A `connect` operation is not in progress. - /// - `would-block`: Can't finish the operation, it is still in progress. (EWOULDBLOCK, EAGAIN) + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) /// /// # References /// - /// - /// - /// - - start-connect: func(network: borrow, remote-address: ip-socket-address) -> result<_, error-code>; - finish-connect: func() -> result<_, error-code>; - - /// Receive messages on the socket. - /// - /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. - /// The returned list may contain fewer elements than requested, but never more. - /// If `max-results` is 0, this function returns successfully with an empty list. - /// - /// # Typical errors - /// - `invalid-state`: The socket is not bound to any local address. (EINVAL) - /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) - /// - `would-block`: There is no pending data available to be read at the moment. (EWOULDBLOCK, EAGAIN) - /// - /// # References - /// - - /// - - /// - - /// - - /// - - /// - - /// - - /// - - receive: func(max-results: u64) -> result, error-code>; - - /// Send messages on the socket. - /// - /// This function attempts to send all provided `datagrams` on the socket without blocking and - /// returns how many messages were actually sent (or queued for sending). - /// - /// This function semantically behaves the same as iterating the `datagrams` list and sequentially - /// sending each individual datagram until either the end of the list has been reached or the first error occurred. - /// If at least one datagram has been sent successfully, this function never returns an error. - /// - /// If the input list is empty, the function returns `ok(0)`. - /// - /// The remote address option is required. To send a message to the "connected" peer, - /// call `remote-address` to get their address. - /// - /// # Typical errors - /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) - /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) - /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The socket is in "connected" mode and the `datagram.remote-address` does not match the address passed to `connect`. (EISCONN) - /// - `invalid-state`: The socket is not bound to any local address. Unlike POSIX, this function does not perform an implicit bind. - /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) - /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) - /// - `would-block`: The send buffer is currently full. (EWOULDBLOCK, EAGAIN) - /// - /// # References - /// - - /// - - /// - - /// - - /// - - /// - - /// - - /// - - send: func(datagrams: list) -> result; + %stream: func(remote-address: option) -> result, error-code>; /// Get the current bound address. /// @@ -146,7 +113,7 @@ interface udp { /// > stored in the object pointed to by `address` is unspecified. /// /// WASI is stricter and requires `local-address` to return `invalid-state` when the socket hasn't been bound yet. - /// + /// /// # Typical errors /// - `invalid-state`: The socket is not bound to any local address. /// @@ -157,10 +124,10 @@ interface udp { /// - local-address: func() -> result; - /// Get the address set with `connect`. + /// Get the address the socket is currently streaming to. /// /// # Typical errors - /// - `invalid-state`: The socket is not connected to a remote address. (ENOTCONN) + /// - `invalid-state`: The socket is not streaming to a specific remote address. (ENOTCONN) /// /// # References /// - @@ -192,11 +159,11 @@ interface udp { /// The kernel buffer space reserved for sends/receives on this socket. /// /// Note #1: an implementation may choose to cap or round the buffer size when setting the value. - /// In other words, after setting a value, reading the same setting back may return a different value. + /// In other words, after setting a value, reading the same setting back may return a different value. /// /// Note #2: there is not necessarily a direct relationship between the kernel buffer size and the bytes of - /// actual data to be sent/received by the application, because the kernel might also use the buffer space - /// for internal metadata structures. + /// actual data to be sent/received by the application, because the kernel might also use the buffer space + /// for internal metadata structures. /// /// Equivalent to the SO_RCVBUF and SO_SNDBUF socket options. receive-buffer-size: func() -> result; @@ -210,4 +177,93 @@ interface udp { /// It's planned to be removed when `future` is natively supported in Preview3. subscribe: func() -> pollable; } + + resource incoming-datagram-stream { + /// Receive messages on the socket. + /// + /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. + /// The returned list may contain fewer elements than requested, but never more. + /// + /// This function returns successfully with an empty list when either: + /// - `max-results` is 0, or: + /// - `max-results` is greater than 0, but no results are immediately available. + /// This function never returns `error(would-block)`. + /// + /// # Typical errors + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + /// + /// # References + /// - + /// - + /// - + /// - + /// - + /// - + /// - + /// - + receive: func(max-results: u64) -> result, error-code>; + + /// Create a `pollable` which will resolve once the stream is ready to receive again. + /// + /// Note: this function is here for WASI Preview2 only. + /// It's planned to be removed when `future` is natively supported in Preview3. + subscribe: func() -> pollable; + } + + resource outgoing-datagram-stream { + /// Check readiness for sending. This function never blocks. + /// + /// Returns the number of datagrams permitted for the next call to `send`, + /// or an error. Calling `send` with more datagrams than this function has + /// permitted will trap. + /// + /// When this function returns ok(0), the `subscribe` pollable will + /// become ready when this function will report at least ok(1), or an + /// error. + /// + /// Never returns `would-block`. + check-send: func() -> result; + + /// Send messages on the socket. + /// + /// This function attempts to send all provided `datagrams` on the socket without blocking and + /// returns how many messages were actually sent (or queued for sending). This function never + /// returns `error(would-block)`. If none of the datagrams were able to be sent, `ok(0)` is returned. + /// + /// This function semantically behaves the same as iterating the `datagrams` list and sequentially + /// sending each individual datagram until either the end of the list has been reached or the first error occurred. + /// If at least one datagram has been sent successfully, this function never returns an error. + /// + /// If the input list is empty, the function returns `ok(0)`. + /// + /// Each call to `send` must be permitted by a preceding `check-send`. Implementations must trap if + /// either `check-send` was not called or `datagrams` contains more items than `check-send` permitted. + /// + /// # Typical errors + /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) + /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) + /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) + /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) + /// - `invalid-argument`: The socket is in "connected" mode and `remote-address` is `some` value that does not match the address passed to `stream`. (EISCONN) + /// - `invalid-argument`: The socket is not "connected" and no value for `remote-address` was provided. (EDESTADDRREQ) + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) + /// + /// # References + /// - + /// - + /// - + /// - + /// - + /// - + /// - + /// - + send: func(datagrams: list) -> result; + + /// Create a `pollable` which will resolve once the stream is ready to send again. + /// + /// Note: this function is here for WASI Preview2 only. + /// It's planned to be removed when `future` is natively supported in Preview3. + subscribe: func() -> pollable; + } } diff --git a/crates/wasi/src/preview2/error.rs b/crates/wasi/src/preview2/error.rs index ccae912d4668..f92317c1fef7 100644 --- a/crates/wasi/src/preview2/error.rs +++ b/crates/wasi/src/preview2/error.rs @@ -61,6 +61,13 @@ impl TrappableError { { self.err.downcast() } + + pub fn downcast_ref(&self) -> Option<&T> + where + T: Error + Send + Sync + 'static, + { + self.err.downcast_ref() + } } impl From for TrappableError diff --git a/crates/wasi/src/preview2/host/udp.rs b/crates/wasi/src/preview2/host/udp.rs index 857925eb6b1d..a97bd1bcd187 100644 --- a/crates/wasi/src/preview2/host/udp.rs +++ b/crates/wasi/src/preview2/host/udp.rs @@ -5,23 +5,27 @@ use crate::preview2::{ sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network}, sockets::udp, }, - udp::UdpState, + udp::{IncomingDatagramStream, OutgoingDatagramStream, SendState, UdpState}, + Subscribe, }; -use crate::preview2::{Pollable, SocketResult, WasiView}; +use crate::preview2::{Pollable, SocketError, SocketResult, WasiView}; +use anyhow::anyhow; +use async_trait::async_trait; use cap_net_ext::{AddressFamily, PoolExt}; use io_lifetimes::AsSocketlike; use rustix::io::Errno; use rustix::net::sockopt; +use tokio::io::Interest; use wasmtime::component::Resource; /// Theoretical maximum byte size of a UDP datagram, the real limit is lower, /// but we do not account for e.g. the transport layer here for simplicity. /// In practice, datagrams are typically less than 1500 bytes. -const MAX_UDP_DATAGRAM_SIZE: usize = 65535; +const MAX_UDP_DATAGRAM_SIZE: usize = u16::MAX as usize; impl udp::Host for T {} -impl crate::preview2::host::udp::udp::HostUdpSocket for T { +impl udp::HostUdpSocket for T { fn start_bind( &mut self, this: Resource, @@ -30,16 +34,15 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { ) -> SocketResult<()> { let table = self.table_mut(); let socket = table.get(&this)?; + let network = table.get(&network)?; + let local_address: SocketAddr = local_address.into(); match socket.udp_state { UdpState::Default => {} - UdpState::BindStarted | UdpState::Connecting(..) => { - return Err(ErrorCode::ConcurrencyConflict.into()) - } - UdpState::Bound | UdpState::Connected(..) => return Err(ErrorCode::InvalidState.into()), + UdpState::BindStarted => return Err(ErrorCode::ConcurrencyConflict.into()), + UdpState::Bound | UdpState::Connected => return Err(ErrorCode::InvalidState.into()), } - let network = table.get(&network)?; let binder = network.pool.udp_binder(local_address)?; // Perform the OS bind call. @@ -68,157 +71,58 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { } } - fn start_connect( + fn stream( &mut self, this: Resource, - network: Resource, - remote_address: IpSocketAddress, - ) -> SocketResult<()> { + remote_address: Option, + ) -> SocketResult<( + Resource, + Resource, + )> { let table = self.table_mut(); - let socket = table.get(&this)?; - let network = table.get(&network)?; - - match socket.udp_state { - UdpState::Default | UdpState::Bound => {} - UdpState::BindStarted | UdpState::Connecting(..) => { - return Err(ErrorCode::ConcurrencyConflict.into()) - } - UdpState::Connected(..) => return Err(ErrorCode::InvalidState.into()), - } - let connecter = network.pool.udp_connecter(remote_address)?; + let has_active_streams = table + .iter_children(&this)? + .any(|c| c.is::() || c.is::()); - // Do an OS `connect`. - connecter.connect_existing_udp_socket( - &*socket - .udp_socket() - .as_socketlike_view::(), - )?; - - let socket = table.get_mut(&this)?; - socket.udp_state = UdpState::Connecting(remote_address); - Ok(()) - } + if has_active_streams { + return Err(SocketError::trap(anyhow!("UDP streams not dropped yet"))); + } - fn finish_connect(&mut self, this: Resource) -> SocketResult<()> { - let table = self.table_mut(); let socket = table.get_mut(&this)?; + let remote_address = remote_address.map(SocketAddr::from); match socket.udp_state { - UdpState::Connecting(addr) => { - socket.udp_state = UdpState::Connected(addr); - Ok(()) - } - _ => Err(ErrorCode::NotInProgress.into()), + UdpState::Bound | UdpState::Connected => {} + _ => return Err(ErrorCode::InvalidState.into()), } - } - fn receive( - &mut self, - this: Resource, - max_results: u64, - ) -> SocketResult> { - if max_results == 0 { - return Ok(vec![]); + if let UdpState::Connected = socket.udp_state { + // FIXME: Allow multiple (dis)connects. This needs to be supported by rustix first. + // rustix::net::disconnect(socket.udp_socket())?; + // socket.udp_state = UdpState::Bound; + return Err(ErrorCode::NotSupported.into()); } - let table = self.table(); - let socket = table.get(&this)?; - - let udp_socket = socket.udp_socket(); - let mut datagrams = vec![]; - let mut buf = [0; MAX_UDP_DATAGRAM_SIZE]; - match socket.udp_state { - UdpState::Default | UdpState::BindStarted => return Err(ErrorCode::InvalidState.into()), - UdpState::Bound | UdpState::Connecting(..) => { - for i in 0..max_results { - match udp_socket.try_recv_from(&mut buf) { - Ok((size, remote_address)) => datagrams.push(udp::Datagram { - data: buf[..size].into(), - remote_address: remote_address.into(), - }), - Err(_e) if i > 0 => { - return Ok(datagrams); - } - Err(e) => return Err(e.into()), - } - } - } - UdpState::Connected(remote_address) => { - for i in 0..max_results { - match udp_socket.try_recv(&mut buf) { - Ok(size) => datagrams.push(udp::Datagram { - data: buf[..size].into(), - remote_address, - }), - Err(_e) if i > 0 => { - return Ok(datagrams); - } - Err(e) => return Err(e.into()), - } - } - } + if let Some(connect_addr) = remote_address { + rustix::net::connect(socket.udp_socket(), &connect_addr)?; + socket.udp_state = UdpState::Connected; } - Ok(datagrams) - } - fn send( - &mut self, - this: Resource, - datagrams: Vec, - ) -> SocketResult { - if datagrams.is_empty() { - return Ok(0); + let incoming_stream = IncomingDatagramStream { + inner: socket.inner.clone(), + remote_address, + }; + let outgoing_stream = OutgoingDatagramStream { + inner: socket.inner.clone(), + remote_address, + send_state: SendState::Idle, }; - let table = self.table(); - let socket = table.get(&this)?; - let udp_socket = socket.udp_socket(); - let mut count = 0; - match socket.udp_state { - UdpState::Default | UdpState::BindStarted => return Err(ErrorCode::InvalidState.into()), - UdpState::Bound | UdpState::Connecting(..) => { - for udp::Datagram { - data, - remote_address, - } in datagrams - { - match udp_socket.try_send_to(&data, remote_address.into()) { - Ok(_size) => count += 1, - Err(_e) if count > 0 => { - return Ok(count); - } - Err(e) => return Err(e.into()), - } - } - } - UdpState::Connected(addr) => { - let addr = SocketAddr::from(addr); - for udp::Datagram { - data, - remote_address, - } in datagrams - { - if SocketAddr::from(remote_address) != addr { - // From WIT documentation: - // If at least one datagram has been sent successfully, this function never returns an error. - if count == 0 { - return Err(ErrorCode::InvalidArgument.into()); - } else { - return Ok(count); - } - } - match udp_socket.try_send(&data) { - Ok(_size) => count += 1, - Err(_e) if count > 0 => { - return Ok(count); - } - Err(e) => return Err(e.into()), - } - } - } - } - Ok(count) + Ok(( + self.table_mut().push_child(incoming_stream, &this)?, + self.table_mut().push_child(outgoing_stream, &this)?, + )) } fn local_address(&mut self, this: Resource) -> SocketResult { @@ -354,3 +258,226 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { Ok(()) } } + +impl udp::HostIncomingDatagramStream for T { + fn receive( + &mut self, + this: Resource, + max_results: u64, + ) -> SocketResult> { + // Returns Ok(None) when the message was dropped. + fn recv_one( + stream: &IncomingDatagramStream, + ) -> SocketResult> { + let mut buf = [0; MAX_UDP_DATAGRAM_SIZE]; + let (size, received_addr) = stream.inner.try_recv_from(&mut buf)?; + + match stream.remote_address { + Some(connected_addr) if connected_addr != received_addr => { + // Normally, this should have already been checked for us by the OS. + return Ok(None); + } + _ => {} + } + + // FIXME: check permission to receive from `received_addr`. + Ok(Some(udp::IncomingDatagram { + data: buf[..size].into(), + remote_address: received_addr.into(), + })) + } + + let table = self.table(); + let stream = table.get(&this)?; + let max_results: usize = max_results.try_into().unwrap_or(usize::MAX); + + if max_results == 0 { + return Ok(vec![]); + } + + let mut datagrams = vec![]; + + while datagrams.len() < max_results { + match recv_one(stream) { + Ok(Some(datagram)) => { + datagrams.push(datagram); + } + Ok(None) => { + // Message was dropped + } + Err(_) if datagrams.len() > 0 => { + return Ok(datagrams); + } + Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => { + return Ok(datagrams); + } + Err(e) => { + return Err(e); + } + } + } + + Ok(datagrams) + } + + fn subscribe( + &mut self, + this: Resource, + ) -> anyhow::Result> { + crate::preview2::poll::subscribe(self.table_mut(), this) + } + + fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { + let table = self.table_mut(); + + // As in the filesystem implementation, we assume closing a socket + // doesn't block. + let dropped = table.delete(this)?; + drop(dropped); + + Ok(()) + } +} + +#[async_trait] +impl Subscribe for IncomingDatagramStream { + async fn ready(&mut self) { + // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. + self.inner + .ready(Interest::READABLE) + .await + .expect("failed to await UDP socket readiness"); + } +} + +impl udp::HostOutgoingDatagramStream for T { + fn check_send(&mut self, this: Resource) -> SocketResult { + let table = self.table_mut(); + let stream = table.get_mut(&this)?; + + let permit = match stream.send_state { + SendState::Idle => { + const PERMIT: usize = 16; + stream.send_state = SendState::Permitted(PERMIT); + PERMIT + } + SendState::Permitted(n) => n, + SendState::Waiting => 0, + }; + + Ok(permit.try_into().unwrap()) + } + + fn send( + &mut self, + this: Resource, + datagrams: Vec, + ) -> SocketResult { + fn send_one( + stream: &OutgoingDatagramStream, + datagram: &udp::OutgoingDatagram, + ) -> SocketResult<()> { + if datagram.data.len() > MAX_UDP_DATAGRAM_SIZE { + return Err(ErrorCode::DatagramTooLarge.into()); + } + + let provided_addr = datagram.remote_address.map(SocketAddr::from); + let addr = match (stream.remote_address, provided_addr) { + (None, Some(addr)) => addr, + (Some(addr), None) => addr, + (Some(connected_addr), Some(provided_addr)) if connected_addr == provided_addr => { + connected_addr + } + _ => return Err(ErrorCode::InvalidArgument.into()), + }; + + // FIXME: check permission to send to `addr`. + if stream.remote_address == Some(addr) { + stream.inner.try_send(&datagram.data)?; + } else { + stream.inner.try_send_to(&datagram.data, addr)?; + } + + Ok(()) + } + + let table = self.table_mut(); + let stream = table.get_mut(&this)?; + + match stream.send_state { + SendState::Permitted(n) if n >= datagrams.len() => { + stream.send_state = SendState::Idle; + } + SendState::Permitted(_) => { + return Err(SocketError::trap(anyhow::anyhow!( + "unpermitted: argument exceeds permitted size" + ))) + } + SendState::Idle | SendState::Waiting => { + return Err(SocketError::trap(anyhow::anyhow!( + "unpermitted: must call check-send first" + ))) + } + } + + if datagrams.is_empty() { + return Ok(0); + } + + let mut count = 0; + + for datagram in datagrams { + match send_one(stream, &datagram) { + Ok(_) => count += 1, + Err(_) if count > 0 => { + // WIT: "If at least one datagram has been sent successfully, this function never returns an error." + return Ok(count); + } + Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => { + stream.send_state = SendState::Waiting; + return Ok(count); + } + Err(e) => { + return Err(e); + } + } + } + + Ok(count) + } + + fn subscribe( + &mut self, + this: Resource, + ) -> anyhow::Result> { + crate::preview2::poll::subscribe(self.table_mut(), this) + } + + fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { + let table = self.table_mut(); + + // As in the filesystem implementation, we assume closing a socket + // doesn't block. + let dropped = table.delete(this)?; + drop(dropped); + + Ok(()) + } +} + +#[async_trait] +impl Subscribe for OutgoingDatagramStream { + async fn ready(&mut self) { + match self.send_state { + SendState::Idle | SendState::Permitted(_) => {} + SendState::Waiting => { + // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. + self.inner + .ready(Interest::WRITABLE) + .await + .expect("failed to await UDP socket readiness"); + self.send_state = SendState::Idle; + } + } + } +} diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index abdfe4df5ca3..f9d0977453b2 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -161,6 +161,8 @@ pub mod bindings { "wasi:sockets/network/network": super::network::Network, "wasi:sockets/tcp/tcp-socket": super::tcp::TcpSocket, "wasi:sockets/udp/udp-socket": super::udp::UdpSocket, + "wasi:sockets/udp/incoming-datagram-stream": super::udp::IncomingDatagramStream, + "wasi:sockets/udp/outgoing-datagram-stream": super::udp::OutgoingDatagramStream, "wasi:sockets/ip-name-lookup/resolve-address-stream": super::ip_name_lookup::ResolveAddressStream, "wasi:filesystem/types/directory-entry-stream": super::filesystem::ReaddirIterator, "wasi:filesystem/types/descriptor": super::filesystem::Descriptor, diff --git a/crates/wasi/src/preview2/table.rs b/crates/wasi/src/preview2/table.rs index 2454fce03272..444e7b25f363 100644 --- a/crates/wasi/src/preview2/table.rs +++ b/crates/wasi/src/preview2/table.rs @@ -234,6 +234,21 @@ impl Table { (item, v) }) } + + /// Iterate over all children belonging to the provided parent + pub fn iter_children( + &self, + parent: &Resource, + ) -> Result, TableError> + where + T: 'static, + { + let parent_entry = self.map.get(&parent.rep()).ok_or(TableError::NotPresent)?; + Ok(parent_entry.children.iter().map(|child_index| { + let child = self.map.get(child_index).expect("missing child"); + child.entry.as_ref() + })) + } } impl Default for Table { diff --git a/crates/wasi/src/preview2/udp.rs b/crates/wasi/src/preview2/udp.rs index 2c879f99a6c9..504dba0b17f5 100644 --- a/crates/wasi/src/preview2/udp.rs +++ b/crates/wasi/src/preview2/udp.rs @@ -1,12 +1,11 @@ -use crate::preview2::bindings::sockets::network::IpSocketAddress; use crate::preview2::poll::Subscribe; use crate::preview2::with_ambient_tokio_runtime; use async_trait::async_trait; use cap_net_ext::{AddressFamily, Blocking, UdpSocketExt}; use io_lifetimes::raw::{FromRawSocketlike, IntoRawSocketlike}; use std::io; +use std::net::SocketAddr; use std::sync::Arc; -use tokio::io::Interest; /// The state of a UDP socket. /// @@ -23,11 +22,8 @@ pub(crate) enum UdpState { /// is not yet listening for connections. Bound, - /// A connect call is in progress. - Connecting(IpSocketAddress), - /// The socket is "connected" to a peer address. - Connected(IpSocketAddress), + Connected, } /// A host UDP socket, plus associated bookkeeping. @@ -49,44 +45,60 @@ pub struct UdpSocket { #[async_trait] impl Subscribe for UdpSocket { async fn ready(&mut self) { - // Some states are ready immediately. - match self.udp_state { - UdpState::BindStarted => return, - _ => {} - } - - // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. - self.inner - .ready(Interest::READABLE | Interest::WRITABLE) - .await - .expect("failed to await UDP socket readiness"); + // None of the socket-level operations block natively } } impl UdpSocket { /// Create a new socket in the given family. pub fn new(family: AddressFamily) -> io::Result { - // Create a new host socket and set it to non-blocking, which is needed - // by our async implementation. - let udp_socket = cap_std::net::UdpSocket::new(family, Blocking::No)?; - Self::from_udp_socket(udp_socket, family) - } - - pub fn from_udp_socket( - udp_socket: cap_std::net::UdpSocket, - family: AddressFamily, - ) -> io::Result { - let fd = udp_socket.into_raw_socketlike(); - let std_socket = unsafe { std::net::UdpSocket::from_raw_socketlike(fd) }; - let socket = with_ambient_tokio_runtime(|| tokio::net::UdpSocket::try_from(std_socket))?; - Ok(Self { - inner: Arc::new(socket), + Ok(UdpSocket { + inner: Arc::new(Self::new_tokio_socket(family)?), udp_state: UdpState::Default, family, }) } + fn new_tokio_socket(family: AddressFamily) -> io::Result { + // Create a new host socket and set it to non-blocking, which is needed + // by our async implementation. + let cap_std_socket = cap_std::net::UdpSocket::new(family, Blocking::No)?; + let fd = cap_std_socket.into_raw_socketlike(); + let std_socket = unsafe { std::net::UdpSocket::from_raw_socketlike(fd) }; + let tokio_socket = + with_ambient_tokio_runtime(|| tokio::net::UdpSocket::try_from(std_socket))?; + + Ok(tokio_socket) + } + pub fn udp_socket(&self) -> &tokio::net::UdpSocket { &self.inner } } + +pub struct IncomingDatagramStream { + pub(crate) inner: Arc, + + /// If this has a value, the stream is "connected". + pub(crate) remote_address: Option, +} + +pub struct OutgoingDatagramStream { + pub(crate) inner: Arc, + + /// If this has a value, the stream is "connected". + pub(crate) remote_address: Option, + + pub(crate) send_state: SendState, +} + +pub(crate) enum SendState { + /// Waiting for the API consumer to call `check-send`. + Idle, + + /// Ready to send up to x datagrams. + Permitted(usize), + + /// Waiting for the OS. + Waiting, +} diff --git a/crates/wasi/wit/deps/sockets/udp-create-socket.wit b/crates/wasi/wit/deps/sockets/udp-create-socket.wit index 4e61d30fa222..cc58234d8455 100644 --- a/crates/wasi/wit/deps/sockets/udp-create-socket.wit +++ b/crates/wasi/wit/deps/sockets/udp-create-socket.wit @@ -8,7 +8,7 @@ interface udp-create-socket { /// Similar to `socket(AF_INET or AF_INET6, SOCK_DGRAM, IPPROTO_UDP)` in POSIX. /// /// This function does not require a network capability handle. This is considered to be safe because - /// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind`/`connect` is called, + /// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind` is called, /// the socket is effectively an in-memory configuration object, unable to communicate with the outside world. /// /// All sockets are non-blocking. Use the wasi-poll interface to block on asynchronous operations. diff --git a/crates/wasi/wit/deps/sockets/udp.wit b/crates/wasi/wit/deps/sockets/udp.wit index 8641cc2c5dbd..232c2b41c7fb 100644 --- a/crates/wasi/wit/deps/sockets/udp.wit +++ b/crates/wasi/wit/deps/sockets/udp.wit @@ -3,17 +3,34 @@ interface udp { use wasi:io/poll@0.2.0-rc-2023-11-05.{pollable}; use network.{network, error-code, ip-socket-address, ip-address-family}; + /// A received datagram. + record incoming-datagram { + /// The payload. + /// + /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. + data: list, - record datagram { - data: list, // Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. + /// The source address. + /// + /// This field is guaranteed to match the remote address the stream was initialized with, if any. + /// + /// Equivalent to the `src_addr` out parameter of `recvfrom`. remote-address: ip-socket-address, + } + + /// A datagram to be sent out. + record outgoing-datagram { + /// The payload. + data: list, - /// Possible future additions: - /// local-address: ip-socket-address, // IP_PKTINFO / IP_RECVDSTADDR / IPV6_PKTINFO - /// local-interface: u32, // IP_PKTINFO / IP_RECVIF - /// ttl: u8, // IP_RECVTTL - /// dscp: u6, // IP_RECVTOS - /// ecn: u2, // IP_RECVTOS + /// The destination address. + /// + /// The requirements on this field depend on how the stream was initialized: + /// - with a remote address: this field must be None or match the stream's remote address exactly. + /// - without a remote address: this field is required. + /// + /// If this value is None, the send operation is equivalent to `send` in POSIX. Otherwise it is equivalent to `sendto`. + remote-address: option, } @@ -24,9 +41,7 @@ interface udp { /// /// If the IP address is zero (`0.0.0.0` in IPv4, `::` in IPv6), it is left to the implementation to decide which /// network interface(s) to bind to. - /// If the TCP/UDP port is zero, the socket will be bound to a random free port. - /// - /// When a socket is not explicitly bound, the first invocation to connect will implicitly bind the socket. + /// If the port is zero, the socket will be bound to a random free port. /// /// Unlike in POSIX, this function is async. This enables interactive WASI hosts to inject permission prompts. /// @@ -49,95 +64,47 @@ interface udp { start-bind: func(network: borrow, local-address: ip-socket-address) -> result<_, error-code>; finish-bind: func() -> result<_, error-code>; - /// Set the destination address. - /// - /// The local-address is updated based on the best network path to `remote-address`. - /// - /// When a destination address is set: - /// - all receive operations will only return datagrams sent from the provided `remote-address`. - /// - the `send` function can only be used to send to this destination. - /// - /// Note that this function does not generate any network traffic and the peer is not aware of this "connection". - /// - /// Unlike in POSIX, this function is async. This enables interactive WASI hosts to inject permission prompts. - /// - /// # Typical `start` errors + /// Set up inbound & outbound communication channels, optionally to a specific peer. + /// + /// This function only changes the local socket configuration and does not generate any network traffic. + /// On success, the `remote-address` of the socket is updated. The `local-address` may be updated as well, + /// based on the best network path to `remote-address`. + /// + /// When a `remote-address` is provided, the returned streams are limited to communicating with that specific peer: + /// - `send` can only be used to send to this destination. + /// - `receive` will only return datagrams sent from the provided `remote-address`. + /// + /// This method may be called multiple times on the same socket to change its association, but + /// only the most recently returned pair of streams will be operational. Implementations may trap if + /// the streams returned by a previous invocation haven't been dropped yet before calling `stream` again. + /// + /// The POSIX equivalent in pseudo-code is: + /// ```text + /// if (was previously connected) { + /// connect(s, AF_UNSPEC) + /// } + /// if (remote_address is Some) { + /// connect(s, remote_address) + /// } + /// ``` + /// + /// Unlike in POSIX, the socket must already be explicitly bound. + /// + /// # Typical errors /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The socket is already bound to a different network. The `network` passed to `connect` must be identical to the one passed to `bind`. - /// - /// # Typical `finish` errors + /// - `invalid-state`: The socket is not bound. /// - `address-in-use`: Tried to perform an implicit bind, but there were no ephemeral ports available. (EADDRINUSE, EADDRNOTAVAIL on Linux, EAGAIN on BSD) - /// - `not-in-progress`: A `connect` operation is not in progress. - /// - `would-block`: Can't finish the operation, it is still in progress. (EWOULDBLOCK, EAGAIN) + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) /// /// # References /// - /// - /// - /// - - start-connect: func(network: borrow, remote-address: ip-socket-address) -> result<_, error-code>; - finish-connect: func() -> result<_, error-code>; - - /// Receive messages on the socket. - /// - /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. - /// The returned list may contain fewer elements than requested, but never more. - /// If `max-results` is 0, this function returns successfully with an empty list. - /// - /// # Typical errors - /// - `invalid-state`: The socket is not bound to any local address. (EINVAL) - /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) - /// - `would-block`: There is no pending data available to be read at the moment. (EWOULDBLOCK, EAGAIN) - /// - /// # References - /// - - /// - - /// - - /// - - /// - - /// - - /// - - /// - - receive: func(max-results: u64) -> result, error-code>; - - /// Send messages on the socket. - /// - /// This function attempts to send all provided `datagrams` on the socket without blocking and - /// returns how many messages were actually sent (or queued for sending). - /// - /// This function semantically behaves the same as iterating the `datagrams` list and sequentially - /// sending each individual datagram until either the end of the list has been reached or the first error occurred. - /// If at least one datagram has been sent successfully, this function never returns an error. - /// - /// If the input list is empty, the function returns `ok(0)`. - /// - /// The remote address option is required. To send a message to the "connected" peer, - /// call `remote-address` to get their address. - /// - /// # Typical errors - /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) - /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) - /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The socket is in "connected" mode and the `datagram.remote-address` does not match the address passed to `connect`. (EISCONN) - /// - `invalid-state`: The socket is not bound to any local address. Unlike POSIX, this function does not perform an implicit bind. - /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) - /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) - /// - `would-block`: The send buffer is currently full. (EWOULDBLOCK, EAGAIN) - /// - /// # References - /// - - /// - - /// - - /// - - /// - - /// - - /// - - /// - - send: func(datagrams: list) -> result; + %stream: func(remote-address: option) -> result, error-code>; /// Get the current bound address. /// @@ -146,7 +113,7 @@ interface udp { /// > stored in the object pointed to by `address` is unspecified. /// /// WASI is stricter and requires `local-address` to return `invalid-state` when the socket hasn't been bound yet. - /// + /// /// # Typical errors /// - `invalid-state`: The socket is not bound to any local address. /// @@ -157,10 +124,10 @@ interface udp { /// - local-address: func() -> result; - /// Get the address set with `connect`. + /// Get the address the socket is currently streaming to. /// /// # Typical errors - /// - `invalid-state`: The socket is not connected to a remote address. (ENOTCONN) + /// - `invalid-state`: The socket is not streaming to a specific remote address. (ENOTCONN) /// /// # References /// - @@ -192,11 +159,11 @@ interface udp { /// The kernel buffer space reserved for sends/receives on this socket. /// /// Note #1: an implementation may choose to cap or round the buffer size when setting the value. - /// In other words, after setting a value, reading the same setting back may return a different value. + /// In other words, after setting a value, reading the same setting back may return a different value. /// /// Note #2: there is not necessarily a direct relationship between the kernel buffer size and the bytes of - /// actual data to be sent/received by the application, because the kernel might also use the buffer space - /// for internal metadata structures. + /// actual data to be sent/received by the application, because the kernel might also use the buffer space + /// for internal metadata structures. /// /// Equivalent to the SO_RCVBUF and SO_SNDBUF socket options. receive-buffer-size: func() -> result; @@ -210,4 +177,93 @@ interface udp { /// It's planned to be removed when `future` is natively supported in Preview3. subscribe: func() -> pollable; } + + resource incoming-datagram-stream { + /// Receive messages on the socket. + /// + /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. + /// The returned list may contain fewer elements than requested, but never more. + /// + /// This function returns successfully with an empty list when either: + /// - `max-results` is 0, or: + /// - `max-results` is greater than 0, but no results are immediately available. + /// This function never returns `error(would-block)`. + /// + /// # Typical errors + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + /// + /// # References + /// - + /// - + /// - + /// - + /// - + /// - + /// - + /// - + receive: func(max-results: u64) -> result, error-code>; + + /// Create a `pollable` which will resolve once the stream is ready to receive again. + /// + /// Note: this function is here for WASI Preview2 only. + /// It's planned to be removed when `future` is natively supported in Preview3. + subscribe: func() -> pollable; + } + + resource outgoing-datagram-stream { + /// Check readiness for sending. This function never blocks. + /// + /// Returns the number of datagrams permitted for the next call to `send`, + /// or an error. Calling `send` with more datagrams than this function has + /// permitted will trap. + /// + /// When this function returns ok(0), the `subscribe` pollable will + /// become ready when this function will report at least ok(1), or an + /// error. + /// + /// Never returns `would-block`. + check-send: func() -> result; + + /// Send messages on the socket. + /// + /// This function attempts to send all provided `datagrams` on the socket without blocking and + /// returns how many messages were actually sent (or queued for sending). This function never + /// returns `error(would-block)`. If none of the datagrams were able to be sent, `ok(0)` is returned. + /// + /// This function semantically behaves the same as iterating the `datagrams` list and sequentially + /// sending each individual datagram until either the end of the list has been reached or the first error occurred. + /// If at least one datagram has been sent successfully, this function never returns an error. + /// + /// If the input list is empty, the function returns `ok(0)`. + /// + /// Each call to `send` must be permitted by a preceding `check-send`. Implementations must trap if + /// either `check-send` was not called or `datagrams` contains more items than `check-send` permitted. + /// + /// # Typical errors + /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) + /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) + /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) + /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) + /// - `invalid-argument`: The socket is in "connected" mode and `remote-address` is `some` value that does not match the address passed to `stream`. (EISCONN) + /// - `invalid-argument`: The socket is not "connected" and no value for `remote-address` was provided. (EDESTADDRREQ) + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) + /// + /// # References + /// - + /// - + /// - + /// - + /// - + /// - + /// - + /// - + send: func(datagrams: list) -> result; + + /// Create a `pollable` which will resolve once the stream is ready to send again. + /// + /// Note: this function is here for WASI Preview2 only. + /// It's planned to be removed when `future` is natively supported in Preview3. + subscribe: func() -> pollable; + } }