From 10b79d94a0e077d49d8932607a5cafb5203f4697 Mon Sep 17 00:00:00 2001 From: Dave Bakker Date: Sat, 14 Oct 2023 18:21:47 +0200 Subject: [PATCH 1/6] Introduce UDP streams Introduce new `inbound-datagram-stream` and `outbound-datagram-stream` types and moved `receive` and `send` methods to those respectively. These streams are returned by `bind` can be individually subscribed to. This resolves a design issue where a UDP server would end up in a spin loop because `receive` returned EWOULDBLOCK but poll_* always returned immediately because the socket was ready for sending. In this new setup, users can poll each direction separately. Fixes https://github.com/WebAssembly/wasi-sockets/issues/64 Additionally: - Enable send-like behaviour by making `outbound-datagram::remote-address` optional. Fixes https://github.com/WebAssembly/wasi-sockets/pull/57 - Dropped the `network` parameter from the `connect` call, because `bind` is now _required_ to perform IO. --- .../bin/preview2_udp_sample_application.rs | 37 +- crates/test-programs/src/sockets.rs | 22 +- crates/wasi-http/wit/deps/sockets/udp.wit | 183 ++++++---- crates/wasi/src/preview2/host/udp.rs | 327 ++++++++++-------- crates/wasi/src/preview2/mod.rs | 2 + crates/wasi/src/preview2/udp.rs | 133 +++++-- crates/wasi/wit/deps/sockets/udp.wit | 183 ++++++---- 7 files changed, 523 insertions(+), 364 deletions(-) diff --git a/crates/test-programs/src/bin/preview2_udp_sample_application.rs b/crates/test-programs/src/bin/preview2_udp_sample_application.rs index de8a0a27ca46..0422aea9195b 100644 --- a/crates/test-programs/src/bin/preview2_udp_sample_application.rs +++ b/crates/test-programs/src/bin/preview2_udp_sample_application.rs @@ -1,9 +1,11 @@ use test_programs::wasi::sockets::network::{ - IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network, + IpAddress, IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network, }; -use test_programs::wasi::sockets::udp::{Datagram, UdpSocket}; +use test_programs::wasi::sockets::udp::{OutboundDatagram, UdpSocket}; fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddress) { + let unspecified_addr = IpSocketAddress::new(IpAddress::new_unspecified(family), 0); + let first_message = &[]; let second_message = b"Hello, world!"; let third_message = b"Greetings, planet!"; @@ -12,32 +14,32 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres let server = UdpSocket::new(family).unwrap(); - server.blocking_bind(&net, bind_address).unwrap(); + let (server_inbound, _) = server.blocking_bind(&net, bind_address).unwrap(); let addr = server.local_address().unwrap(); let client_addr = { let client = UdpSocket::new(family).unwrap(); - client.blocking_connect(&net, addr).unwrap(); + let (_, client_outbound) = client.blocking_bind(&net, unspecified_addr).unwrap(); + client.blocking_connect(addr).unwrap(); let datagrams = [ - Datagram { + OutboundDatagram { data: first_message.to_vec(), - remote_address: addr, + remote_address: None, }, - Datagram { + OutboundDatagram { data: second_message.to_vec(), - remote_address: addr, + remote_address: Some(addr), }, ]; - client.blocking_send(&datagrams).unwrap(); + client_outbound.blocking_send(&datagrams).unwrap(); client.local_address().unwrap() }; { // Check that we've received our sent messages. - // Not guaranteed to work but should work in practice. - let datagrams = server.blocking_receive(2..100).unwrap(); + let datagrams = server_inbound.blocking_receive(2..100).unwrap(); assert_eq!(datagrams.len(), 2); assert_eq!(datagrams[0].data, first_message); @@ -50,21 +52,22 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres // Another client { let client = UdpSocket::new(family).unwrap(); - client.blocking_connect(&net, addr).unwrap(); + let (_, client_outbound) = client.blocking_bind(&net, unspecified_addr).unwrap(); + // Send without connect - let datagrams = [Datagram { + let datagrams = [OutboundDatagram { data: third_message.to_vec(), - remote_address: addr, + remote_address: Some(addr), }]; - client.blocking_send(&datagrams).unwrap(); + client_outbound.blocking_send(&datagrams).unwrap(); } { // Check that we sent and received our message! - let datagrams = server.blocking_receive(1..100).unwrap(); + let datagrams = server_inbound.blocking_receive(1..100).unwrap(); assert_eq!(datagrams.len(), 1); - assert_eq!(datagrams[0].data, third_message); // Not guaranteed to work but should work in practice. + assert_eq!(datagrams[0].data, third_message); } } diff --git a/crates/test-programs/src/sockets.rs b/crates/test-programs/src/sockets.rs index 994e0e55e191..def2002332fc 100644 --- a/crates/test-programs/src/sockets.rs +++ b/crates/test-programs/src/sockets.rs @@ -7,7 +7,9 @@ use crate::wasi::sockets::network::{ Network, }; use crate::wasi::sockets::tcp::TcpSocket; -use crate::wasi::sockets::udp::{Datagram, UdpSocket}; +use crate::wasi::sockets::udp::{ + InboundDatagram, InboundDatagramStream, OutboundDatagram, OutboundDatagramStream, UdpSocket, +}; use crate::wasi::sockets::{tcp_create_socket, udp_create_socket}; use std::ops::Range; @@ -130,7 +132,7 @@ impl UdpSocket { &self, network: &Network, local_address: IpSocketAddress, - ) -> Result<(), ErrorCode> { + ) -> Result<(InboundDatagramStream, OutboundDatagramStream), ErrorCode> { let sub = self.subscribe(); self.start_bind(&network, local_address)?; @@ -143,14 +145,10 @@ impl UdpSocket { } } - pub fn blocking_connect( - &self, - network: &Network, - remote_address: IpSocketAddress, - ) -> Result<(), ErrorCode> { + pub fn blocking_connect(&self, remote_address: IpSocketAddress) -> Result<(), ErrorCode> { let sub = self.subscribe(); - self.start_connect(&network, remote_address)?; + self.start_connect(remote_address)?; loop { match self.finish_connect() { @@ -159,8 +157,10 @@ impl UdpSocket { } } } +} - pub fn blocking_send(&self, mut datagrams: &[Datagram]) -> Result<(), ErrorCode> { +impl OutboundDatagramStream { + pub fn blocking_send(&self, mut datagrams: &[OutboundDatagram]) -> Result<(), ErrorCode> { let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false); let pollable = self.subscribe(); @@ -176,8 +176,10 @@ impl UdpSocket { Ok(()) } +} - pub fn blocking_receive(&self, count: Range) -> Result, ErrorCode> { +impl InboundDatagramStream { + pub fn blocking_receive(&self, count: Range) -> Result, ErrorCode> { let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false); let pollable = self.subscribe(); let mut datagrams = vec![]; diff --git a/crates/wasi-http/wit/deps/sockets/udp.wit b/crates/wasi-http/wit/deps/sockets/udp.wit index 8641cc2c5dbd..731d7ec01b93 100644 --- a/crates/wasi-http/wit/deps/sockets/udp.wit +++ b/crates/wasi-http/wit/deps/sockets/udp.wit @@ -3,17 +3,35 @@ interface udp { use wasi:io/poll@0.2.0-rc-2023-11-05.{pollable}; use network.{network, error-code, ip-socket-address, ip-address-family}; - - record datagram { - data: list, // Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. + /// A received datagram. + record inbound-datagram { + /// The payload. + /// + /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. + data: list, + + /// The source address. + /// + /// If the socket is connected, this field is guaranteed to equal the configured remote address. + /// + /// Equivalent to the `src_addr` out parameter of `recvfrom`. remote-address: ip-socket-address, + } - /// Possible future additions: - /// local-address: ip-socket-address, // IP_PKTINFO / IP_RECVDSTADDR / IPV6_PKTINFO - /// local-interface: u32, // IP_PKTINFO / IP_RECVIF - /// ttl: u8, // IP_RECVTTL - /// dscp: u6, // IP_RECVTOS - /// ecn: u2, // IP_RECVTOS + /// A datagram to be sent out. + record outbound-datagram { + /// The payload. + /// + /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. + data: list, + + /// The destination address. + /// + /// On connected sockets this field must be None or match the connected remote address exactly. + /// On unconnected sockets, this field is required. + /// + /// If this value is None, the send operation is equivalent to `send` in POSIX. Otherwise it is equivalent to `sendto`. + remote-address: option, } @@ -47,11 +65,11 @@ interface udp { /// - /// - start-bind: func(network: borrow, local-address: ip-socket-address) -> result<_, error-code>; - finish-bind: func() -> result<_, error-code>; + finish-bind: func() -> result, error-code>; /// Set the destination address. /// - /// The local-address is updated based on the best network path to `remote-address`. + /// The local-address may be updated based on the best network path to `remote-address`. /// /// When a destination address is set: /// - all receive operations will only return datagrams sent from the provided `remote-address`. @@ -59,14 +77,15 @@ interface udp { /// /// Note that this function does not generate any network traffic and the peer is not aware of this "connection". /// - /// Unlike in POSIX, this function is async. This enables interactive WASI hosts to inject permission prompts. + /// Unlike in POSIX: + /// - The socket must already be explicitly bound. + /// - This function is async. This enables interactive WASI hosts to inject permission prompts. /// /// # Typical `start` errors /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The socket is already bound to a different network. The `network` passed to `connect` must be identical to the one passed to `bind`. /// /// # Typical `finish` errors /// - `address-in-use`: Tried to perform an implicit bind, but there were no ephemeral ports available. (EADDRINUSE, EADDRNOTAVAIL on Linux, EAGAIN on BSD) @@ -78,67 +97,9 @@ interface udp { /// - /// - /// - - start-connect: func(network: borrow, remote-address: ip-socket-address) -> result<_, error-code>; + start-connect: func(remote-address: ip-socket-address) -> result<_, error-code>; finish-connect: func() -> result<_, error-code>; - /// Receive messages on the socket. - /// - /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. - /// The returned list may contain fewer elements than requested, but never more. - /// If `max-results` is 0, this function returns successfully with an empty list. - /// - /// # Typical errors - /// - `invalid-state`: The socket is not bound to any local address. (EINVAL) - /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) - /// - `would-block`: There is no pending data available to be read at the moment. (EWOULDBLOCK, EAGAIN) - /// - /// # References - /// - - /// - - /// - - /// - - /// - - /// - - /// - - /// - - receive: func(max-results: u64) -> result, error-code>; - - /// Send messages on the socket. - /// - /// This function attempts to send all provided `datagrams` on the socket without blocking and - /// returns how many messages were actually sent (or queued for sending). - /// - /// This function semantically behaves the same as iterating the `datagrams` list and sequentially - /// sending each individual datagram until either the end of the list has been reached or the first error occurred. - /// If at least one datagram has been sent successfully, this function never returns an error. - /// - /// If the input list is empty, the function returns `ok(0)`. - /// - /// The remote address option is required. To send a message to the "connected" peer, - /// call `remote-address` to get their address. - /// - /// # Typical errors - /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) - /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) - /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The socket is in "connected" mode and the `datagram.remote-address` does not match the address passed to `connect`. (EISCONN) - /// - `invalid-state`: The socket is not bound to any local address. Unlike POSIX, this function does not perform an implicit bind. - /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) - /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) - /// - `would-block`: The send buffer is currently full. (EWOULDBLOCK, EAGAIN) - /// - /// # References - /// - - /// - - /// - - /// - - /// - - /// - - /// - - /// - - send: func(datagrams: list) -> result; - /// Get the current bound address. /// /// POSIX mentions: @@ -146,7 +107,7 @@ interface udp { /// > stored in the object pointed to by `address` is unspecified. /// /// WASI is stricter and requires `local-address` to return `invalid-state` when the socket hasn't been bound yet. - /// + /// /// # Typical errors /// - `invalid-state`: The socket is not bound to any local address. /// @@ -192,11 +153,11 @@ interface udp { /// The kernel buffer space reserved for sends/receives on this socket. /// /// Note #1: an implementation may choose to cap or round the buffer size when setting the value. - /// In other words, after setting a value, reading the same setting back may return a different value. + /// In other words, after setting a value, reading the same setting back may return a different value. /// /// Note #2: there is not necessarily a direct relationship between the kernel buffer size and the bytes of - /// actual data to be sent/received by the application, because the kernel might also use the buffer space - /// for internal metadata structures. + /// actual data to be sent/received by the application, because the kernel might also use the buffer space + /// for internal metadata structures. /// /// Equivalent to the SO_RCVBUF and SO_SNDBUF socket options. receive-buffer-size: func() -> result; @@ -210,4 +171,74 @@ interface udp { /// It's planned to be removed when `future` is natively supported in Preview3. subscribe: func() -> pollable; } + + resource inbound-datagram-stream { + /// Receive messages on the socket. + /// + /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. + /// The returned list may contain fewer elements than requested, but never more. + /// If `max-results` is 0, this function returns successfully with an empty list. + /// + /// # Typical errors + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + /// - `would-block`: There is no pending data available to be read at the moment. (EWOULDBLOCK, EAGAIN) + /// + /// # References + /// - + /// - + /// - + /// - + /// - + /// - + /// - + /// - + receive: func(max-results: u64) -> result, error-code>; + + /// Create a `pollable` which will resolve once the stream is ready to receive again. + /// + /// Note: this function is here for WASI Preview2 only. + /// It's planned to be removed when `future` is natively supported in Preview3. + subscribe: func() -> pollable; + } + + resource outbound-datagram-stream { + /// Send messages on the socket. + /// + /// This function attempts to send all provided `datagrams` on the socket without blocking and + /// returns how many messages were actually sent (or queued for sending). + /// + /// This function semantically behaves the same as iterating the `datagrams` list and sequentially + /// sending each individual datagram until either the end of the list has been reached or the first error occurred. + /// If at least one datagram has been sent successfully, this function never returns an error. + /// + /// If the input list is empty, the function returns `ok(0)`. + /// + /// # Typical errors + /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) + /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) + /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) + /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) + /// - `invalid-argument`: The socket is in "connected" mode and `remote-address` is `some` value that does not match the address passed to `connect`. (EISCONN) + /// - `invalid-argument`: The socket is not "connected" and no value for `remote-address` was provided. (EDESTADDRREQ) + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) + /// - `would-block`: The send buffer is currently full. (EWOULDBLOCK, EAGAIN) + /// + /// # References + /// - + /// - + /// - + /// - + /// - + /// - + /// - + /// - + send: func(datagrams: list) -> result; + + /// Create a `pollable` which will resolve once the stream is ready to send again. + /// + /// Note: this function is here for WASI Preview2 only. + /// It's planned to be removed when `future` is natively supported in Preview3. + subscribe: func() -> pollable; + } } diff --git a/crates/wasi/src/preview2/host/udp.rs b/crates/wasi/src/preview2/host/udp.rs index 857925eb6b1d..e959021dcdc0 100644 --- a/crates/wasi/src/preview2/host/udp.rs +++ b/crates/wasi/src/preview2/host/udp.rs @@ -5,7 +5,7 @@ use crate::preview2::{ sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network}, sockets::udp, }, - udp::UdpState, + udp::{UdpSocketInner, UdpState}, }; use crate::preview2::{Pollable, SocketResult, WasiView}; use cap_net_ext::{AddressFamily, PoolExt}; @@ -21,15 +21,17 @@ const MAX_UDP_DATAGRAM_SIZE: usize = 65535; impl udp::Host for T {} -impl crate::preview2::host::udp::udp::HostUdpSocket for T { +impl udp::HostUdpSocket for T { fn start_bind( &mut self, this: Resource, network: Resource, local_address: IpSocketAddress, ) -> SocketResult<()> { - let table = self.table_mut(); - let socket = table.get(&this)?; + let table = self.table(); + let mut socket = table.get(&this)?.inner.lock().unwrap(); + let network = table.get(&network)?; + let local_address: SocketAddr = local_address.into(); match socket.udp_state { UdpState::Default => {} @@ -39,7 +41,6 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { UdpState::Bound | UdpState::Connected(..) => return Err(ErrorCode::InvalidState.into()), } - let network = table.get(&network)?; let binder = network.pool.udp_binder(local_address)?; // Perform the OS bind call. @@ -49,34 +50,48 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { .as_socketlike_view::(), )?; - let socket = table.get_mut(&this)?; socket.udp_state = UdpState::BindStarted; Ok(()) } - fn finish_bind(&mut self, this: Resource) -> SocketResult<()> { + fn finish_bind( + &mut self, + this: Resource, + ) -> SocketResult<( + Resource, + Resource, + )> { let table = self.table_mut(); - let socket = table.get_mut(&this)?; + let outer = table.get(&this)?; + { + let mut socket = outer.inner.lock().unwrap(); - match socket.udp_state { - UdpState::BindStarted => { - socket.udp_state = UdpState::Bound; - Ok(()) + match socket.udp_state { + UdpState::BindStarted => {} + _ => return Err(ErrorCode::NotInProgress.into()), } - _ => Err(ErrorCode::NotInProgress.into()), + + socket.udp_state = UdpState::Bound; } + + let inbound_stream = outer.new_inbound_stream(); + let outbound_stream = outer.new_outbound_stream(); + + Ok(( + self.table_mut().push_child(inbound_stream, &this)?, + self.table_mut().push_child(outbound_stream, &this)?, + )) } fn start_connect( &mut self, this: Resource, - network: Resource, remote_address: IpSocketAddress, ) -> SocketResult<()> { - let table = self.table_mut(); - let socket = table.get(&this)?; - let network = table.get(&network)?; + let table = self.table(); + let mut socket = table.get(&this)?.inner.lock().unwrap(); + let remote_address: SocketAddr = remote_address.into(); match socket.udp_state { UdpState::Default | UdpState::Bound => {} @@ -86,23 +101,15 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { UdpState::Connected(..) => return Err(ErrorCode::InvalidState.into()), } - let connecter = network.pool.udp_connecter(remote_address)?; + rustix::net::connect(socket.udp_socket(), &remote_address)?; - // Do an OS `connect`. - connecter.connect_existing_udp_socket( - &*socket - .udp_socket() - .as_socketlike_view::(), - )?; - - let socket = table.get_mut(&this)?; socket.udp_state = UdpState::Connecting(remote_address); Ok(()) } fn finish_connect(&mut self, this: Resource) -> SocketResult<()> { - let table = self.table_mut(); - let socket = table.get_mut(&this)?; + let table = self.table(); + let mut socket = table.get(&this)?.inner.lock().unwrap(); match socket.udp_state { UdpState::Connecting(addr) => { @@ -113,117 +120,9 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { } } - fn receive( - &mut self, - this: Resource, - max_results: u64, - ) -> SocketResult> { - if max_results == 0 { - return Ok(vec![]); - } - - let table = self.table(); - let socket = table.get(&this)?; - - let udp_socket = socket.udp_socket(); - let mut datagrams = vec![]; - let mut buf = [0; MAX_UDP_DATAGRAM_SIZE]; - match socket.udp_state { - UdpState::Default | UdpState::BindStarted => return Err(ErrorCode::InvalidState.into()), - UdpState::Bound | UdpState::Connecting(..) => { - for i in 0..max_results { - match udp_socket.try_recv_from(&mut buf) { - Ok((size, remote_address)) => datagrams.push(udp::Datagram { - data: buf[..size].into(), - remote_address: remote_address.into(), - }), - Err(_e) if i > 0 => { - return Ok(datagrams); - } - Err(e) => return Err(e.into()), - } - } - } - UdpState::Connected(remote_address) => { - for i in 0..max_results { - match udp_socket.try_recv(&mut buf) { - Ok(size) => datagrams.push(udp::Datagram { - data: buf[..size].into(), - remote_address, - }), - Err(_e) if i > 0 => { - return Ok(datagrams); - } - Err(e) => return Err(e.into()), - } - } - } - } - Ok(datagrams) - } - - fn send( - &mut self, - this: Resource, - datagrams: Vec, - ) -> SocketResult { - if datagrams.is_empty() { - return Ok(0); - }; - let table = self.table(); - let socket = table.get(&this)?; - - let udp_socket = socket.udp_socket(); - let mut count = 0; - match socket.udp_state { - UdpState::Default | UdpState::BindStarted => return Err(ErrorCode::InvalidState.into()), - UdpState::Bound | UdpState::Connecting(..) => { - for udp::Datagram { - data, - remote_address, - } in datagrams - { - match udp_socket.try_send_to(&data, remote_address.into()) { - Ok(_size) => count += 1, - Err(_e) if count > 0 => { - return Ok(count); - } - Err(e) => return Err(e.into()), - } - } - } - UdpState::Connected(addr) => { - let addr = SocketAddr::from(addr); - for udp::Datagram { - data, - remote_address, - } in datagrams - { - if SocketAddr::from(remote_address) != addr { - // From WIT documentation: - // If at least one datagram has been sent successfully, this function never returns an error. - if count == 0 { - return Err(ErrorCode::InvalidArgument.into()); - } else { - return Ok(count); - } - } - match udp_socket.try_send(&data) { - Ok(_size) => count += 1, - Err(_e) if count > 0 => { - return Ok(count); - } - Err(e) => return Err(e.into()), - } - } - } - } - Ok(count) - } - fn local_address(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); let addr = socket .udp_socket() .as_socketlike_view::() @@ -233,7 +132,7 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { fn remote_address(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); let addr = socket .udp_socket() .as_socketlike_view::() @@ -246,7 +145,7 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { this: Resource, ) -> Result { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); match socket.family { AddressFamily::Ipv4 => Ok(IpAddressFamily::Ipv4), AddressFamily::Ipv6 => Ok(IpAddressFamily::Ipv6), @@ -255,19 +154,19 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { fn ipv6_only(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); Ok(sockopt::get_ipv6_v6only(socket.udp_socket())?) } fn set_ipv6_only(&mut self, this: Resource, value: bool) -> SocketResult<()> { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); Ok(sockopt::set_ipv6_v6only(socket.udp_socket(), value)?) } fn unicast_hop_limit(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); // We don't track whether the socket is IPv4 or IPv6 so try one and // fall back to the other. @@ -288,7 +187,7 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { value: u8, ) -> SocketResult<()> { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); // We don't track whether the socket is IPv4 or IPv6 so try one and // fall back to the other. @@ -301,7 +200,7 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { fn receive_buffer_size(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); Ok(sockopt::get_socket_recv_buffer_size(socket.udp_socket())? as u64) } @@ -311,7 +210,7 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { value: u64, ) -> SocketResult<()> { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); let value = value.try_into().map_err(|_| ErrorCode::OutOfMemory)?; Ok(sockopt::set_socket_recv_buffer_size( socket.udp_socket(), @@ -321,7 +220,7 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { fn send_buffer_size(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); Ok(sockopt::get_socket_send_buffer_size(socket.udp_socket())? as u64) } @@ -331,7 +230,7 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { value: u64, ) -> SocketResult<()> { let table = self.table(); - let socket = table.get(&this)?; + let socket = table.get(&this)?.inner.lock().unwrap(); let value = value.try_into().map_err(|_| ErrorCode::OutOfMemory)?; Ok(sockopt::set_socket_send_buffer_size( socket.udp_socket(), @@ -354,3 +253,137 @@ impl crate::preview2::host::udp::udp::HostUdpSocket for T { Ok(()) } } + +impl udp::HostInboundDatagramStream for T { + fn receive( + &mut self, + this: Resource, + max_results: u64, + ) -> SocketResult> { + fn recv_one(socket: &UdpSocketInner) -> SocketResult { + let mut buf = [0; MAX_UDP_DATAGRAM_SIZE]; + let (size, received_addr) = socket.udp_socket().try_recv_from(&mut buf)?; + + match socket.remote_address() { + Some(connected_addr) if connected_addr != received_addr => { + // Normally, this should have already been checked for us by the OS. + // Drop message... + return Err(ErrorCode::WouldBlock.into()); + } + _ => {} + } + + // FIXME: check permission to receive from `received_addr`. + Ok(udp::InboundDatagram { + data: buf[..size].into(), + remote_address: received_addr.into(), + }) + } + + let table = self.table(); + let socket = table.get(&this)?.inner.lock().unwrap(); + + if max_results == 0 { + return Ok(vec![]); + } + + let mut datagrams = vec![]; + + for _ in 0..max_results { + match recv_one(&socket) { + Ok(datagram) => { + datagrams.push(datagram); + } + Err(_e) if datagrams.len() > 0 => { + return Ok(datagrams); + } + Err(e) => return Err(e), + } + } + + Ok(datagrams) + } + + fn subscribe( + &mut self, + this: Resource, + ) -> anyhow::Result> { + crate::preview2::poll::subscribe(self.table_mut(), this) + } + + fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { + let table = self.table_mut(); + + // As in the filesystem implementation, we assume closing a socket + // doesn't block. + let dropped = table.delete(this)?; + drop(dropped); + + Ok(()) + } +} + +impl udp::HostOutboundDatagramStream for T { + fn send( + &mut self, + this: Resource, + datagrams: Vec, + ) -> SocketResult { + fn send_one(socket: &UdpSocketInner, datagram: &udp::OutboundDatagram) -> SocketResult<()> { + let provided_addr = datagram.remote_address.map(SocketAddr::from); + let addr = match (socket.remote_address(), provided_addr) { + (None, Some(addr)) => addr, + (Some(addr), None) => addr, + (Some(connected_addr), Some(provided_addr)) if connected_addr == provided_addr => { + connected_addr + } + _ => return Err(ErrorCode::InvalidArgument.into()), + }; + + // FIXME: check permission to send to `addr`. + socket.udp_socket().try_send_to(&datagram.data, addr)?; + + Ok(()) + } + + let table = self.table(); + let socket = table.get(&this)?.inner.lock().unwrap(); + + if datagrams.is_empty() { + return Ok(0); + } + + let mut count = 0; + + for datagram in datagrams { + match send_one(&socket, &datagram) { + Ok(_size) => count += 1, + Err(_e) if count > 0 => { + // WIT: "If at least one datagram has been sent successfully, this function never returns an error." + return Ok(count); + } + Err(e) => return Err(e.into()), + } + } + + Ok(count) + } + + fn subscribe( + &mut self, + this: Resource, + ) -> anyhow::Result> { + crate::preview2::poll::subscribe(self.table_mut(), this) + } + + fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { + let table = self.table_mut(); + + // As in the filesystem implementation, we assume closing a socket + // doesn't block. + let dropped = table.delete(this)?; + drop(dropped); + + Ok(()) + } +} diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index abdfe4df5ca3..fdd0a240abf9 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -161,6 +161,8 @@ pub mod bindings { "wasi:sockets/network/network": super::network::Network, "wasi:sockets/tcp/tcp-socket": super::tcp::TcpSocket, "wasi:sockets/udp/udp-socket": super::udp::UdpSocket, + "wasi:sockets/udp/inbound-datagram-stream": super::udp::InboundDatagramStream, + "wasi:sockets/udp/outbound-datagram-stream": super::udp::OutboundDatagramStream, "wasi:sockets/ip-name-lookup/resolve-address-stream": super::ip_name_lookup::ResolveAddressStream, "wasi:filesystem/types/directory-entry-stream": super::filesystem::ReaddirIterator, "wasi:filesystem/types/descriptor": super::filesystem::Descriptor, diff --git a/crates/wasi/src/preview2/udp.rs b/crates/wasi/src/preview2/udp.rs index 2c879f99a6c9..b08479ea410a 100644 --- a/crates/wasi/src/preview2/udp.rs +++ b/crates/wasi/src/preview2/udp.rs @@ -1,11 +1,11 @@ -use crate::preview2::bindings::sockets::network::IpSocketAddress; use crate::preview2::poll::Subscribe; use crate::preview2::with_ambient_tokio_runtime; use async_trait::async_trait; use cap_net_ext::{AddressFamily, Blocking, UdpSocketExt}; use io_lifetimes::raw::{FromRawSocketlike, IntoRawSocketlike}; use std::io; -use std::sync::Arc; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; use tokio::io::Interest; /// The state of a UDP socket. @@ -24,20 +24,15 @@ pub(crate) enum UdpState { Bound, /// A connect call is in progress. - Connecting(IpSocketAddress), + Connecting(SocketAddr), /// The socket is "connected" to a peer address. - Connected(IpSocketAddress), + Connected(SocketAddr), } -/// A host UDP socket, plus associated bookkeeping. -/// -/// The inner state is wrapped in an Arc because the same underlying socket is -/// used for implementing the stream types. -pub struct UdpSocket { - /// The part of a `UdpSocket` which is reference-counted so that we - /// can pass it to async tasks. - pub(crate) inner: Arc, +/// Operational data shared between the UdpSocket, InboundDatagramStream & OutboundDatagramStream +pub(crate) struct UdpSocketInner { + pub(crate) native_socket: Arc, /// The current state in the bind/connect progression. pub(crate) udp_state: UdpState, @@ -46,47 +41,109 @@ pub struct UdpSocket { pub(crate) family: AddressFamily, } +/// A host UDP socket. +pub struct UdpSocket { + pub(crate) inner: Arc>, +} + #[async_trait] impl Subscribe for UdpSocket { async fn ready(&mut self) { - // Some states are ready immediately. - match self.udp_state { - UdpState::BindStarted => return, - _ => {} - } - - // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. - self.inner - .ready(Interest::READABLE | Interest::WRITABLE) - .await - .expect("failed to await UDP socket readiness"); + // None of the socket-level operations block natively } } impl UdpSocket { /// Create a new socket in the given family. pub fn new(family: AddressFamily) -> io::Result { + let inner = UdpSocketInner { + native_socket: Arc::new(Self::new_tokio_socket(family)?), + udp_state: UdpState::Default, + family, + }; + + Ok(UdpSocket { + inner: Arc::new(Mutex::new(inner)), + }) + } + + fn new_tokio_socket(family: AddressFamily) -> io::Result { // Create a new host socket and set it to non-blocking, which is needed // by our async implementation. - let udp_socket = cap_std::net::UdpSocket::new(family, Blocking::No)?; - Self::from_udp_socket(udp_socket, family) + let cap_std_socket = cap_std::net::UdpSocket::new(family, Blocking::No)?; + let fd = cap_std_socket.into_raw_socketlike(); + let std_socket = unsafe { std::net::UdpSocket::from_raw_socketlike(fd) }; + let tokio_socket = + with_ambient_tokio_runtime(|| tokio::net::UdpSocket::try_from(std_socket))?; + + Ok(tokio_socket) } - pub fn from_udp_socket( - udp_socket: cap_std::net::UdpSocket, - family: AddressFamily, - ) -> io::Result { - let fd = udp_socket.into_raw_socketlike(); - let std_socket = unsafe { std::net::UdpSocket::from_raw_socketlike(fd) }; - let socket = with_ambient_tokio_runtime(|| tokio::net::UdpSocket::try_from(std_socket))?; - Ok(Self { - inner: Arc::new(socket), - udp_state: UdpState::Default, - family, - }) + pub(crate) fn new_inbound_stream(&self) -> InboundDatagramStream { + InboundDatagramStream { + inner: self.inner.clone(), + } + } + + pub(crate) fn new_outbound_stream(&self) -> OutboundDatagramStream { + OutboundDatagramStream { + inner: self.inner.clone(), + } + } +} + +impl UdpSocketInner { + pub fn remote_address(&self) -> Option { + match self.udp_state { + UdpState::Connected(addr) => Some(addr), + UdpState::Connecting(_) // Don't use address. From the consumer's perspective connecting isn't finished yet. + | _ => None, + } } pub fn udp_socket(&self) -> &tokio::net::UdpSocket { - &self.inner + &self.native_socket + } +} + +pub struct InboundDatagramStream { + pub(crate) inner: Arc>, +} + +#[async_trait] +impl Subscribe for InboundDatagramStream { + async fn ready(&mut self) { + let native_socket = { + // Make sure the lock guard is released before the await. + let inner = self.inner.lock().unwrap(); + inner.native_socket.clone() + }; + + // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. + native_socket + .ready(Interest::READABLE) + .await + .expect("failed to await UDP socket readiness"); + } +} + +pub struct OutboundDatagramStream { + pub(crate) inner: Arc>, +} + +#[async_trait] +impl Subscribe for OutboundDatagramStream { + async fn ready(&mut self) { + let native_socket = { + // Make sure the lock guard is released before the await. + let inner = self.inner.lock().unwrap(); + inner.native_socket.clone() + }; + + // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. + native_socket + .ready(Interest::WRITABLE) + .await + .expect("failed to await UDP socket readiness"); } } diff --git a/crates/wasi/wit/deps/sockets/udp.wit b/crates/wasi/wit/deps/sockets/udp.wit index 8641cc2c5dbd..731d7ec01b93 100644 --- a/crates/wasi/wit/deps/sockets/udp.wit +++ b/crates/wasi/wit/deps/sockets/udp.wit @@ -3,17 +3,35 @@ interface udp { use wasi:io/poll@0.2.0-rc-2023-11-05.{pollable}; use network.{network, error-code, ip-socket-address, ip-address-family}; - - record datagram { - data: list, // Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. + /// A received datagram. + record inbound-datagram { + /// The payload. + /// + /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. + data: list, + + /// The source address. + /// + /// If the socket is connected, this field is guaranteed to equal the configured remote address. + /// + /// Equivalent to the `src_addr` out parameter of `recvfrom`. remote-address: ip-socket-address, + } - /// Possible future additions: - /// local-address: ip-socket-address, // IP_PKTINFO / IP_RECVDSTADDR / IPV6_PKTINFO - /// local-interface: u32, // IP_PKTINFO / IP_RECVIF - /// ttl: u8, // IP_RECVTTL - /// dscp: u6, // IP_RECVTOS - /// ecn: u2, // IP_RECVTOS + /// A datagram to be sent out. + record outbound-datagram { + /// The payload. + /// + /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. + data: list, + + /// The destination address. + /// + /// On connected sockets this field must be None or match the connected remote address exactly. + /// On unconnected sockets, this field is required. + /// + /// If this value is None, the send operation is equivalent to `send` in POSIX. Otherwise it is equivalent to `sendto`. + remote-address: option, } @@ -47,11 +65,11 @@ interface udp { /// - /// - start-bind: func(network: borrow, local-address: ip-socket-address) -> result<_, error-code>; - finish-bind: func() -> result<_, error-code>; + finish-bind: func() -> result, error-code>; /// Set the destination address. /// - /// The local-address is updated based on the best network path to `remote-address`. + /// The local-address may be updated based on the best network path to `remote-address`. /// /// When a destination address is set: /// - all receive operations will only return datagrams sent from the provided `remote-address`. @@ -59,14 +77,15 @@ interface udp { /// /// Note that this function does not generate any network traffic and the peer is not aware of this "connection". /// - /// Unlike in POSIX, this function is async. This enables interactive WASI hosts to inject permission prompts. + /// Unlike in POSIX: + /// - The socket must already be explicitly bound. + /// - This function is async. This enables interactive WASI hosts to inject permission prompts. /// /// # Typical `start` errors /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The socket is already bound to a different network. The `network` passed to `connect` must be identical to the one passed to `bind`. /// /// # Typical `finish` errors /// - `address-in-use`: Tried to perform an implicit bind, but there were no ephemeral ports available. (EADDRINUSE, EADDRNOTAVAIL on Linux, EAGAIN on BSD) @@ -78,67 +97,9 @@ interface udp { /// - /// - /// - - start-connect: func(network: borrow, remote-address: ip-socket-address) -> result<_, error-code>; + start-connect: func(remote-address: ip-socket-address) -> result<_, error-code>; finish-connect: func() -> result<_, error-code>; - /// Receive messages on the socket. - /// - /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. - /// The returned list may contain fewer elements than requested, but never more. - /// If `max-results` is 0, this function returns successfully with an empty list. - /// - /// # Typical errors - /// - `invalid-state`: The socket is not bound to any local address. (EINVAL) - /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) - /// - `would-block`: There is no pending data available to be read at the moment. (EWOULDBLOCK, EAGAIN) - /// - /// # References - /// - - /// - - /// - - /// - - /// - - /// - - /// - - /// - - receive: func(max-results: u64) -> result, error-code>; - - /// Send messages on the socket. - /// - /// This function attempts to send all provided `datagrams` on the socket without blocking and - /// returns how many messages were actually sent (or queued for sending). - /// - /// This function semantically behaves the same as iterating the `datagrams` list and sequentially - /// sending each individual datagram until either the end of the list has been reached or the first error occurred. - /// If at least one datagram has been sent successfully, this function never returns an error. - /// - /// If the input list is empty, the function returns `ok(0)`. - /// - /// The remote address option is required. To send a message to the "connected" peer, - /// call `remote-address` to get their address. - /// - /// # Typical errors - /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) - /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) - /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The socket is in "connected" mode and the `datagram.remote-address` does not match the address passed to `connect`. (EISCONN) - /// - `invalid-state`: The socket is not bound to any local address. Unlike POSIX, this function does not perform an implicit bind. - /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) - /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) - /// - `would-block`: The send buffer is currently full. (EWOULDBLOCK, EAGAIN) - /// - /// # References - /// - - /// - - /// - - /// - - /// - - /// - - /// - - /// - - send: func(datagrams: list) -> result; - /// Get the current bound address. /// /// POSIX mentions: @@ -146,7 +107,7 @@ interface udp { /// > stored in the object pointed to by `address` is unspecified. /// /// WASI is stricter and requires `local-address` to return `invalid-state` when the socket hasn't been bound yet. - /// + /// /// # Typical errors /// - `invalid-state`: The socket is not bound to any local address. /// @@ -192,11 +153,11 @@ interface udp { /// The kernel buffer space reserved for sends/receives on this socket. /// /// Note #1: an implementation may choose to cap or round the buffer size when setting the value. - /// In other words, after setting a value, reading the same setting back may return a different value. + /// In other words, after setting a value, reading the same setting back may return a different value. /// /// Note #2: there is not necessarily a direct relationship between the kernel buffer size and the bytes of - /// actual data to be sent/received by the application, because the kernel might also use the buffer space - /// for internal metadata structures. + /// actual data to be sent/received by the application, because the kernel might also use the buffer space + /// for internal metadata structures. /// /// Equivalent to the SO_RCVBUF and SO_SNDBUF socket options. receive-buffer-size: func() -> result; @@ -210,4 +171,74 @@ interface udp { /// It's planned to be removed when `future` is natively supported in Preview3. subscribe: func() -> pollable; } + + resource inbound-datagram-stream { + /// Receive messages on the socket. + /// + /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. + /// The returned list may contain fewer elements than requested, but never more. + /// If `max-results` is 0, this function returns successfully with an empty list. + /// + /// # Typical errors + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + /// - `would-block`: There is no pending data available to be read at the moment. (EWOULDBLOCK, EAGAIN) + /// + /// # References + /// - + /// - + /// - + /// - + /// - + /// - + /// - + /// - + receive: func(max-results: u64) -> result, error-code>; + + /// Create a `pollable` which will resolve once the stream is ready to receive again. + /// + /// Note: this function is here for WASI Preview2 only. + /// It's planned to be removed when `future` is natively supported in Preview3. + subscribe: func() -> pollable; + } + + resource outbound-datagram-stream { + /// Send messages on the socket. + /// + /// This function attempts to send all provided `datagrams` on the socket without blocking and + /// returns how many messages were actually sent (or queued for sending). + /// + /// This function semantically behaves the same as iterating the `datagrams` list and sequentially + /// sending each individual datagram until either the end of the list has been reached or the first error occurred. + /// If at least one datagram has been sent successfully, this function never returns an error. + /// + /// If the input list is empty, the function returns `ok(0)`. + /// + /// # Typical errors + /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) + /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) + /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) + /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) + /// - `invalid-argument`: The socket is in "connected" mode and `remote-address` is `some` value that does not match the address passed to `connect`. (EISCONN) + /// - `invalid-argument`: The socket is not "connected" and no value for `remote-address` was provided. (EDESTADDRREQ) + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) + /// - `would-block`: The send buffer is currently full. (EWOULDBLOCK, EAGAIN) + /// + /// # References + /// - + /// - + /// - + /// - + /// - + /// - + /// - + /// - + send: func(datagrams: list) -> result; + + /// Create a `pollable` which will resolve once the stream is ready to send again. + /// + /// Note: this function is here for WASI Preview2 only. + /// It's planned to be removed when `future` is natively supported in Preview3. + subscribe: func() -> pollable; + } } From ffd1248a63f3d6e01d1653f8824cb7c127dc2f06 Mon Sep 17 00:00:00 2001 From: Dave Bakker Date: Mon, 16 Oct 2023 21:38:39 +0200 Subject: [PATCH 2/6] Align names with wasi-http --- .../bin/preview2_udp_sample_application.rs | 22 +++++------ crates/test-programs/src/sockets.rs | 12 +++--- crates/wasi-http/wit/deps/sockets/udp.wit | 14 +++---- crates/wasi/src/preview2/host/udp.rs | 38 +++++++++---------- crates/wasi/src/preview2/mod.rs | 4 +- crates/wasi/src/preview2/udp.rs | 18 ++++----- crates/wasi/wit/deps/sockets/udp.wit | 14 +++---- 7 files changed, 61 insertions(+), 61 deletions(-) diff --git a/crates/test-programs/src/bin/preview2_udp_sample_application.rs b/crates/test-programs/src/bin/preview2_udp_sample_application.rs index 0422aea9195b..61c8950cbbf4 100644 --- a/crates/test-programs/src/bin/preview2_udp_sample_application.rs +++ b/crates/test-programs/src/bin/preview2_udp_sample_application.rs @@ -1,7 +1,7 @@ use test_programs::wasi::sockets::network::{ IpAddress, IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, Network, }; -use test_programs::wasi::sockets::udp::{OutboundDatagram, UdpSocket}; +use test_programs::wasi::sockets::udp::{OutgoingDatagram, UdpSocket}; fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddress) { let unspecified_addr = IpSocketAddress::new(IpAddress::new_unspecified(family), 0); @@ -14,32 +14,32 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres let server = UdpSocket::new(family).unwrap(); - let (server_inbound, _) = server.blocking_bind(&net, bind_address).unwrap(); + let (server_incoming, _) = server.blocking_bind(&net, bind_address).unwrap(); let addr = server.local_address().unwrap(); let client_addr = { let client = UdpSocket::new(family).unwrap(); - let (_, client_outbound) = client.blocking_bind(&net, unspecified_addr).unwrap(); + let (_, client_outgoing) = client.blocking_bind(&net, unspecified_addr).unwrap(); client.blocking_connect(addr).unwrap(); let datagrams = [ - OutboundDatagram { + OutgoingDatagram { data: first_message.to_vec(), remote_address: None, }, - OutboundDatagram { + OutgoingDatagram { data: second_message.to_vec(), remote_address: Some(addr), }, ]; - client_outbound.blocking_send(&datagrams).unwrap(); + client_outgoing.blocking_send(&datagrams).unwrap(); client.local_address().unwrap() }; { // Check that we've received our sent messages. - let datagrams = server_inbound.blocking_receive(2..100).unwrap(); + let datagrams = server_incoming.blocking_receive(2..100).unwrap(); assert_eq!(datagrams.len(), 2); assert_eq!(datagrams[0].data, first_message); @@ -52,19 +52,19 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres // Another client { let client = UdpSocket::new(family).unwrap(); - let (_, client_outbound) = client.blocking_bind(&net, unspecified_addr).unwrap(); + let (_, client_outgoing) = client.blocking_bind(&net, unspecified_addr).unwrap(); // Send without connect - let datagrams = [OutboundDatagram { + let datagrams = [OutgoingDatagram { data: third_message.to_vec(), remote_address: Some(addr), }]; - client_outbound.blocking_send(&datagrams).unwrap(); + client_outgoing.blocking_send(&datagrams).unwrap(); } { // Check that we sent and received our message! - let datagrams = server_inbound.blocking_receive(1..100).unwrap(); + let datagrams = server_incoming.blocking_receive(1..100).unwrap(); assert_eq!(datagrams.len(), 1); assert_eq!(datagrams[0].data, third_message); diff --git a/crates/test-programs/src/sockets.rs b/crates/test-programs/src/sockets.rs index def2002332fc..9531603b2bd7 100644 --- a/crates/test-programs/src/sockets.rs +++ b/crates/test-programs/src/sockets.rs @@ -8,7 +8,7 @@ use crate::wasi::sockets::network::{ }; use crate::wasi::sockets::tcp::TcpSocket; use crate::wasi::sockets::udp::{ - InboundDatagram, InboundDatagramStream, OutboundDatagram, OutboundDatagramStream, UdpSocket, + IncomingDatagram, IncomingDatagramStream, OutgoingDatagram, OutgoingDatagramStream, UdpSocket, }; use crate::wasi::sockets::{tcp_create_socket, udp_create_socket}; use std::ops::Range; @@ -132,7 +132,7 @@ impl UdpSocket { &self, network: &Network, local_address: IpSocketAddress, - ) -> Result<(InboundDatagramStream, OutboundDatagramStream), ErrorCode> { + ) -> Result<(IncomingDatagramStream, OutgoingDatagramStream), ErrorCode> { let sub = self.subscribe(); self.start_bind(&network, local_address)?; @@ -159,8 +159,8 @@ impl UdpSocket { } } -impl OutboundDatagramStream { - pub fn blocking_send(&self, mut datagrams: &[OutboundDatagram]) -> Result<(), ErrorCode> { +impl OutgoingDatagramStream { + pub fn blocking_send(&self, mut datagrams: &[OutgoingDatagram]) -> Result<(), ErrorCode> { let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false); let pollable = self.subscribe(); @@ -178,8 +178,8 @@ impl OutboundDatagramStream { } } -impl InboundDatagramStream { - pub fn blocking_receive(&self, count: Range) -> Result, ErrorCode> { +impl IncomingDatagramStream { + pub fn blocking_receive(&self, count: Range) -> Result, ErrorCode> { let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false); let pollable = self.subscribe(); let mut datagrams = vec![]; diff --git a/crates/wasi-http/wit/deps/sockets/udp.wit b/crates/wasi-http/wit/deps/sockets/udp.wit index 731d7ec01b93..d6418fe6d39d 100644 --- a/crates/wasi-http/wit/deps/sockets/udp.wit +++ b/crates/wasi-http/wit/deps/sockets/udp.wit @@ -4,7 +4,7 @@ interface udp { use network.{network, error-code, ip-socket-address, ip-address-family}; /// A received datagram. - record inbound-datagram { + record incoming-datagram { /// The payload. /// /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. @@ -19,7 +19,7 @@ interface udp { } /// A datagram to be sent out. - record outbound-datagram { + record outgoing-datagram { /// The payload. /// /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. @@ -65,7 +65,7 @@ interface udp { /// - /// - start-bind: func(network: borrow, local-address: ip-socket-address) -> result<_, error-code>; - finish-bind: func() -> result, error-code>; + finish-bind: func() -> result, error-code>; /// Set the destination address. /// @@ -172,7 +172,7 @@ interface udp { subscribe: func() -> pollable; } - resource inbound-datagram-stream { + resource incoming-datagram-stream { /// Receive messages on the socket. /// /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. @@ -192,7 +192,7 @@ interface udp { /// - /// - /// - - receive: func(max-results: u64) -> result, error-code>; + receive: func(max-results: u64) -> result, error-code>; /// Create a `pollable` which will resolve once the stream is ready to receive again. /// @@ -201,7 +201,7 @@ interface udp { subscribe: func() -> pollable; } - resource outbound-datagram-stream { + resource outgoing-datagram-stream { /// Send messages on the socket. /// /// This function attempts to send all provided `datagrams` on the socket without blocking and @@ -233,7 +233,7 @@ interface udp { /// - /// - /// - - send: func(datagrams: list) -> result; + send: func(datagrams: list) -> result; /// Create a `pollable` which will resolve once the stream is ready to send again. /// diff --git a/crates/wasi/src/preview2/host/udp.rs b/crates/wasi/src/preview2/host/udp.rs index e959021dcdc0..53570e4b2dfc 100644 --- a/crates/wasi/src/preview2/host/udp.rs +++ b/crates/wasi/src/preview2/host/udp.rs @@ -59,8 +59,8 @@ impl udp::HostUdpSocket for T { &mut self, this: Resource, ) -> SocketResult<( - Resource, - Resource, + Resource, + Resource, )> { let table = self.table_mut(); let outer = table.get(&this)?; @@ -75,12 +75,12 @@ impl udp::HostUdpSocket for T { socket.udp_state = UdpState::Bound; } - let inbound_stream = outer.new_inbound_stream(); - let outbound_stream = outer.new_outbound_stream(); + let incoming_stream = outer.new_incoming_stream(); + let outgoing_stream = outer.new_outgoing_stream(); Ok(( - self.table_mut().push_child(inbound_stream, &this)?, - self.table_mut().push_child(outbound_stream, &this)?, + self.table_mut().push_child(incoming_stream, &this)?, + self.table_mut().push_child(outgoing_stream, &this)?, )) } @@ -254,13 +254,13 @@ impl udp::HostUdpSocket for T { } } -impl udp::HostInboundDatagramStream for T { +impl udp::HostIncomingDatagramStream for T { fn receive( &mut self, - this: Resource, + this: Resource, max_results: u64, - ) -> SocketResult> { - fn recv_one(socket: &UdpSocketInner) -> SocketResult { + ) -> SocketResult> { + fn recv_one(socket: &UdpSocketInner) -> SocketResult { let mut buf = [0; MAX_UDP_DATAGRAM_SIZE]; let (size, received_addr) = socket.udp_socket().try_recv_from(&mut buf)?; @@ -274,7 +274,7 @@ impl udp::HostInboundDatagramStream for T { } // FIXME: check permission to receive from `received_addr`. - Ok(udp::InboundDatagram { + Ok(udp::IncomingDatagram { data: buf[..size].into(), remote_address: received_addr.into(), }) @@ -306,12 +306,12 @@ impl udp::HostInboundDatagramStream for T { fn subscribe( &mut self, - this: Resource, + this: Resource, ) -> anyhow::Result> { crate::preview2::poll::subscribe(self.table_mut(), this) } - fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { + fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { let table = self.table_mut(); // As in the filesystem implementation, we assume closing a socket @@ -323,13 +323,13 @@ impl udp::HostInboundDatagramStream for T { } } -impl udp::HostOutboundDatagramStream for T { +impl udp::HostOutgoingDatagramStream for T { fn send( &mut self, - this: Resource, - datagrams: Vec, + this: Resource, + datagrams: Vec, ) -> SocketResult { - fn send_one(socket: &UdpSocketInner, datagram: &udp::OutboundDatagram) -> SocketResult<()> { + fn send_one(socket: &UdpSocketInner, datagram: &udp::OutgoingDatagram) -> SocketResult<()> { let provided_addr = datagram.remote_address.map(SocketAddr::from); let addr = match (socket.remote_address(), provided_addr) { (None, Some(addr)) => addr, @@ -371,12 +371,12 @@ impl udp::HostOutboundDatagramStream for T { fn subscribe( &mut self, - this: Resource, + this: Resource, ) -> anyhow::Result> { crate::preview2::poll::subscribe(self.table_mut(), this) } - fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { + fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { let table = self.table_mut(); // As in the filesystem implementation, we assume closing a socket diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index fdd0a240abf9..f9d0977453b2 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -161,8 +161,8 @@ pub mod bindings { "wasi:sockets/network/network": super::network::Network, "wasi:sockets/tcp/tcp-socket": super::tcp::TcpSocket, "wasi:sockets/udp/udp-socket": super::udp::UdpSocket, - "wasi:sockets/udp/inbound-datagram-stream": super::udp::InboundDatagramStream, - "wasi:sockets/udp/outbound-datagram-stream": super::udp::OutboundDatagramStream, + "wasi:sockets/udp/incoming-datagram-stream": super::udp::IncomingDatagramStream, + "wasi:sockets/udp/outgoing-datagram-stream": super::udp::OutgoingDatagramStream, "wasi:sockets/ip-name-lookup/resolve-address-stream": super::ip_name_lookup::ResolveAddressStream, "wasi:filesystem/types/directory-entry-stream": super::filesystem::ReaddirIterator, "wasi:filesystem/types/descriptor": super::filesystem::Descriptor, diff --git a/crates/wasi/src/preview2/udp.rs b/crates/wasi/src/preview2/udp.rs index b08479ea410a..cebd9ca8d2c6 100644 --- a/crates/wasi/src/preview2/udp.rs +++ b/crates/wasi/src/preview2/udp.rs @@ -30,7 +30,7 @@ pub(crate) enum UdpState { Connected(SocketAddr), } -/// Operational data shared between the UdpSocket, InboundDatagramStream & OutboundDatagramStream +/// Operational data shared between the UdpSocket, IncomingDatagramStream & OutgoingDatagramStream pub(crate) struct UdpSocketInner { pub(crate) native_socket: Arc, @@ -79,14 +79,14 @@ impl UdpSocket { Ok(tokio_socket) } - pub(crate) fn new_inbound_stream(&self) -> InboundDatagramStream { - InboundDatagramStream { + pub(crate) fn new_incoming_stream(&self) -> IncomingDatagramStream { + IncomingDatagramStream { inner: self.inner.clone(), } } - pub(crate) fn new_outbound_stream(&self) -> OutboundDatagramStream { - OutboundDatagramStream { + pub(crate) fn new_outgoing_stream(&self) -> OutgoingDatagramStream { + OutgoingDatagramStream { inner: self.inner.clone(), } } @@ -106,12 +106,12 @@ impl UdpSocketInner { } } -pub struct InboundDatagramStream { +pub struct IncomingDatagramStream { pub(crate) inner: Arc>, } #[async_trait] -impl Subscribe for InboundDatagramStream { +impl Subscribe for IncomingDatagramStream { async fn ready(&mut self) { let native_socket = { // Make sure the lock guard is released before the await. @@ -127,12 +127,12 @@ impl Subscribe for InboundDatagramStream { } } -pub struct OutboundDatagramStream { +pub struct OutgoingDatagramStream { pub(crate) inner: Arc>, } #[async_trait] -impl Subscribe for OutboundDatagramStream { +impl Subscribe for OutgoingDatagramStream { async fn ready(&mut self) { let native_socket = { // Make sure the lock guard is released before the await. diff --git a/crates/wasi/wit/deps/sockets/udp.wit b/crates/wasi/wit/deps/sockets/udp.wit index 731d7ec01b93..d6418fe6d39d 100644 --- a/crates/wasi/wit/deps/sockets/udp.wit +++ b/crates/wasi/wit/deps/sockets/udp.wit @@ -4,7 +4,7 @@ interface udp { use network.{network, error-code, ip-socket-address, ip-address-family}; /// A received datagram. - record inbound-datagram { + record incoming-datagram { /// The payload. /// /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. @@ -19,7 +19,7 @@ interface udp { } /// A datagram to be sent out. - record outbound-datagram { + record outgoing-datagram { /// The payload. /// /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. @@ -65,7 +65,7 @@ interface udp { /// - /// - start-bind: func(network: borrow, local-address: ip-socket-address) -> result<_, error-code>; - finish-bind: func() -> result, error-code>; + finish-bind: func() -> result, error-code>; /// Set the destination address. /// @@ -172,7 +172,7 @@ interface udp { subscribe: func() -> pollable; } - resource inbound-datagram-stream { + resource incoming-datagram-stream { /// Receive messages on the socket. /// /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. @@ -192,7 +192,7 @@ interface udp { /// - /// - /// - - receive: func(max-results: u64) -> result, error-code>; + receive: func(max-results: u64) -> result, error-code>; /// Create a `pollable` which will resolve once the stream is ready to receive again. /// @@ -201,7 +201,7 @@ interface udp { subscribe: func() -> pollable; } - resource outbound-datagram-stream { + resource outgoing-datagram-stream { /// Send messages on the socket. /// /// This function attempts to send all provided `datagrams` on the socket without blocking and @@ -233,7 +233,7 @@ interface udp { /// - /// - /// - - send: func(datagrams: list) -> result; + send: func(datagrams: list) -> result; /// Create a `pollable` which will resolve once the stream is ready to send again. /// From 35eab24a971e30575277322254466ff4e498cb0f Mon Sep 17 00:00:00 2001 From: Dave Bakker Date: Wed, 18 Oct 2023 21:44:17 +0200 Subject: [PATCH 3/6] Revert previous changes to `bind`. Replace `connect` with `stream` Remove the Mutex again. Instead allow `stream` to be called multiple times, but trap if the previous streams are still active. --- .../bin/preview2_udp_sample_application.rs | 11 +- crates/test-programs/src/sockets.rs | 15 +- .../wit/deps/sockets/udp-create-socket.wit | 2 +- crates/wasi-http/wit/deps/sockets/udp.wit | 60 ++++--- crates/wasi/src/preview2/host/udp.rs | 152 +++++++++--------- crates/wasi/src/preview2/table.rs | 15 ++ crates/wasi/src/preview2/udp.rs | 81 +++------- .../wit/deps/sockets/udp-create-socket.wit | 2 +- crates/wasi/wit/deps/sockets/udp.wit | 60 ++++--- 9 files changed, 193 insertions(+), 205 deletions(-) diff --git a/crates/test-programs/src/bin/preview2_udp_sample_application.rs b/crates/test-programs/src/bin/preview2_udp_sample_application.rs index 61c8950cbbf4..41da701e685e 100644 --- a/crates/test-programs/src/bin/preview2_udp_sample_application.rs +++ b/crates/test-programs/src/bin/preview2_udp_sample_application.rs @@ -14,13 +14,14 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres let server = UdpSocket::new(family).unwrap(); - let (server_incoming, _) = server.blocking_bind(&net, bind_address).unwrap(); + server.blocking_bind(&net, bind_address).unwrap(); + let (server_incoming, _) = server.stream(None).unwrap(); let addr = server.local_address().unwrap(); let client_addr = { let client = UdpSocket::new(family).unwrap(); - let (_, client_outgoing) = client.blocking_bind(&net, unspecified_addr).unwrap(); - client.blocking_connect(addr).unwrap(); + client.blocking_bind(&net, unspecified_addr).unwrap(); + let (_, client_outgoing) = client.stream(Some(addr)).unwrap(); let datagrams = [ OutgoingDatagram { @@ -52,8 +53,8 @@ fn test_sample_application(family: IpAddressFamily, bind_address: IpSocketAddres // Another client { let client = UdpSocket::new(family).unwrap(); - let (_, client_outgoing) = client.blocking_bind(&net, unspecified_addr).unwrap(); - // Send without connect + client.blocking_bind(&net, unspecified_addr).unwrap(); + let (_, client_outgoing) = client.stream(None).unwrap(); let datagrams = [OutgoingDatagram { data: third_message.to_vec(), diff --git a/crates/test-programs/src/sockets.rs b/crates/test-programs/src/sockets.rs index 9531603b2bd7..a065312c130f 100644 --- a/crates/test-programs/src/sockets.rs +++ b/crates/test-programs/src/sockets.rs @@ -132,7 +132,7 @@ impl UdpSocket { &self, network: &Network, local_address: IpSocketAddress, - ) -> Result<(IncomingDatagramStream, OutgoingDatagramStream), ErrorCode> { + ) -> Result<(), ErrorCode> { let sub = self.subscribe(); self.start_bind(&network, local_address)?; @@ -144,19 +144,6 @@ impl UdpSocket { } } } - - pub fn blocking_connect(&self, remote_address: IpSocketAddress) -> Result<(), ErrorCode> { - let sub = self.subscribe(); - - self.start_connect(remote_address)?; - - loop { - match self.finish_connect() { - Err(ErrorCode::WouldBlock) => sub.wait(), - result => return result, - } - } - } } impl OutgoingDatagramStream { diff --git a/crates/wasi-http/wit/deps/sockets/udp-create-socket.wit b/crates/wasi-http/wit/deps/sockets/udp-create-socket.wit index 4e61d30fa222..cc58234d8455 100644 --- a/crates/wasi-http/wit/deps/sockets/udp-create-socket.wit +++ b/crates/wasi-http/wit/deps/sockets/udp-create-socket.wit @@ -8,7 +8,7 @@ interface udp-create-socket { /// Similar to `socket(AF_INET or AF_INET6, SOCK_DGRAM, IPPROTO_UDP)` in POSIX. /// /// This function does not require a network capability handle. This is considered to be safe because - /// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind`/`connect` is called, + /// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind` is called, /// the socket is effectively an in-memory configuration object, unable to communicate with the outside world. /// /// All sockets are non-blocking. Use the wasi-poll interface to block on asynchronous operations. diff --git a/crates/wasi-http/wit/deps/sockets/udp.wit b/crates/wasi-http/wit/deps/sockets/udp.wit index d6418fe6d39d..224f56a77634 100644 --- a/crates/wasi-http/wit/deps/sockets/udp.wit +++ b/crates/wasi-http/wit/deps/sockets/udp.wit @@ -12,7 +12,7 @@ interface udp { /// The source address. /// - /// If the socket is connected, this field is guaranteed to equal the configured remote address. + /// This field is guaranteed to match the remote address the stream was initialized with, if any. /// /// Equivalent to the `src_addr` out parameter of `recvfrom`. remote-address: ip-socket-address, @@ -27,8 +27,9 @@ interface udp { /// The destination address. /// - /// On connected sockets this field must be None or match the connected remote address exactly. - /// On unconnected sockets, this field is required. + /// The requirements on this field depend on how the stream was initialized: + /// - with a remote address: this field must be None or match the stream's remote address exactly. + /// - without a remote address: this field is required. /// /// If this value is None, the send operation is equivalent to `send` in POSIX. Otherwise it is equivalent to `sendto`. remote-address: option, @@ -42,9 +43,7 @@ interface udp { /// /// If the IP address is zero (`0.0.0.0` in IPv4, `::` in IPv6), it is left to the implementation to decide which /// network interface(s) to bind to. - /// If the TCP/UDP port is zero, the socket will be bound to a random free port. - /// - /// When a socket is not explicitly bound, the first invocation to connect will implicitly bind the socket. + /// If the port is zero, the socket will be bound to a random free port. /// /// Unlike in POSIX, this function is async. This enables interactive WASI hosts to inject permission prompts. /// @@ -65,40 +64,49 @@ interface udp { /// - /// - start-bind: func(network: borrow, local-address: ip-socket-address) -> result<_, error-code>; - finish-bind: func() -> result, error-code>; + finish-bind: func() -> result<_, error-code>; - /// Set the destination address. + /// Set up inbound & outbound communication channels, optionally to a specific peer. /// - /// The local-address may be updated based on the best network path to `remote-address`. + /// This function only changes the local socket configuration and does not generate any network traffic. + /// On success, the `remote-address` of the socket is updated. The `local-address` may be updated as well, + /// based on the best network path to `remote-address`. /// - /// When a destination address is set: - /// - all receive operations will only return datagrams sent from the provided `remote-address`. - /// - the `send` function can only be used to send to this destination. + /// When a `remote-address` is provided, the returned streams are limited to communicating with that specific peer: + /// - `send` can only be used to send to this destination. + /// - `receive` will only return datagrams sent from the provided `remote-address`. /// - /// Note that this function does not generate any network traffic and the peer is not aware of this "connection". + /// This method may be called multiple times on the same socket to change its association, but + /// only the most recently returned pair of streams will be operational. Implementations may trap if + /// the streams returned by a previous invocation haven't been dropped yet before calling `stream` again. /// - /// Unlike in POSIX: - /// - The socket must already be explicitly bound. - /// - This function is async. This enables interactive WASI hosts to inject permission prompts. + /// The POSIX equivalent in pseudo-code is: + /// ``` + /// if (was previously connected) { + /// connect(s, AF_UNSPEC) + /// } + /// if (remote_address is Some) { + /// connect(s, remote_address) + /// } + /// ``` /// - /// # Typical `start` errors + /// Unlike in POSIX, the socket must already be explicitly bound. + /// + /// # Typical errors /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - /// # Typical `finish` errors + /// - `invalid-state`: The socket is not bound. /// - `address-in-use`: Tried to perform an implicit bind, but there were no ephemeral ports available. (EADDRINUSE, EADDRNOTAVAIL on Linux, EAGAIN on BSD) - /// - `not-in-progress`: A `connect` operation is not in progress. - /// - `would-block`: Can't finish the operation, it is still in progress. (EWOULDBLOCK, EAGAIN) + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) /// /// # References /// - /// - /// - /// - - start-connect: func(remote-address: ip-socket-address) -> result<_, error-code>; - finish-connect: func() -> result<_, error-code>; + %stream: func(remote-address: option) -> result, error-code>; /// Get the current bound address. /// @@ -118,10 +126,10 @@ interface udp { /// - local-address: func() -> result; - /// Get the address set with `connect`. + /// Get the address the socket is currently streaming to. /// /// # Typical errors - /// - `invalid-state`: The socket is not connected to a remote address. (ENOTCONN) + /// - `invalid-state`: The socket is not streaming to a specific remote address. (ENOTCONN) /// /// # References /// - @@ -218,7 +226,7 @@ interface udp { /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The socket is in "connected" mode and `remote-address` is `some` value that does not match the address passed to `connect`. (EISCONN) + /// - `invalid-argument`: The socket is in "connected" mode and `remote-address` is `some` value that does not match the address passed to `stream`. (EISCONN) /// - `invalid-argument`: The socket is not "connected" and no value for `remote-address` was provided. (EDESTADDRREQ) /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) diff --git a/crates/wasi/src/preview2/host/udp.rs b/crates/wasi/src/preview2/host/udp.rs index 53570e4b2dfc..03462659e9ba 100644 --- a/crates/wasi/src/preview2/host/udp.rs +++ b/crates/wasi/src/preview2/host/udp.rs @@ -5,9 +5,10 @@ use crate::preview2::{ sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network}, sockets::udp, }, - udp::{UdpSocketInner, UdpState}, + udp::{IncomingDatagramStream, OutgoingDatagramStream, UdpState}, }; -use crate::preview2::{Pollable, SocketResult, WasiView}; +use crate::preview2::{Pollable, SocketError, SocketResult, WasiView}; +use anyhow::anyhow; use cap_net_ext::{AddressFamily, PoolExt}; use io_lifetimes::AsSocketlike; use rustix::io::Errno; @@ -28,17 +29,15 @@ impl udp::HostUdpSocket for T { network: Resource, local_address: IpSocketAddress, ) -> SocketResult<()> { - let table = self.table(); - let mut socket = table.get(&this)?.inner.lock().unwrap(); + let table = self.table_mut(); + let socket = table.get(&this)?; let network = table.get(&network)?; let local_address: SocketAddr = local_address.into(); match socket.udp_state { UdpState::Default => {} - UdpState::BindStarted | UdpState::Connecting(..) => { - return Err(ErrorCode::ConcurrencyConflict.into()) - } - UdpState::Bound | UdpState::Connected(..) => return Err(ErrorCode::InvalidState.into()), + UdpState::BindStarted => return Err(ErrorCode::ConcurrencyConflict.into()), + UdpState::Bound | UdpState::Connected => return Err(ErrorCode::InvalidState.into()), } let binder = network.pool.udp_binder(local_address)?; @@ -50,79 +49,81 @@ impl udp::HostUdpSocket for T { .as_socketlike_view::(), )?; + let socket = table.get_mut(&this)?; socket.udp_state = UdpState::BindStarted; Ok(()) } - fn finish_bind( + fn finish_bind(&mut self, this: Resource) -> SocketResult<()> { + let table = self.table_mut(); + let socket = table.get_mut(&this)?; + + match socket.udp_state { + UdpState::BindStarted => { + socket.udp_state = UdpState::Bound; + Ok(()) + } + _ => Err(ErrorCode::NotInProgress.into()), + } + } + + fn stream( &mut self, this: Resource, + remote_address: Option, ) -> SocketResult<( Resource, Resource, )> { let table = self.table_mut(); - let outer = table.get(&this)?; - { - let mut socket = outer.inner.lock().unwrap(); - match socket.udp_state { - UdpState::BindStarted => {} - _ => return Err(ErrorCode::NotInProgress.into()), - } + let has_active_streams = table + .iter_children(&this)? + .any(|c| c.is::() || c.is::()); - socket.udp_state = UdpState::Bound; + if has_active_streams { + return Err(SocketError::trap(anyhow!("UDP streams not dropped yet"))); } - let incoming_stream = outer.new_incoming_stream(); - let outgoing_stream = outer.new_outgoing_stream(); - - Ok(( - self.table_mut().push_child(incoming_stream, &this)?, - self.table_mut().push_child(outgoing_stream, &this)?, - )) - } - - fn start_connect( - &mut self, - this: Resource, - remote_address: IpSocketAddress, - ) -> SocketResult<()> { - let table = self.table(); - let mut socket = table.get(&this)?.inner.lock().unwrap(); - let remote_address: SocketAddr = remote_address.into(); + let socket = table.get_mut(&this)?; + let remote_address = remote_address.map(SocketAddr::from); match socket.udp_state { - UdpState::Default | UdpState::Bound => {} - UdpState::BindStarted | UdpState::Connecting(..) => { - return Err(ErrorCode::ConcurrencyConflict.into()) - } - UdpState::Connected(..) => return Err(ErrorCode::InvalidState.into()), + UdpState::Bound | UdpState::Connected => {} + _ => return Err(ErrorCode::InvalidState.into()), } - rustix::net::connect(socket.udp_socket(), &remote_address)?; + if let UdpState::Connected = socket.udp_state { + // FIXME: Allow multiple (dis)connects. This needs to be supported by rustix first. + // rustix::net::disconnect(socket.udp_socket())?; + // socket.udp_state = UdpState::Bound; + return Err(ErrorCode::NotSupported.into()); + } - socket.udp_state = UdpState::Connecting(remote_address); - Ok(()) - } + if let Some(connect_addr) = remote_address { + rustix::net::connect(socket.udp_socket(), &connect_addr)?; + socket.udp_state = UdpState::Connected; + } - fn finish_connect(&mut self, this: Resource) -> SocketResult<()> { - let table = self.table(); - let mut socket = table.get(&this)?.inner.lock().unwrap(); + let incoming_stream = IncomingDatagramStream { + inner: socket.inner.clone(), + remote_address, + }; + let outgoing_stream = OutgoingDatagramStream { + inner: socket.inner.clone(), + remote_address, + }; - match socket.udp_state { - UdpState::Connecting(addr) => { - socket.udp_state = UdpState::Connected(addr); - Ok(()) - } - _ => Err(ErrorCode::NotInProgress.into()), - } + Ok(( + self.table_mut().push_child(incoming_stream, &this)?, + self.table_mut().push_child(outgoing_stream, &this)?, + )) } fn local_address(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; let addr = socket .udp_socket() .as_socketlike_view::() @@ -132,7 +133,7 @@ impl udp::HostUdpSocket for T { fn remote_address(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; let addr = socket .udp_socket() .as_socketlike_view::() @@ -145,7 +146,7 @@ impl udp::HostUdpSocket for T { this: Resource, ) -> Result { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; match socket.family { AddressFamily::Ipv4 => Ok(IpAddressFamily::Ipv4), AddressFamily::Ipv6 => Ok(IpAddressFamily::Ipv6), @@ -154,19 +155,19 @@ impl udp::HostUdpSocket for T { fn ipv6_only(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; Ok(sockopt::get_ipv6_v6only(socket.udp_socket())?) } fn set_ipv6_only(&mut self, this: Resource, value: bool) -> SocketResult<()> { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; Ok(sockopt::set_ipv6_v6only(socket.udp_socket(), value)?) } fn unicast_hop_limit(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; // We don't track whether the socket is IPv4 or IPv6 so try one and // fall back to the other. @@ -187,7 +188,7 @@ impl udp::HostUdpSocket for T { value: u8, ) -> SocketResult<()> { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; // We don't track whether the socket is IPv4 or IPv6 so try one and // fall back to the other. @@ -200,7 +201,7 @@ impl udp::HostUdpSocket for T { fn receive_buffer_size(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; Ok(sockopt::get_socket_recv_buffer_size(socket.udp_socket())? as u64) } @@ -210,7 +211,7 @@ impl udp::HostUdpSocket for T { value: u64, ) -> SocketResult<()> { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; let value = value.try_into().map_err(|_| ErrorCode::OutOfMemory)?; Ok(sockopt::set_socket_recv_buffer_size( socket.udp_socket(), @@ -220,7 +221,7 @@ impl udp::HostUdpSocket for T { fn send_buffer_size(&mut self, this: Resource) -> SocketResult { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; Ok(sockopt::get_socket_send_buffer_size(socket.udp_socket())? as u64) } @@ -230,7 +231,7 @@ impl udp::HostUdpSocket for T { value: u64, ) -> SocketResult<()> { let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; let value = value.try_into().map_err(|_| ErrorCode::OutOfMemory)?; Ok(sockopt::set_socket_send_buffer_size( socket.udp_socket(), @@ -260,11 +261,11 @@ impl udp::HostIncomingDatagramStream for T { this: Resource, max_results: u64, ) -> SocketResult> { - fn recv_one(socket: &UdpSocketInner) -> SocketResult { + fn recv_one(stream: &IncomingDatagramStream) -> SocketResult { let mut buf = [0; MAX_UDP_DATAGRAM_SIZE]; - let (size, received_addr) = socket.udp_socket().try_recv_from(&mut buf)?; + let (size, received_addr) = stream.inner.try_recv_from(&mut buf)?; - match socket.remote_address() { + match stream.remote_address { Some(connected_addr) if connected_addr != received_addr => { // Normally, this should have already been checked for us by the OS. // Drop message... @@ -281,7 +282,7 @@ impl udp::HostIncomingDatagramStream for T { } let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; if max_results == 0 { return Ok(vec![]); @@ -290,7 +291,7 @@ impl udp::HostIncomingDatagramStream for T { let mut datagrams = vec![]; for _ in 0..max_results { - match recv_one(&socket) { + match recv_one(socket) { Ok(datagram) => { datagrams.push(datagram); } @@ -329,9 +330,12 @@ impl udp::HostOutgoingDatagramStream for T { this: Resource, datagrams: Vec, ) -> SocketResult { - fn send_one(socket: &UdpSocketInner, datagram: &udp::OutgoingDatagram) -> SocketResult<()> { + fn send_one( + stream: &OutgoingDatagramStream, + datagram: &udp::OutgoingDatagram, + ) -> SocketResult<()> { let provided_addr = datagram.remote_address.map(SocketAddr::from); - let addr = match (socket.remote_address(), provided_addr) { + let addr = match (stream.remote_address, provided_addr) { (None, Some(addr)) => addr, (Some(addr), None) => addr, (Some(connected_addr), Some(provided_addr)) if connected_addr == provided_addr => { @@ -341,13 +345,13 @@ impl udp::HostOutgoingDatagramStream for T { }; // FIXME: check permission to send to `addr`. - socket.udp_socket().try_send_to(&datagram.data, addr)?; + stream.inner.try_send_to(&datagram.data, addr)?; Ok(()) } let table = self.table(); - let socket = table.get(&this)?.inner.lock().unwrap(); + let socket = table.get(&this)?; if datagrams.is_empty() { return Ok(0); @@ -356,7 +360,7 @@ impl udp::HostOutgoingDatagramStream for T { let mut count = 0; for datagram in datagrams { - match send_one(&socket, &datagram) { + match send_one(socket, &datagram) { Ok(_size) => count += 1, Err(_e) if count > 0 => { // WIT: "If at least one datagram has been sent successfully, this function never returns an error." diff --git a/crates/wasi/src/preview2/table.rs b/crates/wasi/src/preview2/table.rs index 2454fce03272..444e7b25f363 100644 --- a/crates/wasi/src/preview2/table.rs +++ b/crates/wasi/src/preview2/table.rs @@ -234,6 +234,21 @@ impl Table { (item, v) }) } + + /// Iterate over all children belonging to the provided parent + pub fn iter_children( + &self, + parent: &Resource, + ) -> Result, TableError> + where + T: 'static, + { + let parent_entry = self.map.get(&parent.rep()).ok_or(TableError::NotPresent)?; + Ok(parent_entry.children.iter().map(|child_index| { + let child = self.map.get(child_index).expect("missing child"); + child.entry.as_ref() + })) + } } impl Default for Table { diff --git a/crates/wasi/src/preview2/udp.rs b/crates/wasi/src/preview2/udp.rs index cebd9ca8d2c6..bfb0b9dca7f4 100644 --- a/crates/wasi/src/preview2/udp.rs +++ b/crates/wasi/src/preview2/udp.rs @@ -5,7 +5,7 @@ use cap_net_ext::{AddressFamily, Blocking, UdpSocketExt}; use io_lifetimes::raw::{FromRawSocketlike, IntoRawSocketlike}; use std::io; use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tokio::io::Interest; /// The state of a UDP socket. @@ -23,16 +23,18 @@ pub(crate) enum UdpState { /// is not yet listening for connections. Bound, - /// A connect call is in progress. - Connecting(SocketAddr), - /// The socket is "connected" to a peer address. - Connected(SocketAddr), + Connected, } -/// Operational data shared between the UdpSocket, IncomingDatagramStream & OutgoingDatagramStream -pub(crate) struct UdpSocketInner { - pub(crate) native_socket: Arc, +/// A host UDP socket, plus associated bookkeeping. +/// +/// The inner state is wrapped in an Arc because the same underlying socket is +/// used for implementing the stream types. +pub struct UdpSocket { + /// The part of a `UdpSocket` which is reference-counted so that we + /// can pass it to async tasks. + pub(crate) inner: Arc, /// The current state in the bind/connect progression. pub(crate) udp_state: UdpState, @@ -41,11 +43,6 @@ pub(crate) struct UdpSocketInner { pub(crate) family: AddressFamily, } -/// A host UDP socket. -pub struct UdpSocket { - pub(crate) inner: Arc>, -} - #[async_trait] impl Subscribe for UdpSocket { async fn ready(&mut self) { @@ -56,14 +53,10 @@ impl Subscribe for UdpSocket { impl UdpSocket { /// Create a new socket in the given family. pub fn new(family: AddressFamily) -> io::Result { - let inner = UdpSocketInner { - native_socket: Arc::new(Self::new_tokio_socket(family)?), + Ok(UdpSocket { + inner: Arc::new(Self::new_tokio_socket(family)?), udp_state: UdpState::Default, family, - }; - - Ok(UdpSocket { - inner: Arc::new(Mutex::new(inner)), }) } @@ -79,48 +72,23 @@ impl UdpSocket { Ok(tokio_socket) } - pub(crate) fn new_incoming_stream(&self) -> IncomingDatagramStream { - IncomingDatagramStream { - inner: self.inner.clone(), - } - } - - pub(crate) fn new_outgoing_stream(&self) -> OutgoingDatagramStream { - OutgoingDatagramStream { - inner: self.inner.clone(), - } - } -} - -impl UdpSocketInner { - pub fn remote_address(&self) -> Option { - match self.udp_state { - UdpState::Connected(addr) => Some(addr), - UdpState::Connecting(_) // Don't use address. From the consumer's perspective connecting isn't finished yet. - | _ => None, - } - } - pub fn udp_socket(&self) -> &tokio::net::UdpSocket { - &self.native_socket + &self.inner } } pub struct IncomingDatagramStream { - pub(crate) inner: Arc>, + pub(crate) inner: Arc, + + /// If this has a value, the stream is "connected". + pub(crate) remote_address: Option, } #[async_trait] impl Subscribe for IncomingDatagramStream { async fn ready(&mut self) { - let native_socket = { - // Make sure the lock guard is released before the await. - let inner = self.inner.lock().unwrap(); - inner.native_socket.clone() - }; - // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. - native_socket + self.inner .ready(Interest::READABLE) .await .expect("failed to await UDP socket readiness"); @@ -128,20 +96,17 @@ impl Subscribe for IncomingDatagramStream { } pub struct OutgoingDatagramStream { - pub(crate) inner: Arc>, + pub(crate) inner: Arc, + + /// If this has a value, the stream is "connected". + pub(crate) remote_address: Option, } #[async_trait] impl Subscribe for OutgoingDatagramStream { async fn ready(&mut self) { - let native_socket = { - // Make sure the lock guard is released before the await. - let inner = self.inner.lock().unwrap(); - inner.native_socket.clone() - }; - // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. - native_socket + self.inner .ready(Interest::WRITABLE) .await .expect("failed to await UDP socket readiness"); diff --git a/crates/wasi/wit/deps/sockets/udp-create-socket.wit b/crates/wasi/wit/deps/sockets/udp-create-socket.wit index 4e61d30fa222..cc58234d8455 100644 --- a/crates/wasi/wit/deps/sockets/udp-create-socket.wit +++ b/crates/wasi/wit/deps/sockets/udp-create-socket.wit @@ -8,7 +8,7 @@ interface udp-create-socket { /// Similar to `socket(AF_INET or AF_INET6, SOCK_DGRAM, IPPROTO_UDP)` in POSIX. /// /// This function does not require a network capability handle. This is considered to be safe because - /// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind`/`connect` is called, + /// at time of creation, the socket is not bound to any `network` yet. Up to the moment `bind` is called, /// the socket is effectively an in-memory configuration object, unable to communicate with the outside world. /// /// All sockets are non-blocking. Use the wasi-poll interface to block on asynchronous operations. diff --git a/crates/wasi/wit/deps/sockets/udp.wit b/crates/wasi/wit/deps/sockets/udp.wit index d6418fe6d39d..224f56a77634 100644 --- a/crates/wasi/wit/deps/sockets/udp.wit +++ b/crates/wasi/wit/deps/sockets/udp.wit @@ -12,7 +12,7 @@ interface udp { /// The source address. /// - /// If the socket is connected, this field is guaranteed to equal the configured remote address. + /// This field is guaranteed to match the remote address the stream was initialized with, if any. /// /// Equivalent to the `src_addr` out parameter of `recvfrom`. remote-address: ip-socket-address, @@ -27,8 +27,9 @@ interface udp { /// The destination address. /// - /// On connected sockets this field must be None or match the connected remote address exactly. - /// On unconnected sockets, this field is required. + /// The requirements on this field depend on how the stream was initialized: + /// - with a remote address: this field must be None or match the stream's remote address exactly. + /// - without a remote address: this field is required. /// /// If this value is None, the send operation is equivalent to `send` in POSIX. Otherwise it is equivalent to `sendto`. remote-address: option, @@ -42,9 +43,7 @@ interface udp { /// /// If the IP address is zero (`0.0.0.0` in IPv4, `::` in IPv6), it is left to the implementation to decide which /// network interface(s) to bind to. - /// If the TCP/UDP port is zero, the socket will be bound to a random free port. - /// - /// When a socket is not explicitly bound, the first invocation to connect will implicitly bind the socket. + /// If the port is zero, the socket will be bound to a random free port. /// /// Unlike in POSIX, this function is async. This enables interactive WASI hosts to inject permission prompts. /// @@ -65,40 +64,49 @@ interface udp { /// - /// - start-bind: func(network: borrow, local-address: ip-socket-address) -> result<_, error-code>; - finish-bind: func() -> result, error-code>; + finish-bind: func() -> result<_, error-code>; - /// Set the destination address. + /// Set up inbound & outbound communication channels, optionally to a specific peer. /// - /// The local-address may be updated based on the best network path to `remote-address`. + /// This function only changes the local socket configuration and does not generate any network traffic. + /// On success, the `remote-address` of the socket is updated. The `local-address` may be updated as well, + /// based on the best network path to `remote-address`. /// - /// When a destination address is set: - /// - all receive operations will only return datagrams sent from the provided `remote-address`. - /// - the `send` function can only be used to send to this destination. + /// When a `remote-address` is provided, the returned streams are limited to communicating with that specific peer: + /// - `send` can only be used to send to this destination. + /// - `receive` will only return datagrams sent from the provided `remote-address`. /// - /// Note that this function does not generate any network traffic and the peer is not aware of this "connection". + /// This method may be called multiple times on the same socket to change its association, but + /// only the most recently returned pair of streams will be operational. Implementations may trap if + /// the streams returned by a previous invocation haven't been dropped yet before calling `stream` again. /// - /// Unlike in POSIX: - /// - The socket must already be explicitly bound. - /// - This function is async. This enables interactive WASI hosts to inject permission prompts. + /// The POSIX equivalent in pseudo-code is: + /// ``` + /// if (was previously connected) { + /// connect(s, AF_UNSPEC) + /// } + /// if (remote_address is Some) { + /// connect(s, remote_address) + /// } + /// ``` /// - /// # Typical `start` errors + /// Unlike in POSIX, the socket must already be explicitly bound. + /// + /// # Typical errors /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - /// # Typical `finish` errors + /// - `invalid-state`: The socket is not bound. /// - `address-in-use`: Tried to perform an implicit bind, but there were no ephemeral ports available. (EADDRINUSE, EADDRNOTAVAIL on Linux, EAGAIN on BSD) - /// - `not-in-progress`: A `connect` operation is not in progress. - /// - `would-block`: Can't finish the operation, it is still in progress. (EWOULDBLOCK, EAGAIN) + /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) /// /// # References /// - /// - /// - /// - - start-connect: func(remote-address: ip-socket-address) -> result<_, error-code>; - finish-connect: func() -> result<_, error-code>; + %stream: func(remote-address: option) -> result, error-code>; /// Get the current bound address. /// @@ -118,10 +126,10 @@ interface udp { /// - local-address: func() -> result; - /// Get the address set with `connect`. + /// Get the address the socket is currently streaming to. /// /// # Typical errors - /// - `invalid-state`: The socket is not connected to a remote address. (ENOTCONN) + /// - `invalid-state`: The socket is not streaming to a specific remote address. (ENOTCONN) /// /// # References /// - @@ -218,7 +226,7 @@ interface udp { /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) /// - `invalid-argument`: The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) /// - `invalid-argument`: The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) - /// - `invalid-argument`: The socket is in "connected" mode and `remote-address` is `some` value that does not match the address passed to `connect`. (EISCONN) + /// - `invalid-argument`: The socket is in "connected" mode and `remote-address` is `some` value that does not match the address passed to `stream`. (EISCONN) /// - `invalid-argument`: The socket is not "connected" and no value for `remote-address` was provided. (EDESTADDRREQ) /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) From dc8433c92916dd1b5d4152c299872a64739b9376 Mon Sep 17 00:00:00 2001 From: Dave Bakker Date: Fri, 20 Oct 2023 21:45:23 +0200 Subject: [PATCH 4/6] The code block was treated as Rust code. --- crates/wasi-http/wit/deps/sockets/udp.wit | 10 +++++----- crates/wasi/wit/deps/sockets/udp.wit | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/wasi-http/wit/deps/sockets/udp.wit b/crates/wasi-http/wit/deps/sockets/udp.wit index 224f56a77634..c955522be705 100644 --- a/crates/wasi-http/wit/deps/sockets/udp.wit +++ b/crates/wasi-http/wit/deps/sockets/udp.wit @@ -11,9 +11,9 @@ interface udp { data: list, /// The source address. - /// + /// /// This field is guaranteed to match the remote address the stream was initialized with, if any. - /// + /// /// Equivalent to the `src_addr` out parameter of `recvfrom`. remote-address: ip-socket-address, } @@ -79,16 +79,16 @@ interface udp { /// This method may be called multiple times on the same socket to change its association, but /// only the most recently returned pair of streams will be operational. Implementations may trap if /// the streams returned by a previous invocation haven't been dropped yet before calling `stream` again. - /// + /// /// The POSIX equivalent in pseudo-code is: - /// ``` + /// /// if (was previously connected) { /// connect(s, AF_UNSPEC) /// } /// if (remote_address is Some) { /// connect(s, remote_address) /// } - /// ``` + /// /// /// Unlike in POSIX, the socket must already be explicitly bound. /// diff --git a/crates/wasi/wit/deps/sockets/udp.wit b/crates/wasi/wit/deps/sockets/udp.wit index 224f56a77634..c955522be705 100644 --- a/crates/wasi/wit/deps/sockets/udp.wit +++ b/crates/wasi/wit/deps/sockets/udp.wit @@ -11,9 +11,9 @@ interface udp { data: list, /// The source address. - /// + /// /// This field is guaranteed to match the remote address the stream was initialized with, if any. - /// + /// /// Equivalent to the `src_addr` out parameter of `recvfrom`. remote-address: ip-socket-address, } @@ -79,16 +79,16 @@ interface udp { /// This method may be called multiple times on the same socket to change its association, but /// only the most recently returned pair of streams will be operational. Implementations may trap if /// the streams returned by a previous invocation haven't been dropped yet before calling `stream` again. - /// + /// /// The POSIX equivalent in pseudo-code is: - /// ``` + /// /// if (was previously connected) { /// connect(s, AF_UNSPEC) /// } /// if (remote_address is Some) { /// connect(s, remote_address) /// } - /// ``` + /// /// /// Unlike in POSIX, the socket must already be explicitly bound. /// From 4fe5054fd0a41b0875573b83fa0c9e8ea2a6003d Mon Sep 17 00:00:00 2001 From: Dave Bakker Date: Mon, 23 Oct 2023 22:09:51 +0200 Subject: [PATCH 5/6] Align more closely to wasi-io's input&output-stream --- crates/test-programs/src/sockets.rs | 26 +++-- crates/wasi-http/wit/deps/sockets/udp.wit | 33 ++++-- crates/wasi/src/preview2/error.rs | 7 ++ crates/wasi/src/preview2/host/udp.rs | 124 ++++++++++++++++++---- crates/wasi/src/preview2/udp.rs | 32 ++---- crates/wasi/wit/deps/sockets/udp.wit | 33 ++++-- 6 files changed, 190 insertions(+), 65 deletions(-) diff --git a/crates/test-programs/src/sockets.rs b/crates/test-programs/src/sockets.rs index a065312c130f..3033582b6066 100644 --- a/crates/test-programs/src/sockets.rs +++ b/crates/test-programs/src/sockets.rs @@ -147,16 +147,29 @@ impl UdpSocket { } impl OutgoingDatagramStream { + fn blocking_check_send(&self, timeout: &Pollable) -> Result { + let sub = self.subscribe(); + + loop { + match self.check_send() { + Ok(0) => sub.wait_until(timeout)?, + result => return result, + } + } + } + pub fn blocking_send(&self, mut datagrams: &[OutgoingDatagram]) -> Result<(), ErrorCode> { let timeout = monotonic_clock::subscribe(TIMEOUT_NS, false); - let pollable = self.subscribe(); while !datagrams.is_empty() { - match self.send(datagrams) { + let permit = self.blocking_check_send(&timeout)?; + let chunk_len = datagrams.len().min(permit as usize); + match self.send(&datagrams[..chunk_len]) { + Ok(0) => {} Ok(packets_sent) => { - datagrams = &datagrams[(packets_sent as usize)..]; + let packets_sent = packets_sent as usize; + datagrams = &datagrams[packets_sent..]; } - Err(ErrorCode::WouldBlock) => pollable.wait_until(&timeout)?, Err(err) => return Err(err), } } @@ -176,11 +189,6 @@ impl IncomingDatagramStream { Ok(mut chunk) => { datagrams.append(&mut chunk); - if datagrams.len() >= count.start as usize { - return Ok(datagrams); - } - } - Err(ErrorCode::WouldBlock) => { if datagrams.len() >= count.start as usize { return Ok(datagrams); } else { diff --git a/crates/wasi-http/wit/deps/sockets/udp.wit b/crates/wasi-http/wit/deps/sockets/udp.wit index c955522be705..232c2b41c7fb 100644 --- a/crates/wasi-http/wit/deps/sockets/udp.wit +++ b/crates/wasi-http/wit/deps/sockets/udp.wit @@ -21,8 +21,6 @@ interface udp { /// A datagram to be sent out. record outgoing-datagram { /// The payload. - /// - /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. data: list, /// The destination address. @@ -81,14 +79,14 @@ interface udp { /// the streams returned by a previous invocation haven't been dropped yet before calling `stream` again. /// /// The POSIX equivalent in pseudo-code is: - /// + /// ```text /// if (was previously connected) { /// connect(s, AF_UNSPEC) /// } /// if (remote_address is Some) { /// connect(s, remote_address) /// } - /// + /// ``` /// /// Unlike in POSIX, the socket must already be explicitly bound. /// @@ -185,11 +183,14 @@ interface udp { /// /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. /// The returned list may contain fewer elements than requested, but never more. - /// If `max-results` is 0, this function returns successfully with an empty list. + /// + /// This function returns successfully with an empty list when either: + /// - `max-results` is 0, or: + /// - `max-results` is greater than 0, but no results are immediately available. + /// This function never returns `error(would-block)`. /// /// # Typical errors /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) - /// - `would-block`: There is no pending data available to be read at the moment. (EWOULDBLOCK, EAGAIN) /// /// # References /// - @@ -210,10 +211,24 @@ interface udp { } resource outgoing-datagram-stream { + /// Check readiness for sending. This function never blocks. + /// + /// Returns the number of datagrams permitted for the next call to `send`, + /// or an error. Calling `send` with more datagrams than this function has + /// permitted will trap. + /// + /// When this function returns ok(0), the `subscribe` pollable will + /// become ready when this function will report at least ok(1), or an + /// error. + /// + /// Never returns `would-block`. + check-send: func() -> result; + /// Send messages on the socket. /// /// This function attempts to send all provided `datagrams` on the socket without blocking and - /// returns how many messages were actually sent (or queued for sending). + /// returns how many messages were actually sent (or queued for sending). This function never + /// returns `error(would-block)`. If none of the datagrams were able to be sent, `ok(0)` is returned. /// /// This function semantically behaves the same as iterating the `datagrams` list and sequentially /// sending each individual datagram until either the end of the list has been reached or the first error occurred. @@ -221,6 +236,9 @@ interface udp { /// /// If the input list is empty, the function returns `ok(0)`. /// + /// Each call to `send` must be permitted by a preceding `check-send`. Implementations must trap if + /// either `check-send` was not called or `datagrams` contains more items than `check-send` permitted. + /// /// # Typical errors /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) @@ -230,7 +248,6 @@ interface udp { /// - `invalid-argument`: The socket is not "connected" and no value for `remote-address` was provided. (EDESTADDRREQ) /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) - /// - `would-block`: The send buffer is currently full. (EWOULDBLOCK, EAGAIN) /// /// # References /// - diff --git a/crates/wasi/src/preview2/error.rs b/crates/wasi/src/preview2/error.rs index ccae912d4668..f92317c1fef7 100644 --- a/crates/wasi/src/preview2/error.rs +++ b/crates/wasi/src/preview2/error.rs @@ -61,6 +61,13 @@ impl TrappableError { { self.err.downcast() } + + pub fn downcast_ref(&self) -> Option<&T> + where + T: Error + Send + Sync + 'static, + { + self.err.downcast_ref() + } } impl From for TrappableError diff --git a/crates/wasi/src/preview2/host/udp.rs b/crates/wasi/src/preview2/host/udp.rs index 03462659e9ba..b140ec2a75ce 100644 --- a/crates/wasi/src/preview2/host/udp.rs +++ b/crates/wasi/src/preview2/host/udp.rs @@ -5,20 +5,23 @@ use crate::preview2::{ sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network}, sockets::udp, }, - udp::{IncomingDatagramStream, OutgoingDatagramStream, UdpState}, + udp::{IncomingDatagramStream, OutgoingDatagramStream, SendState, UdpState}, + Subscribe, }; use crate::preview2::{Pollable, SocketError, SocketResult, WasiView}; use anyhow::anyhow; +use async_trait::async_trait; use cap_net_ext::{AddressFamily, PoolExt}; use io_lifetimes::AsSocketlike; use rustix::io::Errno; use rustix::net::sockopt; +use tokio::io::Interest; use wasmtime::component::Resource; /// Theoretical maximum byte size of a UDP datagram, the real limit is lower, /// but we do not account for e.g. the transport layer here for simplicity. /// In practice, datagrams are typically less than 1500 bytes. -const MAX_UDP_DATAGRAM_SIZE: usize = 65535; +const MAX_UDP_DATAGRAM_SIZE: usize = u16::MAX as usize; impl udp::Host for T {} @@ -113,6 +116,7 @@ impl udp::HostUdpSocket for T { let outgoing_stream = OutgoingDatagramStream { inner: socket.inner.clone(), remote_address, + send_state: SendState::Idle, }; Ok(( @@ -261,28 +265,31 @@ impl udp::HostIncomingDatagramStream for T { this: Resource, max_results: u64, ) -> SocketResult> { - fn recv_one(stream: &IncomingDatagramStream) -> SocketResult { + // Returns Ok(None) when the message was dropped. + fn recv_one( + stream: &IncomingDatagramStream, + ) -> SocketResult> { let mut buf = [0; MAX_UDP_DATAGRAM_SIZE]; let (size, received_addr) = stream.inner.try_recv_from(&mut buf)?; match stream.remote_address { Some(connected_addr) if connected_addr != received_addr => { // Normally, this should have already been checked for us by the OS. - // Drop message... - return Err(ErrorCode::WouldBlock.into()); + return Ok(None); } _ => {} } // FIXME: check permission to receive from `received_addr`. - Ok(udp::IncomingDatagram { + Ok(Some(udp::IncomingDatagram { data: buf[..size].into(), remote_address: received_addr.into(), - }) + })) } let table = self.table(); - let socket = table.get(&this)?; + let stream = table.get(&this)?; + let max_results: usize = max_results.try_into().unwrap_or(usize::MAX); if max_results == 0 { return Ok(vec![]); @@ -290,15 +297,23 @@ impl udp::HostIncomingDatagramStream for T { let mut datagrams = vec![]; - for _ in 0..max_results { - match recv_one(socket) { - Ok(datagram) => { + while datagrams.len() < max_results { + match recv_one(stream) { + Ok(Some(datagram)) => { datagrams.push(datagram); } - Err(_e) if datagrams.len() > 0 => { + Ok(None) => { + // Message was dropped + } + Err(_) if datagrams.len() > 0 => { return Ok(datagrams); } - Err(e) => return Err(e), + Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => { + return Ok(datagrams); + } + Err(e) => { + return Err(e); + } } } @@ -324,7 +339,35 @@ impl udp::HostIncomingDatagramStream for T { } } +#[async_trait] +impl Subscribe for IncomingDatagramStream { + async fn ready(&mut self) { + // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. + self.inner + .ready(Interest::READABLE) + .await + .expect("failed to await UDP socket readiness"); + } +} + impl udp::HostOutgoingDatagramStream for T { + fn check_send(&mut self, this: Resource) -> SocketResult { + let table = self.table_mut(); + let stream = table.get_mut(&this)?; + + let permit = match stream.send_state { + SendState::Idle => { + const PERMIT: usize = 16; + stream.send_state = SendState::Permitted(PERMIT); + PERMIT + } + SendState::Permitted(n) => n, + SendState::Waiting => 0, + }; + + Ok(permit.try_into().unwrap()) + } + fn send( &mut self, this: Resource, @@ -334,6 +377,10 @@ impl udp::HostOutgoingDatagramStream for T { stream: &OutgoingDatagramStream, datagram: &udp::OutgoingDatagram, ) -> SocketResult<()> { + if datagram.data.len() > MAX_UDP_DATAGRAM_SIZE { + return Err(ErrorCode::DatagramTooLarge.into()); + } + let provided_addr = datagram.remote_address.map(SocketAddr::from); let addr = match (stream.remote_address, provided_addr) { (None, Some(addr)) => addr, @@ -350,8 +397,24 @@ impl udp::HostOutgoingDatagramStream for T { Ok(()) } - let table = self.table(); - let socket = table.get(&this)?; + let table = self.table_mut(); + let stream = table.get_mut(&this)?; + + match stream.send_state { + SendState::Permitted(n) if n >= datagrams.len() => { + stream.send_state = SendState::Idle; + } + SendState::Permitted(_) => { + return Err(SocketError::trap(anyhow::anyhow!( + "unpermitted: argument exceeds permitted size" + ))) + } + SendState::Idle | SendState::Waiting => { + return Err(SocketError::trap(anyhow::anyhow!( + "unpermitted: must call check-send first" + ))) + } + } if datagrams.is_empty() { return Ok(0); @@ -360,13 +423,19 @@ impl udp::HostOutgoingDatagramStream for T { let mut count = 0; for datagram in datagrams { - match send_one(socket, &datagram) { - Ok(_size) => count += 1, - Err(_e) if count > 0 => { + match send_one(stream, &datagram) { + Ok(_) => count += 1, + Err(_) if count > 0 => { // WIT: "If at least one datagram has been sent successfully, this function never returns an error." return Ok(count); } - Err(e) => return Err(e.into()), + Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => { + stream.send_state = SendState::Waiting; + return Ok(count); + } + Err(e) => { + return Err(e); + } } } @@ -391,3 +460,20 @@ impl udp::HostOutgoingDatagramStream for T { Ok(()) } } + +#[async_trait] +impl Subscribe for OutgoingDatagramStream { + async fn ready(&mut self) { + match self.send_state { + SendState::Idle | SendState::Permitted(_) => {} + SendState::Waiting => { + // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. + self.inner + .ready(Interest::WRITABLE) + .await + .expect("failed to await UDP socket readiness"); + self.send_state = SendState::Idle; + } + } + } +} diff --git a/crates/wasi/src/preview2/udp.rs b/crates/wasi/src/preview2/udp.rs index bfb0b9dca7f4..504dba0b17f5 100644 --- a/crates/wasi/src/preview2/udp.rs +++ b/crates/wasi/src/preview2/udp.rs @@ -6,7 +6,6 @@ use io_lifetimes::raw::{FromRawSocketlike, IntoRawSocketlike}; use std::io; use std::net::SocketAddr; use std::sync::Arc; -use tokio::io::Interest; /// The state of a UDP socket. /// @@ -84,31 +83,22 @@ pub struct IncomingDatagramStream { pub(crate) remote_address: Option, } -#[async_trait] -impl Subscribe for IncomingDatagramStream { - async fn ready(&mut self) { - // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. - self.inner - .ready(Interest::READABLE) - .await - .expect("failed to await UDP socket readiness"); - } -} - pub struct OutgoingDatagramStream { pub(crate) inner: Arc, /// If this has a value, the stream is "connected". pub(crate) remote_address: Option, + + pub(crate) send_state: SendState, } -#[async_trait] -impl Subscribe for OutgoingDatagramStream { - async fn ready(&mut self) { - // FIXME: Add `Interest::ERROR` when we update to tokio 1.32. - self.inner - .ready(Interest::WRITABLE) - .await - .expect("failed to await UDP socket readiness"); - } +pub(crate) enum SendState { + /// Waiting for the API consumer to call `check-send`. + Idle, + + /// Ready to send up to x datagrams. + Permitted(usize), + + /// Waiting for the OS. + Waiting, } diff --git a/crates/wasi/wit/deps/sockets/udp.wit b/crates/wasi/wit/deps/sockets/udp.wit index c955522be705..232c2b41c7fb 100644 --- a/crates/wasi/wit/deps/sockets/udp.wit +++ b/crates/wasi/wit/deps/sockets/udp.wit @@ -21,8 +21,6 @@ interface udp { /// A datagram to be sent out. record outgoing-datagram { /// The payload. - /// - /// Theoretical max size: ~64 KiB. In practice, typically less than 1500 bytes. data: list, /// The destination address. @@ -81,14 +79,14 @@ interface udp { /// the streams returned by a previous invocation haven't been dropped yet before calling `stream` again. /// /// The POSIX equivalent in pseudo-code is: - /// + /// ```text /// if (was previously connected) { /// connect(s, AF_UNSPEC) /// } /// if (remote_address is Some) { /// connect(s, remote_address) /// } - /// + /// ``` /// /// Unlike in POSIX, the socket must already be explicitly bound. /// @@ -185,11 +183,14 @@ interface udp { /// /// This function attempts to receive up to `max-results` datagrams on the socket without blocking. /// The returned list may contain fewer elements than requested, but never more. - /// If `max-results` is 0, this function returns successfully with an empty list. + /// + /// This function returns successfully with an empty list when either: + /// - `max-results` is 0, or: + /// - `max-results` is greater than 0, but no results are immediately available. + /// This function never returns `error(would-block)`. /// /// # Typical errors /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) - /// - `would-block`: There is no pending data available to be read at the moment. (EWOULDBLOCK, EAGAIN) /// /// # References /// - @@ -210,10 +211,24 @@ interface udp { } resource outgoing-datagram-stream { + /// Check readiness for sending. This function never blocks. + /// + /// Returns the number of datagrams permitted for the next call to `send`, + /// or an error. Calling `send` with more datagrams than this function has + /// permitted will trap. + /// + /// When this function returns ok(0), the `subscribe` pollable will + /// become ready when this function will report at least ok(1), or an + /// error. + /// + /// Never returns `would-block`. + check-send: func() -> result; + /// Send messages on the socket. /// /// This function attempts to send all provided `datagrams` on the socket without blocking and - /// returns how many messages were actually sent (or queued for sending). + /// returns how many messages were actually sent (or queued for sending). This function never + /// returns `error(would-block)`. If none of the datagrams were able to be sent, `ok(0)` is returned. /// /// This function semantically behaves the same as iterating the `datagrams` list and sequentially /// sending each individual datagram until either the end of the list has been reached or the first error occurred. @@ -221,6 +236,9 @@ interface udp { /// /// If the input list is empty, the function returns `ok(0)`. /// + /// Each call to `send` must be permitted by a preceding `check-send`. Implementations must trap if + /// either `check-send` was not called or `datagrams` contains more items than `check-send` permitted. + /// /// # Typical errors /// - `invalid-argument`: The `remote-address` has the wrong address family. (EAFNOSUPPORT) /// - `invalid-argument`: `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) @@ -230,7 +248,6 @@ interface udp { /// - `invalid-argument`: The socket is not "connected" and no value for `remote-address` was provided. (EDESTADDRREQ) /// - `remote-unreachable`: The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) /// - `datagram-too-large`: The datagram is too large. (EMSGSIZE) - /// - `would-block`: The send buffer is currently full. (EWOULDBLOCK, EAGAIN) /// /// # References /// - From 585c5500260cff17acb011b0a85b6f3f516a363d Mon Sep 17 00:00:00 2001 From: Dave Bakker Date: Tue, 24 Oct 2023 21:13:57 +0200 Subject: [PATCH 6/6] Use `send` instead of `sendto` on connected sockets. prtest:full --- crates/wasi/src/preview2/host/udp.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/wasi/src/preview2/host/udp.rs b/crates/wasi/src/preview2/host/udp.rs index b140ec2a75ce..a97bd1bcd187 100644 --- a/crates/wasi/src/preview2/host/udp.rs +++ b/crates/wasi/src/preview2/host/udp.rs @@ -392,7 +392,11 @@ impl udp::HostOutgoingDatagramStream for T { }; // FIXME: check permission to send to `addr`. - stream.inner.try_send_to(&datagram.data, addr)?; + if stream.remote_address == Some(addr) { + stream.inner.try_send(&datagram.data)?; + } else { + stream.inner.try_send_to(&datagram.data, addr)?; + } Ok(()) }