Skip to content

Commit

Permalink
replace legacy impl with Poll
Browse files Browse the repository at this point in the history
  • Loading branch information
loongs-zhang committed Jan 28, 2025
1 parent fca9c7c commit ff53fe4
Show file tree
Hide file tree
Showing 19 changed files with 184 additions and 123 deletions.
3 changes: 2 additions & 1 deletion monoio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ windows-sys = { version = "0.48.0", features = [
"Win32_System_IO",
"Win32_Storage_FileSystem",
"Win32_Security",
"Win32_System_Console",
] }

# unix dependencies
Expand Down Expand Up @@ -87,7 +88,7 @@ utils = ["nix"]
# enable debug if you want to know what runtime does
debug = ["tracing"]
# enable legacy driver support(will make monoio available for older kernel and macOS)
legacy = ["mio"]
legacy = ["poll-io"]
# iouring support
iouring = ["io-uring"]
# tokio-compatible(only have effect when legacy is enabled and iouring is not)
Expand Down
File renamed without changes.
117 changes: 25 additions & 92 deletions monoio/src/driver/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,18 @@ use std::{
use super::{
op::{CompletionMeta, Op, OpAble},
ready::{self, Ready},
scheduled_io::ScheduledIo,
Driver, Inner, CURRENT,
};
use crate::utils::slab::Slab;

#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
#[cfg(windows)]
pub(super) mod iocp;
use crate::driver::poll::iocp::SocketState;

#[cfg(feature = "sync")]
mod waker;
#[cfg(feature = "sync")]
pub(crate) use waker::UnparkHandle;

pub(crate) struct LegacyInner {
pub(crate) io_dispatch: Slab<ScheduledIo>,
#[cfg(unix)]
events: mio::Events,
#[cfg(unix)]
poll: mio::Poll,
#[cfg(windows)]
events: iocp::Events,
#[cfg(windows)]
poll: iocp::Poller,
pub(crate) poll: crate::driver::poll::Poll,

#[cfg(feature = "sync")]
shared_waker: std::sync::Arc<waker::EventWaker>,
Expand Down Expand Up @@ -66,35 +54,23 @@ impl LegacyDriver {
}

pub(crate) fn new_with_entries(entries: u32) -> io::Result<Self> {
#[cfg(unix)]
let poll = mio::Poll::new()?;
#[cfg(windows)]
let poll = iocp::Poller::new()?;
let poll = crate::driver::poll::Poll::with_capacity(entries as usize)?;

#[cfg(all(unix, feature = "sync"))]
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
poll.registry(),
TOKEN_WAKEUP,
)?));
#[cfg(all(windows, feature = "sync"))]
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(iocp::Waker::new(
&poll,
TOKEN_WAKEUP,
)?));
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(
crate::driver::poll::iocp::Waker::new(&poll, TOKEN_WAKEUP)?,
));
#[cfg(feature = "sync")]
let (waker_sender, waker_receiver) = flume::unbounded::<std::task::Waker>();
#[cfg(feature = "sync")]
let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);

