Skip to content

Commit

Permalink
fix(jetsocat,dgw): allow bigger JMUX messages (#974)
Browse files Browse the repository at this point in the history
This updates the decoder part of the JMUX codec.

The intention is to increase the maximum message size for JMUX message
in the future.
By increasing the maximum size from 4k to 16k, an improvement of about
77.7% can be achieved when measuring using iperf.

With 4k:

> 0.0000-19.4523 sec  26.6 GBytes  11.7 Gbits/sec

With 16k:

> 0.0000-13.8540 sec  33.6 GBytes  20.8 Gbits/sec
  • Loading branch information
CBenoit authored Aug 14, 2024
1 parent 32de1d5 commit aaf7765
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 30 deletions.
27 changes: 5 additions & 22 deletions crates/jmux-proxy/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ use bytes::BytesMut;
use jmux_proto::{Header, Message};
use tokio_util::codec::{Decoder, Encoder};

/// This is a purely arbitrary number
pub(crate) const MAXIMUM_PACKET_SIZE_IN_BYTES: usize = 4096;

pub(crate) struct JmuxCodec;

impl Decoder for JmuxCodec {
Expand All @@ -15,6 +12,8 @@ impl Decoder for JmuxCodec {
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
const MAX_RESERVE_CHUNK_IN_BYTES: usize = 8 * 1024;

if src.len() < Header::SIZE {
// Not enough data to read length marker.
return Ok(None);
Expand All @@ -25,17 +24,11 @@ impl Decoder for JmuxCodec {
length_bytes.copy_from_slice(&src[1..3]);
let length = u16::from_be_bytes(length_bytes) as usize;

if length > MAXIMUM_PACKET_SIZE_IN_BYTES {
return Err(io::Error::other(format!(
"received JMUX packet is exceeding the maximum packet size: {} (max is {})",
length, MAXIMUM_PACKET_SIZE_IN_BYTES,
)));
}

if src.len() < length {
// The full packet has not arrived yet.
// Reserve more space in the buffer (good performance-wise).
src.reserve(length - src.len());
let additional = core::cmp::min(MAX_RESERVE_CHUNK_IN_BYTES, length - src.len());
src.reserve(additional);

// Inform the Framed that more bytes are required to form the next frame.
return Ok(None);
Expand All @@ -56,17 +49,7 @@ impl Encoder<Message> for JmuxCodec {
type Error = io::Error;

fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
if item.size() > MAXIMUM_PACKET_SIZE_IN_BYTES {
return Err(io::Error::other(format!(
"attempted to send a JMUX packet whose size is too big: {} (max is {})",
item.size(),
MAXIMUM_PACKET_SIZE_IN_BYTES
)));
}

item.encode(dst).map_err(io::Error::other)?;

Ok(())
item.encode(dst).map_err(io::Error::other)
}
}

Expand Down
32 changes: 24 additions & 8 deletions crates/jmux-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub use jmux_proto::DestinationUrl;

use self::codec::JmuxCodec;
use self::id_allocator::IdAllocator;
use crate::codec::MAXIMUM_PACKET_SIZE_IN_BYTES;
use anyhow::Context as _;
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
Expand All @@ -30,6 +29,18 @@ use tokio::task::JoinHandle;
use tokio_util::codec::{FramedRead, FramedWrite};
use tracing::{Instrument as _, Span};

// PERF/FIXME: changing this parameter to 16 * 1024 greatly improves the throughput,
// but we need to wait until 2025 before making this change.
//
// iperf result for 4 * 1024:
// > 0.0000-19.4523 sec 26.6 GBytes 11.7 Gbits/sec
//
// iperf result for 16 * 1024:
// > 0.0000-13.8540 sec 33.6 GBytes 20.8 Gbits/sec
//
// This is an improvement of 77.7%.
const MAXIMUM_PACKET_SIZE_IN_BYTES: u16 = 4 * 1024;

pub type ApiResponseSender = oneshot::Sender<JmuxApiResponse>;
pub type ApiResponseReceiver = oneshot::Receiver<JmuxApiResponse>;
pub type ApiRequestSender = mpsc::Sender<JmuxApiRequest>;
Expand Down Expand Up @@ -316,7 +327,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
debug!("{} request {}", id, destination_url);
pending_channels.insert(id, (destination_url.clone(), api_response_tx));
msg_to_send_tx
.send(Message::open(id, u16::try_from(MAXIMUM_PACKET_SIZE_IN_BYTES).expect("fits in a u16"), destination_url))
.send(Message::open(id, MAXIMUM_PACKET_SIZE_IN_BYTES, destination_url))
.context("couldn’t send CHANNEL OPEN message through mpsc channel")?;
}
None => warn!("Couldn’t allocate ID for API request: {}", destination_url),
Expand Down Expand Up @@ -586,28 +597,33 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
}
Message::Data(msg) => {
let id = LocalChannelId::from(msg.recipient_channel_id);
let data_length = u32::try_from(msg.transfer_data.len()).expect("MAXIMUM_PACKET_SIZE_IN_BYTES < u32::MAX");
let distant_id = match jmux_ctx.get_channel(id) {
Some(channel) => channel.distant_id,
let data_length = u16::try_from(msg.transfer_data.len()).expect("header.size (u16) <= u16::MAX");
let channel = match jmux_ctx.get_channel(id) {
Some(channel) => channel,
None => {
warn!("Couldn’t find channel with id {}", id);
warn!(channel.id = %id, "Couldn’t find channel");
continue;
},
};

let data_tx = match data_senders.get_mut(&id) {
Some(sender) => sender,
None => {
warn!("Received data but associated data sender is missing");
warn!(channel.id = %id, "Received data but associated data sender is missing");
continue;
}
};

if channel.maximum_packet_size < data_length {
warn!(channel.id = %id, "Packet's size is exceeding the maximum size for this channel and was dropped");
continue;
}

let _ = data_tx.send(msg.transfer_data);

// Simplest flow control logic for now: just send back a WINDOW ADJUST message to
// increase back peer’s window size.
msg_to_send_tx.send(Message::window_adjust(distant_id, data_length))
msg_to_send_tx.send(Message::window_adjust(channel.distant_id, u32::from(data_length)))
.context("couldn’t send WINDOW ADJUST message")?;
}
Message::Eof(msg) => {
Expand Down

0 comments on commit aaf7765

Please sign in to comment.