Skip to content

Commit

Permalink
fix(driver,iour): register polladd for notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
Berrysoft committed Feb 26, 2024
1 parent 3a4ade2 commit 647aac5
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 30 deletions.
3 changes: 2 additions & 1 deletion compio-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ windows-sys = { workspace = true, features = [
[target.'cfg(target_os = "linux")'.dependencies]
io-uring = { version = "0.6.2", optional = true }
polling = { version = "3.3.0", optional = true }
os_pipe = { workspace = true, optional = true }
paste = { workspace = true }

# Other platform dependencies
Expand All @@ -77,7 +78,7 @@ compio-buf = { workspace = true, features = ["arrayvec"] }

[features]
default = ["io-uring"]
polling = ["dep:polling"]
polling = ["dep:polling", "dep:os_pipe"]

io-uring-sqe128 = []
io-uring-cqe32 = []
Expand Down
67 changes: 38 additions & 29 deletions compio-driver/src/iour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ cfg_if::cfg_if! {
}
}
use io_uring::{
opcode::{AsyncCancel, Read},
opcode::{AsyncCancel, PollAdd},
types::{Fd, SubmitArgs, Timespec},
IoUring,
};
Expand Down Expand Up @@ -76,7 +76,6 @@ pub(crate) struct Driver {
inner: IoUring<SEntry, CEntry>,
squeue: VecDeque<SEntry>,
notifier: Notifier,
notifier_registered: bool,
pool: AsyncifyPool,
pool_completed: Arc<SegQueue<Entry>>,
}
Expand All @@ -88,11 +87,18 @@ impl Driver {
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
instrument!(compio_log::Level::TRACE, "new", ?builder);
trace!("new iour driver");
let mut squeue = VecDeque::with_capacity(builder.capacity as usize);
let notifier = Notifier::new()?;
squeue.push_back(
PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
.multi(true)
.build()
.user_data(Self::NOTIFY),
);
Ok(Self {
inner: IoUring::builder().build(builder.capacity)?,
squeue: VecDeque::with_capacity(builder.capacity as usize),
notifier: Notifier::new()?,
notifier_registered: false,
squeue,
notifier,
pool: builder.create_or_get_thread_pool(),
pool_completed: Arc::new(SegQueue::new()),
})
Expand Down Expand Up @@ -177,7 +183,10 @@ impl Driver {
let completed_entries = cqueue.filter_map(|entry| match entry.user_data() {
Self::CANCEL => None,
Self::NOTIFY => {
self.notifier_registered = false;
const IORING_CQE_F_MORE: u32 = 1 << 1;
let flags = entry.flags();
debug_assert!(flags & IORING_CQE_F_MORE == IORING_CQE_F_MORE);
self.notifier.clear().expect("cannot clear notifier");
None
}
_ => Some(create_entry(entry)),
Expand Down Expand Up @@ -256,19 +265,6 @@ impl Driver {
mut entries: OutEntries<impl Extend<usize>>,
) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
if !self.notifier_registered {
let fd = self.notifier.as_raw_fd();
let dst = self.notifier.dst();
#[allow(clippy::useless_conversion)]
self.squeue.push_back(
Read::new(Fd(fd), dst.as_mut_ptr(), dst.len() as _)
.build()
.user_data(Self::NOTIFY)
.into(),
);
trace!("registered notifier");
self.notifier_registered = true
}
// Anyway we need to submit once, no matter there are entries in squeue.
trace!("start polling");
loop {
Expand Down Expand Up @@ -321,27 +317,40 @@ fn timespec(duration: std::time::Duration) -> Timespec {
#[derive(Debug)]
struct Notifier {
fd: OwnedFd,
read_dst: Box<[u8; 8]>,
}

impl Notifier {
/// Create a new notifier.
fn new() -> io::Result<Self> {
let fd = syscall!(libc::eventfd(0, libc::EFD_CLOEXEC))?;
let fd = syscall!(libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
let fd = unsafe { OwnedFd::from_raw_fd(fd) };
Ok(Self {
fd,
read_dst: Box::new([0; 8]),
})
Ok(Self { fd })
}

fn dst(&mut self) -> &mut [u8] {
self.read_dst.as_mut_slice()
pub fn clear(&self) -> io::Result<()> {
loop {
let mut buffer = [0u64];
let res = syscall!(libc::read(
self.fd.as_raw_fd(),
buffer.as_mut_ptr().cast(),
std::mem::size_of::<u64>()
));
match res {
Ok(len) => {
debug_assert_eq!(len, std::mem::size_of::<u64>() as _);
break Ok(());
}
// Clear the next time:)
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(()),
// Just like read_exact
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => break Err(e),
}
}
}

pub fn handle(&self) -> io::Result<NotifyHandle> {
let fd = self.fd.try_clone()?;
Ok(NotifyHandle::new(fd))
Ok(NotifyHandle::new(self.fd.try_clone()?))
}
}

Expand Down

0 comments on commit 647aac5

Please sign in to comment.