diff --git a/src/sync/client.rs b/src/sync/client.rs index 9cc117d8..d4d036b8 100644 --- a/src/sync/client.rs +++ b/src/sync/client.rs @@ -45,7 +45,7 @@ impl Client { pub fn connect(sockaddr: &str) -> Result { let conn = ClientConnection::client_connect(sockaddr)?; - Ok(Self::new_client(conn)) + Self::new_client(conn) } #[cfg(unix)] @@ -53,10 +53,14 @@ impl Client { pub fn new(fd: RawFd) -> Client { let conn = ClientConnection::new(fd); - Self::new_client(conn) + Self::new_client(conn).unwrap_or_else(|e| { + panic!( + "client was not successfully initialized: {}", e + ) + }) } - fn new_client(pipe_client: ClientConnection) -> Client { + fn new_client(pipe_client: ClientConnection) -> Result { let client = Arc::new(pipe_client); let (sender_tx, rx): (Sender, Receiver) = mpsc::channel(); @@ -64,7 +68,7 @@ impl Client { let receiver_map = recver_map_orig.clone(); - let connection = Arc::new(client.get_pipe_connection()); + let connection = Arc::new(client.get_pipe_connection()?); let sender_client = connection.clone(); //Sender @@ -171,10 +175,10 @@ impl Client { trace!("Receiver quit"); }); - Client { + Ok(Client { _connection: client, sender_tx, - } + }) } pub fn request(&self, req: Request) -> Result { let buf = req.encode().map_err(err_to_others_err!(e, ""))?; @@ -220,7 +224,11 @@ impl Drop for ClientConnection { #[cfg(windows)] impl Drop for PipeConnection { fn drop(&mut self) { - self.close().unwrap(); + self.close().unwrap_or_else(|e| { + trace!( + "connection may already be closed: {}", e + ) + }); trace!("pipe connection is dropped"); } } diff --git a/src/sync/sys/unix/net.rs b/src/sync/sys/unix/net.rs index 4cf855f1..fb344c5a 100644 --- a/src/sync/sys/unix/net.rs +++ b/src/sync/sys/unix/net.rs @@ -304,8 +304,8 @@ impl ClientConnection { Ok(Some(())) } - pub fn get_pipe_connection(&self) -> PipeConnection { - PipeConnection::new(self.fd) + pub fn get_pipe_connection(&self) -> Result { + Ok(PipeConnection::new(self.fd)) } pub fn close_receiver(&self) -> Result<()> { diff --git a/src/sync/sys/windows/net.rs b/src/sync/sys/windows/net.rs index 8a385962..b153fad6 100644 --- a/src/sync/sys/windows/net.rs +++ b/src/sync/sys/windows/net.rs @@ -30,7 +30,7 @@ use windows_sys::Win32::Foundation::{ CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_ use windows_sys::Win32::Storage::FileSystem::{ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX }; use windows_sys::Win32::System::IO::{ GetOverlappedResult, OVERLAPPED }; use windows_sys::Win32::System::Pipes::{ CreateNamedPipeW, ConnectNamedPipe,DisconnectNamedPipe, PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, PIPE_REJECT_REMOTE_CLIENTS }; -use windows_sys::Win32::System::Threading::CreateEventW; +use windows_sys::Win32::System::Threading::{CreateEventW, SetEvent}; const PIPE_BUFFER_SIZE: u32 = 65536; const WAIT_FOR_EVENT: i32 = 1; @@ -38,6 +38,7 @@ const WAIT_FOR_EVENT: i32 = 1; pub struct PipeListener { first_instance: AtomicBool, address: String, + connection_event: isize, } #[repr(C)] @@ -54,12 +55,6 @@ impl Overlapped { ol } - fn new() -> Overlapped { - Overlapped { - inner: UnsafeCell::new(unsafe { std::mem::zeroed() }), - } - } - fn as_mut_ptr(&self) -> *mut OVERLAPPED { self.inner.get() } @@ -67,9 +62,11 @@ impl Overlapped { impl PipeListener { pub(crate) fn new(sockaddr: &str) -> Result { + let connection_event = create_event()?; Ok(PipeListener { first_instance: AtomicBool::new(true), address: sockaddr.to_string(), + connection_event }) } @@ -83,11 +80,21 @@ impl PipeListener { } // Create a new pipe instance for every new client - let np = self.new_instance().unwrap(); - let ol = Overlapped::new(); + let instance = self.new_instance()?; + let np = match PipeConnection::new(instance) { + Ok(np) => np, + Err(e) => { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("failed to create new pipe instance: {:?}", e), + )); + } + }; + + let ol = Overlapped::new_with_event(self.connection_event); trace!("listening for connection"); - let result = unsafe { ConnectNamedPipe(np, ol.as_mut_ptr())}; + let result = unsafe { ConnectNamedPipe(np.named_pipe, ol.as_mut_ptr())}; if result != 0 { return Err(io::Error::last_os_error()); } @@ -95,18 +102,18 @@ impl PipeListener { match io::Error::last_os_error() { e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => { let mut bytes_transfered = 0; - let res = unsafe {GetOverlappedResult(np, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) }; + let res = unsafe {GetOverlappedResult(np.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) }; match res { 0 => { return Err(io::Error::last_os_error()); } _ => { - Ok(Some(PipeConnection::new(np))) + Ok(Some(np)) } } } e if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => { - Ok(Some(PipeConnection::new(np))) + Ok(Some(np)) } e => { return Err(io::Error::new( @@ -143,7 +150,9 @@ impl PipeListener { } pub fn close(&self) -> Result<()> { - Ok(()) + // release the ConnectNamedPipe thread by signaling the event and clean up event handle + set_event(self.connection_event)?; + close_handle(self.connection_event) } } @@ -168,15 +177,15 @@ pub struct PipeConnection { // "It is safer to use an event object because of the confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device." // "In this situation, there is no way to know which operation caused the object's state to be signaled." impl PipeConnection { - pub(crate) fn new(h: isize) -> PipeConnection { + pub(crate) fn new(h: isize) -> Result { trace!("creating events for thread {:?} on pipe instance {}", std::thread::current().id(), h as i32); - let read_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) }; - let write_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) }; - PipeConnection { + let read_event = create_event()?; + let write_event = create_event()?; + Ok(PipeConnection { named_pipe: h, read_event: read_event, write_event: write_event, - } + }) } pub(crate) fn id(&self) -> i32 { @@ -273,6 +282,22 @@ fn close_handle(handle: isize) -> Result<()> { } } +fn create_event() -> Result { + let result = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) }; + match result { + 0 => Err(Error::Windows(io::Error::last_os_error().raw_os_error().unwrap())), + _ => Ok(result), + } +} + +fn set_event(event: isize) -> Result<()> { + let result = unsafe { SetEvent(event) }; + match result { + 0 => Err(Error::Windows(io::Error::last_os_error().raw_os_error().unwrap())), + _ => Ok(()), + } +} + impl ClientConnection { pub fn client_connect(sockaddr: &str) -> Result { Ok(ClientConnection::new(sockaddr)) @@ -289,14 +314,14 @@ impl ClientConnection { Ok(Some(())) } - pub fn get_pipe_connection(&self) -> PipeConnection { + pub fn get_pipe_connection(&self) -> Result { let mut opts = OpenOptions::new(); opts.read(true) .write(true) .custom_flags(FILE_FLAG_OVERLAPPED); let file = opts.open(self.address.as_str()); - PipeConnection::new(file.unwrap().into_raw_handle() as isize) + return PipeConnection::new(file.unwrap().into_raw_handle() as isize) } pub fn close_receiver(&self) -> Result<()> {