diff --git a/src/concurrency/thread.rs b/src/concurrency/thread.rs index f72591f0c4..af457848de 100644 --- a/src/concurrency/thread.rs +++ b/src/concurrency/thread.rs @@ -172,6 +172,8 @@ pub enum BlockReason { Futex { addr: u64 }, /// Blocked on an InitOnce. InitOnce(InitOnceId), + /// Blocked on epoll. + Epoll, } /// The state of a thread. diff --git a/src/shims/unix/fd.rs b/src/shims/unix/fd.rs index e3b9835e36..3ca5f6bb2d 100644 --- a/src/shims/unix/fd.rs +++ b/src/shims/unix/fd.rs @@ -278,6 +278,14 @@ impl WeakFileDescriptionRef { } } +impl VisitProvenance for WeakFileDescriptionRef { + fn visit_provenance(&self, _visit: &mut VisitWith<'_>) { + // A weak reference can never be the only reference to some pointer or place. + // Since the actual file description is tracked by strong ref somewhere, + // it is ok to make this a NOP operation. + } +} + /// A unique id for file descriptions. While we could use the address, considering that /// is definitely unique, the address would expose interpreter internal state when used /// for sorting things. So instead we generate a unique id per file description that stays diff --git a/src/shims/unix/linux/epoll.rs b/src/shims/unix/linux/epoll.rs index 53f8b06ca6..ee86cf5f26 100644 --- a/src/shims/unix/linux/epoll.rs +++ b/src/shims/unix/linux/epoll.rs @@ -2,8 +2,9 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::io; use std::rc::{Rc, Weak}; +use std::time::Duration; -use crate::shims::unix::fd::{FdId, FileDescriptionRef}; +use crate::shims::unix::fd::{FdId, FileDescriptionRef, WeakFileDescriptionRef}; use crate::shims::unix::*; use crate::*; @@ -19,6 +20,8 @@ struct Epoll { // This is an Rc because EpollInterest need to hold a reference to update // it. ready_list: Rc>>, + /// A list of thread ids blocked on this epoll instance. + thread_id: RefCell>, } /// EpollEventInstance contains information that will be returned by epoll_wait. @@ -58,6 +61,8 @@ pub struct EpollEventInterest { data: u64, /// Ready list of the epoll instance under which this EpollEventInterest is registered. ready_list: Rc>>, + /// The file descriptor value that this EpollEventInterest is registered under. + epfd: i32, } /// EpollReadyEvents reflects the readiness of a file description. @@ -338,6 +343,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { events, data, ready_list: Rc::clone(ready_list), + epfd: epfd_value, })); if op == epoll_ctl_add { @@ -395,7 +401,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { /// The `timeout` argument specifies the number of milliseconds that /// `epoll_wait()` will block. Time is measured against the - /// CLOCK_MONOTONIC clock. + /// CLOCK_MONOTONIC clock. If the timeout is zero, the function will not block, + /// while if the timeout is -1, the function will block + /// until at least one event has been retrieved (or an error + /// occurred). /// A call to `epoll_wait()` will block until either: /// • a file descriptor delivers an event; @@ -421,59 +430,100 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { events_op: &OpTy<'tcx>, maxevents: &OpTy<'tcx>, timeout: &OpTy<'tcx>, - ) -> InterpResult<'tcx, Scalar> { + dest: &MPlaceTy<'tcx>, + ) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - let epfd = this.read_scalar(epfd)?.to_i32()?; + let epfd_value = this.read_scalar(epfd)?.to_i32()?; let events = this.read_immediate(events_op)?; let maxevents = this.read_scalar(maxevents)?.to_i32()?; let timeout = this.read_scalar(timeout)?.to_i32()?; - if epfd <= 0 || maxevents <= 0 { + if epfd_value <= 0 || maxevents <= 0 { let einval = this.eval_libc("EINVAL"); this.set_last_error(einval)?; - return Ok(Scalar::from_i32(-1)); + this.write_int(-1, dest)?; + return Ok(()); } // This needs to come after the maxevents value check, or else maxevents.try_into().unwrap() // will fail. - let events = this.deref_pointer_as( + let event = this.deref_pointer_as( &events, this.libc_array_ty_layout("epoll_event", maxevents.try_into().unwrap()), )?; - // FIXME: Implement blocking support - if timeout != 0 { - throw_unsup_format!("epoll_wait: timeout value can only be 0"); - } - - let Some(epfd) = this.machine.fds.get(epfd) else { - return Ok(Scalar::from_i32(this.fd_not_found()?)); + let Some(epfd) = this.machine.fds.get(epfd_value) else { + let result_value: i32 = this.fd_not_found()?; + this.write_int(result_value, dest)?; + return Ok(()); }; - let epoll_file_description = epfd - .downcast::() - .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?; - - let ready_list = epoll_file_description.get_ready_list(); - let mut ready_list = ready_list.borrow_mut(); - let mut num_of_events: i32 = 0; - let mut array_iter = this.project_array_fields(&events)?; - - while let Some(des) = array_iter.next(this)? { - if let Some(epoll_event_instance) = ready_list_next(this, &mut ready_list) { - this.write_int_fields_named( - &[ - ("events", epoll_event_instance.events.into()), - ("u64", epoll_event_instance.data.into()), - ], - &des.1, - )?; - num_of_events = num_of_events.strict_add(1); - } else { - break; - } + // Create a weak ref of epfd and pass it to callback so we will make sure that epfd + // is not close after the thread unblocks. + let weak_epfd = epfd.downgrade(); + + // We just need to know if the ready list is empty and borrow the thread_ids out. + // The whole logic is wrapped inside a block so we don't need to manually drop epfd later. + let ready_list_empty; + let mut thread_ids; + { + let epoll_file_description = epfd + .downcast::() + .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?; + let binding = epoll_file_description.get_ready_list(); + ready_list_empty = binding.borrow_mut().is_empty(); + thread_ids = epoll_file_description.thread_id.borrow_mut(); } - Ok(Scalar::from_i32(num_of_events)) + if timeout == 0 || !ready_list_empty { + // If the ready list is not empty, or the timeout is 0, we can return immediately. + blocking_epoll_callback(epfd_value, weak_epfd, dest, &event, this)?; + } else { + // Blocking + let timeout = match timeout { + 0.. => { + let duration = Duration::from_millis(timeout.try_into().unwrap()); + Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration)) + } + -1 => None, + ..-1 => { + throw_unsup_format!( + "epoll_wait: Only timeout values greater than or equal to -1 are supported." + ); + } + }; + thread_ids.push(this.active_thread()); + let dest = dest.clone(); + this.block_thread( + BlockReason::Epoll, + timeout, + callback!( + @capture<'tcx> { + epfd_value: i32, + weak_epfd: WeakFileDescriptionRef, + dest: MPlaceTy<'tcx>, + event: MPlaceTy<'tcx>, + } + @unblock = |this| { + blocking_epoll_callback(epfd_value, weak_epfd, &dest, &event, this)?; + Ok(()) + } + @timeout = |this| { + // No notification after blocking timeout. + let Some(epfd) = weak_epfd.upgrade() else { + throw_unsup_format!("epoll FD {epfd_value} got closed while blocking.") + }; + // Remove the current active thread_id from the blocked thread_id list. + epfd.downcast::() + .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))? + .thread_id.borrow_mut() + .retain(|&id| id != this.active_thread()); + this.write_int(0, &dest)?; + Ok(()) + } + ), + ); + } + Ok(()) } /// For a specific file description, get its ready events and update the corresponding ready @@ -483,17 +533,47 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { /// /// This *will* report an event if anyone is subscribed to it, without any further filtering, so /// do not call this function when an FD didn't have anything happen to it! - fn check_and_update_readiness(&self, fd_ref: &FileDescriptionRef) -> InterpResult<'tcx, ()> { - let this = self.eval_context_ref(); + fn check_and_update_readiness( + &mut self, + fd_ref: &FileDescriptionRef, + ) -> InterpResult<'tcx, ()> { + let this = self.eval_context_mut(); let id = fd_ref.get_id(); + let mut waiter = Vec::new(); // Get a list of EpollEventInterest that is associated to a specific file description. if let Some(epoll_interests) = this.machine.epoll_interests.get_epoll_interest(id) { for weak_epoll_interest in epoll_interests { if let Some(epoll_interest) = weak_epoll_interest.upgrade() { - check_and_update_one_event_interest(fd_ref, epoll_interest, id, this)?; + let is_updated = check_and_update_one_event_interest( + fd_ref, + epoll_interest.clone(), + id, + this, + )?; + if is_updated { + // Edge-triggered notification only notify one thread even if there are + // multiple threads block on the same epfd. + let epfd = this.machine.fds.get(epoll_interest.borrow().epfd).unwrap(); + + // This unwrap can never fail because if the current epoll instance were + // closed and its epfd value reused, the upgrade of weak_epoll_interest + // above would fail. This guarantee holds because only the epoll instance + // holds a strong ref to epoll_interest. + // FIXME: We can randomly pick a thread to unblock. + if let Some(thread_id) = + epfd.downcast::().unwrap().thread_id.borrow_mut().pop() + { + waiter.push(thread_id); + }; + } } } } + waiter.sort(); + waiter.dedup(); + for thread_id in waiter { + this.unblock_thread(thread_id, BlockReason::Epoll)?; + } Ok(()) } } @@ -517,14 +597,15 @@ fn ready_list_next( } /// This helper function checks whether an epoll notification should be triggered for a specific -/// epoll_interest and, if necessary, triggers the notification. Unlike check_and_update_readiness, -/// this function sends a notification to only one epoll instance. +/// epoll_interest and, if necessary, triggers the notification, and returns whether the +/// notification was added/updated. Unlike check_and_update_readiness, this function sends a +/// notification to only one epoll instance. fn check_and_update_one_event_interest<'tcx>( fd_ref: &FileDescriptionRef, interest: Rc>, id: FdId, ecx: &MiriInterpCx<'tcx>, -) -> InterpResult<'tcx> { +) -> InterpResult<'tcx, bool> { // Get the bitmask of ready events for a file description. let ready_events_bitmask = fd_ref.get_epoll_ready_events()?.get_event_bitmask(ecx); let epoll_event_interest = interest.borrow(); @@ -539,6 +620,46 @@ fn check_and_update_one_event_interest<'tcx>( let event_instance = EpollEventInstance::new(flags, epoll_event_interest.data); // Triggers the notification by inserting it to the ready list. ready_list.insert(epoll_key, event_instance); + return Ok(true); + } + return Ok(false); +} + +/// Callback function after epoll_wait unblocks +fn blocking_epoll_callback<'tcx>( + epfd_value: i32, + weak_epfd: WeakFileDescriptionRef, + dest: &MPlaceTy<'tcx>, + events: &MPlaceTy<'tcx>, + ecx: &mut MiriInterpCx<'tcx>, +) -> InterpResult<'tcx> { + let Some(epfd) = weak_epfd.upgrade() else { + throw_unsup_format!("epoll FD {epfd_value} got closed while blocking.") + }; + + let epoll_file_description = epfd + .downcast::() + .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?; + + let ready_list = epoll_file_description.get_ready_list(); + let mut ready_list = ready_list.borrow_mut(); + let mut num_of_events: i32 = 0; + let mut array_iter = ecx.project_array_fields(events)?; + + while let Some(des) = array_iter.next(ecx)? { + if let Some(epoll_event_instance) = ready_list_next(ecx, &mut ready_list) { + ecx.write_int_fields_named( + &[ + ("events", epoll_event_instance.events.into()), + ("u64", epoll_event_instance.data.into()), + ], + &des.1, + )?; + num_of_events = num_of_events.strict_add(1); + } else { + break; + } } + ecx.write_int(num_of_events, dest)?; Ok(()) } diff --git a/src/shims/unix/linux/foreign_items.rs b/src/shims/unix/linux/foreign_items.rs index 581f0db42e..d64f13f63d 100644 --- a/src/shims/unix/linux/foreign_items.rs +++ b/src/shims/unix/linux/foreign_items.rs @@ -62,8 +62,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { "epoll_wait" => { let [epfd, events, maxevents, timeout] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; - let result = this.epoll_wait(epfd, events, maxevents, timeout)?; - this.write_scalar(result, dest)?; + this.epoll_wait(epfd, events, maxevents, timeout, dest)?; } "eventfd" => { let [val, flag] = diff --git a/tests/fail-dep/tokio/sleep.stderr b/tests/fail-dep/tokio/sleep.stderr deleted file mode 100644 index d5bf00fc17..0000000000 --- a/tests/fail-dep/tokio/sleep.stderr +++ /dev/null @@ -1,15 +0,0 @@ -error: unsupported operation: epoll_wait: timeout value can only be 0 - --> CARGO_REGISTRY/.../epoll.rs:LL:CC - | -LL | / syscall!(epoll_wait( -LL | | self.ep.as_raw_fd(), -LL | | events.as_mut_ptr(), -LL | | events.capacity() as i32, -LL | | timeout, -LL | | )) - | |__________^ epoll_wait: timeout value can only be 0 - | - = help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support - -error: aborting due to 1 previous error - diff --git a/tests/pass-dep/libc/libc-epoll-blocking.rs b/tests/pass-dep/libc/libc-epoll-blocking.rs new file mode 100644 index 0000000000..2a5d3dff07 --- /dev/null +++ b/tests/pass-dep/libc/libc-epoll-blocking.rs @@ -0,0 +1,139 @@ +//@only-target-linux +// test_epoll_block_then_unblock depends on a deterministic schedule. +//@compile-flags: -Zmiri-preemption-rate=0 + +use std::convert::TryInto; +use std::thread; +use std::thread::spawn; + +// This is a set of testcases for blocking epoll. + +fn main() { + test_epoll_block_without_notification(); + test_epoll_block_then_unblock(); + test_notification_after_timeout(); +} + +// Using `as` cast since `EPOLLET` wraps around +const EPOLL_IN_OUT_ET: u32 = (libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET) as _; + +#[track_caller] +fn check_epoll_wait( + epfd: i32, + expected_notifications: &[(u32, u64)], + timeout: i32, +) { + let epoll_event = libc::epoll_event { events: 0, u64: 0 }; + let mut array: [libc::epoll_event; N] = [epoll_event; N]; + let maxsize = N; + let array_ptr = array.as_mut_ptr(); + let res = unsafe { libc::epoll_wait(epfd, array_ptr, maxsize.try_into().unwrap(), timeout) }; + if res < 0 { + panic!("epoll_wait failed: {}", std::io::Error::last_os_error()); + } + assert_eq!( + res, + expected_notifications.len().try_into().unwrap(), + "got wrong number of notifications" + ); + let slice = unsafe { std::slice::from_raw_parts(array_ptr, res.try_into().unwrap()) }; + for (return_event, expected_event) in slice.iter().zip(expected_notifications.iter()) { + let event = return_event.events; + let data = return_event.u64; + assert_eq!(event, expected_event.0, "got wrong events"); + assert_eq!(data, expected_event.1, "got wrong data"); + } +} + +// This test allows epoll_wait to block, then unblock without notification. +fn test_epoll_block_without_notification() { + // Create an epoll instance. + let epfd = unsafe { libc::epoll_create1(0) }; + assert_ne!(epfd, -1); + + // Create an eventfd instances. + let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC; + let fd = unsafe { libc::eventfd(0, flags) }; + + // Register eventfd with epoll. + let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd as u64 }; + let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd, &mut ev) }; + assert_eq!(res, 0); + + // epoll_wait to clear notification. + let expected_event = u32::try_from(libc::EPOLLOUT).unwrap(); + let expected_value = fd as u64; + check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0); + + // This epoll wait blocks, and timeout without notification. + check_epoll_wait::<1>(epfd, &[], 5); +} + +// This test triggers notification and unblocks the epoll_wait before timeout. +fn test_epoll_block_then_unblock() { + // Create an epoll instance. + let epfd = unsafe { libc::epoll_create1(0) }; + assert_ne!(epfd, -1); + + // Create a socketpair instance. + let mut fds = [-1, -1]; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + + // Register one side of the socketpair with epoll. + let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fds[0] as u64 }; + let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fds[0], &mut ev) }; + assert_eq!(res, 0); + + // epoll_wait to clear notification. + let expected_event = u32::try_from(libc::EPOLLOUT).unwrap(); + let expected_value = fds[0] as u64; + check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0); + + // epoll_wait before triggering notification so it will block then get unblocked before timeout. + let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap(); + let expected_value = fds[0] as u64; + let thread1 = spawn(move || { + thread::yield_now(); + let data = "abcde".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) }; + assert_eq!(res, 5); + }); + check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 10); + thread1.join().unwrap(); +} + +// This test triggers a notification after epoll_wait times out. +fn test_notification_after_timeout() { + // Create an epoll instance. + let epfd = unsafe { libc::epoll_create1(0) }; + assert_ne!(epfd, -1); + + // Create a socketpair instance. + let mut fds = [-1, -1]; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + + // Register one side of the socketpair with epoll. + let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fds[0] as u64 }; + let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fds[0], &mut ev) }; + assert_eq!(res, 0); + + // epoll_wait to clear notification. + let expected_event = u32::try_from(libc::EPOLLOUT).unwrap(); + let expected_value = fds[0] as u64; + check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0); + + // epoll_wait timeouts without notification. + check_epoll_wait::<1>(epfd, &[], 10); + + // Trigger epoll notification after timeout. + let data = "abcde".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) }; + assert_eq!(res, 5); + + // Check the result of the notification. + let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap(); + let expected_value = fds[0] as u64; + check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 10); +} diff --git a/tests/pass-dep/libc/libc-epoll.rs b/tests/pass-dep/libc/libc-epoll-no-blocking.rs similarity index 100% rename from tests/pass-dep/libc/libc-epoll.rs rename to tests/pass-dep/libc/libc-epoll-no-blocking.rs diff --git a/tests/fail-dep/tokio/sleep.rs b/tests/pass-dep/tokio/sleep.rs similarity index 59% rename from tests/fail-dep/tokio/sleep.rs rename to tests/pass-dep/tokio/sleep.rs index 0fa5080d48..e6b02c02d0 100644 --- a/tests/fail-dep/tokio/sleep.rs +++ b/tests/pass-dep/tokio/sleep.rs @@ -1,14 +1,12 @@ //@compile-flags: -Zmiri-permissive-provenance -Zmiri-backtrace=full //@only-target-x86_64-unknown-linux: support for tokio only on linux and x86 -//@error-in-other-file: timeout value can only be 0 -//@normalize-stderr-test: " += note:.*\n" -> "" use tokio::time::{sleep, Duration, Instant}; #[tokio::main] async fn main() { let start = Instant::now(); - sleep(Duration::from_secs(1)).await; + sleep(Duration::from_millis(100)).await; let time_elapsed = &start.elapsed().as_millis(); - assert!((1000..1100).contains(time_elapsed), "{}", time_elapsed); + assert!((100..1000).contains(time_elapsed), "{}", time_elapsed); }