From 5a4109c23b87c76ecbcae492732fc46657034007 Mon Sep 17 00:00:00 2001 From: zonyitoo Date: Sat, 26 Jun 2021 00:43:54 +0800 Subject: [PATCH] clearing write readiness if TFO connect returns EINPROGRESS - ref #555 - imperfect until tokio-rs/tokio#3888 was merged --- Cargo.lock | 14 +-- Cargo.toml | 3 + .../src/net/sys/unix/bsd/freebsd.rs | 72 +++++------ .../shadowsocks/src/net/sys/unix/bsd/macos.rs | 56 +++++---- .../shadowsocks/src/net/sys/unix/linux/mod.rs | 113 ++++++++++-------- crates/shadowsocks/src/net/sys/windows/mod.rs | 100 ++++++++-------- 6 files changed, 194 insertions(+), 164 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9f171838d75..bca8c8dc4c5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -531,9 +531,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" dependencies = [ "libc", ] @@ -1015,9 +1015,9 @@ checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" [[package]] name = "openssl-sys" -version = "0.9.64" +version = "0.9.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "209efc2fe0e980c8849efacdb567f975a1c80245c4f6980d6f012733bfa851af" +checksum = "7a7907e3bfa08bb85105209cdfcb6c63d109f8f6c1ed6ca318fff5c1853fbc1d" dependencies = [ "autocfg", "cc", @@ -1814,8 +1814,7 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fb2ed024293bb19f7a5dc54fe83bf86532a44c12a2bb8ba40d64a4509395ca2" +source = "git+https://github.com/zonyitoo/tokio.git#f051b2cae1eede47f3ef1b0b53097104f05097e9" dependencies = [ "autocfg", "bytes", @@ -1844,8 +1843,7 @@ dependencies = [ [[package]] name = "tokio-macros" version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" +source = "git+https://github.com/zonyitoo/tokio.git#f051b2cae1eede47f3ef1b0b53097104f05097e9" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 6dcfe64c690c..f119462531f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,3 +141,6 @@ byteorder = "1.3" env_logger = "0.8" byte_string = "1.0" tokio = { version = "1", features = ["net", "time", "macros", "io-util"]} + +[patch.crates-io] +tokio = { git = "https://github.com/zonyitoo/tokio.git" } diff --git a/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs b/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs index cd4c6a523002..224dc5e92211 100644 --- a/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs +++ b/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs @@ -29,7 +29,7 @@ enum TcpStreamState { } /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -107,59 +107,59 @@ impl AsyncRead for TcpStream { impl AsyncWrite for TcpStream { fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { loop { - let this = self.as_mut().project(); + let TcpStreamProj { inner, state } = self.as_mut().project(); + + match *state { + TcpStreamState::Connected => return inner.poll_write(cx, buf), - match this.state { TcpStreamState::FastOpenConnect(addr) => { // TCP_FASTOPEN was supported since FreeBSD 12.0 // // Example program: // - // Wait until socket is writable - ready!(this.inner.poll_write_ready(cx))?; - - unsafe { - let saddr = SockAddr::from(*addr); - - let ret = libc::sendto( - this.inner.as_raw_fd(), - buf.as_ptr() as *const libc::c_void, - buf.len(), - 0, // Yes, BSD doesn't need MSG_FASTOPEN - saddr.as_ptr(), - saddr.len(), - ); - - if ret >= 0 { - // Connect successfully. - *(this.state) = TcpStreamState::Connected; - return Ok(ret as usize).into(); - } else { - // Error occurs - let err = io::Error::last_os_error(); - - // EAGAIN, EWOULDBLOCK - if err.kind() != ErrorKind::WouldBlock { + let saddr = SockAddr::from(addr); + + let stream = inner.get_mut(); + let n = ready!(stream.poll_write_io(cx, || { + unsafe { + let ret = libc::sendto( + stream.as_raw_fd(), + buf.as_ptr() as *const libc::c_void, + buf.len(), + 0, // Yes, BSD doesn't need MSG_FASTOPEN + saddr.as_ptr(), + saddr.len(), + ); + + if ret >= 0 { + Ok(ret as usize) + } else { + // Error occurs + let err = io::Error::last_os_error(); + // EINPROGRESS if let Some(libc::EINPROGRESS) = err.raw_os_error() { // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). // // So in this state. We have to loop again to call `poll_write` for sending the first packet. - *(this.state) = TcpStreamState::Connected; + *state = TcpStreamState::Connected; + + // Let `poll_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) } else { - // Other errors - return Err(err).into(); + // Other errors, including EAGAIN, EWOULDBLOCK + Err(err) } - } else { - // Pending on poll_write_ready } } - } - } + }))?; - TcpStreamState::Connected => return this.inner.poll_write(cx, buf), + // Connect successfully with fast open + *state = TcpStreamState::Connected; + return Ok(n).into(); + } } } } diff --git a/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs b/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs index dce91dfc731f..c2831ea8f10e 100644 --- a/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs +++ b/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs @@ -30,7 +30,7 @@ enum TcpStreamState { } /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -119,9 +119,11 @@ impl AsyncRead for TcpStream { impl AsyncWrite for TcpStream { fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { loop { - let this = self.as_mut().project(); + let TcpStreamProj { inner, state } = self.as_mut().project(); + + match *state { + TcpStreamState::Connected => return inner.poll_write(cx, buf), - match this.state { TcpStreamState::FastOpenWrite => { // `CONNECT_RESUME_ON_READ_WRITE` is set when calling `connectx`, // so the first call of `send` will perform the actual SYN with TFO cookie. @@ -129,30 +131,38 @@ impl AsyncWrite for TcpStream { // (NOT SURE) If remote server doesn't support TFO or this is the first connection, // it may return EINPROGRESS just like other platforms (Linux, FreeBSD). - match ready!(this.inner.poll_write(cx, buf)) { - Ok(n) => { - *(this.state) = TcpStreamState::Connected; - return Ok(n).into(); - } - Err(err) => { - // EAGAIN and EWOULDBLOCK should have been handled by tokio - // - // EINPROGRESS - if let Some(libc::EINPROGRESS) = err.raw_os_error() { - // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. - // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). - // - // So in this state. We have to loop again to call `poll_write` for sending the first packet. - *(this.state) = TcpStreamState::Connected; + let stream = inner.get_mut(); + let n = ready!(stream.poll_write_io(cx, || { + unsafe { + let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0); + if ret >= 0 { + Ok(ret as usize) } else { - // Other errors - return Err(err).into(); + let err = io::Error::last_os_error(); + // EAGAIN and EWOULDBLOCK should have been handled by tokio + // + // EINPROGRESS + if let Some(libc::EINPROGRESS) = err.raw_os_error() { + // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. + // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). + // + // So in this state. We have to loop again to call `poll_write` for sending the first packet. + *state = TcpStreamState::Connected; + + // Let `poll_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) + } else { + // Other errors, including EAGAIN + Err(err) + } } } - } - } + }))?; - TcpStreamState::Connected => return this.inner.poll_write(cx, buf), + // Connected successfully with fast open + *state = TcpStreamState::Connected; + return Ok(n).into(); + } } } } diff --git a/crates/shadowsocks/src/net/sys/unix/linux/mod.rs b/crates/shadowsocks/src/net/sys/unix/linux/mod.rs index b4739927475d..3a3eea3307c8 100644 --- a/crates/shadowsocks/src/net/sys/unix/linux/mod.rs +++ b/crates/shadowsocks/src/net/sys/unix/linux/mod.rs @@ -32,7 +32,7 @@ enum TcpStreamState { } /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -179,81 +179,94 @@ impl AsyncRead for TcpStream { impl AsyncWrite for TcpStream { fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { loop { - let this = self.as_mut().project(); + let TcpStreamProj { inner, state } = self.as_mut().project(); + + match *state { + TcpStreamState::Connected => return inner.poll_write(cx, buf), - match this.state { TcpStreamState::FastOpenConnect(addr) => { // Fallback mode. Must be kernal < 4.11 // // Uses sendto as BSD-like systems - // Wait until socket is writable - ready!(this.inner.poll_write_ready(cx))?; - - unsafe { - let saddr = SockAddr::from(*addr); - - let ret = libc::sendto( - this.inner.as_raw_fd(), - buf.as_ptr() as *const libc::c_void, - buf.len(), - libc::MSG_FASTOPEN, - saddr.as_ptr(), - saddr.len(), - ); - - if ret >= 0 { - // Connect successfully. - *(this.state) = TcpStreamState::Connected; - return Ok(ret as usize).into(); - } else { - // Error occurs - let err = io::Error::last_os_error(); - - // EAGAIN, EWOULDBLOCK - if err.kind() != ErrorKind::WouldBlock { + let saddr = SockAddr::from(addr); + + let stream = inner.get_mut(); + let n = ready!(stream.poll_write_io(cx, || { + unsafe { + let ret = libc::sendto( + stream.as_raw_fd(), + buf.as_ptr() as *const libc::c_void, + buf.len(), + libc::MSG_FASTOPEN, + saddr.as_ptr(), + saddr.len(), + ); + + if ret >= 0 { + Ok(ret as usize) + } else { + // Error occurs + let err = io::Error::last_os_error(); + // EINPROGRESS if let Some(libc::EINPROGRESS) = err.raw_os_error() { // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). // // So in this state. We have to loop again to call `poll_write` for sending the first packet. - *(this.state) = TcpStreamState::Connected; + *state = TcpStreamState::Connected; + + // Let `try_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) } else { - // Other errors - return Err(err).into(); + // Other errors, including EAGAIN, EWOULDBLOCK + Err(err) } - } else { - // Pending on poll_write_ready } } - } + }))?; + + // Connect successfully with fast open + *state = TcpStreamState::Connected; + return Ok(n).into(); } TcpStreamState::FastOpenWrite => { // First `write` after `TCP_FASTOPEN_CONNECT` // Kernel >= 4.11 - match ready!(this.inner.poll_write(cx, buf)) { - Ok(n) => { - *(this.state) = TcpStreamState::Connected; - return Ok(n).into(); - } - Err(err) => { - // EAGAIN and EWOULDBLOCK should have been handled by tokio - // - // EINPROGRESS - if let Some(libc::EINPROGRESS) = err.raw_os_error() { - // loop again to call `poll_write` for sending the first packet - *(this.state) = TcpStreamState::Connected; + let stream = inner.get_mut(); + let n = ready!(stream.poll_write_io(cx, || { + unsafe { + let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0); + + if ret >= 0 { + Ok(ret as usize) } else { - return Err(err).into(); + let err = io::Error::last_os_error(); + // EINPROGRESS + if let Some(libc::EINPROGRESS) = err.raw_os_error() { + // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. + // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). + // + // So in this state. We have to loop again to call `poll_write` for sending the first packet. + *state = TcpStreamState::Connected; + + // Let `poll_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) + } else { + // Other errors, including EAGAIN, EWOULDBLOCK + Err(err) + } } } - } - } + }))?; - TcpStreamState::Connected => return this.inner.poll_write(cx, buf), + // Connect successfully with fast open + *state = TcpStreamState::Connected; + return Ok(n).into(); + } } } } diff --git a/crates/shadowsocks/src/net/sys/windows/mod.rs b/crates/shadowsocks/src/net/sys/windows/mod.rs index 96d272b5f247..e7f4e0936d53 100644 --- a/crates/shadowsocks/src/net/sys/windows/mod.rs +++ b/crates/shadowsocks/src/net/sys/windows/mod.rs @@ -112,7 +112,7 @@ unsafe impl Send for TcpStreamState {} unsafe impl Sync for TcpStreamState {} /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -228,22 +228,22 @@ fn set_update_connect_context(sock: SOCKET) -> io::Result<()> { } impl AsyncWrite for TcpStream { - fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - let this = self.project(); - + fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { loop { - match this.state { - TcpStreamState::Connected => { - return this.inner.poll_write(cx, buf); - } + let TcpStreamProj { inner, state } = self.as_mut().project(); + + match *state { + TcpStreamState::Connected => return inner.poll_write(cx, buf), + TcpStreamState::FastOpenConnect(addr) => { + let saddr = SockAddr::from(addr); + unsafe { // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex let connect_ex = PFN_CONNECTEX_OPT .expect("LPFN_CONNECTEX function doesn't exist. It is only supported after Windows 10"); - let saddr = SockAddr::from(*addr); - let sock = this.inner.as_raw_socket() as SOCKET; + let sock = inner.as_raw_socket() as SOCKET; let mut overlapped: Box = Box::new(mem::zeroed()); @@ -266,7 +266,7 @@ impl AsyncWrite for TcpStream { debug_assert!(bytes_sent as usize <= buf.len()); - *(this.state) = TcpStreamState::Connected; + *state = TcpStreamState::Connected; return Ok(bytes_sent as usize).into(); } @@ -276,47 +276,53 @@ impl AsyncWrite for TcpStream { } // ConnectEx pending (ERROR_IO_PENDING), check later in FastOpenConnecting - *(this.state) = TcpStreamState::FastOpenConnecting(overlapped); + *state = TcpStreamState::FastOpenConnecting(overlapped); } } - TcpStreamState::FastOpenConnecting(ref mut overlapped) => { - // Wait until socket is writable - ready!(this.inner.poll_write_ready(cx))?; - - unsafe { - let sock = this.inner.as_raw_socket() as SOCKET; - - let mut bytes_sent: DWORD = 0; - let mut flags: DWORD = 0; - - // Fetch ConnectEx's result in a non-blocking way. - let ret: BOOL = WSAGetOverlappedResult( - sock, - overlapped.as_mut() as LPOVERLAPPED, - &mut bytes_sent as LPDWORD, - FALSE, // fWait = false, non-blocking, returns WSA_IO_INCOMPLETE - &mut flags as LPDWORD, - ); - - if ret == TRUE { - // Get ConnectEx's result successfully. Socket is connected - - // Make getpeername() works - set_update_connect_context(sock)?; - - debug_assert!(bytes_sent as usize <= buf.len()); - *(this.state) = TcpStreamState::Connected; - return Ok(bytes_sent as usize).into(); + TcpStreamState::FastOpenConnecting(ref mut overlapped) => { + let n = ready!(inner.poll_write_io(cx, || { + unsafe { + let sock = inner.as_raw_socket() as SOCKET; + + let mut bytes_sent: DWORD = 0; + let mut flags: DWORD = 0; + + // Fetch ConnectEx's result in a non-blocking way. + let ret: BOOL = WSAGetOverlappedResult( + sock, + overlapped.as_mut() as LPOVERLAPPED, + &mut bytes_sent as LPDWORD, + FALSE, // fWait = false, non-blocking, returns WSA_IO_INCOMPLETE + &mut flags as LPDWORD, + ); + + if ret == TRUE { + // Get ConnectEx's result successfully. Socket is connected + + // Make getpeername() works + set_update_connect_context(sock)?; + + debug_assert!(bytes_sent as usize <= buf.len()); + + return Ok(bytes_sent as usize); + } + + let err = WSAGetLastError(); + if err == WSA_IO_INCOMPLETE { + // ConnectEx is still not connected. Wait for the next round + // + // Let `try_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) + } else { + Err(io::Error::from_raw_os_error(err)) + } } + }))?; - let err = WSAGetLastError(); - if err == WSA_IO_INCOMPLETE { - // ConnectEx is still not connected. Wait for the next round - } else { - return Err(io::Error::from_raw_os_error(err)).into(); - } - } + // Connect successfully with fast open + *state = TcpStreamState::Connected; + return Ok(n).into(); } } }