Skip to content

Commit

Permalink
quic: don't create one task for each tx, create one per connection
Browse files Browse the repository at this point in the history
Before quinn-rs/quinn#2002 we could get streams
fragmented and out of order (stream concurrency). Now streams always
come in order, so there's no reason anymore to spawn multiple tasks to
read them.

Before we could have:

[s1][s2][s3][s2 fin][s3 fin][s1 fin]

So spawning multiple tasks led to overall faster ingestion, since to
complete s1 we didn't have to waiy for all the other streams to arrive.

Now we always have:

[s1 fin][s2 fin][s3 fin]

So there's no reason to spawn a task per stream: each task will be
created, read all its stream's chunks, exit, before the next stream
arrives.

This change removes the per-stream task and instead uses the connection
task to read all the streams. This removes the CPU cost of creating
tasks and the corresponding memory allocations.
  • Loading branch information
alessandrod committed Oct 24, 2024
1 parent 537e438 commit 4c2c4ef
Showing 1 changed file with 48 additions and 54 deletions.
102 changes: 48 additions & 54 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,64 +1084,58 @@ async fn handle_connection(
stream_counter.stream_count.fetch_add(1, Ordering::Relaxed);
stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
let cancel = cancel.clone();
let stats = stats.clone();
let packet_sender = params.packet_sender.clone();
let last_update = last_update.clone();
let stream_load_ema = stream_load_ema.clone();
tokio::spawn(async move {
let mut maybe_batch = None;
loop {
// Read the next chunk, waiting up to `wait_for_chunk_timeout`. If we don't get a
// chunk before then, we assume the stream is dead and stop the stream task. This
// can only happen if there's severe packet loss or the peer stop sending for
// whatever reason.
let chunk = match tokio::select! {
chunk = tokio::time::timeout(
wait_for_chunk_timeout,
stream.read_chunk(PACKET_DATA_SIZE, true)) => chunk,

// If the peer gets disconnected stop the task right away.
_ = cancel.cancelled() => break,
} {
// read_chunk returned success
Ok(Ok(chunk)) => chunk,
// read_chunk returned error
Ok(Err(e)) => {
debug!("Received stream error: {:?}", e);
stats
.total_stream_read_errors
.fetch_add(1, Ordering::Relaxed);
break;
}
// timeout elapsed
Err(_) => {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
}
};

if handle_chunk(
chunk,
&mut maybe_batch,
&remote_addr,
&packet_sender,
stats.clone(),
params.peer_type,
)
.await
{
last_update.store(timing::timestamp(), Ordering::Relaxed);
let packet_sender = &params.packet_sender;
let mut maybe_batch = None;
loop {
// Read the next chunk, waiting up to `wait_for_chunk_timeout`. If we don't get a
// chunk before then, we assume the stream is dead and stop the stream task. This
// can only happen if there's severe packet loss or the peer stop sending for
// whatever reason.
let chunk = match tokio::select! {
chunk = tokio::time::timeout(
wait_for_chunk_timeout,
stream.read_chunk(PACKET_DATA_SIZE, true)) => chunk,

// If the peer gets disconnected stop the task right away.
_ = cancel.cancelled() => break,
} {
// read_chunk returned success
Ok(Ok(chunk)) => chunk,
// read_chunk returned error
Ok(Err(e)) => {
debug!("Received stream error: {:?}", e);
stats
.total_stream_read_errors
.fetch_add(1, Ordering::Relaxed);
break;
}
// timeout elapsed
Err(_) => {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
}
};

if handle_chunk(
chunk,
&mut maybe_batch,
&remote_addr,
packet_sender,
stats.clone(),
params.peer_type,
)
.await
{
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
}

stats.total_streams.fetch_sub(1, Ordering::Relaxed);
stream_load_ema.update_ema_if_needed();
});
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
stream_load_ema.update_ema_if_needed();
}

let removed_connection_count = connection_table.lock().await.remove_connection(
Expand Down

0 comments on commit 4c2c4ef

Please sign in to comment.