From 6de972a225407a5dc935aecfdbcbdf621f6cff37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 25 Feb 2024 16:49:09 +0800 Subject: [PATCH] test(runtime): add message queue runtime --- compio-runtime/Cargo.toml | 3 + compio-runtime/tests/custom_loop.rs | 97 ++++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 2 deletions(-) diff --git a/compio-runtime/Cargo.toml b/compio-runtime/Cargo.toml index c4e1a669..b2a64c06 100644 --- a/compio-runtime/Cargo.toml +++ b/compio-runtime/Cargo.toml @@ -52,6 +52,9 @@ windows-sys = { workspace = true, features = ["Win32_System_IO"] } os_pipe = { workspace = true } libc = { workspace = true } +[target.'cfg(windows)'.dev-dependencies] +windows-sys = { workspace = true, features = ["Win32_UI_WindowsAndMessaging"] } + [target.'cfg(target_os = "macos")'.dev-dependencies] core-foundation = "0.9" diff --git a/compio-runtime/tests/custom_loop.rs b/compio-runtime/tests/custom_loop.rs index fc51fe79..66fc5f8c 100644 --- a/compio-runtime/tests/custom_loop.rs +++ b/compio-runtime/tests/custom_loop.rs @@ -1,7 +1,7 @@ #[cfg(target_os = "macos")] #[test] fn cf_run_loop() { - use std::{os::raw::c_void, time::Duration}; + use std::{future::Future, os::raw::c_void, time::Duration}; use compio_driver::AsRawFd; use compio_runtime::Runtime; @@ -10,7 +10,6 @@ fn cf_run_loop() { filedescriptor::{kCFFileDescriptorReadCallBack, CFFileDescriptor, CFFileDescriptorRef}, runloop::{kCFRunLoopDefaultMode, CFRunLoop}, }; - use futures_util::Future; struct CFRunLoopRuntime { runtime: Runtime, @@ -80,3 +79,97 @@ fn cf_run_loop() { }); assert_eq!(res, 1); } + +#[cfg(windows)] +#[test] +fn message_queue() { + use std::{future::Future, mem::MaybeUninit, time::Duration}; + + use compio_driver::AsRawFd; + use compio_runtime::Runtime; + use windows_sys::Win32::{ + Foundation::{HANDLE, WAIT_FAILED}, + System::Threading::INFINITE, + UI::WindowsAndMessaging::{ + DispatchMessageW, MsgWaitForMultipleObjectsEx, PeekMessageW, TranslateMessage, + MWMO_ALERTABLE, MWMO_INPUTAVAILABLE, PM_REMOVE, QS_ALLINPUT, + }, + }; + + struct MQRuntime { + runtime: Runtime, + } + + impl MQRuntime { + pub fn new() -> Self { + Self { + runtime: Runtime::new().unwrap(), + } + } + + pub fn block_on(&self, future: F) -> F::Output { + let _guard = self.runtime.enter(); + let mut result = None; + unsafe { + self.runtime + .spawn_unchecked(async { result = Some(future.await) }) + } + .detach(); + loop { + self.runtime.run(); + if let Some(result) = result.take() { + break result; + } + self.runtime.poll_with(|driver, timeout, entries| { + match driver.poll(Some(Duration::ZERO), entries) { + Ok(()) => { + if !entries.is_empty() { + return Ok(()); + } + } + Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {} + Err(e) => return Err(e), + } + + let timeout = match timeout { + Some(timeout) => timeout.as_millis() as u32, + None => INFINITE, + }; + let handle = driver.as_raw_fd() as HANDLE; + let res = unsafe { + MsgWaitForMultipleObjectsEx( + 1, + &handle, + timeout, + QS_ALLINPUT, + MWMO_ALERTABLE | MWMO_INPUTAVAILABLE, + ) + }; + if res == WAIT_FAILED { + return Err(std::io::Error::last_os_error()); + } + + let mut msg = MaybeUninit::uninit(); + let res = unsafe { PeekMessageW(msg.as_mut_ptr(), 0, 0, 0, PM_REMOVE) }; + if res != 0 { + let msg = unsafe { msg.assume_init() }; + unsafe { + TranslateMessage(&msg); + DispatchMessageW(&msg); + } + } + + Ok(()) + }); + } + } + } + + let runtime = MQRuntime::new(); + + let res = runtime.block_on(async { + compio_runtime::time::sleep(Duration::from_secs(1)).await; + 1 + }); + assert_eq!(res, 1); +}