Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not pre-allocate memory on streams #1682

Closed
wants to merge 9 commits into from
22 changes: 18 additions & 4 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ impl<'a> RecvStream<'a> {
hash_map::Entry::Occupied(s) => s,
hash_map::Entry::Vacant(_) => return Err(UnknownStream { _private: () }),
};
let stream = entry.get_mut();
let stream = entry
.get_mut()
.get_or_insert_with(|| Box::new(Recv::new(self.state.stream_receive_window)));

let (read_credits, stop_sending) = stream.stop()?;
if stop_sending.should_transmit() {
Expand Down Expand Up @@ -207,11 +209,16 @@ impl<'a> SendStream<'a> {
}

let limit = self.state.write_limit();

let max_send_data = self.state.max_send_data(&self.id);
Ralith marked this conversation as resolved.
Show resolved Hide resolved

let stream = self
.state
.send
.get_mut(&self.id)
.map(|s| s.get_or_insert_with(|| Box::new(Send::new(max_send_data))))
.ok_or(WriteError::UnknownStream)?;

if limit == 0 {
trace!(
stream = %self.id, max_data = self.state.max_data, data_sent = self.state.data_sent,
Expand All @@ -237,8 +244,9 @@ impl<'a> SendStream<'a> {

/// Check if this stream was stopped, get the reason if it was
pub fn stopped(&mut self) -> Result<Option<VarInt>, UnknownStream> {
match self.state.send.get(&self.id) {
Some(s) => Ok(s.stop_reason),
match self.state.send.get(&self.id).as_ref() {
Some(Some(s)) => Ok(s.stop_reason),
Some(None) => Ok(None),
None => Err(UnknownStream { _private: () }),
}
}
Expand All @@ -249,10 +257,12 @@ impl<'a> SendStream<'a> {
///
/// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
pub fn finish(&mut self) -> Result<(), FinishError> {
let max_send_data = self.state.max_send_data(&self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(|s| s.get_or_insert_with(|| Box::new(Send::new(max_send_data))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there are a few recurring patterns here that would make good candidates for moving into a method somewhere to raise the level of abstraction. We could address that after merging this, but would be great if you can take a whack at it.

.ok_or(FinishError::UnknownStream)?;

let was_pending = stream.is_pending();
Expand All @@ -269,10 +279,12 @@ impl<'a> SendStream<'a> {
/// # Panics
/// - when applied to a receive stream
pub fn reset(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
let max_send_data = self.state.max_send_data(&self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(|s| s.get_or_insert_with(|| Box::new(Send::new(max_send_data))))
.ok_or(UnknownStream { _private: () })?;

if matches!(stream.state, SendState::ResetSent) {
Expand All @@ -296,10 +308,12 @@ impl<'a> SendStream<'a> {
/// # Panics
/// - when applied to a receive stream
pub fn set_priority(&mut self, priority: i32) -> Result<(), UnknownStream> {
let max_send_data = self.state.max_send_data(&self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(|s| s.get_or_insert_with(|| Box::new(Send::new(max_send_data))))
.ok_or(UnknownStream { _private: () })?;

stream.priority = priority;
Expand All @@ -317,7 +331,7 @@ impl<'a> SendStream<'a> {
.get(&self.id)
.ok_or(UnknownStream { _private: () })?;

Ok(stream.priority)
Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
}
}

Expand Down
14 changes: 9 additions & 5 deletions quinn-proto/src/connection/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,18 @@ impl<'a> Chunks<'a> {
streams: &'a mut StreamsState,
pending: &'a mut Retransmits,
) -> Result<Self, ReadableError> {
let entry = match streams.recv.entry(id) {
let mut entry = match streams.recv.entry(id) {
Entry::Occupied(entry) => entry,
Entry::Vacant(_) => return Err(ReadableError::UnknownStream),
};

let mut recv = match entry.get().stopped {
let mut recv = match entry
.get_mut()
.get_or_insert_with(|| Box::new(Recv::new(streams.stream_receive_window)))
.stopped
{
true => return Err(ReadableError::UnknownStream),
false => entry.remove(),
false => entry.remove().unwrap(), // this can't fail due to the previous get_or_insert_with
};

recv.assembler.ensure_ordering(ordered)?;
Expand Down Expand Up @@ -313,7 +317,7 @@ impl<'a> Chunks<'a> {
self.pending.max_stream_data.insert(self.id);
}
// Return the stream to storage for future use
self.streams.recv.insert(self.id, rs);
self.streams.recv.insert(self.id, Some(rs));
}

// Issue connection-level flow control credit for any data we read regardless of state
Expand All @@ -331,7 +335,7 @@ impl<'a> Drop for Chunks<'a> {
}

enum ChunksState {
Readable(Recv),
Readable(Box<Recv>),
Reset(VarInt),
Finished,
Finalized,
Expand Down
110 changes: 71 additions & 39 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::{
pub struct StreamsState {
pub(super) side: Side,
// Set of streams that are currently open, or could be immediately opened by the peer
pub(super) send: FxHashMap<StreamId, Send>,
pub(super) recv: FxHashMap<StreamId, Recv>,
pub(super) send: FxHashMap<StreamId, Option<Box<Send>>>,
pub(super) recv: FxHashMap<StreamId, Option<Box<Recv>>>,
pub(super) next: [u64; 2],
/// Maximum number of locally-initiated streams that may be opened over the lifetime of the
/// connection so far, per direction
Expand Down Expand Up @@ -152,8 +152,9 @@ impl StreamsState {
self.received_max_data(params.initial_max_data);
for i in 0..self.max_remote[Dir::Bi as usize] {
let id = StreamId::new(!self.side, Dir::Bi, i);
self.send.get_mut(&id).unwrap().max_data =
params.initial_max_stream_data_bidi_local.into();
if let Some(s) = self.send.get_mut(&id).and_then(|s| s.as_mut()) {
s.max_data = params.initial_max_stream_data_bidi_local.into();
}
}
}

Expand Down Expand Up @@ -205,13 +206,17 @@ impl StreamsState {
frame: frame::Stream,
payload_len: usize,
) -> Result<ShouldTransmit, TransportError> {
let stream = frame.id;
self.validate_receive_id(stream).map_err(|e| {
let id = frame.id;
self.validate_receive_id(id).map_err(|e| {
debug!("received illegal STREAM frame");
e
})?;

let rs = match self.recv.get_mut(&stream) {
let rs = match self
.recv
.get_mut(&id)
.map(|s| s.get_or_insert_with(|| Box::new(Recv::new(self.stream_receive_window))))
{
Some(rs) => rs,
None => {
trace!("dropping frame for closed stream");
Expand All @@ -229,14 +234,14 @@ impl StreamsState {
self.data_recvd = self.data_recvd.saturating_add(new_bytes);

if !rs.stopped {
self.on_stream_frame(true, stream);
self.on_stream_frame(true, id);
return Ok(ShouldTransmit(false));
}

// Stopped streams become closed instantly on FIN, so check whether we need to clean up
if closed {
self.recv.remove(&stream);
self.stream_freed(stream, StreamHalf::Recv);
self.recv.remove(&id);
self.stream_freed(id, StreamHalf::Recv);
}

// We don't buffer data on stopped streams, so issue flow control credit immediately
Expand All @@ -261,7 +266,11 @@ impl StreamsState {
e
})?;

let rs = match self.recv.get_mut(&id) {
let rs = match self
.recv
.get_mut(&id)
.map(|s| s.get_or_insert_with(|| Box::new(Recv::new(self.stream_receive_window))))
{
Some(stream) => stream,
None => {
trace!("received RESET_STREAM on closed stream");
Expand Down Expand Up @@ -304,7 +313,12 @@ impl StreamsState {
/// Process incoming `STOP_SENDING` frame
#[allow(unreachable_pub)] // fuzzing only
pub fn received_stop_sending(&mut self, id: StreamId, error_code: VarInt) {
let stream = match self.send.get_mut(&id) {
let max_send_data = self.max_send_data(&id);
let stream = match self
.send
.get_mut(&id)
.map(|s| s.get_or_insert_with(|| Box::new(Send::new(max_send_data))))
{
Some(ss) => ss,
None => return,
};
Expand All @@ -320,7 +334,7 @@ impl StreamsState {
match self.send.entry(id) {
hash_map::Entry::Vacant(_) => {}
hash_map::Entry::Occupied(e) => {
if let SendState::ResetSent = e.get().state {
if let Some(SendState::ResetSent) = e.get().as_ref().map(|s| s.state) {
e.remove_entry();
self.stream_freed(id, StreamHalf::Send);
}
Expand All @@ -332,18 +346,20 @@ impl StreamsState {
pub(crate) fn can_send_stream_data(&self) -> bool {
// Reset streams may linger in the pending stream list, but will never produce stream frames
self.pending.iter().any(|level| {
level
.queue
.borrow()
.iter()
.any(|id| self.send.get(id).map_or(false, |s| !s.is_reset()))
level.queue.borrow().iter().any(|id| {
self.send
.get(id)
.and_then(|s| s.as_ref())
.map_or(false, |s| !s.is_reset())
})
})
}

/// Whether MAX_STREAM_DATA frames could be sent for stream `id`
pub(crate) fn can_send_flow_control(&self, id: StreamId) -> bool {
self.recv
.get(&id)
.and_then(|s| s.as_ref())
.map_or(false, |s| s.receiving_unknown_size())
}

Expand All @@ -361,7 +377,7 @@ impl StreamsState {
Some(x) => x,
None => break,
};
let stream = match self.send.get_mut(&id) {
let stream = match self.send.get_mut(&id).and_then(|s| s.as_mut()) {
Some(x) => x,
None => continue,
};
Expand Down Expand Up @@ -428,7 +444,7 @@ impl StreamsState {
None => break,
};
pending.max_stream_data.remove(&id);
let rs = match self.recv.get_mut(&id) {
let rs = match self.recv.get_mut(&id).and_then(|s| s.as_mut()) {
Some(x) => x,
None => continue,
};
Expand Down Expand Up @@ -507,7 +523,7 @@ impl StreamsState {
break;
}
};
let stream = match self.send.get_mut(&id) {
let stream = match self.send.get_mut(&id).and_then(|s| s.as_mut()) {
Some(s) => s,
// Stream was reset with pending data and the reset was acknowledged
None => continue,
Expand Down Expand Up @@ -593,7 +609,13 @@ impl StreamsState {
hash_map::Entry::Vacant(_) => return,
hash_map::Entry::Occupied(e) => e,
};
let stream = entry.get_mut();

// Because we only call this after sending data on this stream,
// this closure should be unreachable. If we did somehow screw that up,
// then we might hit an underflow below with unpredictable effects down
// the line. Best to short-circuit.
let stream = entry.get_mut().as_mut().unwrap();
jeromegn marked this conversation as resolved.
Show resolved Hide resolved

if stream.is_reset() {
// We account for outstanding data on reset streams at time of reset
return;
Expand All @@ -611,7 +633,7 @@ impl StreamsState {
}

pub(crate) fn retransmit(&mut self, frame: frame::StreamMeta) {
let stream = match self.send.get_mut(&frame.id) {
let stream = match self.send.get_mut(&frame.id).and_then(|s| s.as_mut()) {
// Loss of data on a closed stream is a noop
None => return,
Some(x) => x,
Expand All @@ -627,7 +649,10 @@ impl StreamsState {
for dir in Dir::iter() {
for index in 0..self.next[dir as usize] {
let id = StreamId::new(Side::Client, dir, index);
let stream = self.send.get_mut(&id).unwrap();
let stream = match self.send.get_mut(&id).and_then(|s| s.as_mut()) {
Some(stream) => stream,
None => continue,
};
if stream.pending.is_fully_acked() && !stream.fin_pending {
// Stream data can't be acked in 0-RTT, so we must not have sent anything on
// this stream
Expand Down Expand Up @@ -679,7 +704,12 @@ impl StreamsState {
}

let write_limit = self.write_limit();
if let Some(ss) = self.send.get_mut(&id) {
let max_send_data = self.max_send_data(&id);
if let Some(ss) = self
.send
.get_mut(&id)
.map(|s| s.get_or_insert_with(|| Box::new(Send::new(max_send_data))))
{
if ss.increase_max_data(offset) {
if write_limit > 0 {
self.events.push_back(StreamEvent::Writable { id });
Expand Down Expand Up @@ -716,7 +746,7 @@ impl StreamsState {

if self.write_limit() > 0 {
while let Some(id) = self.connection_blocked.pop() {
let stream = match self.send.get_mut(&id) {
let stream = match self.send.get_mut(&id).and_then(|s| s.as_mut()) {
None => continue,
Some(s) => s,
};
Expand Down Expand Up @@ -797,24 +827,26 @@ impl StreamsState {
expanded
}

pub(super) fn max_send_data(&self, id: &StreamId) -> VarInt {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to squash all the commits and then extract this as a separate prefix commit.

let remote = self.side != id.initiator();
match id.dir() {
Dir::Uni => self.initial_max_stream_data_uni,
// Remote/local appear reversed here because the transport parameters are named from
// the perspective of the peer.
Dir::Bi if remote => self.initial_max_stream_data_bidi_local,
Dir::Bi => self.initial_max_stream_data_bidi_remote,
}
}

pub(super) fn insert(&mut self, remote: bool, id: StreamId) {
let bi = id.dir() == Dir::Bi;
// bidirectional OR (unidirectional AND NOT remote)
if bi || !remote {
let max_data = match id.dir() {
Dir::Uni => self.initial_max_stream_data_uni,
// Remote/local appear reversed here because the transport parameters are named from
// the perspective of the peer.
Dir::Bi if remote => self.initial_max_stream_data_bidi_local,
Dir::Bi => self.initial_max_stream_data_bidi_remote,
};
let stream = Send::new(max_data);
assert!(self.send.insert(id, stream).is_none());
assert!(self.send.insert(id, None).is_none());
}
// bidirectional OR (unidirectional AND remote)
if bi || remote {
assert!(self
.recv
.insert(id, Recv::new(self.stream_receive_window))
.is_none());
assert!(self.recv.insert(id, None).is_none());
}
}

Expand Down
Loading