Skip to content

Commit

Permalink
test(runtime): add message queue runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
Berrysoft committed Feb 25, 2024
1 parent 9f7b7d7 commit 6de972a
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 2 deletions.
3 changes: 3 additions & 0 deletions compio-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ windows-sys = { workspace = true, features = ["Win32_System_IO"] }
os_pipe = { workspace = true }
libc = { workspace = true }

[target.'cfg(windows)'.dev-dependencies]
windows-sys = { workspace = true, features = ["Win32_UI_WindowsAndMessaging"] }

[target.'cfg(target_os = "macos")'.dev-dependencies]
core-foundation = "0.9"

Expand Down
97 changes: 95 additions & 2 deletions compio-runtime/tests/custom_loop.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(target_os = "macos")]
#[test]
fn cf_run_loop() {
use std::{os::raw::c_void, time::Duration};
use std::{future::Future, os::raw::c_void, time::Duration};

use compio_driver::AsRawFd;
use compio_runtime::Runtime;
Expand All @@ -10,7 +10,6 @@ fn cf_run_loop() {
filedescriptor::{kCFFileDescriptorReadCallBack, CFFileDescriptor, CFFileDescriptorRef},
runloop::{kCFRunLoopDefaultMode, CFRunLoop},
};
use futures_util::Future;

struct CFRunLoopRuntime {
runtime: Runtime,
Expand Down Expand Up @@ -80,3 +79,97 @@ fn cf_run_loop() {
});
assert_eq!(res, 1);
}

#[cfg(windows)]
#[test]
fn message_queue() {
use std::{future::Future, mem::MaybeUninit, time::Duration};

use compio_driver::AsRawFd;
use compio_runtime::Runtime;
use windows_sys::Win32::{
Foundation::{HANDLE, WAIT_FAILED},
System::Threading::INFINITE,
UI::WindowsAndMessaging::{
DispatchMessageW, MsgWaitForMultipleObjectsEx, PeekMessageW, TranslateMessage,
MWMO_ALERTABLE, MWMO_INPUTAVAILABLE, PM_REMOVE, QS_ALLINPUT,
},
};

struct MQRuntime {
runtime: Runtime,
}

impl MQRuntime {
pub fn new() -> Self {
Self {
runtime: Runtime::new().unwrap(),
}
}

pub fn block_on<F: Future>(&self, future: F) -> F::Output {
let _guard = self.runtime.enter();
let mut result = None;
unsafe {
self.runtime
.spawn_unchecked(async { result = Some(future.await) })
}
.detach();
loop {
self.runtime.run();
if let Some(result) = result.take() {
break result;
}
self.runtime.poll_with(|driver, timeout, entries| {
match driver.poll(Some(Duration::ZERO), entries) {
Ok(()) => {
if !entries.is_empty() {
return Ok(());
}
}
Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {}
Err(e) => return Err(e),
}

let timeout = match timeout {
Some(timeout) => timeout.as_millis() as u32,
None => INFINITE,
};
let handle = driver.as_raw_fd() as HANDLE;
let res = unsafe {
MsgWaitForMultipleObjectsEx(
1,
&handle,
timeout,
QS_ALLINPUT,
MWMO_ALERTABLE | MWMO_INPUTAVAILABLE,
)
};
if res == WAIT_FAILED {
return Err(std::io::Error::last_os_error());
}

let mut msg = MaybeUninit::uninit();
let res = unsafe { PeekMessageW(msg.as_mut_ptr(), 0, 0, 0, PM_REMOVE) };
if res != 0 {
let msg = unsafe { msg.assume_init() };
unsafe {
TranslateMessage(&msg);
DispatchMessageW(&msg);
}
}

Ok(())
});
}
}
}

let runtime = MQRuntime::new();

let res = runtime.block_on(async {
compio_runtime::time::sleep(Duration::from_secs(1)).await;
1
});
assert_eq!(res, 1);
}

0 comments on commit 6de972a

Please sign in to comment.