Skip to content

Commit

Permalink
Merge pull request #32 from Xanewok/mio-07
Browse files Browse the repository at this point in the history
Upgrade to Tokio 1.0 (and mio 0.7)
  • Loading branch information
Xanewok authored Jul 1, 2021
2 parents 298ef40 + b86fd38 commit d946e3f
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 162 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ Interprocess communication library for tokio.
[dependencies]
futures = "0.3"
log = "0.4"
mio-named-pipes = "0.1"
miow = "0.3.3"
rand = "0.7"
tokio = { version = "0.2", features = ["io-driver", "io-util", "uds", "stream", "rt-core", "macros", "time"] }
tokio = { version = "1.7.0", features = ["net", "time"] }
libc = "0.2.65"

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["winbase", "winnt", "accctrl", "aclapi", "securitybaseapi", "minwinbase", "winbase"] }

[dev-dependencies]
tokio = { version = "1.7.0", features = ["io-util", "rt", "time", "macros"] }
6 changes: 3 additions & 3 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use tokio::{self, prelude::*};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use parity_tokio_ipc::Endpoint;

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
let path = std::env::args().nth(1).expect("Run it with server path to connect as argument");

Expand All @@ -19,6 +19,6 @@ async fn main() {
break;
}

tokio::time::delay_for(std::time::Duration::from_secs(2)).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
11 changes: 4 additions & 7 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
use futures::StreamExt as _;
use tokio::{
prelude::*,
self,
io::split,
};
use tokio::io::{split, AsyncReadExt, AsyncWriteExt};

use parity_tokio_ipc::{Endpoint, SecurityAttributes};

async fn run_server(path: String) {
let mut endpoint = Endpoint::new(path);
endpoint.set_security_attributes(SecurityAttributes::allow_everyone_create().unwrap());

let mut incoming = endpoint.incoming().expect("failed to open new socket");
let incoming = endpoint.incoming().expect("failed to open new socket");
futures::pin_mut!(incoming);

while let Some(result) = incoming.next().await
{
Expand Down Expand Up @@ -40,7 +37,7 @@ async fn run_server(path: String) {
};
}

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
let path = std::env::args().nth(1).expect("Run it with server path as argument");
run_server(path).await
Expand Down
15 changes: 6 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,9 @@ pub fn dummy_endpoint() -> String {

#[cfg(test)]
mod tests {
use tokio::prelude::*;
use futures::{channel::oneshot, StreamExt as _, FutureExt as _};
use std::time::Duration;
use tokio::{
self,
io::split,
};
use tokio::io::{split, AsyncReadExt, AsyncWriteExt};

use super::{dummy_endpoint, Endpoint, SecurityAttributes};
use std::path::Path;
Expand All @@ -68,7 +64,8 @@ mod tests {
.set_mode(0o777)
.unwrap()
);
let mut incoming = endpoint.incoming().expect("failed to open up a new socket");
let incoming = endpoint.incoming().expect("failed to open up a new socket");
futures::pin_mut!(incoming);

while let Some(result) = incoming.next().await {
match result {
Expand Down Expand Up @@ -100,12 +97,12 @@ mod tests {
});
tokio::spawn(server);

tokio::time::delay_for(Duration::from_secs(2)).await;
tokio::time::sleep(Duration::from_secs(2)).await;

println!("Connecting to client 0...");
let mut client_0 = Endpoint::connect(&path).await
.expect("failed to open client_0");
tokio::time::delay_for(Duration::from_secs(2)).await;
tokio::time::sleep(Duration::from_secs(2)).await;
println!("Connecting to client 1...");
let mut client_1 = Endpoint::connect(&path).await
.expect("failed to open client_1");
Expand All @@ -125,7 +122,7 @@ mod tests {
// shutdown server
if let Ok(()) = shutdown_tx.send(()) {
// wait one second for the file to be deleted.
tokio::time::delay_for(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let path = Path::new(&path);
// assert that it has
assert!(!path.exists());
Expand Down
18 changes: 8 additions & 10 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use libc::chmod;
use std::ffi::CString;
use std::io::{self, Error};
use futures::Stream;
use tokio::prelude::*;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::{UnixListener, UnixStream};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::mem::MaybeUninit;

/// Socket permissions and ownership on UNIX
pub struct SecurityAttributes {
Expand Down Expand Up @@ -68,7 +67,7 @@ pub struct Endpoint {

impl Endpoint {
/// Stream of incoming connections
pub fn incoming(self) -> io::Result<impl Stream<Item = tokio::io::Result<impl AsyncRead + AsyncWrite>> + 'static> {
pub fn incoming(self) -> io::Result<impl Stream<Item = std::io::Result<impl AsyncRead + AsyncWrite>> + 'static> {
let listener = self.inner()?;
// the call to bind in `inner()` creates the file
// `apply_permission()` will set the file permissions.
Expand Down Expand Up @@ -124,7 +123,10 @@ impl Stream for Incoming {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
Pin::new(&mut this.listener).poll_next(cx)
match Pin::new(&mut this.listener).poll_accept(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => Poll::Ready(Some(result.map(|(stream, _addr)| stream))),
}
}
}

Expand All @@ -149,15 +151,11 @@ impl Connection {
}

impl AsyncRead for Connection {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}

fn poll_read(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = Pin::into_inner(self);
Pin::new(&mut this.inner).poll_read(ctx, buf)
}
Expand Down
Loading

0 comments on commit d946e3f

Please sign in to comment.