From fd884af70046e0879e40f6d03c8d4b7068db8ead Mon Sep 17 00:00:00 2001 From: James Sturtevant Date: Thu, 2 Mar 2023 15:39:23 -0800 Subject: [PATCH] Fix stuck thread on server shutdown The connect namedpipe thread would be in a suspended state when shutdown on the server is called. Setting the event to a signalled state to wake the thread up so everything can shut down properly. Signed-off-by: James Sturtevant --- src/sync/client.rs | 23 +++++++++---- src/sync/sys/unix/net.rs | 4 +-- src/sync/sys/windows/net.rs | 67 +++++++++++++++++++++++++------------ 3 files changed, 64 insertions(+), 30 deletions(-) diff --git a/src/sync/client.rs b/src/sync/client.rs index 9cc117d8..86ceb777 100644 --- a/src/sync/client.rs +++ b/src/sync/client.rs @@ -45,7 +45,7 @@ impl Client { pub fn connect(sockaddr: &str) -> Result { let conn = ClientConnection::client_connect(sockaddr)?; - Ok(Self::new_client(conn)) + Self::new_client(conn) } #[cfg(unix)] @@ -53,10 +53,15 @@ impl Client { pub fn new(fd: RawFd) -> Client { let conn = ClientConnection::new(fd); - Self::new_client(conn) + // TODO: upgrade the API of Client::new and remove this panic for the major version release + Self::new_client(conn).unwrap_or_else(|e| { + panic!( + "client was not successfully initialized: {}", e + ) + }) } - fn new_client(pipe_client: ClientConnection) -> Client { + fn new_client(pipe_client: ClientConnection) -> Result { let client = Arc::new(pipe_client); let (sender_tx, rx): (Sender, Receiver) = mpsc::channel(); @@ -64,7 +69,7 @@ impl Client { let receiver_map = recver_map_orig.clone(); - let connection = Arc::new(client.get_pipe_connection()); + let connection = Arc::new(client.get_pipe_connection()?); let sender_client = connection.clone(); //Sender @@ -171,10 +176,10 @@ impl Client { trace!("Receiver quit"); }); - Client { + Ok(Client { _connection: client, sender_tx, - } + }) } pub fn request(&self, req: Request) -> Result { let buf = req.encode().map_err(err_to_others_err!(e, ""))?; @@ -220,7 +225,11 @@ impl Drop for ClientConnection { #[cfg(windows)] impl Drop for PipeConnection { fn drop(&mut self) { - self.close().unwrap(); + self.close().unwrap_or_else(|e| { + trace!( + "connection may already be closed: {}", e + ) + }); trace!("pipe connection is dropped"); } } diff --git a/src/sync/sys/unix/net.rs b/src/sync/sys/unix/net.rs index eb29dfb6..3fdf47b8 100644 --- a/src/sync/sys/unix/net.rs +++ b/src/sync/sys/unix/net.rs @@ -305,8 +305,8 @@ impl ClientConnection { Ok(Some(())) } - pub fn get_pipe_connection(&self) -> PipeConnection { - PipeConnection::new(self.fd) + pub fn get_pipe_connection(&self) -> Result { + Ok(PipeConnection::new(self.fd)) } pub fn close_receiver(&self) -> Result<()> { diff --git a/src/sync/sys/windows/net.rs b/src/sync/sys/windows/net.rs index 4ec9d02c..1a5adbca 100644 --- a/src/sync/sys/windows/net.rs +++ b/src/sync/sys/windows/net.rs @@ -30,7 +30,7 @@ use windows_sys::Win32::Foundation::{ CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_ use windows_sys::Win32::Storage::FileSystem::{ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX }; use windows_sys::Win32::System::IO::{ GetOverlappedResult, OVERLAPPED }; use windows_sys::Win32::System::Pipes::{ CreateNamedPipeW, ConnectNamedPipe,DisconnectNamedPipe, PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, PIPE_REJECT_REMOTE_CLIENTS }; -use windows_sys::Win32::System::Threading::CreateEventW; +use windows_sys::Win32::System::Threading::{CreateEventW, SetEvent}; const PIPE_BUFFER_SIZE: u32 = 65536; const WAIT_FOR_EVENT: i32 = 1; @@ -38,6 +38,7 @@ const WAIT_FOR_EVENT: i32 = 1; pub struct PipeListener { first_instance: AtomicBool, address: String, + connection_event: isize, } #[repr(C)] @@ -54,12 +55,6 @@ impl Overlapped { ol } - fn new() -> Overlapped { - Overlapped { - inner: UnsafeCell::new(unsafe { std::mem::zeroed() }), - } - } - fn as_mut_ptr(&self) -> *mut OVERLAPPED { self.inner.get() } @@ -67,9 +62,11 @@ impl Overlapped { impl PipeListener { pub(crate) fn new(sockaddr: &str) -> Result { + let connection_event = create_event()?; Ok(PipeListener { first_instance: AtomicBool::new(true), address: sockaddr.to_string(), + connection_event }) } @@ -85,11 +82,21 @@ impl PipeListener { } // Create a new pipe instance for every new client - let np = self.new_instance().unwrap(); - let ol = Overlapped::new(); + let instance = self.new_instance()?; + let np = match PipeConnection::new(instance) { + Ok(np) => np, + Err(e) => { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("failed to create new pipe instance: {:?}", e), + )); + } + }; + + let ol = Overlapped::new_with_event(self.connection_event); trace!("listening for connection"); - let result = unsafe { ConnectNamedPipe(np, ol.as_mut_ptr())}; + let result = unsafe { ConnectNamedPipe(np.named_pipe, ol.as_mut_ptr())}; if result != 0 { return Err(io::Error::last_os_error()); } @@ -97,18 +104,18 @@ impl PipeListener { match io::Error::last_os_error() { e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => { let mut bytes_transfered = 0; - let res = unsafe {GetOverlappedResult(np, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) }; + let res = unsafe {GetOverlappedResult(np.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) }; match res { 0 => { return Err(io::Error::last_os_error()); } _ => { - Ok(Some(PipeConnection::new(np))) + Ok(Some(np)) } } } e if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => { - Ok(Some(PipeConnection::new(np))) + Ok(Some(np)) } e => { return Err(io::Error::new( @@ -145,7 +152,9 @@ impl PipeListener { } pub fn close(&self) -> Result<()> { - Ok(()) + // release the ConnectNamedPipe thread by signaling the event and clean up event handle + set_event(self.connection_event)?; + close_handle(self.connection_event) } } @@ -170,15 +179,15 @@ pub struct PipeConnection { // "It is safer to use an event object because of the confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device." // "In this situation, there is no way to know which operation caused the object's state to be signaled." impl PipeConnection { - pub(crate) fn new(h: isize) -> PipeConnection { + pub(crate) fn new(h: isize) -> Result { trace!("creating events for thread {:?} on pipe instance {}", std::thread::current().id(), h as i32); - let read_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) }; - let write_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) }; - PipeConnection { + let read_event = create_event()?; + let write_event = create_event()?; + Ok(PipeConnection { named_pipe: h, read_event: read_event, write_event: write_event, - } + }) } pub(crate) fn id(&self) -> i32 { @@ -275,6 +284,22 @@ fn close_handle(handle: isize) -> Result<()> { } } +fn create_event() -> Result { + let result = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) }; + match result { + 0 => Err(Error::Windows(io::Error::last_os_error().raw_os_error().unwrap())), + _ => Ok(result), + } +} + +fn set_event(event: isize) -> Result<()> { + let result = unsafe { SetEvent(event) }; + match result { + 0 => Err(Error::Windows(io::Error::last_os_error().raw_os_error().unwrap())), + _ => Ok(()), + } +} + impl ClientConnection { pub fn client_connect(sockaddr: &str) -> Result { Ok(ClientConnection::new(sockaddr)) @@ -291,14 +316,14 @@ impl ClientConnection { Ok(Some(())) } - pub fn get_pipe_connection(&self) -> PipeConnection { + pub fn get_pipe_connection(&self) -> Result { let mut opts = OpenOptions::new(); opts.read(true) .write(true) .custom_flags(FILE_FLAG_OVERLAPPED); let file = opts.open(self.address.as_str()); - PipeConnection::new(file.unwrap().into_raw_handle() as isize) + return PipeConnection::new(file.unwrap().into_raw_handle() as isize) } pub fn close_receiver(&self) -> Result<()> {