Skip to content

Commit

Permalink
Replace ConnectionError in public API with io::Error
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Jun 3, 2022
1 parent 233199d commit 9e33cd9
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.11.0

- Remove `ConnectionError` from public API in favor `std::io::Error`. See [PR ]

# 0.10.1

- Update `parking_lot` dependency. See [PR 126].
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yamux"
version = "0.10.1"
version = "0.11.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "Apache-2.0 OR MIT"
description = "Multiplexer over reliable, ordered connections"
Expand Down
38 changes: 24 additions & 14 deletions src/connection/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@
// at https://opensource.org/licenses/MIT.

use super::ControlCommand;
use crate::{error::ConnectionError, Stream};
use crate::error::{into_io_error, ConnectionError};
use crate::Stream;
use futures::{
channel::{mpsc, oneshot},
prelude::*,
ready,
};
use std::{
io,
pin::Pin,
task::{Context, Poll},
};

type Result<T> = std::result::Result<T, ConnectionError>;

/// The Yamux `Connection` controller.
///
/// While a Yamux connection makes progress via its `next_stream` method,
Expand All @@ -36,7 +36,7 @@ pub struct Control {
/// Command channel to `Connection`.
sender: mpsc::Sender<ControlCommand>,
/// Pending state of `poll_open_stream`.
pending_open: Option<oneshot::Receiver<Result<Stream>>>,
pending_open: Option<oneshot::Receiver<Result<Stream, ConnectionError>>>,
/// Pending state of `poll_close`.
pending_close: Option<oneshot::Receiver<()>>,
}
Expand All @@ -61,14 +61,19 @@ impl Control {
}

/// Open a new stream to the remote.
pub async fn open_stream(&mut self) -> Result<Stream> {
pub async fn open_stream(&mut self) -> io::Result<Stream> {
let (tx, rx) = oneshot::channel();
self.sender.send(ControlCommand::OpenStream(tx)).await?;
rx.await?
self.sender
.send(ControlCommand::OpenStream(tx))
.await
.map_err(into_io_error)?;
let stream = rx.await.map_err(into_io_error)?.map_err(into_io_error)?;

Ok(stream)
}

/// Close the connection.
pub async fn close(&mut self) -> Result<()> {
pub async fn close(&mut self) -> io::Result<()> {
let (tx, rx) = oneshot::channel();
if self
.sender
Expand All @@ -86,17 +91,22 @@ impl Control {
}

/// [`Poll`] based alternative to [`Control::open_stream`].
pub fn poll_open_stream(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Stream>> {
pub fn poll_open_stream(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<io::Result<Stream>> {
loop {
match self.pending_open.take() {
None => {
ready!(self.sender.poll_ready(cx)?);
ready!(self.sender.poll_ready(cx).map_err(into_io_error)?);
let (tx, rx) = oneshot::channel();
self.sender.start_send(ControlCommand::OpenStream(tx))?;
self.sender
.start_send(ControlCommand::OpenStream(tx))
.map_err(into_io_error)?;
self.pending_open = Some(rx)
}
Some(mut rx) => match rx.poll_unpin(cx)? {
Poll::Ready(result) => return Poll::Ready(result),
Some(mut rx) => match rx.poll_unpin(cx).map_err(into_io_error)? {
Poll::Ready(result) => return Poll::Ready(result.map_err(into_io_error)),
Poll::Pending => {
self.pending_open = Some(rx);
return Poll::Pending;
Expand All @@ -112,7 +122,7 @@ impl Control {
}

/// [`Poll`] based alternative to [`Control::close`].
pub fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
pub fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
loop {
match self.pending_close.take() {
None => {
Expand Down
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// at https://opensource.org/licenses/MIT.

use crate::frame::FrameDecodeError;
use std::io;

/// The various error cases a connection may encounter.
#[non_exhaustive]
Expand Down Expand Up @@ -63,6 +64,15 @@ impl std::error::Error for ConnectionError {
}
}

pub fn into_io_error<E: Into<ConnectionError>>(error: E) -> std::io::Error {
let connection_error = error.into();

match connection_error {
ConnectionError::Io(io) => io,
other => io::Error::new(io::ErrorKind::Other, other),
}
}

impl From<std::io::Error> for ConnectionError {
fn from(e: std::io::Error) -> Self {
ConnectionError::Io(e)
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ mod tests;
pub(crate) mod connection;

pub use crate::connection::{into_stream, Connection, Control, Mode, Packet, Stream};
pub use crate::error::ConnectionError;
pub use crate::frame::{
header::{HeaderDecodeError, StreamId},
FrameDecodeError,
Expand Down
9 changes: 7 additions & 2 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// at https://opensource.org/licenses/MIT.

use crate::WindowUpdateMode;
use crate::{connection::State, Config, Connection, ConnectionError, Control, Mode};
use crate::{connection::State, error::ConnectionError, Config, Connection, Control, Mode};
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::executor::LocalPool;
use futures::future::join;
Expand Down Expand Up @@ -164,7 +164,12 @@ fn prop_max_streams() {
for _ in 0..max_streams {
v.push(control.open_stream().await.expect("open_stream"))
}
if let Err(ConnectionError::TooManyStreams) = control.open_stream().await {
if let Err(Some(Ok(ConnectionError::TooManyStreams))) =
control.open_stream().await.map_err(|e| {
e.into_inner()
.map(|inner| inner.downcast::<ConnectionError>().map(|b| *b))
})
{
true
} else {
false
Expand Down
3 changes: 2 additions & 1 deletion tests/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use futures::{channel::mpsc, prelude::*};
use quickcheck::{Arbitrary, Gen, QuickCheck};
use std::{
io,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
};
Expand Down Expand Up @@ -106,7 +107,7 @@ async fn roundtrip(
log::debug!("C: {}: read {} bytes", stream.id(), frame.len());
assert_eq!(&data[..], &frame[..]);
tx.unbounded_send(1).expect("unbounded_send");
Ok::<(), yamux::ConnectionError>(())
Ok::<(), io::Error>(())
});
}
let n = rx
Expand Down

0 comments on commit 9e33cd9

Please sign in to comment.