diff --git a/Cargo.toml b/Cargo.toml index d2d4949..f1d1ebb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ homepage = "https://github.com/paritytech/parity-tokio-ipc" description = """ Interprocess communication library for tokio. """ +include = ["src/**/*", "LICENSE-*", "README.md"] [dependencies] futures = "0.3" diff --git a/examples/client.rs b/examples/client.rs index 5a06e9f..a4d9368 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,24 +1,33 @@ -use tokio::io::{AsyncReadExt, AsyncWriteExt}; use parity_tokio_ipc::Endpoint; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::main(flavor = "current_thread")] async fn main() { - let path = std::env::args().nth(1).expect("Run it with server path to connect as argument"); + let path = std::env::args() + .nth(1) + .expect("Run it with server path to connect as argument"); - let mut client = Endpoint::connect(&path).await - .expect("Failed to connect client."); + let mut client = Endpoint::connect(&path) + .await + .expect("Failed to connect client."); - loop { - let mut buf = [0u8; 4]; - println!("SEND: PING"); - client.write_all(b"ping").await.expect("Unable to write message to client"); - client.read_exact(&mut buf[..]).await.expect("Unable to read buffer"); - if let Ok("pong") = std::str::from_utf8(&buf[..]) { - println!("RECEIVED: PONG"); - } else { - break; - } + loop { + let mut buf = [0u8; 4]; + println!("SEND: PING"); + client + .write_all(b"ping") + .await + .expect("Unable to write message to client"); + client + .read_exact(&mut buf[..]) + .await + .expect("Unable to read buffer"); + if let Ok("pong") = std::str::from_utf8(&buf[..]) { + println!("RECEIVED: PONG"); + } else { + break; + } - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - } + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } } diff --git a/examples/server.rs b/examples/server.rs index bad6b6c..c8eda4c 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -4,41 +4,45 @@ use tokio::io::{split, AsyncReadExt, AsyncWriteExt}; use parity_tokio_ipc::{Endpoint, SecurityAttributes}; async fn run_server(path: String) { - let mut endpoint = Endpoint::new(path); - endpoint.set_security_attributes(SecurityAttributes::allow_everyone_create().unwrap()); + let mut endpoint = Endpoint::new(path); + endpoint.set_security_attributes(SecurityAttributes::allow_everyone_create().unwrap()); - let incoming = endpoint.incoming().expect("failed to open new socket"); - futures::pin_mut!(incoming); + let incoming = endpoint.incoming().expect("failed to open new socket"); + futures::pin_mut!(incoming); - while let Some(result) = incoming.next().await - { - match result { - Ok(stream) => { - let (mut reader, mut writer) = split(stream); + while let Some(result) = incoming.next().await { + match result { + Ok(stream) => { + let (mut reader, mut writer) = split(stream); - tokio::spawn(async move { - loop { - let mut buf = [0u8; 4]; - let pong_buf = b"pong"; - if let Err(_) = reader.read_exact(&mut buf).await { - println!("Closing socket"); - break; - } - if let Ok("ping") = std::str::from_utf8(&buf[..]) { - println!("RECIEVED: PING"); - writer.write_all(pong_buf).await.expect("unable to write to socket"); - println!("SEND: PONG"); - } - } - }); - } - _ => unreachable!("ideally") - } - }; + tokio::spawn(async move { + loop { + let mut buf = [0u8; 4]; + let pong_buf = b"pong"; + if reader.read_exact(&mut buf).await.is_err() { + println!("Closing socket"); + break; + } + if let Ok("ping") = std::str::from_utf8(&buf[..]) { + println!("RECIEVED: PING"); + writer + .write_all(pong_buf) + .await + .expect("unable to write to socket"); + println!("SEND: PONG"); + } + } + }); + } + _ => unreachable!("ideally"), + } + } } #[tokio::main(flavor = "current_thread")] async fn main() { - let path = std::env::args().nth(1).expect("Run it with server path as argument"); - run_server(path).await -} \ No newline at end of file + let path = std::env::args() + .nth(1) + .expect("Run it with server path as argument"); + run_server(path).await +} diff --git a/src/lib.rs b/src/lib.rs index e7f13ea..db5b4c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,17 +10,19 @@ macro_rules! doc_comment { ($x:expr) => { #[doc = $x] - extern {} + extern "C" {} }; } doc_comment!(include_str!("../README.md")); -#[cfg(windows)] -mod win; #[cfg(not(windows))] mod unix; +#[cfg(windows)] +mod win; +#[cfg(unix)] +pub use unix::{Connection, Endpoint, SecurityAttributes}; /// Endpoint for IPC transport /// /// # Examples @@ -43,102 +45,116 @@ mod unix; /// } ///``` #[cfg(windows)] -pub use win::{SecurityAttributes, Endpoint, Connection}; -#[cfg(unix)] -pub use unix::{SecurityAttributes, Endpoint, Connection}; +pub use win::{Connection, Endpoint, SecurityAttributes}; /// For testing/examples +#[must_use] pub fn dummy_endpoint() -> String { - let num: u64 = rand::Rng::gen(&mut rand::thread_rng()); - if cfg!(windows) { - format!(r"\\.\pipe\my-pipe-{}", num) - } else { - format!(r"/tmp/my-uds-{}", num) - } + let num: u64 = rand::Rng::gen(&mut rand::thread_rng()); + if cfg!(windows) { + format!(r"\\.\pipe\my-pipe-{}", num) + } else { + format!(r"/tmp/my-uds-{}", num) + } } #[cfg(test)] mod tests { - use futures::{channel::oneshot, StreamExt as _, FutureExt as _}; - use std::time::Duration; - use tokio::io::{split, AsyncReadExt, AsyncWriteExt}; - - use super::{dummy_endpoint, Endpoint, SecurityAttributes}; - use std::path::Path; - use futures::future::{Either, select, ready}; - - async fn run_server(path: String) { - let path = path.to_owned(); - let mut endpoint = Endpoint::new(path); - - endpoint.set_security_attributes( - SecurityAttributes::empty() - .set_mode(0o777) - .unwrap() - ); - let incoming = endpoint.incoming().expect("failed to open up a new socket"); - futures::pin_mut!(incoming); - - while let Some(result) = incoming.next().await { - match result { - Ok(stream) => { - let (mut reader, mut writer) = split(stream); - let mut buf = [0u8; 5]; - reader.read_exact(&mut buf).await.expect("unable to read from socket"); - writer.write_all(&buf[..]).await.expect("unable to write to socket"); - } - _ => unreachable!("ideally") - } - }; - } - - #[tokio::test] - async fn smoke_test() { - let path = dummy_endpoint(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - - let server = select(Box::pin(run_server(path.clone())), shutdown_rx) - .then(|either| { - match either { - Either::Right((_, server)) => { - drop(server); - } - _ => unreachable!("also ideally") - }; - ready(()) - }); - tokio::spawn(server); - - tokio::time::sleep(Duration::from_secs(2)).await; - - println!("Connecting to client 0..."); - let mut client_0 = Endpoint::connect(&path).await - .expect("failed to open client_0"); - tokio::time::sleep(Duration::from_secs(2)).await; - println!("Connecting to client 1..."); - let mut client_1 = Endpoint::connect(&path).await - .expect("failed to open client_1"); - let msg = b"hello"; - - let mut rx_buf = vec![0u8; msg.len()]; - client_0.write_all(msg).await.expect("Unable to write message to client"); - client_0.read_exact(&mut rx_buf).await.expect("Unable to read message from client"); - - let mut rx_buf2 = vec![0u8; msg.len()]; - client_1.write_all(msg).await.expect("Unable to write message to client"); - client_1.read_exact(&mut rx_buf2).await.expect("Unable to read message from client"); - - assert_eq!(rx_buf, msg); - assert_eq!(rx_buf2, msg); - - // shutdown server - if let Ok(()) = shutdown_tx.send(()) { - // wait one second for the file to be deleted. - tokio::time::sleep(Duration::from_secs(1)).await; - let path = Path::new(&path); - // assert that it has - assert!(!path.exists()); - } + use futures::{channel::oneshot, FutureExt as _, StreamExt as _}; + use std::time::Duration; + use tokio::io::{split, AsyncReadExt, AsyncWriteExt}; + + use super::{dummy_endpoint, Endpoint, SecurityAttributes}; + use futures::future::{ready, select, Either}; + use std::path::Path; + + async fn run_server(path: String) { + let path = path.clone(); + let mut endpoint = Endpoint::new(path); + + endpoint.set_security_attributes(SecurityAttributes::empty().set_mode(0o777).unwrap()); + let incoming = endpoint.incoming().expect("failed to open up a new socket"); + futures::pin_mut!(incoming); + + while let Some(result) = incoming.next().await { + match result { + Ok(stream) => { + let (mut reader, mut writer) = split(stream); + let mut buf = [0_u8; 5]; + reader + .read_exact(&mut buf) + .await + .expect("unable to read from socket"); + writer + .write_all(&buf[..]) + .await + .expect("unable to write to socket"); + } + _ => unreachable!("ideally"), + } + } + } + + #[tokio::test] + async fn smoke_test() { + let path = dummy_endpoint(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let server = select(Box::pin(run_server(path.clone())), shutdown_rx).then(|either| { + match either { + Either::Right((_, server)) => { + drop(server); + } + _ => unreachable!("also ideally"), + }; + ready(()) + }); + tokio::spawn(server); + + tokio::time::sleep(Duration::from_secs(2)).await; + + println!("Connecting to client 0..."); + let mut client_0 = Endpoint::connect(&path) + .await + .expect("failed to open client_0"); + tokio::time::sleep(Duration::from_secs(2)).await; + println!("Connecting to client 1..."); + let mut client_1 = Endpoint::connect(&path) + .await + .expect("failed to open client_1"); + let msg = b"hello"; + + let mut rx_buf = vec![0_u8; msg.len()]; + client_0 + .write_all(msg) + .await + .expect("Unable to write message to client"); + client_0 + .read_exact(&mut rx_buf) + .await + .expect("Unable to read message from client"); + + let mut rx_buf2 = vec![0_u8; msg.len()]; + client_1 + .write_all(msg) + .await + .expect("Unable to write message to client"); + client_1 + .read_exact(&mut rx_buf2) + .await + .expect("Unable to read message from client"); + + assert_eq!(rx_buf, msg); + assert_eq!(rx_buf2, msg); + + // shutdown server + if let Ok(()) = shutdown_tx.send(()) { + // wait one second for the file to be deleted. + tokio::time::sleep(Duration::from_secs(1)).await; + let path = Path::new(&path); + // assert that it has + assert!(!path.exists()); + } } #[tokio::test] @@ -150,23 +166,23 @@ mod tests { is_static(endpoint.incoming()); } - #[cfg(windows)] - fn create_pipe_with_permissions(attr: SecurityAttributes) -> ::std::io::Result<()> { - let path = dummy_endpoint(); - - let mut endpoint = Endpoint::new(path); - endpoint.set_security_attributes(attr); - endpoint.incoming().map(|_| ()) - } - - #[cfg(windows)] - #[tokio::test] - async fn test_pipe_permissions() { - create_pipe_with_permissions(SecurityAttributes::empty()) - .expect("failed with no attributes"); - create_pipe_with_permissions(SecurityAttributes::allow_everyone_create().unwrap()) - .expect("failed with attributes for creating"); - create_pipe_with_permissions(SecurityAttributes::empty().allow_everyone_connect().unwrap()) - .expect("failed with attributes for connecting"); - } + #[cfg(windows)] + fn create_pipe_with_permissions(attr: SecurityAttributes) -> ::std::io::Result<()> { + let path = dummy_endpoint(); + + let mut endpoint = Endpoint::new(path); + endpoint.set_security_attributes(attr); + endpoint.incoming().map(|_| ()) + } + + #[cfg(windows)] + #[tokio::test] + async fn test_pipe_permissions() { + create_pipe_with_permissions(SecurityAttributes::empty()) + .expect("failed with no attributes"); + create_pipe_with_permissions(SecurityAttributes::allow_everyone_create().unwrap()) + .expect("failed with attributes for creating"); + create_pipe_with_permissions(SecurityAttributes::allow_everyone_connect().unwrap()) + .expect("failed with attributes for connecting"); + } } diff --git a/src/unix.rs b/src/unix.rs index 1cdf00c..f61dc10 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -1,36 +1,36 @@ +use futures::Stream; use libc::chmod; use std::ffi::CString; use std::io::{self, Error}; -use futures::Stream; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio::net::{UnixListener, UnixStream}; use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::{UnixListener, UnixStream}; /// Socket permissions and ownership on UNIX pub struct SecurityAttributes { // read/write permissions for owner, group and others in unix octal. - mode: Option + mode: Option, } impl SecurityAttributes { /// New default security attributes. These only allow access by the /// process’s own user and the system administrator. - pub fn empty() -> Self { + pub const fn empty() -> Self { SecurityAttributes { mode: Some(0o600) } } /// New security attributes that allow everyone to connect. - pub fn allow_everyone_connect(mut self) -> io::Result { + pub const fn allow_everyone_connect(mut self) -> io::Result { self.mode = Some(0o666); Ok(self) } /// Set a custom permission on the socket - pub fn set_mode(mut self, mode: u16) -> io::Result { + pub const fn set_mode(mut self, mode: u16) -> io::Result { self.mode = Some(mode); Ok(self) } @@ -39,10 +39,8 @@ impl SecurityAttributes { /// /// This does not work on unix, where it is equivalent to /// [`SecurityAttributes::allow_everyone_connect`]. - pub fn allow_everyone_create() -> io::Result { - Ok(SecurityAttributes { - mode: None - }) + pub const fn allow_everyone_create() -> io::Result { + Ok(SecurityAttributes { mode: None }) } /// called in unix, after server socket has been created @@ -67,7 +65,10 @@ pub struct Endpoint { impl Endpoint { /// Stream of incoming connections - pub fn incoming(self) -> io::Result> + 'static> { + pub fn incoming( + self, + ) -> io::Result> + 'static> + { let listener = self.inner()?; // the call to bind in `inner()` creates the file // `apply_permission()` will set the file permissions. @@ -99,7 +100,7 @@ impl Endpoint { } /// New IPC endpoint at the given path - pub fn new(path: String) -> Self { + pub const fn new(path: String) -> Self { Endpoint { path, security_attributes: SecurityAttributes::empty(), @@ -118,10 +119,7 @@ struct Incoming { impl Stream for Incoming { type Item = io::Result; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = Pin::into_inner(self); match Pin::new(&mut this.listener).poll_accept(cx) { Poll::Pending => Poll::Pending, @@ -145,7 +143,7 @@ pub struct Connection { } impl Connection { - fn wrap(stream: UnixStream) -> Self { + const fn wrap(stream: UnixStream) -> Self { Self { inner: stream } } } diff --git a/src/win.rs b/src/win.rs index ac987b4..75b2868 100644 --- a/src/win.rs +++ b/src/win.rs @@ -1,469 +1,476 @@ -use winapi::shared::winerror::{ERROR_PIPE_BUSY, ERROR_SUCCESS}; -use winapi::um::accctrl::*; -use winapi::um::aclapi::*; -use winapi::um::minwinbase::{LPTR, PSECURITY_ATTRIBUTES, SECURITY_ATTRIBUTES}; -use winapi::um::securitybaseapi::*; -use winapi::um::winbase::{LocalAlloc, LocalFree}; -use winapi::um::winnt::*; - -use futures::Stream; -use std::io; -use std::marker; -use std::mem; -use std::path::Path; -use std::pin::Pin; -use std::ptr; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; -use tokio::io::{AsyncRead, AsyncWrite}; - -use tokio::net::windows::named_pipe; - -enum NamedPipe { - Server(named_pipe::NamedPipeServer), - Client(named_pipe::NamedPipeClient), -} - -const PIPE_AVAILABILITY_TIMEOUT: Duration = Duration::from_secs(5); - -/// Endpoint implementation for windows -pub struct Endpoint { - path: String, - security_attributes: SecurityAttributes, - created_listener: bool, -} - -impl Endpoint { - /// Stream of incoming connections - pub fn incoming( - mut self, - ) -> io::Result> + 'static> { - let pipe = self.create_listener()?; - - let stream = - futures::stream::try_unfold((pipe, self), |(listener, mut endpoint)| async move { - let () = listener.connect().await?; - - let new_listener = endpoint.create_listener()?; - - let conn = Connection::wrap(NamedPipe::Server(listener)); - - Ok(Some((conn, (new_listener, endpoint)))) - }); - - Ok(stream) - } - - fn create_listener(&mut self) -> io::Result { - let server = unsafe { - named_pipe::ServerOptions::new() - .first_pipe_instance(!self.created_listener) - .reject_remote_clients(true) - .access_inbound(true) - .access_outbound(true) - .in_buffer_size(65536) - .out_buffer_size(65536) - .create_with_security_attributes_raw( - &self.path, - self.security_attributes.as_ptr() as *mut libc::c_void, - ) - }?; - self.created_listener = true; - - Ok(server) - } - - /// Set security attributes for the connection - pub fn set_security_attributes(&mut self, security_attributes: SecurityAttributes) { - self.security_attributes = security_attributes; - } - - /// Returns the path of the endpoint. - pub fn path(&self) -> &str { - &self.path - } - - /// Make new connection using the provided path and running event pool. - pub async fn connect>(path: P) -> io::Result { - let path = path.as_ref(); - - // There is not async equivalent of waiting for a named pipe in Windows, - // so we keep trying or sleeping for a bit, until we hit a timeout - let attempt_start = Instant::now(); - let client = loop { - match named_pipe::ClientOptions::new() - .read(true) - .write(true) - .open(path) - { - Ok(client) => break client, - Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => { - if attempt_start.elapsed() < PIPE_AVAILABILITY_TIMEOUT { - tokio::time::sleep(Duration::from_millis(50)).await; - continue; - } else { - return Err(e); - } - } - Err(e) => return Err(e), - } - }; - - Ok(Connection::wrap(NamedPipe::Client(client))) - } - - /// New IPC endpoint at the given path - pub fn new(path: String) -> Self { - Endpoint { - path, - security_attributes: SecurityAttributes::empty(), - created_listener: false, - } - } -} - -/// IPC connection. -pub struct Connection { - inner: NamedPipe, -} - -impl Connection { - /// Wraps an existing named pipe - fn wrap(pipe: NamedPipe) -> Self { - Self { inner: pipe } - } -} - -impl AsyncRead for Connection { - fn poll_read( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - let this = Pin::into_inner(self); - match this.inner { - NamedPipe::Client(ref mut c) => Pin::new(c).poll_read(ctx, buf), - NamedPipe::Server(ref mut s) => Pin::new(s).poll_read(ctx, buf), - } - } -} - -impl AsyncWrite for Connection { - fn poll_write( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let this = Pin::into_inner(self); - match this.inner { - NamedPipe::Client(ref mut c) => Pin::new(c).poll_write(ctx, buf), - NamedPipe::Server(ref mut s) => Pin::new(s).poll_write(ctx, buf), - } - } - - fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - let this = Pin::into_inner(self); - match this.inner { - NamedPipe::Client(ref mut c) => Pin::new(c).poll_flush(ctx), - NamedPipe::Server(ref mut s) => Pin::new(s).poll_flush(ctx), - } - } - - fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - let this = Pin::into_inner(self); - match this.inner { - NamedPipe::Client(ref mut c) => Pin::new(c).poll_shutdown(ctx), - NamedPipe::Server(ref mut s) => Pin::new(s).poll_shutdown(ctx), - } - } -} - -/// Security attributes. -pub struct SecurityAttributes { - attributes: Option, -} - -pub const DEFAULT_SECURITY_ATTRIBUTES: SecurityAttributes = SecurityAttributes { - attributes: Some(InnerAttributes { - descriptor: SecurityDescriptor { - descriptor_ptr: ptr::null_mut(), - }, - acl: Acl { - acl_ptr: ptr::null_mut(), - }, - attrs: SECURITY_ATTRIBUTES { - nLength: mem::size_of::() as u32, - lpSecurityDescriptor: ptr::null_mut(), - bInheritHandle: 0, - }, - }), -}; - -impl SecurityAttributes { - /// New default security attributes. - pub fn empty() -> SecurityAttributes { - DEFAULT_SECURITY_ATTRIBUTES - } - - /// New default security attributes that allow everyone to connect. - pub fn allow_everyone_connect(&self) -> io::Result { - let attributes = Some(InnerAttributes::allow_everyone( - GENERIC_READ | FILE_WRITE_DATA, - )?); - Ok(SecurityAttributes { attributes }) - } - - /// Set a custom permission on the socket - pub fn set_mode(self, _mode: u32) -> io::Result { - // for now, does nothing. - Ok(self) - } - - /// New default security attributes that allow everyone to create. - pub fn allow_everyone_create() -> io::Result { - let attributes = Some(InnerAttributes::allow_everyone( - GENERIC_READ | GENERIC_WRITE, - )?); - Ok(SecurityAttributes { attributes }) - } - - /// Return raw handle of security attributes. - pub(crate) unsafe fn as_ptr(&mut self) -> PSECURITY_ATTRIBUTES { - match self.attributes.as_mut() { - Some(attributes) => attributes.as_ptr(), - None => ptr::null_mut(), - } - } -} - -unsafe impl Send for SecurityAttributes {} - -struct Sid { - sid_ptr: PSID, -} - -impl Sid { - fn everyone_sid() -> io::Result { - let mut sid_ptr = ptr::null_mut(); - let result = unsafe { - #[allow(const_item_mutation)] - AllocateAndInitializeSid( - SECURITY_WORLD_SID_AUTHORITY.as_mut_ptr() as *mut _, - 1, - SECURITY_WORLD_RID, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - &mut sid_ptr, - ) - }; - if result == 0 { - Err(io::Error::last_os_error()) - } else { - Ok(Sid { sid_ptr }) - } - } - - // Unsafe - the returned pointer is only valid for the lifetime of self. - unsafe fn as_ptr(&self) -> PSID { - self.sid_ptr - } -} - -impl Drop for Sid { - fn drop(&mut self) { - if !self.sid_ptr.is_null() { - unsafe { - FreeSid(self.sid_ptr); - } - } - } -} - -struct AceWithSid<'a> { - explicit_access: EXPLICIT_ACCESS_W, - _marker: marker::PhantomData<&'a Sid>, -} - -impl<'a> AceWithSid<'a> { - fn new(sid: &'a Sid, trustee_type: u32) -> AceWithSid<'a> { - let mut explicit_access = unsafe { mem::zeroed::() }; - explicit_access.Trustee.TrusteeForm = TRUSTEE_IS_SID; - explicit_access.Trustee.TrusteeType = trustee_type; - explicit_access.Trustee.ptstrName = unsafe { sid.as_ptr() as *mut _ }; - - AceWithSid { - explicit_access, - _marker: marker::PhantomData, - } - } - - fn set_access_mode(&mut self, access_mode: u32) -> &mut Self { - self.explicit_access.grfAccessMode = access_mode; - self - } - - fn set_access_permissions(&mut self, access_permissions: u32) -> &mut Self { - self.explicit_access.grfAccessPermissions = access_permissions; - self - } - - fn allow_inheritance(&mut self, inheritance_flags: u32) -> &mut Self { - self.explicit_access.grfInheritance = inheritance_flags; - self - } -} - -struct Acl { - acl_ptr: PACL, -} - -impl Acl { - fn empty() -> io::Result { - Self::new(&mut []) - } - - fn new(entries: &mut [AceWithSid<'_>]) -> io::Result { - let mut acl_ptr = ptr::null_mut(); - let result = unsafe { - SetEntriesInAclW( - entries.len() as u32, - entries.as_mut_ptr() as *mut _, - ptr::null_mut(), - &mut acl_ptr, - ) - }; - - if result != ERROR_SUCCESS { - return Err(io::Error::from_raw_os_error(result as i32)); - } - - Ok(Acl { acl_ptr }) - } - - unsafe fn as_ptr(&self) -> PACL { - self.acl_ptr - } -} - -impl Drop for Acl { - fn drop(&mut self) { - if !self.acl_ptr.is_null() { - unsafe { LocalFree(self.acl_ptr as *mut _) }; - } - } -} - -struct SecurityDescriptor { - descriptor_ptr: PSECURITY_DESCRIPTOR, -} - -impl SecurityDescriptor { - fn new() -> io::Result { - let descriptor_ptr = unsafe { LocalAlloc(LPTR, SECURITY_DESCRIPTOR_MIN_LENGTH) }; - if descriptor_ptr.is_null() { - return Err(io::Error::new( - io::ErrorKind::Other, - "Failed to allocate security descriptor", - )); - } - - if unsafe { - InitializeSecurityDescriptor(descriptor_ptr, SECURITY_DESCRIPTOR_REVISION) == 0 - } { - return Err(io::Error::last_os_error()); - }; - - Ok(SecurityDescriptor { descriptor_ptr }) - } - - fn set_dacl(&mut self, acl: &Acl) -> io::Result<()> { - if unsafe { - SetSecurityDescriptorDacl(self.descriptor_ptr, true as i32, acl.as_ptr(), false as i32) - == 0 - } { - return Err(io::Error::last_os_error()); - } - Ok(()) - } - - unsafe fn as_ptr(&self) -> PSECURITY_DESCRIPTOR { - self.descriptor_ptr - } -} - -impl Drop for SecurityDescriptor { - fn drop(&mut self) { - if !self.descriptor_ptr.is_null() { - unsafe { LocalFree(self.descriptor_ptr) }; - self.descriptor_ptr = ptr::null_mut(); - } - } -} - -struct InnerAttributes { - descriptor: SecurityDescriptor, - acl: Acl, - attrs: SECURITY_ATTRIBUTES, -} - -impl InnerAttributes { - fn empty() -> io::Result { - let descriptor = SecurityDescriptor::new()?; - let mut attrs = unsafe { mem::zeroed::() }; - attrs.nLength = mem::size_of::() as u32; - attrs.lpSecurityDescriptor = unsafe { descriptor.as_ptr() }; - attrs.bInheritHandle = false as i32; - - let acl = Acl::empty().expect("this should never fail"); - - Ok(InnerAttributes { - acl, - descriptor, - attrs, - }) - } - - fn allow_everyone(permissions: u32) -> io::Result { - let mut attributes = Self::empty()?; - let sid = Sid::everyone_sid()?; - - let mut everyone_ace = AceWithSid::new(&sid, TRUSTEE_IS_WELL_KNOWN_GROUP); - everyone_ace - .set_access_mode(SET_ACCESS) - .set_access_permissions(permissions) - .allow_inheritance(false as u32); - - let mut entries = vec![everyone_ace]; - attributes.acl = Acl::new(&mut entries)?; - attributes.descriptor.set_dacl(&attributes.acl)?; - - Ok(attributes) - } - - unsafe fn as_ptr(&mut self) -> PSECURITY_ATTRIBUTES { - &mut self.attrs as *mut _ - } -} - -#[cfg(test)] -mod test { - use super::SecurityAttributes; - - #[test] - fn test_allow_everyone_everything() { - SecurityAttributes::allow_everyone_create() - .expect("failed to create security attributes that allow everyone to create a pipe"); - } - - #[test] - fn test_allow_eveyone_read_write() { - SecurityAttributes::empty() - .allow_everyone_connect() - .expect("failed to create security attributes that allow everyone to read and write to/from a pipe"); - } -} +use winapi::shared::winerror::{ERROR_PIPE_BUSY, ERROR_SUCCESS}; +use winapi::um::accctrl::{ + EXPLICIT_ACCESS_W, SET_ACCESS, TRUSTEE_IS_SID, TRUSTEE_IS_WELL_KNOWN_GROUP, +}; +use winapi::um::aclapi::SetEntriesInAclW; +use winapi::um::minwinbase::{LPTR, PSECURITY_ATTRIBUTES, SECURITY_ATTRIBUTES}; +use winapi::um::securitybaseapi::{ + AllocateAndInitializeSid, FreeSid, InitializeSecurityDescriptor, SetSecurityDescriptorDacl, +}; +use winapi::um::winbase::{LocalAlloc, LocalFree}; +use winapi::um::winnt::{ + FILE_WRITE_DATA, GENERIC_READ, GENERIC_WRITE, PACL, PSECURITY_DESCRIPTOR, PSID, + SECURITY_DESCRIPTOR_MIN_LENGTH, SECURITY_DESCRIPTOR_REVISION, SECURITY_WORLD_RID, + SECURITY_WORLD_SID_AUTHORITY, +}; + +use futures::Stream; +use std::io; +use std::marker; +use std::mem; +use std::path::Path; +use std::pin::Pin; +use std::ptr; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncRead, AsyncWrite}; + +use tokio::net::windows::named_pipe; + +enum NamedPipe { + Server(named_pipe::NamedPipeServer), + Client(named_pipe::NamedPipeClient), +} + +const PIPE_AVAILABILITY_TIMEOUT: Duration = Duration::from_secs(5); + +/// Endpoint implementation for windows +pub struct Endpoint { + path: String, + security_attributes: SecurityAttributes, + created_listener: bool, +} + +impl Endpoint { + /// Stream of incoming connections + pub fn incoming( + mut self, + ) -> io::Result> + 'static> { + let pipe = self.create_listener()?; + + let stream = + futures::stream::try_unfold((pipe, self), |(listener, mut endpoint)| async move { + listener.connect().await?; + + let new_listener = endpoint.create_listener()?; + + let conn = Connection::wrap(NamedPipe::Server(listener)); + + Ok(Some((conn, (new_listener, endpoint)))) + }); + + Ok(stream) + } + + fn create_listener(&mut self) -> io::Result { + let server = unsafe { + named_pipe::ServerOptions::new() + .first_pipe_instance(!self.created_listener) + .reject_remote_clients(true) + .access_inbound(true) + .access_outbound(true) + .in_buffer_size(65536) + .out_buffer_size(65536) + .create_with_security_attributes_raw( + &self.path, + self.security_attributes.as_ptr().cast::(), + ) + }?; + self.created_listener = true; + + Ok(server) + } + + /// Set security attributes for the connection + pub fn set_security_attributes(&mut self, security_attributes: SecurityAttributes) { + self.security_attributes = security_attributes; + } + + /// Returns the path of the endpoint. + pub fn path(&self) -> &str { + &self.path + } + + /// Make new connection using the provided path and running event pool. + pub async fn connect>(path: P) -> io::Result { + let path = path.as_ref(); + + // There is not async equivalent of waiting for a named pipe in Windows, + // so we keep trying or sleeping for a bit, until we hit a timeout + let attempt_start = Instant::now(); + let client = loop { + match named_pipe::ClientOptions::new() + .read(true) + .write(true) + .open(path) + { + Ok(client) => break client, + Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => { + if attempt_start.elapsed() < PIPE_AVAILABILITY_TIMEOUT { + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } else { + return Err(e); + } + } + Err(e) => return Err(e), + } + }; + + Ok(Connection::wrap(NamedPipe::Client(client))) + } + + /// New IPC endpoint at the given path + pub const fn new(path: String) -> Self { + Endpoint { + path, + security_attributes: SecurityAttributes::empty(), + created_listener: false, + } + } +} + +/// IPC connection. +pub struct Connection { + inner: NamedPipe, +} + +impl Connection { + /// Wraps an existing named pipe + const fn wrap(pipe: NamedPipe) -> Self { + Self { inner: pipe } + } +} + +impl AsyncRead for Connection { + fn poll_read( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let this = Pin::into_inner(self); + match this.inner { + NamedPipe::Client(ref mut c) => Pin::new(c).poll_read(ctx, buf), + NamedPipe::Server(ref mut s) => Pin::new(s).poll_read(ctx, buf), + } + } +} + +impl AsyncWrite for Connection { + fn poll_write( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = Pin::into_inner(self); + match this.inner { + NamedPipe::Client(ref mut c) => Pin::new(c).poll_write(ctx, buf), + NamedPipe::Server(ref mut s) => Pin::new(s).poll_write(ctx, buf), + } + } + + fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); + match this.inner { + NamedPipe::Client(ref mut c) => Pin::new(c).poll_flush(ctx), + NamedPipe::Server(ref mut s) => Pin::new(s).poll_flush(ctx), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); + match this.inner { + NamedPipe::Client(ref mut c) => Pin::new(c).poll_shutdown(ctx), + NamedPipe::Server(ref mut s) => Pin::new(s).poll_shutdown(ctx), + } + } +} + +/// Security attributes. +pub struct SecurityAttributes { + attributes: Option, +} + +pub const DEFAULT_SECURITY_ATTRIBUTES: SecurityAttributes = SecurityAttributes { + attributes: Some(InnerAttributes { + descriptor: SecurityDescriptor { + descriptor_ptr: ptr::null_mut(), + }, + acl: Acl { + acl_ptr: ptr::null_mut(), + }, + attrs: SECURITY_ATTRIBUTES { + nLength: mem::size_of::() as u32, + lpSecurityDescriptor: ptr::null_mut(), + bInheritHandle: 0, + }, + }), +}; + +impl SecurityAttributes { + /// New default security attributes. + pub const fn empty() -> Self { + DEFAULT_SECURITY_ATTRIBUTES + } + + /// New default security attributes that allow everyone to connect. + pub fn allow_everyone_connect() -> io::Result { + let attributes = Some(InnerAttributes::allow_everyone( + GENERIC_READ | FILE_WRITE_DATA, + )?); + Ok(SecurityAttributes { attributes }) + } + + /// Set a custom permission on the socket + pub const fn set_mode(self, _mode: u32) -> io::Result { + // for now, does nothing. + Ok(self) + } + + /// New default security attributes that allow everyone to create. + pub fn allow_everyone_create() -> io::Result { + let attributes = Some(InnerAttributes::allow_everyone( + GENERIC_READ | GENERIC_WRITE, + )?); + Ok(SecurityAttributes { attributes }) + } + + /// Return raw handle of security attributes. + pub(crate) unsafe fn as_ptr(&mut self) -> PSECURITY_ATTRIBUTES { + match self.attributes.as_mut() { + Some(attributes) => attributes.as_ptr(), + None => ptr::null_mut(), + } + } +} + +unsafe impl Send for SecurityAttributes {} + +struct Sid { + sid_ptr: PSID, +} + +impl Sid { + fn everyone_sid() -> io::Result { + let mut sid_ptr = ptr::null_mut(); + let result = unsafe { + #[allow(const_item_mutation)] + AllocateAndInitializeSid( + SECURITY_WORLD_SID_AUTHORITY.as_mut_ptr().cast(), + 1, + SECURITY_WORLD_RID, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + &mut sid_ptr, + ) + }; + if result == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(Sid { sid_ptr }) + } + } + + // Unsafe - the returned pointer is only valid for the lifetime of self. + const unsafe fn as_ptr(&self) -> PSID { + self.sid_ptr + } +} + +impl Drop for Sid { + fn drop(&mut self) { + if !self.sid_ptr.is_null() { + unsafe { + FreeSid(self.sid_ptr); + } + } + } +} + +struct AceWithSid<'a> { + explicit_access: EXPLICIT_ACCESS_W, + _marker: marker::PhantomData<&'a Sid>, +} + +impl<'a> AceWithSid<'a> { + fn new(sid: &'a Sid, trustee_type: u32) -> AceWithSid<'a> { + let mut explicit_access = unsafe { mem::zeroed::() }; + explicit_access.Trustee.TrusteeForm = TRUSTEE_IS_SID; + explicit_access.Trustee.TrusteeType = trustee_type; + explicit_access.Trustee.ptstrName = unsafe { sid.as_ptr().cast() }; + + AceWithSid { + explicit_access, + _marker: marker::PhantomData, + } + } + + fn set_access_mode(&mut self, access_mode: u32) -> &mut Self { + self.explicit_access.grfAccessMode = access_mode; + self + } + + fn set_access_permissions(&mut self, access_permissions: u32) -> &mut Self { + self.explicit_access.grfAccessPermissions = access_permissions; + self + } + + fn allow_inheritance(&mut self, inheritance_flags: u32) -> &mut Self { + self.explicit_access.grfInheritance = inheritance_flags; + self + } +} + +struct Acl { + acl_ptr: PACL, +} + +impl Acl { + fn empty() -> io::Result { + Self::new(&mut []) + } + + fn new(entries: &mut [AceWithSid<'_>]) -> io::Result { + let mut acl_ptr = ptr::null_mut(); + let result = unsafe { + SetEntriesInAclW( + entries.len() as u32, + entries.as_mut_ptr().cast(), + ptr::null_mut(), + &mut acl_ptr, + ) + }; + + if result != ERROR_SUCCESS { + return Err(io::Error::from_raw_os_error(result as i32)); + } + + Ok(Acl { acl_ptr }) + } + + const unsafe fn as_ptr(&self) -> PACL { + self.acl_ptr + } +} + +impl Drop for Acl { + fn drop(&mut self) { + if !self.acl_ptr.is_null() { + unsafe { LocalFree(self.acl_ptr.cast()) }; + } + } +} + +struct SecurityDescriptor { + descriptor_ptr: PSECURITY_DESCRIPTOR, +} + +impl SecurityDescriptor { + fn new() -> io::Result { + let descriptor_ptr = unsafe { LocalAlloc(LPTR, SECURITY_DESCRIPTOR_MIN_LENGTH) }; + if descriptor_ptr.is_null() { + return Err(io::Error::new( + io::ErrorKind::Other, + "Failed to allocate security descriptor", + )); + } + + if unsafe { + InitializeSecurityDescriptor(descriptor_ptr, SECURITY_DESCRIPTOR_REVISION) == 0 + } { + return Err(io::Error::last_os_error()); + }; + + Ok(SecurityDescriptor { descriptor_ptr }) + } + + fn set_dacl(&mut self, acl: &Acl) -> io::Result<()> { + if unsafe { + SetSecurityDescriptorDacl(self.descriptor_ptr, true as i32, acl.as_ptr(), false as i32) + == 0 + } { + return Err(io::Error::last_os_error()); + } + Ok(()) + } + + const unsafe fn as_ptr(&self) -> PSECURITY_DESCRIPTOR { + self.descriptor_ptr + } +} + +impl Drop for SecurityDescriptor { + fn drop(&mut self) { + if !self.descriptor_ptr.is_null() { + unsafe { LocalFree(self.descriptor_ptr) }; + self.descriptor_ptr = ptr::null_mut(); + } + } +} + +struct InnerAttributes { + descriptor: SecurityDescriptor, + acl: Acl, + attrs: SECURITY_ATTRIBUTES, +} + +impl InnerAttributes { + fn empty() -> io::Result { + let descriptor = SecurityDescriptor::new()?; + let mut attrs = unsafe { mem::zeroed::() }; + attrs.nLength = mem::size_of::() as u32; + attrs.lpSecurityDescriptor = unsafe { descriptor.as_ptr() }; + attrs.bInheritHandle = false as i32; + + let acl = Acl::empty().expect("this should never fail"); + + Ok(InnerAttributes { + descriptor, + acl, + attrs, + }) + } + + fn allow_everyone(permissions: u32) -> io::Result { + let mut attributes = Self::empty()?; + let sid = Sid::everyone_sid()?; + + let mut everyone_ace = AceWithSid::new(&sid, TRUSTEE_IS_WELL_KNOWN_GROUP); + everyone_ace + .set_access_mode(SET_ACCESS) + .set_access_permissions(permissions) + .allow_inheritance(false as u32); + + let mut entries = vec![everyone_ace]; + attributes.acl = Acl::new(&mut entries)?; + attributes.descriptor.set_dacl(&attributes.acl)?; + + Ok(attributes) + } + + unsafe fn as_ptr(&mut self) -> PSECURITY_ATTRIBUTES { + &mut self.attrs as *mut _ + } +} + +#[cfg(test)] +mod test { + use super::SecurityAttributes; + + #[test] + fn test_allow_everyone_everything() { + SecurityAttributes::allow_everyone_create() + .expect("failed to create security attributes that allow everyone to create a pipe"); + } + + #[test] + fn test_allow_eveyone_read_write() { + SecurityAttributes::allow_everyone_connect() + .expect("failed to create security attributes that allow everyone to read and write to/from a pipe"); + } +}