let inner = LegacyInner {
io_dispatch: Slab::new(),
#[cfg(unix)]
events: mio::Events::with_capacity(entries as usize),
#[cfg(unix)]
poll,
#[cfg(windows)]
events: iocp::Events::with_capacity(entries as usize),
#[cfg(windows)]
poll,
#[cfg(feature = "sync")]
shared_waker,
Expand Down Expand Up @@ -150,66 +126,47 @@ impl LegacyDriver {
timeout = Some(Duration::ZERO);
}

// here we borrow 2 mut self, but its safe.
let events = unsafe { &mut (*self.inner.get()).events };
match inner.poll.poll(events, timeout) {
// here we borrow 2 mut self, but it's safe.
let poll = unsafe { &mut (*self.inner.get()).poll };
let events = unsafe { &mut (*self.inner.get()).poll.events };
match poll.poll(events, timeout) {
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
#[cfg(unix)]
let iter = events.iter();
#[cfg(windows)]
let iter = events.events.iter();
for event in iter {
for event in events.iter() {
let token = event.token();

#[cfg(feature = "sync")]
if token != TOKEN_WAKEUP {
inner.dispatch(token, Ready::from_mio(event));
inner.poll.dispatch(token, Ready::from_mio(event));
}

#[cfg(not(feature = "sync"))]
inner.dispatch(token, Ready::from_mio(event));
inner.poll.dispatch(token, Ready::from_mio(event));
}
Ok(())
}

#[cfg(windows)]
pub(crate) fn register(
this: &Rc<UnsafeCell<LegacyInner>>,
state: &mut iocp::SocketState,
state: &mut SocketState,
interest: mio::Interest,
) -> io::Result<usize> {
let inner = unsafe { &mut *this.get() };
let io = ScheduledIo::default();
let token = inner.io_dispatch.insert(io);

match inner.poll.register(state, mio::Token(token), interest) {
Ok(_) => Ok(token),
Err(e) => {
inner.io_dispatch.remove(token);
Err(e)
}
}
inner.poll.register(state, interest)
}

#[cfg(windows)]
pub(crate) fn deregister(
this: &Rc<UnsafeCell<LegacyInner>>,
token: usize,
state: &mut iocp::SocketState,
state: &mut SocketState,
) -> io::Result<()> {
let inner = unsafe { &mut *this.get() };

// try to deregister fd first, on success we will remove it from slab.
match inner.poll.deregister(state) {
Ok(_) => {
inner.io_dispatch.remove(token);
Ok(())
}
Err(e) => Err(e),
}
inner.poll.deregister(state, token)
}

#[cfg(unix)]
Expand All @@ -219,16 +176,7 @@ impl LegacyDriver {
interest: mio::Interest,
) -> io::Result<usize> {
let inner = unsafe { &mut *this.get() };
let token = inner.io_dispatch.insert(ScheduledIo::new());

let registry = inner.poll.registry();
match registry.register(source, mio::Token(token), interest) {
Ok(_) => Ok(token),
Err(e) => {
inner.io_dispatch.remove(token);
Err(e)
}
}
inner.poll.register(source, interest)
}

#[cfg(unix)]
Expand All @@ -238,31 +186,12 @@ impl LegacyDriver {
source: &mut impl mio::event::Source,
) -> io::Result<()> {
let inner = unsafe { &mut *this.get() };

// try to deregister fd first, on success we will remove it from slab.
match inner.poll.registry().deregister(source) {
Ok(_) => {
inner.io_dispatch.remove(token);
Ok(())
}
Err(e) => Err(e),
}
inner.poll.deregister(source, token)
}
}

impl LegacyInner {
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let mut sio = match self.io_dispatch.get(token.0) {
Some(io) => io,
None => {
return;
}
};
let ref_mut = sio.as_mut();
ref_mut.set_readiness(|curr| curr | ready);
ref_mut.wake(ready);
}

pub(crate) fn poll_op<T: OpAble>(
this: &Rc<UnsafeCell<Self>>,
data: &mut T,
Expand All @@ -282,7 +211,11 @@ impl LegacyInner {
};

// wait io ready and do syscall
let mut scheduled_io = inner.io_dispatch.get(index).expect("scheduled_io lost");
let mut scheduled_io = inner
.poll
.io_dispatch
.get(index)
.expect("scheduled_io lost");
let ref_mut = scheduled_io.as_mut();

let readiness = ready!(ref_mut.poll_readiness(cx, direction));
Expand Down Expand Up @@ -324,7 +257,7 @@ impl LegacyInner {
ready::Direction::Read => Ready::READ_CANCELED,
ready::Direction::Write => Ready::WRITE_CANCELED,
};
inner.dispatch(mio::Token(index), ready);
inner.poll.dispatch(mio::Token(index), ready);
}

pub(crate) fn submit_with_data<T>(
Expand Down
4 changes: 2 additions & 2 deletions monoio/src/driver/legacy/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::driver::unpark::Unpark;
pub(crate) struct EventWaker {
// raw waker
#[cfg(windows)]
waker: super::iocp::Waker,
waker: super::super::poll::iocp::Waker,
#[cfg(unix)]
waker: mio::Waker,
// Atomic awake status
Expand All @@ -20,7 +20,7 @@ impl EventWaker {
}

#[cfg(windows)]
pub(crate) fn new(waker: super::iocp::Waker) -> Self {
pub(crate) fn new(waker: super::super::poll::iocp::Waker) -> Self {
Self {
waker,
awake: std::sync::atomic::AtomicBool::new(true),
Expand Down
6 changes: 5 additions & 1 deletion monoio/src/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// Monoio Driver.
#[allow(dead_code)]
pub(crate) mod op;
#[cfg(all(feature = "poll-io", unix))]
#[cfg(feature = "poll-io")]
pub(crate) mod poll;
#[cfg(any(feature = "legacy", feature = "poll-io"))]
pub(crate) mod ready;
Expand All @@ -16,6 +16,10 @@ pub(crate) mod thread;
mod legacy;
#[cfg(all(target_os = "linux", feature = "iouring"))]
mod uring;
// todo add completion IOCP
#[allow(warnings)]
#[cfg(windows)]
mod iocp;

mod util;

Expand Down
9 changes: 4 additions & 5 deletions monoio/src/driver/op/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,10 @@ impl<T: IoBuf> Op<SendMsgUnix<T>> {
buf: T,
socket_addr: Option<UnixSocketAddr>,
) -> io::Result<Self> {
let mut info: Box<(Option<UnixSocketAddr>, IoVecMeta, libc::msghdr)> = Box::new((
socket_addr.map(Into::into),
IoVecMeta::from(&buf),
unsafe { std::mem::zeroed() },
));
let mut info: Box<(Option<UnixSocketAddr>, IoVecMeta, libc::msghdr)> =
Box::new((socket_addr, IoVecMeta::from(&buf), unsafe {
std::mem::zeroed()
}));

info.2.msg_iov = info.1.write_iovec_ptr();
info.2.msg_iovlen = info.1.write_iovec_len() as _;
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use core::slice::Iter;

use mio::Token;
use windows_sys::Win32::System::IO::OVERLAPPED_ENTRY;

Expand Down Expand Up @@ -117,4 +119,8 @@ impl Events {
*status = unsafe { std::mem::zeroed() };
}
}

pub fn iter(&self) -> Iter<'_, Event> {
self.events.iter()
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
mod afd;
mod core;
mod event;
mod state;
mod waker;

pub use core::*;
use std::{
collections::VecDeque,
os::windows::prelude::RawSocket,
os::windows::{
io::{AsRawHandle, RawHandle},
prelude::RawSocket,
},
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Expand All @@ -25,6 +26,8 @@ use windows_sys::Win32::{
System::IO::{OVERLAPPED, OVERLAPPED_ENTRY},
};

pub use crate::driver::iocp::*;

pub struct Poller {
is_polling: AtomicBool,
cp: Arc<CompletionPort>,
Expand Down Expand Up @@ -229,7 +232,7 @@ impl Poller {
const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;

let mut afd_group = self.afd.lock().unwrap();
if afd_group.len() == 0 {
if afd_group.is_empty() {
self._alloc_afd_group(&mut afd_group)?;
} else {
// + 1 reference in Vec
Expand Down Expand Up @@ -286,6 +289,12 @@ impl Drop for Poller {
}
}

impl AsRawHandle for Poller {
fn as_raw_handle(&self) -> RawHandle {
self.cp.as_raw_handle()
}
}

pub fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>> {
let sock_ptr: *const Mutex<SockState> = ptr as *const _;
unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) }
Expand Down
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit ff53fe4

Please sign in to comment.