From 9b8861bb2da5217b2fc5ce8857a56c570edbf86b Mon Sep 17 00:00:00 2001 From: Pixelstorm Date: Tue, 27 Feb 2024 18:26:38 +0000 Subject: [PATCH 1/3] Refactor stream priority management The main goal of this refactor is to remove a `RefCell` that is preventing `Connection` from being `Sync`. This is achieved by replacing it with a monotonically decreasing 'recency' counter. See #1769 for more information. --- quinn-proto/src/connection/streams/mod.rs | 97 ++++++++++----------- quinn-proto/src/connection/streams/state.rs | 86 ++++++------------ 2 files changed, 73 insertions(+), 110 deletions(-) diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index 4841a69f2..e5728c751 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -1,7 +1,4 @@ -use std::{ - cell::RefCell, - collections::{hash_map, BinaryHeap, VecDeque}, -}; +use std::collections::{hash_map, BinaryHeap}; use bytes::Bytes; use thiserror::Error; @@ -237,7 +234,7 @@ impl<'a> SendStream<'a> { self.state.unacked_data += written.bytes as u64; trace!(stream = %self.id, "wrote {} bytes", written.bytes); if !was_pending { - push_pending(&mut self.state.pending, self.id, stream.priority); + self.state.pending.push_pending(self.id, stream.priority); } Ok(written) } @@ -268,7 +265,7 @@ impl<'a> SendStream<'a> { let was_pending = stream.is_pending(); stream.finish()?; if !was_pending { - push_pending(&mut self.state.pending, self.id, stream.priority); + self.state.pending.push_pending(self.id, stream.priority); } Ok(()) @@ -335,60 +332,54 @@ impl<'a> SendStream<'a> { } } -fn push_pending(pending: &mut BinaryHeap, id: StreamId, priority: i32) { - for level in pending.iter() { - if priority == level.priority { - level.queue.borrow_mut().push_back(id); - return; - } - } - - // If there is only a single level and it's empty, repurpose it for the - // required priority - if pending.len() == 1 { - if let Some(mut first) = pending.peek_mut() { - let mut queue = first.queue.borrow_mut(); - if queue.is_empty() { - queue.push_back(id); - drop(queue); - first.priority = priority; - return; - } - } - } - - let mut queue = VecDeque::new(); - queue.push_back(id); - pending.push(PendingLevel { - queue: RefCell::new(queue), - priority, - }); +/// A queue of streams with pending outgoing data, sorted by priority +struct PendingStreamsQueue { + streams: BinaryHeap, + /// A monotonically decreasing counter, used to implement round-robin scheduling for streams of the same priority. + /// Underflowing is not a practical concern, as it is initialized to u64::MAX and only decremented by 1 in `push_pending` + recency: u64, } -struct PendingLevel { - // RefCell is needed because BinaryHeap doesn't have an iter_mut() - queue: RefCell>, - priority: i32, -} - -impl PartialEq for PendingLevel { - fn eq(&self, other: &Self) -> bool { - self.priority.eq(&other.priority) +impl PendingStreamsQueue { + fn new() -> Self { + Self { + streams: BinaryHeap::new(), + recency: u64::MAX, + } } -} -impl PartialOrd for PendingLevel { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) + /// Push a pending stream ID with the given priority, queued after any already-queued streams for the priority + fn push_pending(&mut self, id: StreamId, priority: i32) { + // As the recency counter is monotonically decreasing, we know that using its value to sort this stream will queue it + // after all other queued streams of the same priority. + // This is enough to implement round-robin scheduling for streams that are still pending even after being handled, + // as in that case they are removed from the `BinaryHeap`, handled, and then immediately reinserted. + self.recency -= 1; + self.streams.push(PendingStream { + priority, + recency: self.recency, + id, + }); } } -impl Eq for PendingLevel {} - -impl Ord for PendingLevel { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.priority.cmp(&other.priority) - } +/// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency +#[derive(PartialEq, Eq, PartialOrd, Ord)] +struct PendingStream { + /// The priority of the stream + // Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct + // (See https://doc.rust-lang.org/stable/std/cmp/trait.Ord.html#derivable) + priority: i32, + /// A tie-breaker for streams of the same priority, used to improve fairness by implementing round-robin scheduling: + /// Larger values are prioritized, so it is initialised to `u64::MAX`, and when a stream writes data, we know + /// that it currently has the highest recency value, so it is deprioritized by setting its recency to 1 less than the + /// previous lowest recency value, such that all other streams of this priority will get processed once before we get back + /// round to this one + recency: u64, + /// The ID of the stream + // The way this type is used ensures that every instance has a unique `recency` value, so this field should be kept below + // the `priority` and `recency` fields, so that it does not interfere with the behaviour of the `Ord` derive + id: StreamId, } /// Application events about streams diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index 9413da75b..e541bc199 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -1,5 +1,5 @@ use std::{ - collections::{binary_heap::PeekMut, hash_map, BinaryHeap, VecDeque}, + collections::{hash_map, VecDeque}, convert::TryFrom, mem, }; @@ -9,7 +9,7 @@ use rustc_hash::FxHashMap; use tracing::{debug, trace}; use super::{ - push_pending, PendingLevel, Recv, Retransmits, Send, SendState, ShouldTransmit, StreamEvent, + PendingStreamsQueue, Recv, Retransmits, Send, SendState, ShouldTransmit, StreamEvent, StreamHalf, ThinRetransmits, }; use crate::{ @@ -52,8 +52,8 @@ pub struct StreamsState { /// This differs from `self.send.len()` in that it does not include streams that the peer is /// permitted to open but which have not yet been opened. pub(super) send_streams: usize, - /// Streams with outgoing data queued - pub(super) pending: BinaryHeap, + /// Streams with outgoing data queued, sorted by priority + pub(super) pending: PendingStreamsQueue, events: VecDeque, /// Streams blocked on connection-level flow control or stream window space @@ -115,7 +115,7 @@ impl StreamsState { opened: [false, false], next_reported_remote: [0, 0], send_streams: 0, - pending: BinaryHeap::new(), + pending: PendingStreamsQueue::new(), events: VecDeque::new(), connection_blocked: Vec::new(), max_data: 0, @@ -192,7 +192,7 @@ impl StreamsState { } } - self.pending.clear(); + self.pending.streams.clear(); self.send_streams = 0; self.data_sent = 0; self.connection_blocked.clear(); @@ -345,13 +345,11 @@ impl StreamsState { /// Whether any stream data is queued, regardless of control frames pub(crate) fn can_send_stream_data(&self) -> bool { // Reset streams may linger in the pending stream list, but will never produce stream frames - self.pending.iter().any(|level| { - level.queue.borrow().iter().any(|id| { - self.send - .get(id) - .and_then(|s| s.as_ref()) - .map_or(false, |s| !s.is_reset()) - }) + self.pending.streams.iter().any(|stream| { + self.send + .get(&stream.id) + .and_then(|s| s.as_ref()) + .map_or(false, |s| !s.is_reset()) }) } @@ -503,26 +501,14 @@ impl StreamsState { break; } - let num_levels = self.pending.len(); - let mut level = match self.pending.peek_mut() { - Some(x) => x, - None => break, - }; - // Poppping data from the front of the queue, storing as much data - // as possible in a single frame, and enqueing sending further - // remaining data at the end of the queue helps with fairness. - // Other streams will have a chance to write data before we touch - // this stream again. - let id = match level.queue.get_mut().pop_front() { - Some(x) => x, - None => { - debug_assert!( - num_levels == 1, - "An empty queue is only allowed for a single level" - ); - break; - } + // Pop the stream of the highest priority that currently has pending data + // If the stream still has some pending data left after writing, it will be reinserted, otherwise not + let Some(stream) = self.pending.streams.pop() else { + break; }; + + let id = stream.id; + let stream = match self.send.get_mut(&id).and_then(|s| s.as_mut()) { Some(s) => s, // Stream was reset with pending data and the reset was acknowledged @@ -547,24 +533,10 @@ impl StreamsState { } if stream.is_pending() { - if level.priority == stream.priority { - // Enqueue for the same level - level.queue.get_mut().push_back(id); - } else { - // Enqueue for a different level. If the current level is empty, drop it - if level.queue.borrow().is_empty() && num_levels != 1 { - // We keep the last level around even in empty form so that - // the next insert doesn't have to reallocate the queue - PeekMut::pop(level); - } else { - drop(level); - } - push_pending(&mut self.pending, id, stream.priority); - } - } else if level.queue.borrow().is_empty() && num_levels != 1 { - // We keep the last level around even in empty form so that - // the next insert doesn't have to reallocate the queue - PeekMut::pop(level); + // If the stream still has pending data, reinsert it, possibly with an updated priority value + // Fairness with other streams is achieved by implementing round-robin scheduling, + // so that the other streams will have a chance to write data before we touch this stream again. + self.pending.push_pending(id, stream.priority); } let meta = frame::StreamMeta { id, offsets, fin }; @@ -644,7 +616,7 @@ impl StreamsState { Some(x) => x, }; if !stream.is_pending() { - push_pending(&mut self.pending, frame.id, stream.priority); + self.pending.push_pending(frame.id, stream.priority); } stream.fin_pending |= frame.fin; stream.pending.retransmit(frame.offsets); @@ -664,7 +636,7 @@ impl StreamsState { continue; } if !stream.is_pending() { - push_pending(&mut self.pending, id, stream.priority); + self.pending.push_pending(id, stream.priority); } stream.pending.retransmit_all_for_0rtt(); } @@ -1324,7 +1296,7 @@ mod tests { assert_eq!(meta[2].id, id_low); assert!(!server.can_send_stream_data()); - assert_eq!(server.pending.len(), 1); + assert_eq!(server.pending.streams.len(), 0); } #[test] @@ -1353,7 +1325,7 @@ mod tests { conn_state: &state, }; assert_eq!(mid.write(b"mid").unwrap(), 3); - assert_eq!(server.pending.len(), 1); + assert_eq!(server.pending.streams.len(), 1); let mut high = SendStream { id: id_high, @@ -1363,7 +1335,7 @@ mod tests { }; high.set_priority(1).unwrap(); assert_eq!(high.write(&[0; 200]).unwrap(), 200); - assert_eq!(server.pending.len(), 2); + assert_eq!(server.pending.streams.len(), 2); // Requeue the high priority stream to lowest priority. The initial send // still uses high priority since it's queued that way. After that it will @@ -1382,7 +1354,7 @@ mod tests { assert_eq!(meta[0].id, id_high); // After requeuing we should end up with 2 priorities - not 3 - assert_eq!(server.pending.len(), 2); + assert_eq!(server.pending.streams.len(), 2); // Send the remaining data. The initial mid priority one should go first now let meta = server.write_stream_frames(&mut buf, 1000); @@ -1391,7 +1363,7 @@ mod tests { assert_eq!(meta[1].id, id_high); assert!(!server.can_send_stream_data()); - assert_eq!(server.pending.len(), 1); + assert_eq!(server.pending.streams.len(), 0); } #[test] From f4dbada425ee5653332d312cd9863a0c06ff243e Mon Sep 17 00:00:00 2001 From: Pixelstorm Date: Sat, 6 Apr 2024 17:00:37 +0100 Subject: [PATCH 2/3] Add sync bounds to traits --- quinn-proto/src/cid_generator.rs | 2 +- quinn-proto/src/congestion.rs | 2 +- quinn-proto/src/crypto.rs | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/quinn-proto/src/cid_generator.rs b/quinn-proto/src/cid_generator.rs index 0b7f502d3..2b36cd86b 100644 --- a/quinn-proto/src/cid_generator.rs +++ b/quinn-proto/src/cid_generator.rs @@ -6,7 +6,7 @@ use crate::shared::ConnectionId; use crate::MAX_CID_SIZE; /// Generates connection IDs for incoming connections -pub trait ConnectionIdGenerator: Send { +pub trait ConnectionIdGenerator: Send + Sync { /// Generates a new CID /// /// Connection IDs MUST NOT contain any information that can be used by diff --git a/quinn-proto/src/congestion.rs b/quinn-proto/src/congestion.rs index ec6bde82c..fac6e80e5 100644 --- a/quinn-proto/src/congestion.rs +++ b/quinn-proto/src/congestion.rs @@ -14,7 +14,7 @@ pub use cubic::{Cubic, CubicConfig}; pub use new_reno::{NewReno, NewRenoConfig}; /// Common interface for different congestion controllers -pub trait Controller: Send { +pub trait Controller: Send + Sync { /// One or more packets were just sent #[allow(unused_variables)] fn on_sent(&mut self, now: Instant, bytes: u64, last_packet_number: u64) {} diff --git a/quinn-proto/src/crypto.rs b/quinn-proto/src/crypto.rs index 3c2cb5c04..b2d5afdcb 100644 --- a/quinn-proto/src/crypto.rs +++ b/quinn-proto/src/crypto.rs @@ -25,7 +25,7 @@ pub(crate) mod ring; pub mod rustls; /// A cryptographic session (commonly TLS) -pub trait Session: Send + 'static { +pub trait Session: Send + Sync + 'static { /// Create the initial set of keys given the client's initial destination ConnectionId fn initial_keys(&self, dst_cid: &ConnectionId, side: Side) -> Keys; @@ -146,7 +146,7 @@ pub trait ServerConfig: Send + Sync { } /// Keys used to protect packet payloads -pub trait PacketKey: Send { +pub trait PacketKey: Send + Sync { /// Encrypt the packet payload with the given packet number fn encrypt(&self, packet: u64, buf: &mut [u8], header_len: usize); /// Decrypt the packet payload with the given packet number @@ -166,7 +166,7 @@ pub trait PacketKey: Send { } /// Keys used to protect packet headers -pub trait HeaderKey: Send { +pub trait HeaderKey: Send + Sync { /// Decrypt the given packet's header fn decrypt(&self, pn_offset: usize, packet: &mut [u8]); /// Encrypt the given packet's header From 6a92ccc4b1d850066cbb8353fba6be7027a93ba1 Mon Sep 17 00:00:00 2001 From: Pixelstorm Date: Sat, 6 Apr 2024 17:16:28 +0100 Subject: [PATCH 3/3] Add test ensuring send & sync are impled --- quinn-proto/src/tests/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index e631003f0..62b553936 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -2828,3 +2828,10 @@ fn reject_manually() { }) if close.error_code == TransportErrorCode::CONNECTION_REFUSED )); } + +#[test] +fn endpoint_and_connection_impl_send_sync() { + const fn is_send_sync() {} + is_send_sync::(); + is_send_sync::(); +}