diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index b89683c3a01..7bea0f3f35a 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -3,9 +3,11 @@ - Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691]. - Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707]. +- Replace `Into` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710]. [PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691 [PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707 +[PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710 # 0.33.0 diff --git a/core/src/either.rs b/core/src/either.rs index 55b616d78a6..8ce9046f6f8 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -203,7 +203,7 @@ where { type Substream = EitherOutput; type OutboundSubstream = EitherOutbound; - type Error = io::Error; + type Error = EitherError; fn poll_event( &self, @@ -212,11 +212,11 @@ where match self { EitherOutput::First(inner) => inner .poll_event(cx) - .map_err(|e| e.into()) + .map_err(EitherError::A) .map_ok(|event| event.map_inbound_stream(EitherOutput::First)), EitherOutput::Second(inner) => inner .poll_event(cx) - .map_err(|e| e.into()) + .map_err(EitherError::B) .map_ok(|event| event.map_inbound_stream(EitherOutput::Second)), } } @@ -237,11 +237,11 @@ where (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner .poll_outbound(cx, substream) .map(|p| p.map(EitherOutput::First)) - .map_err(|e| e.into()), + .map_err(EitherError::A), (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner .poll_outbound(cx, substream) .map(|p| p.map(EitherOutput::Second)) - .map_err(|e| e.into()), + .map_err(EitherError::B), _ => panic!("Wrong API usage"), } } @@ -261,8 +261,8 @@ where fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { match self { - EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()), - EitherOutput::Second(inner) => inner.poll_close(cx).map_err(|e| e.into()), + EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A), + EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B), } } } diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 68e555e167f..050d8d1bd35 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -52,7 +52,6 @@ use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite}; use multiaddr::Multiaddr; -use std::io; pub use self::boxed::StreamMuxerBox; pub use self::boxed::SubstreamBox; @@ -76,7 +75,7 @@ pub trait StreamMuxer { type OutboundSubstream; /// Error type of the muxer - type Error: Into; + type Error: std::error::Error; /// Polls for a connection-wide event. /// diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index f0e2ff10647..ad39ef0532d 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -1,8 +1,9 @@ use crate::muxing::StreamMuxerEvent; use crate::StreamMuxer; use fnv::FnvHashMap; -use futures::{AsyncRead, AsyncWrite}; +use futures::{ready, AsyncRead, AsyncWrite}; use parking_lot::Mutex; +use std::error::Error; use std::fmt; use std::io; use std::io::{IoSlice, IoSliceMut}; @@ -38,6 +39,7 @@ impl StreamMuxer for Wrap where T: StreamMuxer, T::Substream: Send + Unpin + 'static, + T::Error: Send + Sync + 'static, { type Substream = SubstreamBox; type OutboundSubstream = usize; // TODO: use a newtype @@ -48,18 +50,10 @@ where &self, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - let substream = match self.inner.poll_event(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => { - return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) - } - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - }; + let event = ready!(self.inner.poll_event(cx).map_err(into_io_error)?) + .map_inbound_stream(SubstreamBox::new); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(SubstreamBox::new( - substream, - )))) + Poll::Ready(Ok(event)) } #[inline] @@ -77,16 +71,12 @@ where substream: &mut Self::OutboundSubstream, ) -> Poll> { let mut list = self.outbound.lock(); - let substream = match self + let stream = ready!(self .inner .poll_outbound(cx, list.get_mut(substream).unwrap()) - { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(s)) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - }; + .map_err(into_io_error)?); - Poll::Ready(Ok(SubstreamBox::new(substream))) + Poll::Ready(Ok(SubstreamBox::new(stream))) } #[inline] @@ -98,10 +88,17 @@ where #[inline] fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx).map_err(|e| e.into()) + self.inner.poll_close(cx).map_err(into_io_error) } } +fn into_io_error(err: E) -> io::Error +where + E: Error + Send + Sync + 'static, +{ + io::Error::new(io::ErrorKind::Other, err) +} + impl StreamMuxerBox { /// Turns a stream muxer into a `StreamMuxerBox`. pub fn new(muxer: T) -> StreamMuxerBox @@ -109,6 +106,7 @@ impl StreamMuxerBox { T: StreamMuxer + Send + Sync + 'static, T::OutboundSubstream: Send, T::Substream: Send + Unpin + 'static, + T::Error: Send + Sync + 'static, { let wrap = Wrap { inner: muxer, diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 5f303396452..964045ad33f 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -302,6 +302,7 @@ impl Multiplexed { M: StreamMuxer + Send + Sync + 'static, M::Substream: Send + Unpin + 'static, M::OutboundSubstream: Send + 'static, + M::Error: Send + Sync + 'static, { boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))) } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index e53701699e2..b09cb0480bd 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -137,40 +137,38 @@ where ) -> Poll, ConnectionError>> { loop { // Poll the handler for new events. - match self.handler.poll(cx) { + match self.handler.poll(cx)? { Poll::Pending => {} - Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => { + Poll::Ready(handler_wrapper::Event::OutboundSubstreamRequest(user_data)) => { self.muxing.open_substream(user_data); continue; } - Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => { + Poll::Ready(handler_wrapper::Event::Custom(event)) => { return Poll::Ready(Ok(Event::Handler(event))); } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), } // Perform I/O on the connection through the muxer, informing the handler // of new substreams. - match self.muxing.poll(cx) { + match self.muxing.poll(cx)? { Poll::Pending => {} - Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => { + Poll::Ready(SubstreamEvent::InboundSubstream { substream }) => { self.handler .inject_substream(substream, SubstreamEndpoint::Listener); continue; } - Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { + Poll::Ready(SubstreamEvent::OutboundSubstream { user_data, substream, - })) => { + }) => { let endpoint = SubstreamEndpoint::Dialer(user_data); self.handler.inject_substream(substream, endpoint); continue; } - Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => { + Poll::Ready(SubstreamEvent::AddressChange(address)) => { self.handler.inject_address_change(&address); return Poll::Ready(Ok(Event::AddressChange(address))); } - Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))), } return Poll::Pending; diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 4e8d85a8180..8a6d6bbbf00 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -75,6 +75,12 @@ impl From> for ConnectionError< } } +impl From for ConnectionError { + fn from(error: io::Error) -> Self { + ConnectionError::IO(error) + } +} + /// Errors that can occur in the context of a pending outgoing `Connection`. /// /// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs index 50714585366..47d5d315b20 100644 --- a/swarm/src/connection/substream.rs +++ b/swarm/src/connection/substream.rs @@ -23,7 +23,7 @@ use libp2p_core::multiaddr::Multiaddr; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use smallvec::SmallVec; use std::sync::Arc; -use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll}; +use std::{fmt, pin::Pin, task::Context, task::Poll}; /// Endpoint for a received substream. #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -134,7 +134,7 @@ where pub fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, IoError>> { + ) -> Poll, TMuxer::Error>> { // Polling inbound substream. match self.inner.poll_event(cx) { Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { @@ -143,7 +143,7 @@ where Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))) } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => {} } @@ -164,7 +164,7 @@ where } Poll::Ready(Err(err)) => { self.inner.destroy_outbound(outbound); - return Poll::Ready(Err(err.into())); + return Poll::Ready(Err(err)); } } } @@ -203,13 +203,13 @@ impl Future for Close where TMuxer: StreamMuxer, { - type Output = Result<(), IoError>; + type Output = Result<(), TMuxer::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.muxer.poll_close(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), } } }