Skip to content

Commit

Permalink
chore: remove unsued async support code
Browse files Browse the repository at this point in the history
Resolve the broken CI caused by async_io feature

Signed-off-by: Yang Kaiyong <yangkaiyong.yky@antgroup.com>
  • Loading branch information
Yang Kaiyong committed Jan 22, 2025
1 parent 5603c65 commit 776ad9f
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 180 deletions.
179 changes: 0 additions & 179 deletions src/transport/fusedev/linux_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,182 +713,3 @@ mod tests {
se.mount().unwrap();
}
}

#[cfg(feature = "async_io")]
pub use asyncio::FuseDevTask;

#[cfg(feature = "async_io")]
/// Task context to handle fuse request in asynchronous mode.
mod asyncio {
use std::os::unix::io::RawFd;
use std::sync::Arc;

use crate::api::filesystem::AsyncFileSystem;
use crate::api::server::Server;
use crate::transport::{FuseBuf, Reader, Writer};

/// Task context to handle fuse request in asynchronous mode.
///
/// This structure provides a context to handle fuse request in asynchronous mode, including
/// the fuse fd, a internal buffer and a `Server` instance to serve requests.
///
/// ## Examples
/// ```ignore
/// let buf_size = 0x1_0000;
/// let state = AsyncExecutorState::new();
/// let mut task = FuseDevTask::new(buf_size, fuse_dev_fd, fs_server, state.clone());
///
/// // Run the task
/// executor.spawn(async move { task.poll_handler().await });
///
/// // Stop the task
/// state.quiesce();
/// ```
pub struct FuseDevTask<F: AsyncFileSystem + Sync> {
fd: RawFd,
buf: Vec<u8>,
state: AsyncExecutorState,
server: Arc<Server<F>>,
}

impl<F: AsyncFileSystem + Sync> FuseDevTask<F> {
/// Create a new fuse task context for asynchronous IO.
///
/// # Parameters
/// - buf_size: size of buffer to receive requests from/send reply to the fuse fd
/// - fd: fuse device file descriptor
/// - server: `Server` instance to serve requests from the fuse fd
/// - state: shared state object to control the task object
///
/// # Safety
/// The caller must ensure `fd` is valid during the lifetime of the returned task object.
pub fn new(
buf_size: usize,
fd: RawFd,
server: Arc<Server<F>>,
state: AsyncExecutorState,
) -> Self {
FuseDevTask {
fd,
server,
state,
buf: vec![0x0u8; buf_size],
}
}

/// Handler to process fuse requests in asynchronous mode.
///
/// An async fn to handle requests from the fuse fd. It works in asynchronous IO mode when:
/// - receiving request from fuse fd
/// - handling requests by calling Server::async_handle_requests()
/// - sending reply to fuse fd
///
/// The async fn repeatedly return Poll::Pending when polled until the state has been set
/// to quiesce mode.
pub async fn poll_handler(&mut self) {
// TODO: register self.buf as io uring buffers.
let drive = AsyncDriver::default();

while !self.state.quiescing() {
let result = AsyncUtil::read(drive.clone(), self.fd, &mut self.buf, 0).await;
match result {
Ok(len) => {
// ###############################################
// Note: it's a heavy hack to reuse the same underlying data
// buffer for both Reader and Writer, in order to reduce memory
// consumption. Here we assume Reader won't be used anymore once
// we start to write to the Writer. To get rid of this hack,
// just allocate a dedicated data buffer for Writer.
let buf = unsafe {
std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf.len())
};
// Reader::new() and Writer::new() should always return success.
let reader =
Reader::<()>::new(FuseBuf::new(&mut self.buf[0..len])).unwrap();
let writer = Writer::new(self.fd, buf).unwrap();
let result = unsafe {
self.server
.async_handle_message(drive.clone(), reader, writer, None, None)
.await
};

if let Err(e) = result {
// TODO: error handling
error!("failed to handle fuse request, {}", e);
}
}
Err(e) => {
// TODO: error handling
error!("failed to read request from fuse device fd, {}", e);
}
}
}

// TODO: unregister self.buf as io uring buffers.

// Report that the task has been quiesced.
self.state.report();
}
}

impl<F: AsyncFileSystem + Sync> Clone for FuseDevTask<F> {
fn clone(&self) -> Self {
FuseDevTask {
fd: self.fd,
server: self.server.clone(),
state: self.state.clone(),
buf: vec![0x0u8; self.buf.capacity()],
}
}
}

#[cfg(test)]
mod tests {
use std::os::unix::io::AsRawFd;

use super::*;
use crate::api::{Vfs, VfsOptions};
use crate::async_util::{AsyncDriver, AsyncExecutor};

#[test]
fn test_fuse_task() {
let state = AsyncExecutorState::new();
let fs = Vfs::<AsyncDriver, ()>::new(VfsOptions::default());
let _server = Arc::new(Server::<Vfs<AsyncDriver, ()>, AsyncDriver, ()>::new(fs));
let file = vmm_sys_util::tempfile::TempFile::new().unwrap();
let _fd = file.as_file().as_raw_fd();

let mut executor = AsyncExecutor::new(32);
executor.setup().unwrap();

/*
// Create three tasks, which could handle three concurrent fuse requests.
let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
executor
.spawn(async move { task.poll_handler().await })
.unwrap();
let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
executor
.spawn(async move { task.poll_handler().await })
.unwrap();
let mut task = FuseDevTask::new(0x1000, fd, server.clone(), state.clone());
executor
.spawn(async move { task.poll_handler().await })
.unwrap();
*/

for _i in 0..10 {
executor.run_once(false).unwrap();
}

// Set existing flag
state.quiesce();
// Close the fusedev fd, so all pending async io requests will be aborted.
drop(file);

for _i in 0..10 {
executor.run_once(false).unwrap();
}
}
}
}
1 change: 0 additions & 1 deletion src/transport/virtiofs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ mod async_io {

/// Disabled since vm-virtio doesn't export any DescriptorChain constructors.
/// Should re-enable once it does.
#[allow(unexpected_cfgs)]
#[cfg(testff)]
mod tests {
use super::*;
Expand Down

0 comments on commit 776ad9f

Please sign in to comment.