diff --git a/.gitignore b/.gitignore index c787fcc..96ef6c0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ /target Cargo.lock -/.vscode diff --git a/src/kqueue.rs b/src/kqueue.rs index 464434b..1b7a3b3 100644 --- a/src/kqueue.rs +++ b/src/kqueue.rs @@ -83,7 +83,7 @@ impl Poller { let mode_flags = match mode { PollMode::Oneshot => libc::EV_ONESHOT, PollMode::Level => 0, - PollMode::Edge => libc::EV_DISPATCH, + PollMode::Edge => libc::EV_CLEAR, }; let read_flags = if ev.readable { diff --git a/src/lib.rs b/src/lib.rs index 7e9aad6..972954e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -388,16 +388,21 @@ impl Poller { self.modify_with_mode(source, interest, PollMode::Oneshot) } - /// Modifies interest in a file descriptor or socket to the poller, but with the - /// specified mode. + /// Modifies interest in a file descriptor or socket to the poller, but with the specified + /// mode. /// - /// This is identical to the `modify()` function, but allows specifying the - /// polling mode to use for this socket. + /// This is identical to the `modify()` function, but allows specifying the polling mode + /// to use for this socket. + /// + /// # Performance Notes + /// + /// This function can be used to change a source from one polling mode to another. However, + /// on some platforms, this switch can cause delays in the delivery of events. /// /// # Errors /// - /// If the operating system does not support the specified mode, this function - /// will return an error. + /// If the operating system does not support the specified mode, this function will return + /// an error. pub fn modify_with_mode( &self, source: impl Source, diff --git a/tests/other_modes.rs b/tests/other_modes.rs new file mode 100644 index 0000000..e499d70 --- /dev/null +++ b/tests/other_modes.rs @@ -0,0 +1,170 @@ +//! Tests for level triggered and edge triggered mode. + +#![allow(clippy::unused_io_amount)] + +use std::io::{self, prelude::*}; +use std::net::{TcpListener, TcpStream}; +use std::time::Duration; + +use polling::{Event, PollMode, Poller}; + +#[test] +fn level_triggered() { + // Create our streams. + let (mut reader, mut writer) = tcp_pair().unwrap(); + let reader_token = 1; + + // Create our poller and register our streams. + let poller = Poller::new().unwrap(); + if poller + .add_with_mode(&reader, Event::readable(reader_token), PollMode::Level) + .is_err() + { + // Only panic if we're on a platform that should support level mode. + cfg_if::cfg_if! { + if #[cfg(any(target_os = "solaris", target_os = "illumos"))] { + return; + } else { + panic!("Level mode should be supported on this platform"); + } + } + } + + // Write some data to the writer. + let data = [1, 2, 3, 4, 5]; + writer.write_all(&data).unwrap(); + + // A "readable" notification should be delivered. + let mut events = Vec::new(); + poller + .wait(&mut events, Some(Duration::from_secs(10))) + .unwrap(); + + assert_eq!(events, [Event::readable(reader_token)]); + + // If we read some of the data, the notification should still be available. + reader.read_exact(&mut [0; 3]).unwrap(); + events.clear(); + poller + .wait(&mut events, Some(Duration::from_secs(10))) + .unwrap(); + assert_eq!(events, [Event::readable(reader_token)]); + + // If we read the rest of the data, the notification should be gone. + reader.read_exact(&mut [0; 2]).unwrap(); + events.clear(); + poller + .wait(&mut events, Some(Duration::from_secs(0))) + .unwrap(); + + assert_eq!(events, []); + + // After modifying the stream and sending more data, it should be oneshot. + poller + .modify_with_mode(&reader, Event::readable(reader_token), PollMode::Oneshot) + .unwrap(); + + writer.write(&data).unwrap(); + events.clear(); + + // BUG: Somehow, the notification here is delayed? + poller + .wait(&mut events, Some(Duration::from_secs(10))) + .unwrap(); + + assert_eq!(events, [Event::readable(reader_token)]); + + // After reading, the notification should vanish. + reader.read(&mut [0; 5]).unwrap(); + events.clear(); + poller + .wait(&mut events, Some(Duration::from_secs(0))) + .unwrap(); + + assert_eq!(events, []); +} + +#[test] +fn edge_triggered() { + // Create our streams. + let (mut reader, mut writer) = tcp_pair().unwrap(); + let reader_token = 1; + + // Create our poller and register our streams. + let poller = Poller::new().unwrap(); + if poller + .add_with_mode(&reader, Event::readable(reader_token), PollMode::Edge) + .is_err() + { + // Only panic if we're on a platform that should support level mode. + cfg_if::cfg_if! { + if #[cfg(all( + any( + target_os = "linux", + target_os = "android", + target_os = "macos", + target_os = "ios", + target_os = "tvos", + target_os = "watchos", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd", + target_os = "dragonfly" + ), + not(polling_test_poll_backend) + ))] { + panic!("Edge mode should be supported on this platform"); + } else { + return; + } + } + } + + // Write some data to the writer. + let data = [1, 2, 3, 4, 5]; + writer.write_all(&data).unwrap(); + + // A "readable" notification should be delivered. + let mut events = Vec::new(); + poller + .wait(&mut events, Some(Duration::from_secs(10))) + .unwrap(); + + assert_eq!(events, [Event::readable(reader_token)]); + + // If we read some of the data, the notification should not still be available. + reader.read_exact(&mut [0; 3]).unwrap(); + events.clear(); + poller + .wait(&mut events, Some(Duration::from_secs(0))) + .unwrap(); + assert_eq!(events, []); + + // If we write more data, a notification should be delivered. + writer.write_all(&data).unwrap(); + events.clear(); + poller + .wait(&mut events, Some(Duration::from_secs(10))) + .unwrap(); + assert_eq!(events, [Event::readable(reader_token)]); + + // After modifying the stream and sending more data, it should be oneshot. + poller + .modify_with_mode(&reader, Event::readable(reader_token), PollMode::Oneshot) + .unwrap(); + + writer.write_all(&data).unwrap(); + events.clear(); + poller + .wait(&mut events, Some(Duration::from_secs(10))) + .unwrap(); + + assert_eq!(events, [Event::readable(reader_token)]); +} + +fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> { + let listener = TcpListener::bind("127.0.0.1:0")?; + let a = TcpStream::connect(listener.local_addr()?)?; + let (b, _) = listener.accept()?; + Ok((a, b)) +}