From ece142e0fa0a7731c6087314996960a53f7e84a6 Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Thu, 11 Jul 2024 16:29:24 +0800 Subject: [PATCH] feat(io): add AsyncReadBufferPool and AsyncReadAtBufferPool --- compio-driver/src/fallback_buffer_pool.rs | 13 +++- compio-driver/src/fusion/buffer_pool.rs | 24 ++++++++ compio-driver/src/fusion/mod.rs | 7 ++- compio-driver/src/iocp/mod.rs | 17 +++++- compio-driver/src/iocp/op.rs | 39 +++++++++--- compio-driver/src/iour/buffer_pool.rs | 8 +++ compio-driver/src/lib.rs | 8 ++- compio-driver/src/poll/op.rs | 14 ++++- compio-fs/src/async_fd.rs | 46 +++++++++----- compio-fs/src/file.rs | 33 +++++----- compio-fs/src/named_pipe.rs | 74 +++++++++++++++++------ compio-fs/src/pipe.rs | 44 ++++++++++---- compio-fs/src/stdio/unix.rs | 37 +++++++++--- compio-fs/tests/buffer_pool.rs | 21 ++++--- compio-io/src/read/mod.rs | 47 +++++++++++++- compio-net/src/tcp.rs | 43 ++++++++----- compio-net/src/udp.rs | 10 ++- compio-net/src/unix.rs | 43 ++++++++----- compio-net/tests/buffer_pool.rs | 14 ++--- compio-process/src/unix.rs | 41 ++++++++++++- compio-process/src/windows.rs | 41 ++++++++++++- compio-runtime/src/buffer_pool.rs | 19 +++++- compio-runtime/src/runtime/mod.rs | 6 -- 23 files changed, 495 insertions(+), 154 deletions(-) diff --git a/compio-driver/src/fallback_buffer_pool.rs b/compio-driver/src/fallback_buffer_pool.rs index a926b98d..f274bddc 100644 --- a/compio-driver/src/fallback_buffer_pool.rs +++ b/compio-driver/src/fallback_buffer_pool.rs @@ -9,6 +9,10 @@ use std::{ use compio_buf::{IntoInner, Slice}; +/// Buffer pool +/// +/// A buffer pool to allow user no need to specify a specific buffer to do the +/// IO operation pub struct BufferPool { buffers: RefCell>>, } @@ -21,7 +25,7 @@ impl Debug for BufferPool { impl BufferPool { pub(crate) fn new(buffer_len: u16, buffer_size: usize) -> Self { - let buffers = (0..buffer_len) + let buffers = (0..buffer_len.next_power_of_two()) .map(|_| Vec::with_capacity(buffer_size)) .collect(); @@ -34,11 +38,16 @@ impl BufferPool { self.buffers.borrow_mut().pop_front() } - pub(crate) fn add_buffer(&self, buffer: Vec) { + pub(crate) fn add_buffer(&self, mut buffer: Vec) { + buffer.clear(); self.buffers.borrow_mut().push_back(buffer) } } +/// Buffer borrowed from buffer pool +/// +/// When IO operation finish, user will obtain a `BorrowedBuffer` to access the +/// filled data pub struct BorrowedBuffer<'a> { buffer: ManuallyDrop>>, pool: &'a BufferPool, diff --git a/compio-driver/src/fusion/buffer_pool.rs b/compio-driver/src/fusion/buffer_pool.rs index a04248ed..7d47e3b1 100644 --- a/compio-driver/src/fusion/buffer_pool.rs +++ b/compio-driver/src/fusion/buffer_pool.rs @@ -4,6 +4,10 @@ use std::{ ops::{Deref, DerefMut}, }; +/// Buffer pool +/// +/// A buffer pool to allow user no need to specify a specific buffer to do the +/// IO operation pub struct BufferPool { inner: BufferPollInner, } @@ -40,6 +44,22 @@ impl BufferPool { inner: BufferPollInner::Poll(buffer_pool), } } + + pub(crate) fn into_poll(self) -> crate::fallback_buffer_pool::BufferPool { + match self.inner { + BufferPollInner::IoUring(_) => { + panic!("BufferPool type is not io-uring type") + } + BufferPollInner::Poll(inner) => inner, + } + } + + pub(crate) fn into_io_uring(self) -> super::iour::buffer_pool::BufferPool { + match self.inner { + BufferPollInner::IoUring(inner) => inner, + BufferPollInner::Poll(_) => panic!("BufferPool type is not poll type"), + } + } } enum BufferPollInner { @@ -47,6 +67,10 @@ enum BufferPollInner { Poll(crate::fallback_buffer_pool::BufferPool), } +/// Buffer borrowed from buffer pool +/// +/// When IO operation finish, user will obtain a `BorrowedBuffer` to access the +/// filled data pub struct BorrowedBuffer<'a> { inner: BorrowedBufferInner<'a>, } diff --git a/compio-driver/src/fusion/mod.rs b/compio-driver/src/fusion/mod.rs index 64b71b16..d633cd87 100644 --- a/compio-driver/src/fusion/mod.rs +++ b/compio-driver/src/fusion/mod.rs @@ -200,8 +200,11 @@ impl Driver { /// # Safety /// /// caller must make sure release the buffer pool with correct driver - pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> { - todo!() + pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> { + match &mut self.fuse { + FuseDriver::Poll(driver) => driver.release_buffer_pool(buffer_pool.into_poll()), + FuseDriver::IoUring(driver) => driver.release_buffer_pool(buffer_pool.into_io_uring()), + } } } diff --git a/compio-driver/src/iocp/mod.rs b/compio-driver/src/iocp/mod.rs index 3b9f0fa8..37528d59 100644 --- a/compio-driver/src/iocp/mod.rs +++ b/compio-driver/src/iocp/mod.rs @@ -28,7 +28,7 @@ use windows_sys::Win32::{ }, }; -use crate::{syscall, AsyncifyPool, Entry, Key, OutEntries, ProactorBuilder}; +use crate::{syscall, AsyncifyPool, BufferPool, Entry, Key, OutEntries, ProactorBuilder}; pub(crate) mod op; @@ -320,6 +320,21 @@ impl Driver { self.notify_overlapped.clone(), )) } + + pub fn create_buffer_pool( + &mut self, + buffer_len: u16, + buffer_size: usize, + ) -> io::Result { + Ok(BufferPool::new(buffer_len, buffer_size)) + } + + /// # Safety + /// + /// caller must make sure release the buffer pool with correct driver + pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> { + Ok(()) + } } impl AsRawFd for Driver { diff --git a/compio-driver/src/iocp/op.rs b/compio-driver/src/iocp/op.rs index edb0e63d..b1890ab9 100644 --- a/compio-driver/src/iocp/op.rs +++ b/compio-driver/src/iocp/op.rs @@ -5,14 +5,16 @@ use std::{ io::ErrorKind, marker::PhantomPinned, net::Shutdown, - os::{fd::AsRawFd, windows::io::AsRawSocket}, + os::windows::io::AsRawSocket, pin::Pin, ptr::{null, null_mut}, task::Poll, }; use aligned_array::{Aligned, A8}; -use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; +use compio_buf::{ + BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, SetBufInit, Slice, +}; #[cfg(not(feature = "once_cell_try"))] use once_cell::sync::OnceCell as OnceLock; use pin_project_lite::pin_project; @@ -40,7 +42,10 @@ use windows_sys::{ }, }; -use crate::{op::*, syscall, AsRawFd, OpCode, OpType, RawFd, SharedFd}; +use crate::{ + op::*, syscall, AsRawFd, BorrowedBuffer, BufferPool, OpCode, OpType, RawFd, SharedFd, + TakeBuffer, +}; #[inline] fn winapi_result(transferred: u32) -> Poll> { @@ -185,24 +190,32 @@ impl ReadAtBufferPool { let buffer = buffer_pool.get_buffer().ok_or_else(|| { io::Error::new(ErrorKind::Other, "buffer ring has no available buffer") })?; + let len = if len == 0 { + buffer.capacity() + } else { + buffer.capacity().min(len as _) + }; Ok(Self { - read_at: ReadAt::new(fd, offset, buffer.slice(..len as usize)), + read_at: ReadAt::new(fd, offset, buffer.slice(..len)), }) } } impl OpCode for ReadAtBufferPool { unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll> { - self.project().recv.operate(self, optr) + self.project().read_at.operate(optr) } unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> { - self.project().recv.cancel(self, optr) + self.project().read_at.cancel(optr) } } impl TakeBuffer for ReadAtBufferPool { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + fn take_buffer( self, buffer_pool: &BufferPool, @@ -490,24 +503,32 @@ impl RecvBufferPool { let buffer = buffer_pool.get_buffer().ok_or_else(|| { io::Error::new(ErrorKind::Other, "buffer ring has no available buffer") })?; + let len = if len == 0 { + buffer.capacity() + } else { + buffer.capacity().min(len as _) + }; Ok(Self { - recv: Recv::new(fd, buffer.slice(..len as usize)), + recv: Recv::new(fd, buffer.slice(..len)), }) } } impl OpCode for RecvBufferPool { unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll> { - self.project().recv.operate(self, optr) + self.project().recv.operate(optr) } unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> { - self.project().recv.cancel(self, optr) + self.project().recv.cancel(optr) } } impl TakeBuffer for RecvBufferPool { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + fn take_buffer( self, buffer_pool: &BufferPool, diff --git a/compio-driver/src/iour/buffer_pool.rs b/compio-driver/src/iour/buffer_pool.rs index 8bde3b35..6cb61789 100644 --- a/compio-driver/src/iour/buffer_pool.rs +++ b/compio-driver/src/iour/buffer_pool.rs @@ -7,6 +7,10 @@ use std::{ use io_uring::cqueue::buffer_select; use io_uring_buf_ring::IoUringBufRing; +/// Buffer pool +/// +/// A buffer pool to allow user no need to specify a specific buffer to do the +/// IO operation pub struct BufferPool { buf_ring: IoUringBufRing>, } @@ -49,6 +53,10 @@ impl BufferPool { } } +/// Buffer borrowed from buffer pool +/// +/// When IO operation finish, user will obtain a `BorrowedBuffer` to access the +/// filled data pub struct BorrowedBuffer<'a>(io_uring_buf_ring::BorrowedBuffer<'a, Vec>); impl Debug for BorrowedBuffer<'_> { diff --git a/compio-driver/src/lib.rs b/compio-driver/src/lib.rs index 8b7e7451..e474eb77 100644 --- a/compio-driver/src/lib.rs +++ b/compio-driver/src/lib.rs @@ -535,14 +535,20 @@ impl ProactorBuilder { } } +/// Trait to get the selected buffer of an io operation. pub trait TakeBuffer { + /// Buffer pool type type BufferPool; + + /// Selected buffer type type Buffer<'a>; + /// Take the selected buffer with `buffer_pool`, io `result` and `flags`, if + /// io operation is success fn take_buffer( self, buffer_pool: &Self::BufferPool, result: io::Result, - flag: u32, + flags: u32, ) -> io::Result>; } diff --git a/compio-driver/src/poll/op.rs b/compio-driver/src/poll/op.rs index 74ca24d9..74b66d8c 100644 --- a/compio-driver/src/poll/op.rs +++ b/compio-driver/src/poll/op.rs @@ -218,9 +218,14 @@ impl ReadAtBufferPool { let buffer = buffer_pool.get_buffer().ok_or_else(|| { io::Error::new(ErrorKind::Other, "buffer ring has no available buffer") })?; + let len = if len == 0 { + buffer.capacity() + } else { + buffer.capacity().min(len as _) + }; Ok(Self { - read_at: ReadAt::new(fd, offset, buffer.slice(..len as usize)), + read_at: ReadAt::new(fd, offset, buffer.slice(..len)), }) } } @@ -530,9 +535,14 @@ impl RecvBufferPool { let buffer = buffer_pool.get_buffer().ok_or_else(|| { io::Error::new(ErrorKind::Other, "buffer ring has no available buffer") })?; + let len = if len == 0 { + buffer.capacity() + } else { + buffer.capacity().min(len as _) + }; Ok(Self { - recv: Recv::new(fd, buffer.slice(..len as usize)), + recv: Recv::new(fd, buffer.slice(..len)), }) } } diff --git a/compio-fs/src/async_fd.rs b/compio-fs/src/async_fd.rs index e0b0b3f9..b2928d79 100644 --- a/compio-fs/src/async_fd.rs +++ b/compio-fs/src/async_fd.rs @@ -11,7 +11,7 @@ use compio_driver::{ op::{BufResultExt, Recv, RecvBufferPool, Send}, AsRawFd, SharedFd, TakeBuffer, ToSharedFd, }; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; use compio_runtime::{ buffer_pool::{BorrowedBuffer, BufferPool}, Attacher, @@ -49,20 +49,6 @@ impl AsyncFd { } } -impl AsyncFd { - pub async fn read_buffer_pool<'a>( - &self, - buffer_pool: &'a BufferPool, - len: u32, - ) -> io::Result> { - let fd = self.to_shared_fd(); - let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len)?; - let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; - - op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags) - } -} - impl AsyncRead for AsyncFd { #[inline] async fn read(&mut self, buf: B) -> BufResult { @@ -91,6 +77,36 @@ impl AsyncRead for &AsyncFd { } } +impl AsyncReadBufferPool for AsyncFd { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_buffer_pool(buffer_pool, len).await + } +} + +impl AsyncReadBufferPool for &AsyncFd { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; + let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; + + op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags) + } +} + impl AsyncWrite for AsyncFd { #[inline] async fn write(&mut self, buf: B) -> BufResult { diff --git a/compio-fs/src/file.rs b/compio-fs/src/file.rs index 50cce5e1..7096c602 100644 --- a/compio-fs/src/file.rs +++ b/compio-fs/src/file.rs @@ -6,7 +6,7 @@ use compio_driver::{ op::{BufResultExt, CloseFile, ReadAt, ReadAtBufferPool, Sync, WriteAt}, TakeBuffer, ToSharedFd, }; -use compio_io::{AsyncReadAt, AsyncWriteAt}; +use compio_io::{AsyncReadAt, AsyncReadAtBufferPool, AsyncWriteAt}; use compio_runtime::{ buffer_pool::{BorrowedBuffer, BufferPool}, Attacher, @@ -150,19 +150,6 @@ impl File { pub async fn sync_data(&self) -> io::Result<()> { self.sync_impl(true).await } - - pub async fn read_at_buffer_pool<'a>( - &self, - buffer_pool: &'a BufferPool, - pos: u64, - len: u32, - ) -> io::Result> { - let fd = self.to_shared_fd(); - let op = ReadAtBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, pos, len)?; - let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; - - op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags) - } } impl AsyncReadAt for File { @@ -184,6 +171,24 @@ impl AsyncReadAt for File { } } +impl AsyncReadAtBufferPool for File { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_at_buffer_pool<'a>( + &self, + buffer_pool: &'a Self::BufferPool, + pos: u64, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let op = ReadAtBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, pos, len as _)?; + let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; + + op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags) + } +} + impl AsyncWriteAt for File { #[inline] async fn write_at(&mut self, buf: T, pos: u64) -> BufResult { diff --git a/compio-fs/src/named_pipe.rs b/compio-fs/src/named_pipe.rs index 1db2d7c0..9a58d257 100644 --- a/compio-fs/src/named_pipe.rs +++ b/compio-fs/src/named_pipe.rs @@ -8,7 +8,9 @@ use std::{ffi::OsStr, io, os::windows::io::FromRawHandle, ptr::null}; use compio_buf::{BufResult, IoBuf, IoBufMut}; use compio_driver::{impl_raw_fd, op::ConnectNamedPipe, syscall, AsRawFd, RawFd, ToSharedFd}; -use compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt}; +use compio_io::{ + AsyncRead, AsyncReadAt, AsyncReadAtBufferPool, AsyncReadBufferPool, AsyncWrite, AsyncWriteAt, +}; use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use widestring::U16CString; use windows_sys::Win32::{ @@ -177,15 +179,6 @@ impl NamedPipeServer { syscall!(BOOL, DisconnectNamedPipe(self.as_raw_fd() as _))?; Ok(()) } - - #[inline] - pub async fn read_buffer_pool<'a>( - &self, - buffer_pool: &'a BufferPool, - len: u32, - ) -> io::Result> { - self.handle.read_buffer_pool(buffer_pool, len).await - } } impl AsyncRead for NamedPipeServer { @@ -202,6 +195,32 @@ impl AsyncRead for &NamedPipeServer { } } +impl AsyncReadBufferPool for NamedPipeServer { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_buffer_pool(buffer_pool, len).await + } +} + +impl AsyncReadBufferPool for &NamedPipeServer { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&self.handle).read_buffer_pool(buffer_pool, len).await + } +} + impl AsyncWrite for NamedPipeServer { #[inline] async fn write(&mut self, buf: T) -> BufResult { @@ -305,15 +324,6 @@ impl NamedPipeClient { // Safety: we're ensuring the lifetime of the named pipe. unsafe { named_pipe_info(self.as_raw_fd()) } } - - #[inline] - pub async fn read_buffer_pool<'a>( - &self, - buffer_pool: &'a BufferPool, - len: u32, - ) -> io::Result> { - self.handle.read_buffer_pool(buffer_pool, len).await - } } impl AsyncRead for NamedPipeClient { @@ -331,6 +341,32 @@ impl AsyncRead for &NamedPipeClient { } } +impl AsyncReadBufferPool for NamedPipeClient { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_buffer_pool(buffer_pool, len).await + } +} + +impl AsyncReadBufferPool for &NamedPipeClient { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + self.handle.read_at_buffer_pool(buffer_pool, 0, len).await + } +} + impl AsyncWrite for NamedPipeClient { #[inline] async fn write(&mut self, buf: T) -> BufResult { diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs index 11000a92..81ce56f8 100644 --- a/compio-fs/src/pipe.rs +++ b/compio-fs/src/pipe.rs @@ -13,7 +13,7 @@ use compio_driver::{ op::{BufResultExt, Recv, RecvBufferPool, RecvVectored, Send, SendVectored}, syscall, AsRawFd, TakeBuffer, ToSharedFd, }; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use crate::File; @@ -477,18 +477,6 @@ impl Receiver { pub fn close(self) -> impl Future> { self.file.close() } - - pub async fn read_buffer_pool<'a>( - &self, - buffer_pool: &'a BufferPool, - len: u32, - ) -> io::Result> { - let fd = self.to_shared_fd(); - let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len)?; - let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; - - op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags) - } } impl AsyncRead for Receiver { @@ -515,6 +503,36 @@ impl AsyncRead for &Receiver { } } +impl AsyncReadBufferPool for Receiver { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_buffer_pool(buffer_pool, len).await + } +} + +impl AsyncReadBufferPool for &Receiver { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; + let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; + + op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags) + } +} + impl_raw_fd!(Receiver, std::fs::File, file, file); /// Checks if file is a FIFO diff --git a/compio-fs/src/stdio/unix.rs b/compio-fs/src/stdio/unix.rs index 7b03b3f8..444b581f 100644 --- a/compio-fs/src/stdio/unix.rs +++ b/compio-fs/src/stdio/unix.rs @@ -2,7 +2,7 @@ use std::io; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::{AsRawFd, RawFd}; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; #[cfg(doc)] @@ -20,15 +20,6 @@ impl Stdin { // SAFETY: no need to attach on unix Self(unsafe { AsyncFd::new_unchecked(libc::STDIN_FILENO) }) } - - #[inline] - pub async fn read_buffer_pool<'a>( - &self, - buffer_pool: &'a BufferPool, - len: u32, - ) -> io::Result> { - self.0.read_buffer_pool(buffer_pool, len).await - } } impl AsyncRead for Stdin { @@ -41,6 +32,32 @@ impl AsyncRead for Stdin { } } +impl AsyncReadBufferPool for Stdin { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_buffer_pool(buffer_pool, len).await + } +} + +impl AsyncReadBufferPool for &Stdin { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&self.0).read_buffer_pool(buffer_pool, len).await + } +} + impl AsyncRead for &Stdin { async fn read(&mut self, buf: B) -> BufResult { (&self.0).read(buf).await diff --git a/compio-fs/tests/buffer_pool.rs b/compio-fs/tests/buffer_pool.rs index 84b6c697..fbacbf36 100644 --- a/compio-fs/tests/buffer_pool.rs +++ b/compio-fs/tests/buffer_pool.rs @@ -1,7 +1,13 @@ -use std::io::{Seek, SeekFrom, Write}; - -use compio_fs::{pipe, AsyncFd, File}; -use compio_io::AsyncWriteExt; +use std::io::Write; +#[cfg(unix)] +use std::io::{Seek, SeekFrom}; + +use compio_fs::File; +#[cfg(unix)] +use compio_fs::{pipe, AsyncFd}; +use compio_io::AsyncReadAtBufferPool; +#[cfg(unix)] +use compio_io::{AsyncReadBufferPool, AsyncWriteExt}; use compio_runtime::buffer_pool::BufferPool; use tempfile::NamedTempFile; @@ -24,13 +30,14 @@ async fn test_read_file() { assert_eq!(buf.as_ref(), HELLO); } +#[cfg(unix)] #[compio_macros::test] async fn test_read_async_fd() { let mut tempfile = tempfile(); tempfile.write_all(HELLO).unwrap(); tempfile.seek(SeekFrom::Start(0)).unwrap(); - let file = AsyncFd::new(tempfile).unwrap(); + let mut file = AsyncFd::new(tempfile).unwrap(); let buffer_pool = BufferPool::new(1, 15).unwrap(); let buf = file.read_buffer_pool(&buffer_pool, 0).await.unwrap(); @@ -38,10 +45,10 @@ async fn test_read_async_fd() { assert_eq!(buf.as_ref(), HELLO); } -#[cfg(target_family = "unix")] +#[cfg(unix)] #[compio_macros::test] async fn test_read_pipe() { - let (rx, mut tx) = pipe::anonymous().unwrap(); + let (mut rx, mut tx) = pipe::anonymous().unwrap(); tx.write_all(HELLO).await.unwrap(); let buffer_pool = BufferPool::new(1, 15).unwrap(); diff --git a/compio-io/src/read/mod.rs b/compio-io/src/read/mod.rs index 69032b96..7c88566f 100644 --- a/compio-io/src/read/mod.rs +++ b/compio-io/src/read/mod.rs @@ -1,6 +1,6 @@ #[cfg(feature = "allocator_api")] use std::alloc::Allocator; -use std::{io::Cursor, rc::Rc, sync::Arc}; +use std::{io, io::Cursor, ops::DerefMut, rc::Rc, sync::Arc}; use compio_buf::{buf_try, t_alloc, BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBufMut}; @@ -130,6 +130,51 @@ pub trait AsyncReadAt { } } +/// # AsyncReadBufferPool +/// +/// Async read with buffer pool +pub trait AsyncReadBufferPool { + /// Filled buffer type + type Buffer<'a>: DerefMut; + + /// Buffer pool type + type BufferPool; + + /// Read some bytes from this source with [`BufferPool`] and return + /// a [`BorrowedBuffer`]. + /// + /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len, + /// if `len` > 0, `min(len, inner buffer size)` will be the read max len + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result>; +} + +/// # AsyncReadAtBufferPool +/// +/// Async read with buffer pool and position +pub trait AsyncReadAtBufferPool { + /// Buffer pool type + type BufferPool; + + /// Filled buffer type + type Buffer<'a>: DerefMut; + + /// Read some bytes from this source at position with [`BufferPool`] and + /// return a [`BorrowedBuffer`]. + /// + /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len, + /// if `len` > 0, `min(len, inner buffer size)` will be the read max len + async fn read_at_buffer_pool<'a>( + &self, + buffer_pool: &'a Self::BufferPool, + pos: u64, + len: usize, + ) -> io::Result>; +} + macro_rules! impl_read_at { (@ptr $($ty:ty),*) => { $( diff --git a/compio-net/src/tcp.rs b/compio-net/src/tcp.rs index 09599614..592cd6cb 100644 --- a/compio-net/src/tcp.rs +++ b/compio-net/src/tcp.rs @@ -2,7 +2,7 @@ use std::{future::Future, io, net::SocketAddr}; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::impl_raw_fd; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use socket2::{Protocol, SockAddr, Socket as Socket2, Type}; @@ -216,21 +216,6 @@ impl TcpStream { pub fn into_poll_fd(self) -> io::Result> { self.inner.into_poll_fd() } - - /// Read some bytes from this source with buffer pool - /// - /// # Note - /// - /// - If len > 0, will read `len` data at most - /// - If len == 0, will let kernel and `buffer_pool` decide how much data to - /// read - pub async fn recv_buffer_pool<'a>( - &self, - buffer_pool: &'a BufferPool, - len: u32, - ) -> io::Result> { - self.inner.recv_buffer_pool(buffer_pool, len).await - } } impl AsyncRead for TcpStream { @@ -245,6 +230,32 @@ impl AsyncRead for TcpStream { } } +impl AsyncReadBufferPool for TcpStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_buffer_pool(buffer_pool, len).await + } +} + +impl AsyncReadBufferPool for &TcpStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + self.inner.recv_buffer_pool(buffer_pool, len as _).await + } +} + impl AsyncRead for &TcpStream { #[inline] async fn read(&mut self, buf: B) -> BufResult { diff --git a/compio-net/src/udp.rs b/compio-net/src/udp.rs index ae2b2768..93e602c6 100644 --- a/compio-net/src/udp.rs +++ b/compio-net/src/udp.rs @@ -251,13 +251,11 @@ impl UdpSocket { .await } - /// Read some bytes from this source with buffer pool + /// Read some bytes from this source with [`BufferPool`] and return + /// a [`BorrowedBuffer`]. /// - /// # Note - /// - /// - If len > 0, will read `len` data at most - /// - If len == 0, will let kernel and `buffer_pool` decide how much data to - /// read + /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len, + /// if `len` > 0, `min(len, inner buffer size)` will be the read max len pub async fn recv_buffer_pool<'a>( &self, buffer_pool: &'a BufferPool, diff --git a/compio-net/src/unix.rs b/compio-net/src/unix.rs index 35cf2368..0b08d434 100644 --- a/compio-net/src/unix.rs +++ b/compio-net/src/unix.rs @@ -2,7 +2,7 @@ use std::{future::Future, io, path::Path}; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::impl_raw_fd; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use socket2::{SockAddr, Socket as Socket2, Type}; @@ -198,21 +198,6 @@ impl UnixStream { pub fn into_poll_fd(self) -> io::Result> { self.inner.into_poll_fd() } - - /// Read some bytes from this source with buffer pool - /// - /// # Note - /// - /// - If len > 0, will read `len` data at most - /// - If len == 0, will let kernel and `buffer_pool` decide how much data to - /// read - pub async fn recv_buffer_pool<'a>( - &self, - buffer_pool: &'a BufferPool, - len: u32, - ) -> io::Result> { - self.inner.recv_buffer_pool(buffer_pool, len).await - } } impl AsyncRead for UnixStream { @@ -239,6 +224,32 @@ impl AsyncRead for &UnixStream { } } +impl AsyncReadBufferPool for UnixStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_buffer_pool(buffer_pool, len).await + } +} + +impl AsyncReadBufferPool for &UnixStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + self.inner.recv_buffer_pool(buffer_pool, len as _).await + } +} + impl AsyncWrite for UnixStream { #[inline] async fn write(&mut self, buf: T) -> BufResult { diff --git a/compio-net/tests/buffer_pool.rs b/compio-net/tests/buffer_pool.rs index 59b1dc45..13501a3f 100644 --- a/compio-net/tests/buffer_pool.rs +++ b/compio-net/tests/buffer_pool.rs @@ -1,6 +1,6 @@ use std::net::Ipv6Addr; -use compio_io::AsyncWriteExt; +use compio_io::{AsyncReadBufferPool, AsyncWriteExt}; use compio_net::{TcpListener, TcpStream, UdpSocket, UnixListener, UnixStream}; use compio_runtime::buffer_pool::BufferPool; @@ -16,11 +16,11 @@ async fn test_tcp_read_buffer_pool() { .detach(); let buffer_pool = BufferPool::new(1, 4).unwrap(); - let stream = TcpStream::connect(addr).await.unwrap(); + let mut stream = TcpStream::connect(addr).await.unwrap(); assert_eq!( stream - .recv_buffer_pool(&buffer_pool, 0) + .read_buffer_pool(&buffer_pool, 0) .await .unwrap() .as_ref(), @@ -29,7 +29,7 @@ async fn test_tcp_read_buffer_pool() { assert!( stream - .recv_buffer_pool(&buffer_pool, 0) + .read_buffer_pool(&buffer_pool, 0) .await .unwrap() .is_empty() @@ -71,7 +71,7 @@ async fn test_uds_recv_buffer_pool() { let listener = UnixListener::bind(&sock_path).await.unwrap(); - let (mut client, (server, _)) = + let (mut client, (mut server, _)) = futures_util::try_join!(UnixStream::connect(&sock_path), listener.accept()).unwrap(); client.write_all("test").await.unwrap(); @@ -81,7 +81,7 @@ async fn test_uds_recv_buffer_pool() { assert_eq!( server - .recv_buffer_pool(&buffer_pool, 0) + .read_buffer_pool(&buffer_pool, 0) .await .unwrap() .as_ref(), @@ -90,7 +90,7 @@ async fn test_uds_recv_buffer_pool() { assert!( server - .recv_buffer_pool(&buffer_pool, 0) + .read_buffer_pool(&buffer_pool, 0) .await .unwrap() .is_empty() diff --git a/compio-process/src/unix.rs b/compio-process/src/unix.rs index a48abfe8..e9329930 100644 --- a/compio-process/src/unix.rs +++ b/compio-process/src/unix.rs @@ -2,10 +2,11 @@ use std::{io, panic::resume_unwind, process}; use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut}; use compio_driver::{ - op::{BufResultExt, Recv, Send}, - AsRawFd, RawFd, SharedFd, ToSharedFd, + op::{BufResultExt, Recv, RecvBufferPool, Send}, + AsRawFd, RawFd, SharedFd, TakeBuffer, ToSharedFd, }; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; +use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use crate::{ChildStderr, ChildStdin, ChildStdout}; @@ -35,6 +36,23 @@ impl AsyncRead for ChildStdout { } } +impl AsyncReadBufferPool for ChildStdout { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; + let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; + + op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags) + } +} + impl AsRawFd for ChildStderr { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() @@ -55,6 +73,23 @@ impl AsyncRead for ChildStderr { } } +impl AsyncReadBufferPool for ChildStderr { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; + let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; + + op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags) + } +} + impl AsRawFd for ChildStdin { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() diff --git a/compio-process/src/windows.rs b/compio-process/src/windows.rs index 7722a28c..5dafc333 100644 --- a/compio-process/src/windows.rs +++ b/compio-process/src/windows.rs @@ -8,10 +8,11 @@ use std::{ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut}; use compio_driver::{ - op::{BufResultExt, Recv, Send}, - syscall, AsRawFd, OpCode, OpType, RawFd, SharedFd, ToSharedFd, + op::{BufResultExt, Recv, RecvBufferPool, Send}, + syscall, AsRawFd, OpCode, OpType, RawFd, SharedFd, TakeBuffer, ToSharedFd, }; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; +use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use windows_sys::Win32::System::{Threading::GetExitCodeProcess, IO::OVERLAPPED}; use crate::{ChildStderr, ChildStdin, ChildStdout}; @@ -67,6 +68,23 @@ impl AsyncRead for ChildStdout { } } +impl AsyncReadBufferPool for ChildStdout { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; + let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; + + op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags) + } +} + impl AsRawFd for ChildStderr { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() @@ -87,6 +105,23 @@ impl AsyncRead for ChildStderr { } } +impl AsyncReadBufferPool for ChildStderr { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; + let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; + + op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags) + } +} + impl AsRawFd for ChildStdin { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() diff --git a/compio-runtime/src/buffer_pool.rs b/compio-runtime/src/buffer_pool.rs index 3c049440..196f9eb7 100644 --- a/compio-runtime/src/buffer_pool.rs +++ b/compio-runtime/src/buffer_pool.rs @@ -1,9 +1,21 @@ -pub type BorrowedBuffer<'a> = compio_driver::BorrowedBuffer<'a>; +//! Buffer pool use std::{io, marker::PhantomData, mem::ManuallyDrop}; use crate::Runtime; +/// Buffer borrowed from buffer pool +/// +/// When IO operation finish, user will obtain a `BorrowedBuffer` to access the +/// filled data +pub type BorrowedBuffer<'a> = compio_driver::BorrowedBuffer<'a>; + +/// Buffer pool +/// +/// A buffer pool to allow user no need to specify a specific buffer to do the +/// IO operation +/// +/// Drop the `BufferPool` will release the buffer pool automatically #[derive(Debug)] pub struct BufferPool { inner: ManuallyDrop, @@ -33,6 +45,11 @@ impl BufferPool { } } + /// Get the inner driver buffer pool reference + /// + /// # Notes + /// + /// You should not use this method unless you are writing your own IO opcode pub fn as_driver_buffer_pool(&self) -> &compio_driver::BufferPool { &self.inner } diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index e984433a..8208c0fb 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -349,12 +349,6 @@ impl Runtime { self.timer_runtime.borrow_mut().wake(); } - /// Create buffer pool with given `buffer_size` and `buffer_len` - /// - /// # Notes - /// - /// If `buffer_len` is not power of 2, it will be upward with - /// [`u16::next_power_of_two`] pub(crate) fn create_buffer_pool( &self, buffer_len: u16,