From ca117514c9a189333684cf385569f17b95ba80df Mon Sep 17 00:00:00 2001 From: Maciej Godek Date: Sat, 30 Jan 2021 13:54:18 +0100 Subject: [PATCH] Rework Event type to allow synchronized cleanup Previously, NativeActivity callbacks returned immediately after dispatching an asynchronous Event to the user on a UNIX pipe. This made it impossible to handle destruction events (`onNativeWindowDestroyed`, `onInputQueueDestroyed`) in a manner conforming to the Android docs, which state that all cleanup must be completed *before* the callback returns, cf. https://developer.android.com/ndk/reference/struct/a-native-activity-callbacks#struct_a_native_activity_callbacks_1a6aaafbbfae4c0ac066b9d61ebb3ab97f Therefore, some method of blocking and then notifying the callback thread had to be introduced. In order to accomplish this, I wrote a simple wrapper on a lockless queue (`crossbeam_queue::SegQueue`) and an `eventfd` object, which allows sending arbitrary Rust objects to the user while notifying the Android `Looper` in a manner preserving the source of the notification. This in turn enabled a different semantics of the `Event` type. `Events` now transfer ownership of various Android handles (e.g. `NativeActivity`, `InputQueue`) to the user. The user is responsible for storing these handles. When a corresponding `*Destroyed` event is received, the user must ensure that they complete all cleanup before dropping the contained `EventSyncGuard` object and do not touch the relevant handle afterwards. --- ndk-glue/Cargo.toml | 1 + ndk-glue/src/lib.rs | 384 ++++++++++++++++++++++++++++++++++---------- ndk/src/looper.rs | 12 ++ 3 files changed, 309 insertions(+), 88 deletions(-) diff --git a/ndk-glue/Cargo.toml b/ndk-glue/Cargo.toml index 9733ade6..e1d961dd 100644 --- a/ndk-glue/Cargo.toml +++ b/ndk-glue/Cargo.toml @@ -19,6 +19,7 @@ lazy_static = "1.4.0" libc = "0.2.84" log = "0.4.14" android_logger = { version = "0.9.2", optional = true } +crossbeam-queue = "0.3" [features] default = [] diff --git a/ndk-glue/src/lib.rs b/ndk-glue/src/lib.rs index 886bd031..9646df00 100644 --- a/ndk-glue/src/lib.rs +++ b/ndk-glue/src/lib.rs @@ -1,17 +1,21 @@ +use crossbeam_queue::SegQueue; use lazy_static::lazy_static; use log::Level; use ndk::input_queue::InputQueue; -use ndk::looper::{ForeignLooper, ThreadLooper}; +use ndk::looper::{ForeignLooper, LooperError, ThreadLooper}; use ndk::native_activity::NativeActivity; use ndk::native_window::NativeWindow; use ndk_sys::{AInputQueue, ANativeActivity, ANativeWindow, ARect, ALOOPER_EVENT_INPUT}; -use std::ffi::{CStr, CString}; +use std::ffi::{c_void, CStr, CString}; use std::fs::File; use std::io::{BufRead, BufReader}; use std::os::raw; use std::os::unix::prelude::*; use std::ptr::NonNull; -use std::sync::{RwLock, RwLockReadGuard}; +use std::sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, Weak, +}; use std::thread; pub use ndk_macro::main; @@ -41,56 +45,224 @@ pub fn android_log(level: Level, tag: &CStr, msg: &CStr) { } } -lazy_static! { - static ref NATIVE_WINDOW: RwLock> = Default::default(); - static ref INPUT_QUEUE: RwLock> = Default::default(); - static ref CONTENT_RECT: RwLock = Default::default(); -} +#[derive(Debug)] +pub struct EventPayload(Option<&'static Mutex>>); -static mut NATIVE_ACTIVITY: Option = None; -static mut LOOPER: Option = None; +impl EventPayload { + fn new(val: &'static Mutex>) -> Self { + Self(Some(val)) + } -pub fn native_activity() -> &'static NativeActivity { - unsafe { NATIVE_ACTIVITY.as_ref().unwrap() } + pub fn deliver(mut self) -> Option { + let mutex = self.0.take().unwrap(); + mutex.lock().unwrap().take() + } +} + +impl Drop for EventPayload { + fn drop(&mut self) { + // Even if the user did not take this resource, we remove it from the + // static `Mutex>`, to mark it as delivered. + self.0.map(|mutex| mutex.lock().unwrap().take()); + } } -pub fn native_window() -> RwLockReadGuard<'static, Option> { - NATIVE_WINDOW.read().unwrap() +#[derive(Debug)] +pub struct SyncEventGuard(Sender<()>); + +impl SyncEventGuard { + fn new() -> (Self, Receiver<()>) { + let (tx, rx) = channel(); + (Self(tx), rx) + } } -pub fn input_queue() -> RwLockReadGuard<'static, Option> { - INPUT_QUEUE.read().unwrap() +impl Drop for SyncEventGuard { + fn drop(&mut self) { + let _ = self.0.send(()); + } } -pub fn content_rect() -> Rect { - CONTENT_RECT.read().unwrap().clone() +pub mod ident { + pub const ACTIVITY_CALLBACK: i32 = 0; + pub const INPUT_QUEUE: i32 = 1; + pub const USER: i32 = 2; } -lazy_static! { - static ref PIPE: [RawFd; 2] = { - let mut pipe: [RawFd; 2] = Default::default(); - unsafe { libc::pipe(pipe.as_mut_ptr()) }; - pipe - }; +#[derive(Debug)] +struct EventFd { + raw_fd: RawFd, } -pub fn poll_events() -> Option { - unsafe { - let size = std::mem::size_of::(); - let mut event = Event::Start; - if libc::read(PIPE[0], &mut event as *mut _ as *mut _, size) == size as _ { - Some(event) +impl EventFd { + pub fn new(init: libc::c_uint, flag: libc::c_int) -> Result { + let raw_fd = unsafe { libc::eventfd(init, flag) }; + if raw_fd < 0i32 { + Err(std::io::Error::last_os_error()) } else { - None + Ok(Self { raw_fd }) + } + } + + #[inline] + pub fn to_raw(&self) -> RawFd { + self.raw_fd + } + + #[inline] + pub fn write(&self, n: u64) -> Result<(), std::io::Error> { + let size = std::mem::size_of::(); + let ptr = &n as *const _ as *const _; + match unsafe { libc::write(self.raw_fd, ptr, size) } { + 8isize => Ok(()), + _ => Err(std::io::Error::last_os_error()), } } + + #[inline] + pub fn read(&self) -> Result, std::io::Error> { + let mut n = 0u64; + let ptr = &mut n as *mut _ as *mut _; + let size = std::mem::size_of::(); + match unsafe { libc::read(self.raw_fd, ptr, size) } { + 8isize => Ok(Some(n)), + _ => { + let err = std::io::Error::last_os_error(); + match err.raw_os_error().unwrap() { + libc::EAGAIN => Ok(None), + _ => Err(err), + } + } + } + } + + pub fn add_looper(&self, looper: &ForeignLooper, ident: i32) -> Result<(), LooperError> { + unsafe { + looper.add_fd( + self.raw_fd, + ident, + ALOOPER_EVENT_INPUT as _, + std::ptr::null_mut::(), + ) + } + } +} + +impl Drop for EventFd { + fn drop(&mut self) { + let _ = unsafe { libc::close(self.raw_fd) }; + } } -unsafe fn wake(_activity: *mut ANativeActivity, event: Event) { - log::trace!("{:?}", event); - let size = std::mem::size_of::(); - let res = libc::write(PIPE[1], &event as *const _ as *const _, size); - assert_eq!(res, size as _); +unsafe impl Send for EventFd {} +unsafe impl Sync for EventFd {} + +// Unbounded eventfd based mpsc +// MUST be attached to the LOOPER +// Does not care if all Senders are gone, +// but tries to prevent sends to a dropped Receiver +// and to drop all outstanding events on Receiver drop +#[derive(Debug)] +struct EventQueueInner { + event_fd: EventFd, + queue: SegQueue, +} + +impl EventQueueInner { + fn new() -> Self { + Self { + event_fd: EventFd::new(0u32, libc::EFD_SEMAPHORE | libc::EFD_NONBLOCK) + .expect("Could not open eventfd for ndk_glue::event_queue, cannot proceed"), + queue: SegQueue::new(), + } + } +} + +#[derive(Debug)] +pub struct EventQueueSender(Weak>); + +impl Clone for EventQueueSender { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl EventQueueSender { + pub fn send(&self, event: E) -> Result<(), E> { + match self.0.upgrade() { + Some(inner) => { + inner.queue.push(event); + inner.event_fd.write(1).unwrap(); + Ok(()) + } + None => Err(event), + } + } +} + +#[derive(Debug)] +pub struct EventQueueReceiver(Arc>); + +impl EventQueueReceiver { + pub fn try_recv(&self) -> Option { + let _ = self.0.event_fd.read().unwrap()?; + self.0.queue.pop() + } +} + +impl Drop for EventQueueReceiver { + fn drop(&mut self) { + unsafe { + let _ = LOOPER.as_ref().unwrap().remove_fd(self.0.event_fd.to_raw()); + } + } +} + +// Unsafe because caller MUST ensure that the eventfd is attached to LOOPER +// and that the Receiver does not drop before then. +unsafe fn event_queue_impl() -> (EventQueueSender, EventQueueReceiver) { + let inner = Arc::new(EventQueueInner::new()); + let weak_inner = Arc::downgrade(&inner); + (EventQueueSender(weak_inner), EventQueueReceiver(inner)) +} + +pub fn event_queue(ident: i32) -> (EventQueueSender, EventQueueReceiver) { + let (tx, rx) = unsafe { event_queue_impl() }; + unsafe { + rx.0.event_fd + .add_looper(LOOPER.as_ref().unwrap(), ident) + .unwrap(); + } + (tx, rx) +} + +lazy_static! { + static ref NATIVE_WINDOW: Mutex> = Default::default(); + static ref INPUT_QUEUE: Mutex> = Default::default(); + static ref CONTENT_RECT: Mutex> = Mutex::new(Some(Default::default())); +} + +static mut NATIVE_ACTIVITY: Option = None; +static mut LOOPER: Option = None; +static mut SENDER: Option> = None; +static mut RECEIVER: Option> = None; + +pub fn native_activity() -> &'static NativeActivity { + unsafe { NATIVE_ACTIVITY.as_ref().unwrap() } +} + +pub fn activity_event_rx() -> &'static EventQueueReceiver { + unsafe { RECEIVER.as_ref().unwrap() } +} + +pub fn request_window_redraw() { + unsafe { + SENDER + .as_ref() + .unwrap() + .send(Event::WindowRedrawNeeded) + .unwrap() + }; } #[derive(Clone, Debug, Default, Eq, PartialEq)] @@ -101,8 +273,7 @@ pub struct Rect { pub bottom: u32, } -#[derive(Clone, Debug, Eq, PartialEq)] -#[repr(u8)] +#[derive(Debug)] pub enum Event { Start, Resume, @@ -114,13 +285,15 @@ pub enum Event { LowMemory, WindowLostFocus, WindowHasFocus, - WindowCreated, + WindowCreated(EventPayload), WindowResized, WindowRedrawNeeded, - WindowDestroyed, - InputQueueCreated, - InputQueueDestroyed, - ContentRectChanged, + // Will be sent if and only if `WindowCreated` payload was not `None` + WindowDestroyed(SyncEventGuard), + InputQueueCreated(EventPayload), + // Will be sent if and only if `InputQueueCreated` payload was not `None` + InputQueueDestroyed(SyncEventGuard), + ContentRectChanged(EventPayload), } pub unsafe fn init( @@ -172,61 +345,67 @@ pub unsafe fn init( } }); + let (tx, rx) = event_queue_impl(); + SENDER = Some(tx); + RECEIVER = Some(rx); thread::spawn(move || { let looper = ThreadLooper::prepare(); let foreign = looper.into_foreign(); - foreign - .add_fd( - PIPE[0], - NDK_GLUE_LOOPER_EVENT_PIPE_IDENT, - ALOOPER_EVENT_INPUT as _, - std::ptr::null_mut(), - ) + RECEIVER + .as_ref() + .unwrap() + .0 + .event_fd + .add_looper(&foreign, ident::ACTIVITY_CALLBACK) .unwrap(); LOOPER = Some(foreign); main() }); } -unsafe extern "C" fn on_start(activity: *mut ANativeActivity) { - wake(activity, Event::Start); +unsafe extern "C" fn on_start(_activity: *mut ANativeActivity) { + SENDER.as_ref().unwrap().send(Event::Start).unwrap(); } -unsafe extern "C" fn on_resume(activity: *mut ANativeActivity) { - wake(activity, Event::Resume); +unsafe extern "C" fn on_resume(_activity: *mut ANativeActivity) { + SENDER.as_ref().unwrap().send(Event::Resume).unwrap(); } unsafe extern "C" fn on_save_instance_state( - activity: *mut ANativeActivity, + _activity: *mut ANativeActivity, _out_size: *mut ndk_sys::size_t, ) -> *mut raw::c_void { // TODO - wake(activity, Event::SaveInstanceState); + SENDER + .as_ref() + .unwrap() + .send(Event::SaveInstanceState) + .unwrap(); std::ptr::null_mut() } -unsafe extern "C" fn on_pause(activity: *mut ANativeActivity) { - wake(activity, Event::Pause); +unsafe extern "C" fn on_pause(_activity: *mut ANativeActivity) { + SENDER.as_ref().unwrap().send(Event::Pause).unwrap(); } -unsafe extern "C" fn on_stop(activity: *mut ANativeActivity) { - wake(activity, Event::Stop); +unsafe extern "C" fn on_stop(_activity: *mut ANativeActivity) { + SENDER.as_ref().unwrap().send(Event::Stop).unwrap(); } -unsafe extern "C" fn on_destroy(activity: *mut ANativeActivity) { - wake(activity, Event::Destroy); +unsafe extern "C" fn on_destroy(_activity: *mut ANativeActivity) { + SENDER.as_ref().unwrap().send(Event::Destroy).unwrap(); } -unsafe extern "C" fn on_configuration_changed(activity: *mut ANativeActivity) { - wake(activity, Event::ConfigChanged); +unsafe extern "C" fn on_configuration_changed(_activity: *mut ANativeActivity) { + SENDER.as_ref().unwrap().send(Event::ConfigChanged).unwrap(); } -unsafe extern "C" fn on_low_memory(activity: *mut ANativeActivity) { - wake(activity, Event::LowMemory); +unsafe extern "C" fn on_low_memory(_activity: *mut ANativeActivity) { + SENDER.as_ref().unwrap().send(Event::LowMemory).unwrap(); } unsafe extern "C" fn on_window_focus_changed( - activity: *mut ANativeActivity, + _activity: *mut ANativeActivity, has_focus: raw::c_int, ) { let event = if has_focus == 0 { @@ -234,64 +413,93 @@ unsafe extern "C" fn on_window_focus_changed( } else { Event::WindowHasFocus }; - wake(activity, event); + SENDER.as_ref().unwrap().send(event).unwrap(); } -unsafe extern "C" fn on_window_created(activity: *mut ANativeActivity, window: *mut ANativeWindow) { - *NATIVE_WINDOW.write().unwrap() = Some(NativeWindow::from_ptr(NonNull::new(window).unwrap())); - wake(activity, Event::WindowCreated); +unsafe extern "C" fn on_window_created( + _activity: *mut ANativeActivity, + window: *mut ANativeWindow, +) { + *NATIVE_WINDOW.lock().unwrap() = Some(NativeWindow::from_ptr(NonNull::new(window).unwrap())); + let event = Event::WindowCreated(EventPayload::new(&NATIVE_WINDOW)); + SENDER.as_ref().unwrap().send(event).unwrap(); } unsafe extern "C" fn on_window_resized( - activity: *mut ANativeActivity, + _activity: *mut ANativeActivity, _window: *mut ANativeWindow, ) { - wake(activity, Event::WindowResized); + SENDER.as_ref().unwrap().send(Event::WindowResized).unwrap(); } unsafe extern "C" fn on_window_redraw_needed( - activity: *mut ANativeActivity, + _activity: *mut ANativeActivity, _window: *mut ANativeWindow, ) { - wake(activity, Event::WindowRedrawNeeded); + SENDER + .as_ref() + .unwrap() + .send(Event::WindowRedrawNeeded) + .unwrap(); } unsafe extern "C" fn on_window_destroyed( - activity: *mut ANativeActivity, + _activity: *mut ANativeActivity, _window: *mut ANativeWindow, ) { - *NATIVE_WINDOW.write().unwrap() = None; - wake(activity, Event::WindowDestroyed); + // If the window was delivered (WindowCreated event has already been handled) + // then we need to send a destroyed event. Otherwise -- just preemptively + // empty out the `Option<_>` which ensures the user won't see it. + let was_delivered = NATIVE_WINDOW.lock().unwrap().take().is_none(); + if was_delivered { + let (event_guard, rx) = SyncEventGuard::new(); + SENDER + .as_ref() + .unwrap() + .send(Event::WindowDestroyed(event_guard)) + .unwrap(); + rx.recv().unwrap(); + } } unsafe extern "C" fn on_input_queue_created( - activity: *mut ANativeActivity, + _activity: *mut ANativeActivity, queue: *mut AInputQueue, ) { let input_queue = InputQueue::from_ptr(NonNull::new(queue).unwrap()); let looper = LOOPER.as_ref().unwrap(); - input_queue.attach_looper(looper, NDK_GLUE_LOOPER_INPUT_QUEUE_IDENT); - *INPUT_QUEUE.write().unwrap() = Some(input_queue); - wake(activity, Event::InputQueueCreated); + input_queue.attach_looper(looper, ident::INPUT_QUEUE as _); + *INPUT_QUEUE.lock().unwrap() = Some(input_queue); + let event = Event::InputQueueCreated(EventPayload::new(&INPUT_QUEUE)); + SENDER.as_ref().unwrap().send(event).unwrap(); } unsafe extern "C" fn on_input_queue_destroyed( - activity: *mut ANativeActivity, + _activity: *mut ANativeActivity, queue: *mut AInputQueue, ) { let input_queue = InputQueue::from_ptr(NonNull::new(queue).unwrap()); input_queue.detach_looper(); - *INPUT_QUEUE.write().unwrap() = None; - wake(activity, Event::InputQueueDestroyed); + let was_delivered = INPUT_QUEUE.lock().unwrap().take().is_none(); + if was_delivered { + let (event_guard, rx) = SyncEventGuard::new(); + SENDER + .as_ref() + .unwrap() + .send(Event::InputQueueDestroyed(event_guard)) + .unwrap(); + rx.recv().unwrap(); + } } -unsafe extern "C" fn on_content_rect_changed(activity: *mut ANativeActivity, rect: *const ARect) { +unsafe extern "C" fn on_content_rect_changed(_activity: *mut ANativeActivity, rect: *const ARect) { let rect = Rect { left: (*rect).left as _, top: (*rect).top as _, right: (*rect).right as _, bottom: (*rect).bottom as _, }; - *CONTENT_RECT.write().unwrap() = rect; - wake(activity, Event::ContentRectChanged); + *CONTENT_RECT.lock().unwrap() = Some(rect); + let event = Event::ContentRectChanged(EventPayload::new(&CONTENT_RECT)); + SENDER.as_ref().unwrap().send(event).unwrap(); } diff --git a/ndk/src/looper.rs b/ndk/src/looper.rs index 5e9122e5..c121b6d8 100644 --- a/ndk/src/looper.rs +++ b/ndk/src/looper.rs @@ -262,4 +262,16 @@ impl ForeignLooper { _ => unreachable!(), } } + + pub unsafe fn remove_fd( + &self, + fd: RawFd + ) -> Result { + match ffi::ALooper_removeFd(self.ptr.as_ptr(), fd) { + 1 => Ok(true), + 0 => Ok(false), + -1 => Err(LooperError), + _ => unreachable!(), + } + } }