Skip to content

Commit

Permalink
feat: take three Bytes for each send
Browse files Browse the repository at this point in the history
BREAKING CHANGE: changes bytes interface for message send/read

This more closely reflects msg header, destintation and payload
parts, and allows for greater flexibility and deduplication when
sending repeat headers/payloads to different peers
  • Loading branch information
joshuef committed Sep 5, 2022
1 parent 5be24ee commit dcc35e5
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 101 deletions.
Binary file added .DS_Store
Binary file not shown.
8 changes: 5 additions & 3 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<()> {
let msg = Bytes::from(MSG_MARCO);
println!("Sending to {:?} --> {:?}\n", peer, msg);
let (conn, mut incoming) = node.connect_to(&peer).await?;
conn.send(msg.clone()).await?;
conn.send((Bytes::new(), Bytes::new(), msg.clone())).await?;
// `Endpoint` no longer having `connection_pool` to hold established connection.
// Which means the connection get closed immediately when it reaches end of life span.
// And causes the receiver side a sending error when reply via the in-coming connection.
Expand All @@ -75,11 +75,13 @@ async fn main() -> Result<()> {
let src = connection.remote_address();

// loop over incoming messages
while let Some(bytes) = incoming_messages.next().await? {
while let Some((_, _, bytes)) = incoming_messages.next().await? {
println!("Received from {:?} --> {:?}", src, bytes);
if bytes == *MSG_MARCO {
let reply = Bytes::from(MSG_POLO);
connection.send(reply.clone()).await?;
connection
.send((Bytes::new(), Bytes::new(), reply.clone()))
.await?;
println!("Replied to {:?} --> {:?}", src, reply);
}
println!();
Expand Down
56 changes: 31 additions & 25 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
use crate::{
config::{RetryConfig, SERVER_NAME},
error::{ConnectionError, RecvError, RpcError, SendError, SerializationError, StreamError},
wire_msg::WireMsg,
wire_msg::{UsrMsgBytes, WireMsg},
};
use bytes::Bytes;
use futures::{
future,
stream::{self, Stream, StreamExt, TryStream, TryStreamExt},
Expand Down Expand Up @@ -96,24 +95,24 @@ impl Connection {
/// [`Config`](crate::Config) that was used to construct the [`Endpoint`] this connection
/// belongs to. See [`send_with`](Self::send_with) if you want to send a message with specific
/// configuration.
pub async fn send(&self, msg: Bytes) -> Result<(), SendError> {
self.send_with(msg, 0, None).await
pub async fn send(&self, bytes: UsrMsgBytes) -> Result<(), SendError> {
self.send_with(bytes, 0, None).await
}

/// Send a message to the peer using the given configuration.
///
/// See [`send`](Self::send) if you want to send with the default configuration.
pub async fn send_with(
&self,
msg: Bytes,
user_msg_bytes: UsrMsgBytes,
priority: i32,
retry_config: Option<&RetryConfig>,
) -> Result<(), SendError> {
match retry_config.or(self.default_retry_config.as_deref()) {
Some(retry_config) => {
retry_config
.retry(|| async {
self.send_uni(msg.clone(), priority)
self.send_uni(user_msg_bytes.clone(), priority)
.await
.map_err(|error| match &error {
// don't retry on connection loss, since we can't recover that from here
Expand All @@ -124,7 +123,7 @@ impl Connection {
.await?;
}
None => {
self.send_uni(msg, priority).await?;
self.send_uni(user_msg_bytes, priority).await?;
}
}
Ok(())
Expand Down Expand Up @@ -160,11 +159,11 @@ impl Connection {
}

/// Opens a uni directional stream and sends message on this stream
async fn send_uni(&self, msg: Bytes, priority: i32) -> Result<(), SendError> {
async fn send_uni(&self, user_msg_bytes: UsrMsgBytes, priority: i32) -> Result<(), SendError> {
let mut send_stream = self.open_uni().await.map_err(SendError::ConnectionLost)?;
send_stream.set_priority(priority);

send_stream.send_user_msg(msg.clone()).await?;
send_stream.send_user_msg(user_msg_bytes).await?;

// We try to make sure the stream is gracefully closed and the bytes get sent, but if it
// was already closed (perhaps by the peer) then we ignore the error.
Expand Down Expand Up @@ -214,8 +213,10 @@ impl SendStream {
/// Send a message over the stream to the peer.
///
/// Messages sent over the stream will arrive at the peer in the order they were sent.
pub async fn send_user_msg(&mut self, msg: Bytes) -> Result<(), SendError> {
WireMsg::UserMsg(msg).write_to_stream(&mut self.inner).await
pub async fn send_user_msg(&mut self, user_msg_bytes: UsrMsgBytes) -> Result<(), SendError> {
WireMsg::UserMsg(user_msg_bytes)
.write_to_stream(&mut self.inner)
.await
}

/// Shut down the send stream gracefully.
Expand Down Expand Up @@ -248,7 +249,7 @@ impl RecvStream {
}

/// Get the next message sent by the peer over this stream.
pub async fn next(&mut self) -> Result<Bytes, RecvError> {
pub async fn next(&mut self) -> Result<UsrMsgBytes, RecvError> {
match self.next_wire_msg().await? {
Some(WireMsg::UserMsg(msg)) => Ok(msg),
msg => Err(SerializationError::unexpected(&msg).into()),
Expand All @@ -269,7 +270,7 @@ impl fmt::Debug for RecvStream {
/// The receiving API for a connection.
#[derive(Debug)]
pub struct ConnectionIncoming {
message_rx: mpsc::Receiver<Result<(Bytes, Option<ResponseStream>), RecvError>>,
message_rx: mpsc::Receiver<Result<(UsrMsgBytes, Option<ResponseStream>), RecvError>>,
_alive_tx: Arc<watch::Sender<()>>,
}

Expand Down Expand Up @@ -302,7 +303,7 @@ impl ConnectionIncoming {
}

/// Get the next message sent by the peer, over any stream.
pub async fn next(&mut self) -> Result<Option<Bytes>, RecvError> {
pub async fn next(&mut self) -> Result<Option<UsrMsgBytes>, RecvError> {
if let Some((bytes, _opt)) = self.next_with_stream().await? {
Ok(Some(bytes))
} else {
Expand All @@ -313,7 +314,7 @@ impl ConnectionIncoming {
/// Get the next message sent by the peer, over any stream along with the stream to respond with.
pub async fn next_with_stream(
&mut self,
) -> Result<Option<(Bytes, Option<ResponseStream>)>, RecvError> {
) -> Result<Option<(UsrMsgBytes, Option<ResponseStream>)>, RecvError> {
self.message_rx.recv().await.transpose()
}
}
Expand All @@ -329,7 +330,7 @@ fn start_message_listeners(
uni_streams: quinn::IncomingUniStreams,
bi_streams: quinn::IncomingBiStreams,
alive_rx: watch::Receiver<()>,
message_tx: mpsc::Sender<Result<(Bytes, Option<ResponseStream>), RecvError>>,
message_tx: mpsc::Sender<Result<(UsrMsgBytes, Option<ResponseStream>), RecvError>>,
) {
let _ = tokio::spawn(listen_on_uni_streams(
peer_addr,
Expand All @@ -351,7 +352,7 @@ async fn listen_on_uni_streams(
peer_addr: SocketAddr,
uni_streams: FilterBenignClose<quinn::IncomingUniStreams>,
mut alive_rx: watch::Receiver<()>,
message_tx: mpsc::Sender<Result<(Bytes, Option<ResponseStream>), RecvError>>,
message_tx: mpsc::Sender<Result<(UsrMsgBytes, Option<ResponseStream>), RecvError>>,
) {
trace!(
"Started listener for incoming uni-streams from {}",
Expand Down Expand Up @@ -425,7 +426,7 @@ async fn listen_on_bi_streams(
peer_addr: SocketAddr,
bi_streams: FilterBenignClose<quinn::IncomingBiStreams>,
mut alive_rx: watch::Receiver<()>,
message_tx: mpsc::Sender<Result<(Bytes, Option<ResponseStream>), RecvError>>,
message_tx: mpsc::Sender<Result<(UsrMsgBytes, Option<ResponseStream>), RecvError>>,
) {
trace!(
"Started listener for incoming bi-streams from {}",
Expand Down Expand Up @@ -461,8 +462,10 @@ async fn listen_on_bi_streams(
Ok(None) => {
break;
}
Ok(Some(WireMsg::UserMsg(msg))) => {
if let Err(msg) = message_tx.send(Ok((msg, Some(arc_mutex.clone())))).await
Ok(Some(WireMsg::UserMsg((header, dst, payload)))) => {
if let Err(msg) = message_tx
.send(Ok(((header, dst, payload), Some(arc_mutex.clone()))))
.await
{
// if we can't send the result, the receiving end is closed so we should stop
trace!("Receiver gone, dropping message: {:?}", msg);
Expand Down Expand Up @@ -669,10 +672,10 @@ mod tests {
p1_tx
.open_uni()
.await?
.send_user_msg(Bytes::from_static(b"hello"))
.send_user_msg((Bytes::new(), Bytes::new(), Bytes::from_static(b"hello")))
.await?;

if let Some(msg) = timeout(p2_rx.next()).await?? {
if let Some((_, _, msg)) = timeout(p2_rx.next()).await?? {
assert_eq!(&msg[..], b"hello");
} else {
bail!("did not receive message when one was expected");
Expand All @@ -681,10 +684,10 @@ mod tests {
p2_tx
.open_uni()
.await?
.send_user_msg(Bytes::from_static(b"world"))
.send_user_msg((Bytes::new(), Bytes::new(), Bytes::from_static(b"world")))
.await?;

if let Some(msg) = timeout(p1_rx.next()).await?? {
if let Some((_, _, msg)) = timeout(p1_rx.next()).await?? {
assert_eq!(&msg[..], b"world");
} else {
bail!("did not receive message when one was expected");
Expand Down Expand Up @@ -730,7 +733,10 @@ mod tests {
tokio::time::sleep(Duration::from_secs(2)).await;

// trying to send a message should fail with an error
match p1_tx.send(b"hello"[..].into()).await {
match p1_tx
.send((Bytes::new(), Bytes::new(), b"hello"[..].into()))
.await
{
Err(SendError::ConnectionLost(ConnectionError::TimedOut)) => {}
res => bail!("unexpected send result: {:?}", res),
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub use error::{
RpcError, SendError, SerializationError, StreamError, TransportErrorCode,
UnsupportedStreamOperation,
};
pub use wire_msg::UsrMsgBytes;

#[cfg(test)]
mod tests;
Loading

0 comments on commit dcc35e5

Please sign in to comment.