Skip to content

Commit

Permalink
feat(io): add AsyncReadBufferPool and AsyncReadAtBufferPool
Browse files Browse the repository at this point in the history
  • Loading branch information
Sherlock-Holo committed Jul 12, 2024
1 parent db360e2 commit ece142e
Show file tree
Hide file tree
Showing 23 changed files with 495 additions and 154 deletions.
13 changes: 11 additions & 2 deletions compio-driver/src/fallback_buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use std::{

use compio_buf::{IntoInner, Slice};

/// Buffer pool
///
/// A buffer pool to allow user no need to specify a specific buffer to do the
/// IO operation
pub struct BufferPool {
buffers: RefCell<VecDeque<Vec<u8>>>,
}
Expand All @@ -21,7 +25,7 @@ impl Debug for BufferPool {

impl BufferPool {
pub(crate) fn new(buffer_len: u16, buffer_size: usize) -> Self {
let buffers = (0..buffer_len)
let buffers = (0..buffer_len.next_power_of_two())
.map(|_| Vec::with_capacity(buffer_size))
.collect();

Expand All @@ -34,11 +38,16 @@ impl BufferPool {
self.buffers.borrow_mut().pop_front()
}

pub(crate) fn add_buffer(&self, buffer: Vec<u8>) {
pub(crate) fn add_buffer(&self, mut buffer: Vec<u8>) {
buffer.clear();
self.buffers.borrow_mut().push_back(buffer)
}
}

/// Buffer borrowed from buffer pool
///
/// When IO operation finish, user will obtain a `BorrowedBuffer` to access the
/// filled data
pub struct BorrowedBuffer<'a> {
buffer: ManuallyDrop<Slice<Vec<u8>>>,
pool: &'a BufferPool,
Expand Down
24 changes: 24 additions & 0 deletions compio-driver/src/fusion/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use std::{
ops::{Deref, DerefMut},
};

/// Buffer pool
///
/// A buffer pool to allow user no need to specify a specific buffer to do the
/// IO operation
pub struct BufferPool {
inner: BufferPollInner,
}
Expand Down Expand Up @@ -40,13 +44,33 @@ impl BufferPool {
inner: BufferPollInner::Poll(buffer_pool),
}
}

pub(crate) fn into_poll(self) -> crate::fallback_buffer_pool::BufferPool {
match self.inner {
BufferPollInner::IoUring(_) => {
panic!("BufferPool type is not io-uring type")
}
BufferPollInner::Poll(inner) => inner,
}
}

pub(crate) fn into_io_uring(self) -> super::iour::buffer_pool::BufferPool {
match self.inner {
BufferPollInner::IoUring(inner) => inner,
BufferPollInner::Poll(_) => panic!("BufferPool type is not poll type"),
}
}
}

enum BufferPollInner {
IoUring(super::iour::buffer_pool::BufferPool),
Poll(crate::fallback_buffer_pool::BufferPool),
}

/// Buffer borrowed from buffer pool
///
/// When IO operation finish, user will obtain a `BorrowedBuffer` to access the
/// filled data
pub struct BorrowedBuffer<'a> {
inner: BorrowedBufferInner<'a>,
}
Expand Down
7 changes: 5 additions & 2 deletions compio-driver/src/fusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,11 @@ impl Driver {
/// # Safety
///
/// caller must make sure release the buffer pool with correct driver
pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
todo!()
pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> {
match &mut self.fuse {
FuseDriver::Poll(driver) => driver.release_buffer_pool(buffer_pool.into_poll()),
FuseDriver::IoUring(driver) => driver.release_buffer_pool(buffer_pool.into_io_uring()),
}
}
}

Expand Down
17 changes: 16 additions & 1 deletion compio-driver/src/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use windows_sys::Win32::{
},
};

use crate::{syscall, AsyncifyPool, Entry, Key, OutEntries, ProactorBuilder};
use crate::{syscall, AsyncifyPool, BufferPool, Entry, Key, OutEntries, ProactorBuilder};

