diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 555f87829c4efa..0d4563360a08e8 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1084,64 +1084,58 @@ async fn handle_connection( stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); stats.total_streams.fetch_add(1, Ordering::Relaxed); stats.total_new_streams.fetch_add(1, Ordering::Relaxed); - let cancel = cancel.clone(); - let stats = stats.clone(); - let packet_sender = params.packet_sender.clone(); - let last_update = last_update.clone(); - let stream_load_ema = stream_load_ema.clone(); - tokio::spawn(async move { - let mut maybe_batch = None; - loop { - // Read the next chunk, waiting up to `wait_for_chunk_timeout`. If we don't get a - // chunk before then, we assume the stream is dead and stop the stream task. This - // can only happen if there's severe packet loss or the peer stop sending for - // whatever reason. - let chunk = match tokio::select! { - chunk = tokio::time::timeout( - wait_for_chunk_timeout, - stream.read_chunk(PACKET_DATA_SIZE, true)) => chunk, - - // If the peer gets disconnected stop the task right away. - _ = cancel.cancelled() => break, - } { - // read_chunk returned success - Ok(Ok(chunk)) => chunk, - // read_chunk returned error - Ok(Err(e)) => { - debug!("Received stream error: {:?}", e); - stats - .total_stream_read_errors - .fetch_add(1, Ordering::Relaxed); - break; - } - // timeout elapsed - Err(_) => { - debug!("Timeout in receiving on stream"); - stats - .total_stream_read_timeouts - .fetch_add(1, Ordering::Relaxed); - break; - } - }; - - if handle_chunk( - chunk, - &mut maybe_batch, - &remote_addr, - &packet_sender, - stats.clone(), - params.peer_type, - ) - .await - { - last_update.store(timing::timestamp(), Ordering::Relaxed); + let packet_sender = ¶ms.packet_sender; + let mut maybe_batch = None; + loop { + // Read the next chunk, waiting up to `wait_for_chunk_timeout`. If we don't get a + // chunk before then, we assume the stream is dead and stop the stream task. This + // can only happen if there's severe packet loss or the peer stop sending for + // whatever reason. + let chunk = match tokio::select! { + chunk = tokio::time::timeout( + wait_for_chunk_timeout, + stream.read_chunk(PACKET_DATA_SIZE, true)) => chunk, + + // If the peer gets disconnected stop the task right away. + _ = cancel.cancelled() => break, + } { + // read_chunk returned success + Ok(Ok(chunk)) => chunk, + // read_chunk returned error + Ok(Err(e)) => { + debug!("Received stream error: {:?}", e); + stats + .total_stream_read_errors + .fetch_add(1, Ordering::Relaxed); break; } + // timeout elapsed + Err(_) => { + debug!("Timeout in receiving on stream"); + stats + .total_stream_read_timeouts + .fetch_add(1, Ordering::Relaxed); + break; + } + }; + + if handle_chunk( + chunk, + &mut maybe_batch, + &remote_addr, + packet_sender, + stats.clone(), + params.peer_type, + ) + .await + { + last_update.store(timing::timestamp(), Ordering::Relaxed); + break; } + } - stats.total_streams.fetch_sub(1, Ordering::Relaxed); - stream_load_ema.update_ema_if_needed(); - }); + stats.total_streams.fetch_sub(1, Ordering::Relaxed); + stream_load_ema.update_ema_if_needed(); } let removed_connection_count = connection_table.lock().await.remove_connection(