Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

int: Use polling crate as interface to poll system #123

Merged
merged 9 commits into from
Apr 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/loop_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl<'l, Data> EventLoop<'l, Data> {
///
/// Fails if the initialization of the polling system failed.
pub fn try_new() -> crate::Result<Self> {
Self::inner_new(false)
Self::inner_new()
}

/// Create a new event loop in high precision mode
Expand All @@ -274,11 +274,12 @@ impl<'l, Data> EventLoop<'l, Data> {
///
/// Fails if the initialization of the polling system failed.
pub fn try_new_high_precision() -> crate::Result<Self> {
Self::inner_new(true)
// The polling crate uses high precision no matter what.
Self::inner_new()
}
notgull marked this conversation as resolved.
Show resolved Hide resolved

fn inner_new(high_precision: bool) -> crate::Result<Self> {
let poll = Poll::new(high_precision)?;
fn inner_new() -> crate::Result<Self> {
let poll = Poll::new()?;
let handle = LoopHandle {
inner: Rc::new(LoopInner {
poll: RefCell::new(poll),
Expand Down
94 changes: 73 additions & 21 deletions src/sys.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{cell::RefCell, rc::Rc, sync::Arc, time::Duration};
use std::{cell::RefCell, collections::HashMap, rc::Rc, sync::Arc, time::Duration};

#[cfg(unix)]
use std::os::unix::io::AsRawFd;
use std::os::unix::io::{AsRawFd, RawFd as Raw};

#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
use std::os::windows::io::{AsRawSocket, RawSocket as Raw};

#[cfg(unix)]
use io_lifetimes::AsFd;
Expand All @@ -31,16 +31,16 @@ pub enum Mode {
/// Level-triggering
///
/// This FD will report events on every poll as long as the requested interests
/// are available. If the same FD is inserted in multiple event loops, all of
/// them are notified of readiness.
/// are available.
Level,

/// Edge-triggering
///
/// This FD will report events only when it *gains* one of the requested interests.
/// it must thus be fully processed before it'll generate events again. If the same
/// FD is inserted on multiple event loops, it may be that not all of them are notified
/// of readiness, and not necessarily always the same(s) (at least one is notified).
/// it must thus be fully processed before it'll generate events again.
///
/// This mode is not supported on certain platforms, and an error will be returned
/// if it is used.
notgull marked this conversation as resolved.
Show resolved Hide resolved
Edge,
}

Expand Down Expand Up @@ -176,6 +176,13 @@ pub struct Poll {
/// The buffer of events returned by the poller.
events: RefCell<Vec<Event>>,

/// The sources registered as level triggered.
///
/// Some platforms that `polling` supports do not support level-triggered events. As of the time
/// of writing, this only includes Solaris and illumos. To work around this, we emulate level
/// triggered events by keeping this map of file descriptors.
level_triggered: Option<RefCell<HashMap<usize, (Raw, polling::Event)>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, to make sure I understand correctly, this is emulating level-triggered using oneshot, right?

In that case, does oneshot raise an event if the source is already ready when registered? I assume that yes, as this is necessary for this emulation to work, but I'd like to be sure.

Also, could you expand this comment by explaining how we emulate level-triggers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran this code:

use polling::{Poller, Event};
use std::net::{TcpListener, TcpStream};
use std::io::prelude::*;

fn main() {
    let (stream1, mut stream2) = tcp_pipe();
    let mut poller = Poller::new().unwrap();
    stream2.write(b"hello").unwrap();
    poller.add(&stream1, Event::readable(0)).unwrap();

    let mut events = vec![];
    poller.wait(&mut events, None).unwrap();
    println!("{:?}", events);
}

fn tcp_pipe() -> (TcpStream, TcpStream) {
    let listener = TcpListener::bind("127.0.0.1:0").unwrap();
    let addr = listener.local_addr().unwrap();
    let stream1 = TcpStream::connect(addr).unwrap();
    let stream2 = listener.accept().unwrap().0;
    (stream1, stream2)
}

Got this result:

[Event { key: 0, readable: true, writable: false }]

So, yes, it looks like oneshot mode triggers on pre-registration events properly.


pub(crate) timers: Rc<RefCell<TimerWheel>>,
}

Expand All @@ -187,11 +194,19 @@ impl std::fmt::Debug for Poll {
}

impl Poll {
pub(crate) fn new(_high_precision: bool) -> crate::Result<Poll> {
pub(crate) fn new() -> crate::Result<Poll> {
let poller = Poller::new()?;
let level_triggered = if poller.supports_level() {
None
} else {
Some(RefCell::new(HashMap::new()))
};

Ok(Poll {
poller: Arc::new(Poller::new()?),
poller: Arc::new(poller),
events: RefCell::new(Vec::new()),
timers: Rc::new(RefCell::new(TimerWheel::new())),
level_triggered,
})
}

Expand All @@ -213,17 +228,29 @@ impl Poll {
self.poller.wait(&mut events, timeout)?;

// Convert `polling` events to `calloop` events.
let level_triggered = self.level_triggered.as_ref().map(RefCell::borrow);
let mut poll_events = events
.drain(..)
.map(|ev| PollEvent {
readiness: Readiness {
readable: ev.readable,
writable: ev.writable,
error: false,
},
token: Token { key: ev.key },
.map(|ev| {
// If we need to emulate level-triggered events...
if let Some(level_triggered) = level_triggered.as_ref() {
// ...and this event is from a level-triggered source...
if let Some((source, interest)) = level_triggered.get(&ev.key) {
// ...then we need to re-register the source.
self.poller.modify(source, *interest)?;
}
}

Ok(PollEvent {
readiness: Readiness {
readable: ev.readable,
writable: ev.writable,
error: false,
},
token: Token { key: ev.key },
})
})
.collect::<Vec<_>>();
.collect::<std::io::Result<Vec<_>>>()?;

drop(events);

Expand Down Expand Up @@ -276,8 +303,16 @@ impl Poll {
}
};

let ev = cvt_interest(interest, token);
self.poller
.add_with_mode(raw, cvt_interest(interest, token), cvt_mode(mode))?;
.add_with_mode(raw, ev, cvt_mode(mode, self.poller.supports_level()))?;

// If this is level triggered and we're emulating level triggered mode...
if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
// ...then we need to keep track of the source.
let mut level_triggered = level_triggered.borrow_mut();
level_triggered.insert(ev.key, (raw, ev));
}

Ok(())
}
Expand Down Expand Up @@ -308,8 +343,16 @@ impl Poll {
}
};

let ev = cvt_interest(interest, token);
self.poller
.modify_with_mode(raw, cvt_interest(interest, token), cvt_mode(mode))?;
.modify_with_mode(raw, ev, cvt_mode(mode, self.poller.supports_level()))?;

// If this is level triggered and we're emulating level triggered mode...
if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
// ...then we need to keep track of the source.
let mut level_triggered = level_triggered.borrow_mut();
level_triggered.insert(ev.key, (raw, ev));
}

Ok(())
}
Expand All @@ -336,6 +379,11 @@ impl Poll {
};
self.poller.delete(raw)?;

if let Some(level_triggered) = self.level_triggered.as_ref() {
let mut level_triggered = level_triggered.borrow_mut();
level_triggered.retain(|_, (source, _)| *source != raw);
}

Ok(())
}

Expand Down Expand Up @@ -365,7 +413,11 @@ fn cvt_interest(interest: Interest, tok: Token) -> Event {
}
}

fn cvt_mode(mode: Mode) -> PollMode {
fn cvt_mode(mode: Mode, supports_other_modes: bool) -> PollMode {
if !supports_other_modes {
return PollMode::Oneshot;
}

match mode {
Mode::Edge => PollMode::Edge,
Mode::Level => PollMode::Level,
Expand Down