pub(crate) mod op;

Expand Down Expand Up @@ -320,6 +320,21 @@ impl Driver {
self.notify_overlapped.clone(),
))
}

pub fn create_buffer_pool(
&mut self,
buffer_len: u16,
buffer_size: usize,
) -> io::Result<BufferPool> {
Ok(BufferPool::new(buffer_len, buffer_size))
}

/// # Safety
///
/// caller must make sure release the buffer pool with correct driver
pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
Ok(())
}
}

impl AsRawFd for Driver {
Expand Down
39 changes: 30 additions & 9 deletions compio-driver/src/iocp/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use std::{
io::ErrorKind,
marker::PhantomPinned,
net::Shutdown,
os::{fd::AsRawFd, windows::io::AsRawSocket},
os::windows::io::AsRawSocket,
pin::Pin,
ptr::{null, null_mut},
task::Poll,
};

use aligned_array::{Aligned, A8};
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use compio_buf::{
BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, SetBufInit, Slice,
};
#[cfg(not(feature = "once_cell_try"))]
use once_cell::sync::OnceCell as OnceLock;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -40,7 +42,10 @@ use windows_sys::{
},
};

use crate::{op::*, syscall, AsRawFd, OpCode, OpType, RawFd, SharedFd};
use crate::{
op::*, syscall, AsRawFd, BorrowedBuffer, BufferPool, OpCode, OpType, RawFd, SharedFd,
TakeBuffer,
};

#[inline]
fn winapi_result(transferred: u32) -> Poll<io::Result<usize>> {
Expand Down Expand Up @@ -185,24 +190,32 @@ impl<S> ReadAtBufferPool<S> {
let buffer = buffer_pool.get_buffer().ok_or_else(|| {
io::Error::new(ErrorKind::Other, "buffer ring has no available buffer")
})?;
let len = if len == 0 {
buffer.capacity()
} else {
buffer.capacity().min(len as _)
};

Ok(Self {
read_at: ReadAt::new(fd, offset, buffer.slice(..len as usize)),
read_at: ReadAt::new(fd, offset, buffer.slice(..len)),
})
}
}

impl<S: AsRawFd> OpCode for ReadAtBufferPool<S> {
unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
self.project().recv.operate(self, optr)
self.project().read_at.operate(optr)
}

unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
self.project().recv.cancel(self, optr)
self.project().read_at.cancel(optr)
}
}

impl<S> TakeBuffer<usize> for ReadAtBufferPool<S> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

fn take_buffer(
self,
buffer_pool: &BufferPool,
Expand Down Expand Up @@ -490,24 +503,32 @@ impl<S> RecvBufferPool<S> {
let buffer = buffer_pool.get_buffer().ok_or_else(|| {
io::Error::new(ErrorKind::Other, "buffer ring has no available buffer")
})?;
let len = if len == 0 {
buffer.capacity()
} else {
buffer.capacity().min(len as _)
};

Ok(Self {
recv: Recv::new(fd, buffer.slice(..len as usize)),
recv: Recv::new(fd, buffer.slice(..len)),
})
}
}

impl<S: AsRawFd> OpCode for RecvBufferPool<S> {
unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
self.project().recv.operate(self, optr)
self.project().recv.operate(optr)
}

unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
self.project().recv.cancel(self, optr)
self.project().recv.cancel(optr)
}
}

impl<S> TakeBuffer<usize> for RecvBufferPool<S> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

fn take_buffer(
self,
buffer_pool: &BufferPool,
Expand Down
8 changes: 8 additions & 0 deletions compio-driver/src/iour/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use std::{
use io_uring::cqueue::buffer_select;
use io_uring_buf_ring::IoUringBufRing;

/// Buffer pool
///
/// A buffer pool to allow user no need to specify a specific buffer to do the
/// IO operation
pub struct BufferPool {
buf_ring: IoUringBufRing<Vec<u8>>,
}
Expand Down Expand Up @@ -49,6 +53,10 @@ impl BufferPool {
}
}

/// Buffer borrowed from buffer pool
///
/// When IO operation finish, user will obtain a `BorrowedBuffer` to access the
/// filled data
pub struct BorrowedBuffer<'a>(io_uring_buf_ring::BorrowedBuffer<'a, Vec<u8>>);

