Skip to content

Commit

Permalink
feat(priority): allow setting priority of a stream
Browse files Browse the repository at this point in the history
Sending uni directional messages can now be done with a priority.
The initial value for any stream is 0, and an i32 is used to go above or
 below that value, while keeping an intuitive default.
We also expose this API on the public SendStream type, as it allows
downstream consumers more freedom in the usage of the feat.
  • Loading branch information
oetyng committed Aug 11, 2021
1 parent d7ca7b7 commit a0cf893
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 30 deletions.
4 changes: 2 additions & 2 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn main() -> Result<()> {
let msg = Bytes::from(MSG_MARCO);
println!("Sending to {:?} --> {:?}\n", peer, msg);
node.connect_to(&peer).await?;
node.send_message(msg.clone(), &peer).await?;
node.send_message(msg.clone(), &peer, 0).await?;
}
}

Expand All @@ -72,7 +72,7 @@ async fn main() -> Result<()> {
println!("Received from {:?} --> {:?}", socket_addr, bytes);
if bytes == *MSG_MARCO {
let reply = Bytes::from(MSG_POLO);
node.send_message(reply.clone(), &socket_addr).await?;
node.send_message(reply.clone(), &socket_addr, 0).await?;
println!("Replied to {:?} --> {:?}", socket_addr, reply);
}
println!();
Expand Down
39 changes: 34 additions & 5 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,22 @@ impl<I: ConnId> Connection<I> {
Self { quic_conn, remover }
}

pub(crate) async fn open_bi(&self) -> Result<(SendStream, RecvStream)> {
/// Priority default is 0. Both lower and higher can be passed in.
pub(crate) async fn open_bi(&self, priority: i32) -> Result<(SendStream, RecvStream)> {
let (send_stream, recv_stream) = self.handle_error(self.quic_conn.open_bi().await).await?;
Ok((SendStream::new(send_stream), RecvStream::new(recv_stream)))
let send_stream = SendStream::new(send_stream);
send_stream.set_priority(priority)?;
Ok((send_stream, RecvStream::new(recv_stream)))
}

/// Send message to peer using a uni-directional stream.
pub(crate) async fn send_uni(&self, msg: Bytes) -> Result<()> {
/// Priority default is 0. Both lower and higher can be passed in.
pub(crate) async fn send_uni(&self, msg: Bytes, priority: i32) -> Result<()> {
let mut send_stream = self.handle_error(self.quic_conn.open_uni().await).await?;
send_stream
.set_priority(priority)
.map_err(|_| Error::UnknownStream)?;

self.handle_error(send_msg(&mut send_stream, msg).await)
.await?;

Expand Down Expand Up @@ -112,6 +120,27 @@ impl SendStream {
Self { quinn_send_stream }
}

/// Set the priority of the send stream
///
/// Every send stream has an initial priority of 0. Locally buffered data from streams with
/// higher priority will be transmitted before data from streams with lower priority. Changing
/// the priority of a stream with pending data may only take effect after that data has been
/// transmitted. Using many different priority levels per connection may have a negative
/// impact on performance.
pub fn set_priority(&self, priority: i32) -> Result<()> {
self.quinn_send_stream
.set_priority(priority)
.map_err(|_| Error::UnknownStream)?;
Ok(())
}

/// Get the priority of the send stream
pub fn priority(&self) -> Result<i32> {
self.quinn_send_stream
.priority()
.map_err(|_| Error::UnknownStream)
}

/// Send a message using the stream created by the initiator
pub async fn send_user_msg(&mut self, msg: Bytes) -> Result<()> {
send_msg(&mut self.quinn_send_stream, msg).await
Expand Down Expand Up @@ -374,7 +403,7 @@ async fn handle_endpoint_verification_req<I: ConnId>(
peer_addr
);
// Verify if the peer's endpoint is reachable via EchoServiceReq
let (mut temp_send, mut temp_recv) = endpoint.open_bidirectional_stream(&addr_sent).await?;
let (mut temp_send, mut temp_recv) = endpoint.open_bidirectional_stream(&addr_sent, 0).await?;
let message = WireMsg::EndpointEchoReq;
message
.write_to_stream(&mut temp_send.quinn_send_stream)
Expand Down Expand Up @@ -434,7 +463,7 @@ mod tests {
.get_connection(&peer2_addr)
.await
.ok_or(Error::MissingConnection)?;
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
let (mut send_stream, mut recv_stream) = connection.open_bi(0).await?;
let message = WireMsg::EndpointEchoReq;
message
.write_to_stream(&mut send_stream.quinn_send_stream)
Expand Down
29 changes: 21 additions & 8 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl<I: ConnId> Endpoint<I> {
.get_connection(contact)
.await
.ok_or(Error::MissingConnection)?;
let (mut send, mut recv) = connection.open_bi().await?;
let (mut send, mut recv) = connection.open_bi(0).await?;
send.send(WireMsg::EndpointVerificationReq(addr)).await?;
let response = timeout(
Duration::from_secs(ECHO_SERVICE_QUERY_TIMEOUT),
Expand Down Expand Up @@ -495,39 +495,52 @@ impl<I: ConnId> Endpoint<I> {
}

/// Open a bi-directional peer with a given peer
/// Priority default is 0. Both lower and higher can be passed in.
pub async fn open_bidirectional_stream(
&self,
peer_addr: &SocketAddr,
priority: i32,
) -> Result<(SendStream, RecvStream)> {
self.connect_to(peer_addr).await?;
let connection = self
.get_connection(peer_addr)
.await
.ok_or(Error::MissingConnection)?;
connection.open_bi().await
connection.open_bi(priority).await
}

/// Sends a message to a peer. This will attempt to use an existing connection
/// to the destination peer. If a connection does not exist, this will fail with `Error::MissingConnection`
pub async fn try_send_message(&self, msg: Bytes, dest: &SocketAddr) -> Result<()> {
/// Priority default is 0. Both lower and higher can be passed in.
pub async fn try_send_message(
&self,
msg: Bytes,
dest: &SocketAddr,
priority: i32,
) -> Result<()> {
let connection = self
.get_connection(dest)
.await
.ok_or(Error::MissingConnection)?;
connection.send_uni(msg).await?;
connection.send_uni(msg, priority).await?;
Ok(())
}

/// Sends a message to a peer. This will attempt to use an existing connection
/// to the peer first. If this connection is broken or doesn't exist
/// a new connection is created and the message is sent.
pub async fn send_message(&self, msg: Bytes, dest: &SocketAddr) -> Result<()> {
if self.try_send_message(msg.clone(), dest).await.is_ok() {
/// Priority default is 0. Both lower and higher can be passed in.
pub async fn send_message(&self, msg: Bytes, dest: &SocketAddr, priority: i32) -> Result<()> {
if self
.try_send_message(msg.clone(), dest, priority)
.await
.is_ok()
{
return Ok(());
}
self.connect_to(dest).await?;

self.retry(|| async { Ok(self.try_send_message(msg.clone(), dest).await?) })
self.retry(|| async { Ok(self.try_send_message(msg.clone(), dest, priority).await?) })
.await
}

Expand All @@ -554,7 +567,7 @@ impl<I: ConnId> Endpoint<I> {
.get_connection(&node)
.await
.ok_or(Error::MissingConnection)?;
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
let (mut send_stream, mut recv_stream) = connection.open_bi(0).await?;
send_stream.send(WireMsg::EndpointEchoReq).await?;
match WireMsg::read_from_stream(&mut recv_stream.quinn_recv_stream).await {
Ok(WireMsg::EndpointEchoResp(socket_addr)) => Ok(socket_addr),
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ pub enum Error {
/// Failed to create a new endpoint.
#[error("Creating endpoint")]
Endpoint(#[from] quinn::EndpointError),
/// Failed to set/get priority of stream.
#[error("Unknown stream, cannot set/get priority.")]
UnknownStream,
/// Certificate for secure communication couldn't be parsed.
#[error("Cannot parse certificate ")]
CertificateParse,
Expand Down
30 changes: 15 additions & 15 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn single_message() -> Result<()> {
peer2.connect_to(&peer1_addr).await?;
let msg_from_peer2 = random_msg(1024);
peer2
.send_message(msg_from_peer2.clone(), &peer1_addr)
.send_message(msg_from_peer2.clone(), &peer1_addr, 0)
.await?;

// Peer 1 gets an incoming connection
Expand Down Expand Up @@ -84,7 +84,7 @@ async fn reuse_outgoing_connection() -> Result<()> {
// Connect for the first time and send a message.
alice.connect_to(&bob_addr).await?;
let msg0 = random_msg(1024);
alice.send_message(msg0.clone(), &bob_addr).await?;
alice.send_message(msg0.clone(), &bob_addr, 0).await?;

// Bob should recieve an incoming connection and message
if let Some(connecting_peer) = bob_incoming_connections.next().await {
Expand All @@ -103,7 +103,7 @@ async fn reuse_outgoing_connection() -> Result<()> {
// Try connecting again and send a message
alice.connect_to(&bob_addr).await?;
let msg1 = random_msg(1024);
alice.send_message(msg1.clone(), &bob_addr).await?;
alice.send_message(msg1.clone(), &bob_addr, 0).await?;

// Bob *should not* get an incoming connection since there is already a connection established
if let Ok(Some(connecting_peer)) =
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn reuse_incoming_connection() -> Result<()> {
// Connect for the first time and send a message.
alice.connect_to(&bob_addr).await?;
let msg0 = random_msg(1024);
alice.send_message(msg0.clone(), &bob_addr).await?;
alice.send_message(msg0.clone(), &bob_addr, 0).await?;

// Bob should recieve an incoming connection and message
if let Some(connecting_peer) = bob_incoming_connections.next().await {
Expand All @@ -154,7 +154,7 @@ async fn reuse_incoming_connection() -> Result<()> {
// Bob tries to connect to alice and sends a message
bob.connect_to(&alice_addr).await?;
let msg1 = random_msg(1024);
bob.send_message(msg1.clone(), &alice_addr).await?;
bob.send_message(msg1.clone(), &alice_addr, 0).await?;

// Alice *will not* get an incoming connection since there is already a connection established
// However, Alice will still get the incoming message
Expand Down Expand Up @@ -254,10 +254,10 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
}

let msg0 = random_msg(1024);
alice.send_message(msg0.clone(), &bob_addr).await?;
alice.send_message(msg0.clone(), &bob_addr, 0).await?;

let msg1 = random_msg(1024);
bob.send_message(msg1.clone(), &alice_addr).await?;
bob.send_message(msg1.clone(), &alice_addr, 0).await?;

if let Some((src, message)) = alice_incoming_messages.next().await {
assert_eq!(src, bob_addr);
Expand Down Expand Up @@ -294,7 +294,7 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
}

let msg2 = random_msg(1024);
bob.send_message(msg2.clone(), &alice_addr).await?;
bob.send_message(msg2.clone(), &alice_addr, 0).await?;

if let Some((src, message)) = alice_incoming_messages.next().await {
assert_eq!(src, bob_addr);
Expand Down Expand Up @@ -333,10 +333,10 @@ async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {

// Send two messages, one from each end
let msg0 = random_msg(1024);
alice.send_message(msg0.clone(), &bob_addr).await?;
alice.send_message(msg0.clone(), &bob_addr, 0).await?;

let msg1 = random_msg(1024);
bob.send_message(msg1.clone(), &alice_addr).await?;
bob.send_message(msg1.clone(), &alice_addr, 0).await?;

// Both messages are received at the other end
if let Some((src, message)) = alice_incoming_messages.next().await {
Expand Down Expand Up @@ -395,7 +395,7 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
// Send the hash result back.
sending_endpoint.connect_to(&src).await?;
sending_endpoint
.send_message(hash_result.to_vec().into(), &src)
.send_message(hash_result.to_vec().into(), &src, 0)
.await?;

Ok::<_, anyhow::Error>(())
Expand Down Expand Up @@ -428,7 +428,7 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
let _ = hash_results.insert(hash(message));
info!("sender #{} sending message #{}", id, index);
send_endpoint
.send_message(message.clone(), &server_addr)
.send_message(message.clone(), &server_addr, 0)
.await?;
}

Expand Down Expand Up @@ -502,7 +502,7 @@ async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<(

// Send the hash result back.
sending_endpoint
.send_message(hash_result.to_vec().into(), &src)
.send_message(hash_result.to_vec().into(), &src, 0)
.await?;

assert!(!logs_contain("error"));
Expand Down Expand Up @@ -538,7 +538,7 @@ async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<(

info!("sender #{} sending message #{}", id, index);
send_endpoint
.send_message(message.clone(), &server_addr)
.send_message(message.clone(), &server_addr, 0)
.await?;
}

Expand Down Expand Up @@ -604,7 +604,7 @@ async fn many_messages() -> Result<()> {
info!("sending {}", id);
let msg = id.to_le_bytes().to_vec().into();
endpoint.connect_to(&recv_addr).await?;
endpoint.send_message(msg, &recv_addr).await?;
endpoint.send_message(msg, &recv_addr, 0).await?;
info!("sent {}", id);

Ok::<_, anyhow::Error>(())
Expand Down

0 comments on commit a0cf893

Please sign in to comment.