From 15aaf7f7ed460d05a32f1cfe41731b2d23ad6d2b Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Mon, 28 Jun 2021 16:34:45 +0200 Subject: [PATCH] Add read/try_read etc methods to NamedPipeServer --- examples/named-pipe-ready.rs | 72 +++- tokio/src/net/windows/named_pipe.rs | 490 ++++++++++++++++++++++++++++ tokio/tests/named_pipe.rs | 64 +++- 3 files changed, 611 insertions(+), 15 deletions(-) diff --git a/examples/named-pipe-ready.rs b/examples/named-pipe-ready.rs index 5037f2a8744..fce17466c4a 100644 --- a/examples/named-pipe-ready.rs +++ b/examples/named-pipe-ready.rs @@ -2,7 +2,7 @@ use std::io; #[cfg(windows)] async fn windows_main() -> io::Result<()> { - use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Interest}; + use tokio::io::Interest; use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions}; const PIPE_NAME: &str = r"\\.\pipe\named-pipe-single-client"; @@ -13,11 +13,62 @@ async fn windows_main() -> io::Result<()> { // Note: we wait for a client to connect. server.connect().await?; - let mut server = BufReader::new(server); + let buf = { + let mut read_buf = [0u8; 5]; + let mut read_buf_cursor = 0; + + loop { + server.readable().await?; + + let buf = &mut read_buf[read_buf_cursor..]; + + match server.try_read(buf) { + Ok(n) => { + read_buf_cursor += n; + + if read_buf_cursor == read_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + + read_buf + }; + + { + let write_buf = b"pong\n"; + let mut write_buf_cursor = 0; + + loop { + let buf = &write_buf[write_buf_cursor..]; + + if buf.is_empty() { + break; + } + + server.writable().await?; + + match server.try_write(buf) { + Ok(n) => { + write_buf_cursor += n; + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } - let mut buf = String::new(); - server.read_line(&mut buf).await?; - server.write_all(b"pong\n").await?; Ok::<_, io::Error>(buf) }); @@ -33,9 +84,12 @@ async fn windows_main() -> io::Result<()> { let mut write_buf_cursor = 0; loop { - let ready = client - .ready(Interest::READABLE | Interest::WRITABLE) - .await?; + let mut interest = Interest::READABLE; + if write_buf_cursor < write_buf.len() { + interest |= Interest::WRITABLE; + } + + let ready = client.ready(interest).await?; if ready.is_readable() { let buf = &mut read_buf[read_buf_cursor..]; @@ -85,7 +139,7 @@ async fn windows_main() -> io::Result<()> { let (server, client) = tokio::try_join!(server, client)?; - assert_eq!(server?, "ping\n"); + assert_eq!(server?, *b"ping\n"); assert_eq!(client?, "pong\n"); Ok(()) diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 396556ce4f6..3434e5ad102 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -225,6 +225,496 @@ impl NamedPipeServer { pub fn disconnect(&self) -> io::Result<()> { self.io.disconnect() } + + /// Wait for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same pipe on a single + /// task without splitting the pipe. + /// + /// # Examples + /// + /// Concurrently read and write to the pipe on the same task without + /// splitting. + /// + /// ```no_run + /// use tokio::io::Interest; + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let server = named_pipe::ServerOptions::new() + /// .create(PIPE_NAME)?; + /// + /// loop { + /// let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?; + /// + /// if ready.is_readable() { + /// let mut data = vec![0; 1024]; + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match server.try_read(&mut data) { + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// if ready.is_writable() { + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match server.try_write(b"hello world") { + /// Ok(n) => { + /// println!("write {} bytes", n); + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// } + /// } + /// ``` + pub async fn ready(&self, interest: Interest) -> io::Result { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Wait for the pipe to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_read()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let server = named_pipe::ServerOptions::new() + /// .create(PIPE_NAME)?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the pipe to be readable + /// server.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match server.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + + /// Polls for read readiness. + /// + /// If the pipe is not currently ready for reading, this method will + /// store a clone of the `Waker` from the provided `Context`. When the pipe + /// becomes ready for reading, `Waker::wake` will be called on the waker. + /// + /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a + /// second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`readable`] is not feasible. Where possible, using [`readable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the pipe is not ready for reading. + /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`readable`]: method@Self::readable + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.registration().poll_read_ready(cx).map_ok(|_| ()) + } + + /// Try to read data from the pipe into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: NamedPipeServer::readable() + /// [`ready()`]: NamedPipeServer::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed + /// and will no longer yield data. If the pipe is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let server = named_pipe::ServerOptions::new() + /// .create(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the pipe to be readable + /// server.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf = [0; 4096]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match server.try_read(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read(&self, buf: &mut [u8]) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read(buf)) + } + + /// Try to read data from the pipe into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: NamedPipeServer::try_read() + /// [`readable()`]: NamedPipeServer::readable() + /// [`ready()`]: NamedPipeServer::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed + /// and will no longer yield data. If the pipe is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io::{self, IoSliceMut}; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let server = named_pipe::ServerOptions::new() + /// .create(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the pipe to be readable + /// server.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf_a = [0; 512]; + /// let mut buf_b = [0; 1024]; + /// let mut bufs = [ + /// IoSliceMut::new(&mut buf_a), + /// IoSliceMut::new(&mut buf_b), + /// ]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match server.try_read_vectored(&mut bufs) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) + } + + /// Wait for the pipe to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let server = named_pipe::ServerOptions::new() + /// .create(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the pipe to be writable + /// server.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match server.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + + /// Polls for write readiness. + /// + /// If the pipe is not currently ready for writing, this method will + /// store a clone of the `Waker` from the provided `Context`. When the pipe + /// becomes ready for writing, `Waker::wake` will be called on the waker. + /// + /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a + /// second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`writable`] is not feasible. Where possible, using [`writable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the pipe is not ready for writing. + /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`writable`]: method@Self::writable + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.registration().poll_write_ready(cx).map_ok(|_| ()) + } + + /// Try to write a buffer to the pipe, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is usually paired with `writable()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the pipe is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let server = named_pipe::ServerOptions::new() + /// .create(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the pipe to be writable + /// server.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match server.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write(&self, buf: &[u8]) -> io::Result { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) + } + + /// Try to write several buffers to the pipe, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: NamedPipeServer::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the pipe is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let server = named_pipe::ServerOptions::new() + /// .create(PIPE_NAME)?; + /// + /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; + /// + /// loop { + /// // Wait for the pipe to be writable + /// server.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match server.try_write_vectored(&bufs) { + /// Ok(n) => { + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) + } } impl AsyncRead for NamedPipeServer { diff --git a/tokio/tests/named_pipe.rs b/tokio/tests/named_pipe.rs index a16a324d254..fd33203f835 100644 --- a/tokio/tests/named_pipe.rs +++ b/tokio/tests/named_pipe.rs @@ -150,7 +150,7 @@ async fn test_named_pipe_multi_client() -> io::Result<()> { #[tokio::test] async fn test_named_pipe_multi_client_ready() -> io::Result<()> { - use tokio::io::{AsyncBufReadExt as _, BufReader, Interest}; + use tokio::io::Interest; const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready"; const N: usize = 10; @@ -164,7 +164,8 @@ async fn test_named_pipe_multi_client_ready() -> io::Result<()> { for _ in 0..N { // Wait for client to connect. server.connect().await?; - let mut inner = BufReader::new(server); + + let inner_server = server; // Construct the next server to be connected before sending the one // we already have of onto a task. This ensures that the server @@ -174,10 +175,61 @@ async fn test_named_pipe_multi_client_ready() -> io::Result<()> { server = ServerOptions::new().create(PIPE_NAME)?; let _ = tokio::spawn(async move { - let mut buf = String::new(); - inner.read_line(&mut buf).await?; - inner.write_all(b"pong\n").await?; - inner.flush().await?; + let server = inner_server; + + { + let mut read_buf = [0u8; 5]; + let mut read_buf_cursor = 0; + + loop { + server.readable().await?; + + let buf = &mut read_buf[read_buf_cursor..]; + + match server.try_read(buf) { + Ok(n) => { + read_buf_cursor += n; + + if read_buf_cursor == read_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + }; + + { + let write_buf = b"pong\n"; + let mut write_buf_cursor = 0; + + loop { + server.writable().await?; + let buf = &write_buf[write_buf_cursor..]; + + match server.try_write(buf) { + Ok(n) => { + write_buf_cursor += n; + + if write_buf_cursor == write_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + Ok::<_, io::Error>(()) }); }