Skip to content

Commit

Permalink
fixed ipc leak (#277)
Browse files Browse the repository at this point in the history
* fixed ipc connection leak, closes #275

* fixed indentation

* fixed broken pipe issue in tests

* empirical tests fixes

* fix tests

* fix tests

* fix tests

* move ipc start_signal.send after the incoming.for_each

* log ipc traces on travis

* keep writer in memory as long as possible

* select_with_weak

* remove redundant thread::sleep

* test session end

* fixed race condition in test_session_end
  • Loading branch information
debris committed Jun 12, 2018
1 parent 06bec6a commit a6d5841
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 53 deletions.
1 change: 1 addition & 0 deletions core/src/calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use BoxFuture;
/// Metadata trait
pub trait Metadata: Clone + Send + 'static {}
impl Metadata for () {}
impl<T: Sync + Send + 'static> Metadata for Arc<T> {}

/// Asynchronous Method
pub trait RpcMethodSimple: Send + Sync + 'static {
Expand Down
5 changes: 3 additions & 2 deletions ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ parity-tokio-ipc = { git = "https://github.com/nikvolf/parity-tokio-ipc" }

[dev-dependencies]
env_logger = "0.4"
lazy_static = "0.2"
lazy_static = "1.0"
parking_lot = "0.5"

[target.'cfg(not(windows))'.dev-dependencies]
tokio-uds = "0.1"
tokio-uds = "0.2"

[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
1 change: 1 addition & 0 deletions ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub extern crate jsonrpc_core;
#[cfg(test)] mod logger;

mod server;
mod select_with_weak;
mod meta;

use jsonrpc_core as jsonrpc;
Expand Down
77 changes: 77 additions & 0 deletions ipc/src/select_with_weak.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use jsonrpc::futures::{Poll, Async};
use jsonrpc::futures::stream::{Stream, Fuse};

pub trait SelectWithWeakExt: Stream {
fn select_with_weak<S>(self, other: S) -> SelectWithWeak<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized;
}

impl<T> SelectWithWeakExt for T where T: Stream {
fn select_with_weak<S>(self, other: S) -> SelectWithWeak<Self, S>
where S: Stream<Item = Self::Item, Error = Self::Error>, Self: Sized {
new(self, other)
}
}

/// An adapter for merging the output of two streams.
///
/// The merged stream produces items from either of the underlying streams as
/// they become available, and the streams are polled in a round-robin fashion.
/// Errors, however, are not merged: you get at most one error at a time.
///
/// Finishes when strong stream finishes
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct SelectWithWeak<S1, S2> {
strong: Fuse<S1>,
weak: Fuse<S2>,
use_strong: bool,
}

fn new<S1, S2>(stream1: S1, stream2: S2) -> SelectWithWeak<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
SelectWithWeak {
strong: stream1.fuse(),
weak: stream2.fuse(),
use_strong: false,
}
}

impl<S1, S2> Stream for SelectWithWeak<S1, S2>
where S1: Stream,
S2: Stream<Item = S1::Item, Error = S1::Error>
{
type Item = S1::Item;
type Error = S1::Error;

fn poll(&mut self) -> Poll<Option<S1::Item>, S1::Error> {
let mut checked_strong = false;
loop {
if self.use_strong {
match self.strong.poll()? {
Async::Ready(Some(item)) => {
self.use_strong = false;
return Ok(Some(item).into())
},
Async::Ready(None) => return Ok(None.into()),
Async::NotReady => {
if !checked_strong {
self.use_strong = false;
} else {
return Ok(Async::NotReady)
}
}
}
checked_strong = true;
} else {
self.use_strong = true;
match self.weak.poll()? {
Async::Ready(Some(item)) => return Ok(Some(item).into()),
Async::Ready(None) | Async::NotReady => (),
}
}
}
}
}
Loading

0 comments on commit a6d5841

Please sign in to comment.