impl Debug for BorrowedBuffer<'_> {
Expand Down
8 changes: 7 additions & 1 deletion compio-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,14 +535,20 @@ impl ProactorBuilder {
}
}

/// Trait to get the selected buffer of an io operation.
pub trait TakeBuffer<T> {
/// Buffer pool type
type BufferPool;

/// Selected buffer type
type Buffer<'a>;

/// Take the selected buffer with `buffer_pool`, io `result` and `flags`, if
/// io operation is success
fn take_buffer(
self,
buffer_pool: &Self::BufferPool,
result: io::Result<T>,
flag: u32,
flags: u32,
) -> io::Result<Self::Buffer<'_>>;
}
14 changes: 12 additions & 2 deletions compio-driver/src/poll/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,14 @@ impl<S> ReadAtBufferPool<S> {
let buffer = buffer_pool.get_buffer().ok_or_else(|| {
io::Error::new(ErrorKind::Other, "buffer ring has no available buffer")
})?;
let len = if len == 0 {
buffer.capacity()
} else {
buffer.capacity().min(len as _)
};

Ok(Self {
read_at: ReadAt::new(fd, offset, buffer.slice(..len as usize)),
read_at: ReadAt::new(fd, offset, buffer.slice(..len)),
})
}
}
Expand Down Expand Up @@ -530,9 +535,14 @@ impl<S> RecvBufferPool<S> {
let buffer = buffer_pool.get_buffer().ok_or_else(|| {
io::Error::new(ErrorKind::Other, "buffer ring has no available buffer")
})?;
let len = if len == 0 {
buffer.capacity()
} else {
buffer.capacity().min(len as _)
};

Ok(Self {
recv: Recv::new(fd, buffer.slice(..len as usize)),
recv: Recv::new(fd, buffer.slice(..len)),
})
}
}
Expand Down
46 changes: 31 additions & 15 deletions compio-fs/src/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use compio_driver::{
op::{BufResultExt, Recv, RecvBufferPool, Send},
AsRawFd, SharedFd, TakeBuffer, ToSharedFd,
};
use compio_io::{AsyncRead, AsyncWrite};
use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite};
use compio_runtime::{
buffer_pool::{BorrowedBuffer, BufferPool},
Attacher,
Expand Down Expand Up @@ -49,20 +49,6 @@ impl<T: AsRawFd> AsyncFd<T> {
}
}

impl<T: AsRawFd + 'static> AsyncFd<T> {
pub async fn read_buffer_pool<'a>(
&self,
buffer_pool: &'a BufferPool,
len: u32,
) -> io::Result<BorrowedBuffer<'a>> {
let fd = self.to_shared_fd();
let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len)?;
let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await;

op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags)
}
}

impl<T: AsRawFd + 'static> AsyncRead for AsyncFd<T> {
#[inline]
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
Expand Down Expand Up @@ -91,6 +77,36 @@ impl<T: AsRawFd + 'static> AsyncRead for &AsyncFd<T> {
}
}

impl<T: AsRawFd + 'static> AsyncReadBufferPool for AsyncFd<T> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_buffer_pool(buffer_pool, len).await
}
}

impl<T: AsRawFd + 'static> AsyncReadBufferPool for &AsyncFd<T> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
let fd = self.to_shared_fd();
let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?;
let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await;

op.take_buffer(buffer_pool.as_driver_buffer_pool(), res, flags)
}
}

impl<T: AsRawFd + 'static> AsyncWrite for AsyncFd<T> {
#[inline]
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
Expand Down
Loading

0 comments on commit ece142e

Please sign in to comment.