Skip to content

Commit

Permalink
Refactor stream priority management
Browse files Browse the repository at this point in the history
The main goal of this refactor is to remove a
`RefCell` that is preventing `Connection` from
being `Sync`. This is achieved by replacing the
`BinaryHeap` with a `BTreeMap`.

See quinn-rs#1769 for more information.
  • Loading branch information
BigWingBeat committed Apr 6, 2024
1 parent 8fbcf08 commit 20f81b2
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 106 deletions.
89 changes: 36 additions & 53 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
cell::RefCell,
collections::{hash_map, BinaryHeap, VecDeque},
collections::{hash_map, BTreeMap},
ops::Bound,
};

use bytes::Bytes;
Expand Down Expand Up @@ -335,60 +335,43 @@ impl<'a> SendStream<'a> {
}
}

fn push_pending(pending: &mut BinaryHeap<PendingLevel>, id: StreamId, priority: i32) {
for level in pending.iter() {
if priority == level.priority {
level.queue.borrow_mut().push_back(id);
return;
}
}

// If there is only a single level and it's empty, repurpose it for the
// required priority
if pending.len() == 1 {
if let Some(mut first) = pending.peek_mut() {
let mut queue = first.queue.borrow_mut();
if queue.is_empty() {
queue.push_back(id);
drop(queue);
first.priority = priority;
return;
}
}
}

let mut queue = VecDeque::new();
queue.push_back(id);
pending.push(PendingLevel {
queue: RefCell::new(queue),
priority,
});
/// Push a pending stream ID with the given priority, queued after all preexisting streams for the priority
fn push_pending(pending: &mut BTreeMap<PendingPriority, StreamId>, id: StreamId, priority: i32) {
// Get all preexisting streams of this priority
let mut range = pending.range((
Bound::Included(PendingPriority {
priority,
recency: 0,
}),
Bound::Included(PendingPriority {
priority,
recency: u64::MAX,
}),
));

// Determine the recency value for this stream
// Setting it to 1 below all preexisting streams of this priority causes it to be sorted after all of those streams in
// the `BTreeMap`. This implements round-robin scheduling for streams that are still pending even after being handled,
// as in that case they are removed and then immediately reinserted.
// If this is the only/first stream for this priority, recency is initialised to u64::MAX instead. As this function is
// the only place that recency is initialised or mutated, this ensures that it will practically never underflow.
let recency = range.next().map(|(p, _)| p.recency - 1).unwrap_or(u64::MAX);

pending.insert(PendingPriority { priority, recency }, id);
}

struct PendingLevel {
// RefCell is needed because BinaryHeap doesn't have an iter_mut()
queue: RefCell<VecDeque<StreamId>>,
/// Key type for a [`BTreeMap`] for streams with pending data, to sort them by priority and recency
#[derive(PartialEq, Eq, PartialOrd, Ord)]
struct PendingPriority {
/// The priority of the stream
// Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct
// (See https://doc.rust-lang.org/stable/std/cmp/trait.Ord.html#derivable)
priority: i32,
}

impl PartialEq for PendingLevel {
fn eq(&self, other: &Self) -> bool {
self.priority.eq(&other.priority)
}
}

impl PartialOrd for PendingLevel {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Eq for PendingLevel {}

impl Ord for PendingLevel {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.priority.cmp(&other.priority)
}
/// A tie-breaker for streams of the same priority, used to improve fairness by implementing round-robin scheduling:
/// Larger values are prioritized, so it is initialised to `u64::MAX`, and when a stream writes data, we know
/// that it currently has the highest recency value, so it is deprioritized by 'leapfrogging' its recency past
/// the recency values of the other streams, down to 1 less than the previous lowest recency value for that priority
recency: u64,
}

/// Application events about streams
Expand Down
76 changes: 23 additions & 53 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{binary_heap::PeekMut, hash_map, BinaryHeap, VecDeque},
collections::{hash_map, BTreeMap, VecDeque},
convert::TryFrom,
mem,
};
Expand All @@ -9,12 +9,12 @@ use rustc_hash::FxHashMap;
use tracing::{debug, trace};

use super::{
push_pending, PendingLevel, Recv, Retransmits, Send, SendState, ShouldTransmit, StreamEvent,
StreamHalf, ThinRetransmits,
PendingPriority, Recv, Retransmits, Send, SendState, ShouldTransmit, StreamEvent, StreamHalf,
ThinRetransmits,
};
use crate::{
coding::BufMutExt,
connection::stats::FrameStats,
connection::{stats::FrameStats, streams::push_pending},
frame::{self, FrameStruct, StreamMetaVec},
transport_parameters::TransportParameters,
Dir, Side, StreamId, TransportError, VarInt, MAX_STREAM_COUNT,
Expand Down Expand Up @@ -52,8 +52,8 @@ pub struct StreamsState {
/// This differs from `self.send.len()` in that it does not include streams that the peer is
/// permitted to open but which have not yet been opened.
pub(super) send_streams: usize,
/// Streams with outgoing data queued
pub(super) pending: BinaryHeap<PendingLevel>,
/// Streams with outgoing data queued, sorted by priority
pub(super) pending: BTreeMap<PendingPriority, StreamId>,

events: VecDeque<StreamEvent>,
/// Streams blocked on connection-level flow control or stream window space
Expand Down Expand Up @@ -115,7 +115,7 @@ impl StreamsState {
opened: [false, false],
next_reported_remote: [0, 0],
send_streams: 0,
pending: BinaryHeap::new(),
pending: BTreeMap::new(),
events: VecDeque::new(),
connection_blocked: Vec::new(),
max_data: 0,
Expand Down Expand Up @@ -345,13 +345,11 @@ impl StreamsState {
/// Whether any stream data is queued, regardless of control frames
pub(crate) fn can_send_stream_data(&self) -> bool {
// Reset streams may linger in the pending stream list, but will never produce stream frames
self.pending.iter().any(|level| {
level.queue.borrow().iter().any(|id| {
self.send
.get(id)
.and_then(|s| s.as_ref())
.map_or(false, |s| !s.is_reset())
})
self.pending.values().any(|id| {
self.send
.get(id)
.and_then(|s| s.as_ref())
.map_or(false, |s| !s.is_reset())
})
}

Expand Down Expand Up @@ -503,26 +501,12 @@ impl StreamsState {
break;
}

let num_levels = self.pending.len();
let mut level = match self.pending.peek_mut() {
Some(x) => x,
None => break,
};
// Poppping data from the front of the queue, storing as much data
// as possible in a single frame, and enqueing sending further
// remaining data at the end of the queue helps with fairness.
// Other streams will have a chance to write data before we touch
// this stream again.
let id = match level.queue.get_mut().pop_front() {
Some(x) => x,
None => {
debug_assert!(
num_levels == 1,
"An empty queue is only allowed for a single level"
);
break;
}
// Pop the stream of the highest priority that currently has pending data
// If the stream still has some pending data left after writing, it will be reinserted, otherwise not
let Some((_, id)) = self.pending.pop_last() else {
break;
};

let stream = match self.send.get_mut(&id).and_then(|s| s.as_mut()) {
Some(s) => s,
// Stream was reset with pending data and the reset was acknowledged
Expand All @@ -547,24 +531,10 @@ impl StreamsState {
}

if stream.is_pending() {
if level.priority == stream.priority {
// Enqueue for the same level
level.queue.get_mut().push_back(id);
} else {
// Enqueue for a different level. If the current level is empty, drop it
if level.queue.borrow().is_empty() && num_levels != 1 {
// We keep the last level around even in empty form so that
// the next insert doesn't have to reallocate the queue
PeekMut::pop(level);
} else {
drop(level);
}
push_pending(&mut self.pending, id, stream.priority);
}
} else if level.queue.borrow().is_empty() && num_levels != 1 {
// We keep the last level around even in empty form so that
// the next insert doesn't have to reallocate the queue
PeekMut::pop(level);
// If the stream still has pending data, reinsert it, possibly with an updated priority value
// Fairness with other streams is achieved by implementing round-robin scheduling,
// so that the other streams will have a chance to write data before we touch this stream again.
push_pending(&mut self.pending, id, stream.priority);
}

let meta = frame::StreamMeta { id, offsets, fin };
Expand Down Expand Up @@ -1324,7 +1294,7 @@ mod tests {
assert_eq!(meta[2].id, id_low);

assert!(!server.can_send_stream_data());
assert_eq!(server.pending.len(), 1);
assert_eq!(server.pending.len(), 0);
}

#[test]
Expand Down Expand Up @@ -1391,7 +1361,7 @@ mod tests {
assert_eq!(meta[1].id, id_high);

assert!(!server.can_send_stream_data());
assert_eq!(server.pending.len(), 1);
assert_eq!(server.pending.len(), 0);
}

#[test]
Expand Down

0 comments on commit 20f81b2

Please sign in to comment.