From 093d640ad79d3eae43a57a0439174f0a470f7670 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 18 Jun 2020 12:11:37 +0200 Subject: [PATCH 1/3] fix(net): ensure the reactor and runtime are running If this is not done, then reactor is not running, resulting in the sockets not actually connecting. Closes #818 --- src/net/tcp/listener.rs | 4 ++++ src/net/tcp/stream.rs | 4 ++++ src/net/udp/mod.rs | 4 ++++ src/os/unix/net/datagram.rs | 8 ++++++++ src/os/unix/net/listener.rs | 11 ++++++++++- src/os/unix/net/stream.rs | 10 +++++++++- 6 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 72c5d3a80..09f5812fb 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -75,6 +75,8 @@ impl TcpListener { /// /// [`local_addr`]: #method.local_addr pub async fn bind(addrs: A) -> io::Result { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + let mut last_err = None; let addrs = addrs.to_socket_addrs().await?; @@ -200,6 +202,8 @@ impl<'a> Stream for Incoming<'a> { impl From for TcpListener { /// Converts a `std::net::TcpListener` into its asynchronous equivalent. fn from(listener: std::net::TcpListener) -> TcpListener { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + TcpListener { watcher: Async::new(listener).expect("TcpListener is known to be good"), } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index b854143ff..63232fa35 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -71,6 +71,8 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub async fn connect(addrs: A) -> io::Result { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + let mut last_err = None; let addrs = addrs.to_socket_addrs().await?; @@ -356,6 +358,8 @@ impl Write for &TcpStream { impl From for TcpStream { /// Converts a `std::net::TcpStream` into its asynchronous equivalent. fn from(stream: std::net::TcpStream) -> TcpStream { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + TcpStream { watcher: Arc::new(Async::new(stream).expect("TcpStream is known to be good")), } diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index 18f6fc700..d361a6fce 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -68,6 +68,8 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn bind(addrs: A) -> io::Result { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + let mut last_err = None; let addrs = addrs.to_socket_addrs().await?; @@ -479,6 +481,8 @@ impl UdpSocket { impl From for UdpSocket { /// Converts a `std::net::UdpSocket` into its asynchronous equivalent. fn from(socket: std::net::UdpSocket) -> UdpSocket { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + UdpSocket { watcher: Async::new(socket).expect("UdpSocket is known to be good"), } diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index 6a30b0279..52c6b07f1 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -45,6 +45,8 @@ pub struct UnixDatagram { impl UnixDatagram { fn new(socket: StdUnixDatagram) -> UnixDatagram { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + UnixDatagram { watcher: Async::new(socket).expect("UnixDatagram is known to be good"), } @@ -64,6 +66,8 @@ impl UnixDatagram { /// # Ok(()) }) } /// ``` pub async fn bind>(path: P) -> io::Result { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + let path = path.as_ref().to_owned(); let socket = Async::::bind(path)?; Ok(UnixDatagram { watcher: socket }) @@ -305,6 +309,8 @@ impl fmt::Debug for UnixDatagram { impl From for UnixDatagram { /// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent. fn from(datagram: StdUnixDatagram) -> UnixDatagram { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + UnixDatagram { watcher: Async::new(datagram).expect("UnixDatagram is known to be good"), } @@ -319,6 +325,8 @@ impl AsRawFd for UnixDatagram { impl FromRawFd for UnixDatagram { unsafe fn from_raw_fd(fd: RawFd) -> UnixDatagram { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + let raw = StdUnixDatagram::from_raw_fd(fd); let datagram = Async::::new(raw).expect("invalid file descriptor"); UnixDatagram { watcher: datagram } diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index ac033075d..a63bd4b65 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -68,6 +68,8 @@ impl UnixListener { /// # Ok(()) }) } /// ``` pub async fn bind>(path: P) -> io::Result { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + let path = path.as_ref().to_owned(); let listener = Async::::bind(path)?; @@ -93,7 +95,12 @@ impl UnixListener { pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { let (stream, addr) = self.watcher.accept().await?; - Ok((UnixStream { watcher: Arc::new(stream) }, addr)) + Ok(( + UnixStream { + watcher: Arc::new(stream), + }, + addr, + )) } /// Returns a stream of incoming connections. @@ -187,6 +194,8 @@ impl Stream for Incoming<'_> { impl From for UnixListener { /// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent. fn from(listener: StdUnixListener) -> UnixListener { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + UnixListener { watcher: Async::new(listener).expect("UnixListener is known to be good"), } diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index b1ba5bca0..74bd6aef9 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -57,6 +57,8 @@ impl UnixStream { /// # Ok(()) }) } /// ``` pub async fn connect>(path: P) -> io::Result { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + let path = path.as_ref().to_owned(); let stream = Arc::new(Async::::connect(path).await?); @@ -79,6 +81,8 @@ impl UnixStream { /// # Ok(()) }) } /// ``` pub fn pair() -> io::Result<(UnixStream, UnixStream)> { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + let (a, b) = Async::::pair()?; let a = UnixStream { watcher: Arc::new(a), @@ -224,8 +228,12 @@ impl fmt::Debug for UnixStream { impl From for UnixStream { /// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent. fn from(stream: StdUnixStream) -> UnixStream { + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + let stream = Async::new(stream).expect("UnixStream is known to be good"); - UnixStream { watcher: Arc::new(stream) } + UnixStream { + watcher: Arc::new(stream), + } } } From 1c1c168e1b55e01f932a38bf0629ec7467bf6162 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 18 Jun 2020 12:37:14 +0200 Subject: [PATCH 2/3] fix(timer): ensure the runtime is working for timers --- src/future/future/delay.rs | 4 ++-- src/future/timeout.rs | 4 ++-- src/io/timeout.rs | 4 ++-- src/stream/interval.rs | 6 +++--- src/stream/stream/delay.rs | 4 ++-- src/stream/stream/throttle.rs | 6 +++--- src/stream/stream/timeout.rs | 4 ++-- src/utils.rs | 7 +++++++ 8 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/future/future/delay.rs b/src/future/future/delay.rs index b6c30bcc3..092639d91 100644 --- a/src/future/future/delay.rs +++ b/src/future/future/delay.rs @@ -5,7 +5,7 @@ use std::time::Duration; use pin_project_lite::pin_project; use crate::task::{Context, Poll}; -use crate::utils::Timer; +use crate::utils::{timer_after, Timer}; pin_project! { #[doc(hidden)] @@ -20,7 +20,7 @@ pin_project! { impl DelayFuture { pub fn new(future: F, dur: Duration) -> DelayFuture { - let delay = Timer::after(dur); + let delay = timer_after(dur); DelayFuture { future, delay } } diff --git a/src/future/timeout.rs b/src/future/timeout.rs index 4a9d93c7f..384662149 100644 --- a/src/future/timeout.rs +++ b/src/future/timeout.rs @@ -7,7 +7,7 @@ use std::time::Duration; use pin_project_lite::pin_project; use crate::task::{Context, Poll}; -use crate::utils::Timer; +use crate::utils::{timer_after, Timer}; /// Awaits a future or times out after a duration of time. /// @@ -51,7 +51,7 @@ impl TimeoutFuture { pub(super) fn new(future: F, dur: Duration) -> TimeoutFuture { TimeoutFuture { future, - delay: Timer::after(dur), + delay: timer_after(dur), } } } diff --git a/src/io/timeout.rs b/src/io/timeout.rs index ce33fea1d..073c2f6e9 100644 --- a/src/io/timeout.rs +++ b/src/io/timeout.rs @@ -6,7 +6,7 @@ use std::time::Duration; use pin_project_lite::pin_project; use crate::io; -use crate::utils::Timer; +use crate::utils::{timer_after, Timer}; /// Awaits an I/O future or times out after a duration of time. /// @@ -37,7 +37,7 @@ where F: Future>, { Timeout { - timeout: Timer::after(dur), + timeout: timer_after(dur), future: f, } .await diff --git a/src/stream/interval.rs b/src/stream/interval.rs index 4e5c92b02..0a7eb4807 100644 --- a/src/stream/interval.rs +++ b/src/stream/interval.rs @@ -4,7 +4,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use crate::stream::Stream; -use crate::utils::Timer; +use crate::utils::{timer_after, Timer}; /// Creates a new stream that yields at a set interval. /// @@ -45,7 +45,7 @@ use crate::utils::Timer; #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub fn interval(dur: Duration) -> Interval { Interval { - delay: Timer::after(dur), + delay: timer_after(dur), interval: dur, } } @@ -72,7 +72,7 @@ impl Stream for Interval { return Poll::Pending; } let interval = self.interval; - let _ = std::mem::replace(&mut self.delay, Timer::after(interval)); + let _ = std::mem::replace(&mut self.delay, timer_after(interval)); Poll::Ready(Some(())) } } diff --git a/src/stream/stream/delay.rs b/src/stream/stream/delay.rs index 0ba42b052..9a7f947c6 100644 --- a/src/stream/stream/delay.rs +++ b/src/stream/stream/delay.rs @@ -6,7 +6,7 @@ use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; -use crate::utils::Timer; +use crate::utils::{timer_after, Timer}; pin_project! { #[doc(hidden)] @@ -24,7 +24,7 @@ impl Delay { pub(super) fn new(stream: S, dur: Duration) -> Self { Delay { stream, - delay: Timer::after(dur), + delay: timer_after(dur), delay_done: false, } } diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 2f9333a7a..d0e2cdd14 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -6,7 +6,7 @@ use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; -use crate::utils::Timer; +use crate::utils::{timer_after, Timer}; pin_project! { /// A stream that only yields one element once every `duration`. @@ -35,7 +35,7 @@ impl Throttle { stream, duration, blocked: false, - delay: Timer::after(Duration::default()), + delay: timer_after(Duration::default()), } } } @@ -59,7 +59,7 @@ impl Stream for Throttle { Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { *this.blocked = true; - let _ = std::mem::replace(&mut *this.delay, Timer::after(*this.duration)); + let _ = std::mem::replace(&mut *this.delay, timer_after(*this.duration)); Poll::Ready(Some(v)) } } diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 28e52aebd..0e0ee912c 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -8,7 +8,7 @@ use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; -use crate::utils::Timer; +use crate::utils::{timer_after, Timer}; pin_project! { /// A stream with timeout time set @@ -23,7 +23,7 @@ pin_project! { impl Timeout { pub(crate) fn new(stream: S, dur: Duration) -> Self { - let delay = Timer::after(dur); + let delay = timer_after(dur); Self { stream, delay } } diff --git a/src/utils.rs b/src/utils.rs index e064570ec..3ca9d15b6 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -64,6 +64,13 @@ mod timer { pub type Timer = smol::Timer; } +pub(crate) fn timer_after(dur: std::time::Duration) -> timer::Timer { + #[cfg(not(target_os = "unknown"))] + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + + Timer::after(dur) +} + #[cfg(any( all(target_arch = "wasm32", feature = "default"), all(feature = "unstable", not(feature = "default")) From 06a2fb8c4ff38ebcc31a1de9ab9c4b8b148b43a3 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 18 Jun 2020 13:10:37 +0200 Subject: [PATCH 3/3] fix export --- src/utils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/utils.rs b/src/utils.rs index 3ca9d15b6..31290e333 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -64,6 +64,7 @@ mod timer { pub type Timer = smol::Timer; } +#[cfg(any(feature = "unstable", feature = "default"))] pub(crate) fn timer_after(dur: std::time::Duration) -> timer::Timer { #[cfg(not(target_os = "unknown"))] once_cell::sync::Lazy::force(&crate::rt::RUNTIME);