diff --git a/crates/jmux-proxy/src/codec.rs b/crates/jmux-proxy/src/codec.rs index 130693a1..4db04722 100644 --- a/crates/jmux-proxy/src/codec.rs +++ b/crates/jmux-proxy/src/codec.rs @@ -4,9 +4,6 @@ use bytes::BytesMut; use jmux_proto::{Header, Message}; use tokio_util::codec::{Decoder, Encoder}; -/// This is a purely arbitrary number -pub(crate) const MAXIMUM_PACKET_SIZE_IN_BYTES: usize = 4096; - pub(crate) struct JmuxCodec; impl Decoder for JmuxCodec { @@ -15,6 +12,8 @@ impl Decoder for JmuxCodec { type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + const MAX_RESERVE_CHUNK_IN_BYTES: usize = 8 * 1024; + if src.len() < Header::SIZE { // Not enough data to read length marker. return Ok(None); @@ -25,17 +24,11 @@ impl Decoder for JmuxCodec { length_bytes.copy_from_slice(&src[1..3]); let length = u16::from_be_bytes(length_bytes) as usize; - if length > MAXIMUM_PACKET_SIZE_IN_BYTES { - return Err(io::Error::other(format!( - "received JMUX packet is exceeding the maximum packet size: {} (max is {})", - length, MAXIMUM_PACKET_SIZE_IN_BYTES, - ))); - } - if src.len() < length { // The full packet has not arrived yet. // Reserve more space in the buffer (good performance-wise). - src.reserve(length - src.len()); + let additional = core::cmp::min(MAX_RESERVE_CHUNK_IN_BYTES, length - src.len()); + src.reserve(additional); // Inform the Framed that more bytes are required to form the next frame. return Ok(None); @@ -56,17 +49,7 @@ impl Encoder for JmuxCodec { type Error = io::Error; fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { - if item.size() > MAXIMUM_PACKET_SIZE_IN_BYTES { - return Err(io::Error::other(format!( - "attempted to send a JMUX packet whose size is too big: {} (max is {})", - item.size(), - MAXIMUM_PACKET_SIZE_IN_BYTES - ))); - } - - item.encode(dst).map_err(io::Error::other)?; - - Ok(()) + item.encode(dst).map_err(io::Error::other) } } diff --git a/crates/jmux-proxy/src/lib.rs b/crates/jmux-proxy/src/lib.rs index 749e2c2a..3e7c7454 100644 --- a/crates/jmux-proxy/src/lib.rs +++ b/crates/jmux-proxy/src/lib.rs @@ -12,7 +12,6 @@ pub use jmux_proto::DestinationUrl; use self::codec::JmuxCodec; use self::id_allocator::IdAllocator; -use crate::codec::MAXIMUM_PACKET_SIZE_IN_BYTES; use anyhow::Context as _; use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; @@ -30,6 +29,18 @@ use tokio::task::JoinHandle; use tokio_util::codec::{FramedRead, FramedWrite}; use tracing::{Instrument as _, Span}; +// PERF/FIXME: changing this parameter to 16 * 1024 greatly improves the throughput, +// but we need to wait until 2025 before making this change. +// +// iperf result for 4 * 1024: +// > 0.0000-19.4523 sec 26.6 GBytes 11.7 Gbits/sec +// +// iperf result for 16 * 1024: +// > 0.0000-13.8540 sec 33.6 GBytes 20.8 Gbits/sec +// +// This is an improvement of 77.7%. +const MAXIMUM_PACKET_SIZE_IN_BYTES: u16 = 4 * 1024; + pub type ApiResponseSender = oneshot::Sender; pub type ApiResponseReceiver = oneshot::Receiver; pub type ApiRequestSender = mpsc::Sender; @@ -316,7 +327,7 @@ async fn scheduler_task_impl(task: JmuxSc debug!("{} request {}", id, destination_url); pending_channels.insert(id, (destination_url.clone(), api_response_tx)); msg_to_send_tx - .send(Message::open(id, u16::try_from(MAXIMUM_PACKET_SIZE_IN_BYTES).expect("fits in a u16"), destination_url)) + .send(Message::open(id, MAXIMUM_PACKET_SIZE_IN_BYTES, destination_url)) .context("couldn’t send CHANNEL OPEN message through mpsc channel")?; } None => warn!("Couldn’t allocate ID for API request: {}", destination_url), @@ -586,11 +597,11 @@ async fn scheduler_task_impl(task: JmuxSc } Message::Data(msg) => { let id = LocalChannelId::from(msg.recipient_channel_id); - let data_length = u32::try_from(msg.transfer_data.len()).expect("MAXIMUM_PACKET_SIZE_IN_BYTES < u32::MAX"); - let distant_id = match jmux_ctx.get_channel(id) { - Some(channel) => channel.distant_id, + let data_length = u16::try_from(msg.transfer_data.len()).expect("header.size (u16) <= u16::MAX"); + let channel = match jmux_ctx.get_channel(id) { + Some(channel) => channel, None => { - warn!("Couldn’t find channel with id {}", id); + warn!(channel.id = %id, "Couldn’t find channel"); continue; }, }; @@ -598,16 +609,21 @@ async fn scheduler_task_impl(task: JmuxSc let data_tx = match data_senders.get_mut(&id) { Some(sender) => sender, None => { - warn!("Received data but associated data sender is missing"); + warn!(channel.id = %id, "Received data but associated data sender is missing"); continue; } }; + if channel.maximum_packet_size < data_length { + warn!(channel.id = %id, "Packet's size is exceeding the maximum size for this channel and was dropped"); + continue; + } + let _ = data_tx.send(msg.transfer_data); // Simplest flow control logic for now: just send back a WINDOW ADJUST message to // increase back peer’s window size. - msg_to_send_tx.send(Message::window_adjust(distant_id, data_length)) + msg_to_send_tx.send(Message::window_adjust(channel.distant_id, u32::from(data_length))) .context("couldn’t send WINDOW ADJUST message")?; } Message::Eof(msg) => {