From 0a16c93401ddf731b35b4c1de3f1ef0af2012351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Thu, 21 Nov 2024 22:20:45 +0900 Subject: [PATCH] fix(poll): simplify push_blocking loop --- compio-driver/src/poll/mod.rs | 56 ++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/compio-driver/src/poll/mod.rs b/compio-driver/src/poll/mod.rs index 3f53e518..56e70bff 100644 --- a/compio-driver/src/poll/mod.rs +++ b/compio-driver/src/poll/mod.rs @@ -244,13 +244,7 @@ impl Driver { Poll::Pending } Decision::Completed(res) => Poll::Ready(Ok(res)), - Decision::Blocking => loop { - if self.push_blocking(user_data) { - break Poll::Pending; - } else { - self.poll_blocking(); - } - }, + Decision::Blocking => self.push_blocking(user_data), #[cfg(aio)] Decision::Aio(AioControl { mut aiocbp, submit }) => { let aiocb = unsafe { aiocbp.as_mut() }; @@ -273,19 +267,20 @@ impl Driver { } match syscall!(submit(aiocbp.as_ptr())) { Ok(_) => Poll::Pending, + // FreeBSD: + // * EOPNOTSUPP: It's on a filesystem without AIO support. Just fallback to + // blocking IO. + // * EAGAIN: The process-wide queue is full. No safe way to remove the (maybe) + // dead entries. + // Solarish: + // * EAGAIN: Allocation failed. Err(e) if matches!( e.raw_os_error(), Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN) ) => { - loop { - if self.push_blocking(user_data) { - return Poll::Pending; - } else { - self.poll_blocking(); - } - } + self.push_blocking(user_data) } Err(e) => Poll::Ready(Err(e)), } @@ -293,21 +288,28 @@ impl Driver { } } - fn push_blocking(&mut self, user_data: usize) -> bool { + fn push_blocking(&mut self, user_data: usize) -> Poll> { let poll = self.poll.clone(); let completed = self.pool_completed.clone(); - self.pool - .dispatch(move || { - let mut op = unsafe { Key::::new_unchecked(user_data) }; - let op_pin = op.as_op_pin(); - let res = match op_pin.operate() { - Poll::Pending => unreachable!("this operation is not non-blocking"), - Poll::Ready(res) => res, - }; - completed.push(Entry::new(user_data, res)); - poll.notify().ok(); - }) - .is_ok() + let mut closure = move || { + let mut op = unsafe { Key::::new_unchecked(user_data) }; + let op_pin = op.as_op_pin(); + let res = match op_pin.operate() { + Poll::Pending => unreachable!("this operation is not non-blocking"), + Poll::Ready(res) => res, + }; + completed.push(Entry::new(user_data, res)); + poll.notify().ok(); + }; + loop { + match self.pool.dispatch(closure) { + Ok(()) => return Poll::Pending, + Err(e) => { + closure = e.0; + self.poll_blocking(); + } + } + } } fn poll_blocking(&mut self) {