Skip to content

Commit

Permalink
fix!: Fix WireMsg visibility
Browse files Browse the repository at this point in the history
This was marked `pub` and appeared in some APIs (`SendStream::send`,
`Error::UnexpectedMessageType`) even though the type itself is
unreachable (`wire_msg` mod is private and `WireMsg` is not
re-exported).

The type itself has been marked `pub(crate)`, along with its associated
functions, and functions that use it. The `Error::UnexpectedMessageType`
variant has been changed to include a public newtype, which for users
would behave like an opaque error. This assumes the intention was for
the type not to be exposed.

BREAKING CHANGE: `SendStream::send` is no longer public.
`Error::UnexpectedMessageType` now contains a simple error struct.
  • Loading branch information
Chris Connelly authored and connec committed Aug 27, 2021
1 parent db74509 commit 33e9b3b
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 24 deletions.
16 changes: 10 additions & 6 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,18 @@ impl RecvStream {
Self { quinn_recv_stream }
}

/// Read next message from the stream
/// Read next user message from the stream
pub async fn next(&mut self) -> Result<Bytes> {
match read_bytes(&mut self.quinn_recv_stream).await {
Ok(WireMsg::UserMsg(bytes)) => Ok(bytes),
Ok(msg) => Err(Error::UnexpectedMessageType(msg)),
Err(error) => Err(error),
match self.next_wire_msg().await? {
WireMsg::UserMsg(bytes) => Ok(bytes),
msg => Err(Error::UnexpectedMessageType(msg.into())),
}
}

/// Read next wire msg from the stream
pub(crate) async fn next_wire_msg(&mut self) -> Result<WireMsg> {
read_bytes(&mut self.quinn_recv_stream).await
}
}

impl Debug for RecvStream {
Expand Down Expand Up @@ -149,7 +153,7 @@ impl SendStream {
}

/// Send a wire message
pub async fn send(&mut self, msg: WireMsg) -> Result<()> {
pub(crate) async fn send(&mut self, msg: WireMsg) -> Result<()> {
msg.write_to_stream(&mut self.quinn_send_stream).await
}

Expand Down
17 changes: 5 additions & 12 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl<I: ConnId> Endpoint<I> {
"Unexpected message when verifying public endpoint: {}",
other
);
return Err(Error::UnexpectedMessageType(other));
return Err(Error::UnexpectedMessageType(other.into()));
}
Ok(Err(err)) => {
error!("Error while verifying Public IP Address");
Expand Down Expand Up @@ -416,25 +416,18 @@ impl<I: ConnId> Endpoint<I> {

match timeout(
Duration::from_secs(ECHO_SERVICE_QUERY_TIMEOUT),
recv_stream.next(),
recv_stream.next_wire_msg(),
)
.await
{
Ok(Err(Error::UnexpectedMessageType(WireMsg::EndpointEchoResp(_)))) => Ok(()),
Ok(Err(Error::UnexpectedMessageType(other))) => {
Ok(Ok(WireMsg::EndpointEchoResp(_))) => Ok(()),
Ok(Ok(other)) => {
info!(
"Unexpected message type when verifying reachability: {}",
&other
);
Ok(())
}
Ok(Ok(bytes)) => {
info!(
"Unexpected message type when verifying reachability: {}",
WireMsg::UserMsg(bytes)
);
Ok(())
}
Ok(Err(err)) => {
info!("Unable to contact peer: {:?}", err);
Err(err)
Expand Down Expand Up @@ -527,7 +520,7 @@ impl<I: ConnId> Endpoint<I> {
send_stream.send(WireMsg::EndpointEchoReq).await?;
match WireMsg::read_from_stream(&mut recv_stream.quinn_recv_stream).await {
Ok(WireMsg::EndpointEchoResp(socket_addr)) => Ok(socket_addr),
Ok(msg) => Err(Error::UnexpectedMessageType(msg)),
Ok(msg) => Err(Error::UnexpectedMessageType(msg.into())),
Err(err) => Err(err),
}
});
Expand Down
15 changes: 13 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ pub enum Error {
#[error("Empty response message received from peer")]
EmptyResponse,
/// The type of message received is not the expected one.
#[error("Type of the message received was not the expected one: {0}")]
UnexpectedMessageType(WireMsg),
#[error(transparent)]
UnexpectedMessageType(#[from] UnexpectedMessageType),
/// The message exceeds the maximum message length allowed.
#[error("Maximum data length exceeded, length: {0}")]
MaxLengthExceeded(usize),
Expand Down Expand Up @@ -300,3 +300,14 @@ impl fmt::Display for TransportErrorCode {
write!(f, "{}", self.0)
}
}

/// The type of message received is not the expected one.
#[derive(Debug, Error)]
#[error("The of the message received was not the expected one: {0}")]
pub struct UnexpectedMessageType(WireMsg);

impl From<WireMsg> for UnexpectedMessageType {
fn from(msg: WireMsg) -> Self {
UnexpectedMessageType(msg)
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub use connections::{DisconnectionEvents, RecvStream, SendStream};
pub use endpoint::{Endpoint, IncomingConnections, IncomingMessages};
pub use error::{
ClientEndpointError, Close, ConnectionError, Error, InternalConfigError, Result,
TransportErrorCode,
TransportErrorCode, UnexpectedMessageType,
};

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions src/wire_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const MSG_PROTOCOL_VERSION: u16 = 0x0001;

/// Final type serialised and sent on the wire by `QuicP2p`
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum WireMsg {
pub(crate) enum WireMsg {
EndpointEchoReq,
EndpointEchoResp(SocketAddr),
EndpointVerificationReq(SocketAddr),
Expand All @@ -34,7 +34,7 @@ const ECHO_SRVC_MSG_FLAG: u8 = 0x01;

impl WireMsg {
// Read a message's bytes from the provided stream
pub async fn read_from_stream(recv: &mut quinn::RecvStream) -> Result<Self> {
pub(crate) async fn read_from_stream(recv: &mut quinn::RecvStream) -> Result<Self> {
let mut header_bytes = [0; MSG_HEADER_LEN];
recv.read_exact(&mut header_bytes).await?;

Expand Down Expand Up @@ -64,7 +64,7 @@ impl WireMsg {
}

// Helper to write WireMsg bytes to the provided stream.
pub async fn write_to_stream(&self, send_stream: &mut quinn::SendStream) -> Result<()> {
pub(crate) async fn write_to_stream(&self, send_stream: &mut quinn::SendStream) -> Result<()> {
// Let's generate the message bytes
let (msg_bytes, msg_flag) = match self {
WireMsg::UserMsg(ref m) => (m.clone(), USER_MSG_FLAG),
Expand Down

0 comments on commit 33e9b3b

Please sign in to comment.