Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(jetsocat): use a larger buffer for plain forwarding #968

Merged
merged 3 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ exclude = [
"crates/sogar-registry"
]

[profile.profiling]
inherits = "release"
debug = 1

[profile.production]
inherits = "release"
lto = true
Expand Down
36 changes: 0 additions & 36 deletions crates/transport/src/forward.rs

This file was deleted.

2 changes: 0 additions & 2 deletions crates/transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
use anyhow as _;

mod copy_bidirectional;
mod forward;
mod ws;

pub use self::copy_bidirectional::*;
pub use self::forward::*;
pub use self::ws::*;

use tokio::io::{AsyncRead, AsyncWrite};
Expand Down
Empty file.
27 changes: 6 additions & 21 deletions crates/transport/tests/forwarding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,12 @@ async fn node(
port_server: u16,
server_kind: TransportKind,
) -> anyhow::Result<()> {
let (mut client_reader, mut client_writer) =
tokio::io::split(client_kind.accept(port_node).await.context("accept")?);
let mut client_stream = client_kind.accept(port_node).await.context("accept")?;
let mut server_stream = server_kind.connect(port_server).await.context("connect")?;

let (mut server_reader, mut server_writer) =
tokio::io::split(server_kind.connect(port_server).await.context("connect")?);

let client_to_server_fut =
transport::forward(&mut client_reader, &mut server_writer).map(|res| res.context("forward to server"));
let server_to_client_fut =
transport::forward(&mut server_reader, &mut client_writer).map(|res| res.context("forward to client"));

tokio::try_join!(client_to_server_fut, server_to_client_fut)?;

client_writer
.shutdown()
.await
.context("shutdown operation on client_writer")?;
server_writer
.shutdown()
.await
.context("shutdown operation on server_writer")?;
let _ = tokio::io::copy_bidirectional(&mut client_stream, &mut server_stream).await;
let _ = client_stream.shutdown().await;
let _ = server_stream.shutdown().await;

Ok(())
}
Expand Down Expand Up @@ -71,8 +56,8 @@ fn three_points() {
)| {
rt.block_on(async {
let server_fut = server(&payload.0, node_to_server_kind, port_server).map(|res| res.context("server"));
let node_fut = node(port_node, client_to_node_kind, port_server, node_to_server_kind).map(|res| res.context("node"));
let client_fut = client(&payload.0, client_to_node_kind, port_node).map(|res| res.context("client"));
let node_fut = node(port_node, client_to_node_kind, port_server, node_to_server_kind).map(|res| res.context("node"));
tokio::try_join!(server_fut, node_fut, client_fut).unwrap();
});
})
Expand Down
4 changes: 3 additions & 1 deletion jetsocat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ pub async fn jmux_proxy(cfg: JmuxProxyCfg) -> anyhow::Result<()> {
.await
.context("couldn't open pipe")?;

let (reader, writer) = tokio::io::split(pipe.stream);

// Start JMUX proxy over the pipe
let proxy_fut = JmuxProxy::new(pipe.read, pipe.write)
let proxy_fut = JmuxProxy::new(Box::new(reader), Box::new(writer))
.with_config(cfg.jmux_cfg)
.with_requester_api(api_request_rx)
.run();
Expand Down
87 changes: 34 additions & 53 deletions jetsocat/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::proxy::ProxyConfig;
use anyhow::{Context as _, Result};
use std::any::Any;
use std::path::PathBuf;
use transport::{ErasedRead, ErasedWrite};
use transport::ErasedReadWrite;
use uuid::Uuid;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -43,15 +43,13 @@ pub enum PipeMode {

pub struct Pipe {
pub name: &'static str,
pub read: ErasedRead,
pub write: ErasedWrite,
pub stream: ErasedReadWrite,

// Useful when we don't want to drop something before the Pipe
pub _handle: Option<Box<dyn Any + Send>>,
_handle: Option<Box<dyn Any + Send>>,
}

pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result<Pipe> {
use crate::utils::DummyReaderWriter;
use anyhow::Context as _;
use std::process::Stdio;
use tokio::fs;
Expand All @@ -60,8 +58,7 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result
match mode {
PipeMode::Stdio => Ok(Pipe {
name: "stdio",
read: Box::new(tokio::io::stdin()),
write: Box::new(tokio::io::stdout()),
stream: Box::new(tokio::io::join(tokio::io::stdin(), tokio::io::stdout())),
_handle: None,
}),
PipeMode::ProcessCmd { command } => {
Expand Down Expand Up @@ -90,8 +87,7 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result

Ok(Pipe {
name: "process",
read: Box::new(stdout),
write: Box::new(stdin),
stream: Box::new(tokio::io::join(stdout, stdin)),
_handle: Some(Box::new(handle)), // we need to store the handle because of kill_on_drop(true)
})
}
Expand All @@ -111,8 +107,7 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result

Ok(Pipe {
name: "write-file",
read: Box::new(DummyReaderWriter),
write: Box::new(file),
stream: Box::new(file),
_handle: None,
})
}
Expand All @@ -131,8 +126,7 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result

Ok(Pipe {
name: "read-file",
read: Box::new(file),
write: Box::new(DummyReaderWriter),
stream: Box::new(file),
_handle: None,
})
}
Expand All @@ -148,12 +142,9 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result

info!(%peer_addr, "Accepted peer");

let (read, write) = tokio::io::split(socket);

Ok(Pipe {
name: "tcp-listener",
read: Box::new(read),
write: Box::new(write),
stream: Box::new(socket),
_handle: None,
})
}
Expand All @@ -162,16 +153,15 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result

info!(%addr, "TCP connect");

let (read, write) = tcp_connect(addr, proxy_cfg)
let stream = tcp_connect(addr, proxy_cfg)
.await
.with_context(|| "TCP connect failed")?;

debug!("Connected");

Ok(Pipe {
name: "tcp",
read,
write,
stream,
_handle: None,
})
}
Expand All @@ -189,22 +179,21 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result
association_id, candidate_id
);

let (mut read, mut write) = tcp_connect(addr, proxy_cfg)
let mut stream = tcp_connect(addr, proxy_cfg)
.await
.with_context(|| "TCP connect failed")?;

debug!("Sending JET accept request…");
write_jet_accept_request(&mut write, association_id, candidate_id).await?;
write_jet_accept_request(&mut stream, association_id, candidate_id).await?;
debug!("JET accept request sent, waiting for response…");
read_jet_accept_response(&mut read).await?;
read_jet_accept_response(&mut stream).await?;
debug!("JET accept response received and processed successfully!");

debug!("Connected");

Ok(Pipe {
name: "jet-tcp-accept",
read,
write,
stream,
_handle: None,
})
}
Expand All @@ -221,22 +210,21 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result
"TCP connect with JET connect protocol for {}/{}", association_id, candidate_id
);

let (mut read, mut write) = tcp_connect(addr, proxy_cfg)
let mut stream = tcp_connect(addr, proxy_cfg)
.await
.with_context(|| "TCP connect failed")?;

debug!("Sending JET connect request…");
write_jet_connect_request(&mut write, association_id, candidate_id).await?;
write_jet_connect_request(&mut stream, association_id, candidate_id).await?;
debug!("JET connect request sent, waiting for response…");
read_jet_connect_response(&mut read).await?;
read_jet_connect_response(&mut stream).await?;
debug!("JET connect response received and processed successfully!");

debug!("Connected");

Ok(Pipe {
name: "jet-tcp-connect",
read,
write,
stream,
_handle: None,
})
}
Expand All @@ -253,22 +241,20 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result
}
);

let (read, write, rsp) = ws_connect(url, proxy_cfg)
let (stream, rsp) = ws_connect(url, proxy_cfg)
.await
.with_context(|| "WebSocket connect failed")?;

debug!(?rsp, "Connected");

Ok(Pipe {
name: "websocket",
read,
write,
stream,
_handle: None,
})
}
PipeMode::WebSocketListen { bind_addr } => {
use crate::utils::{websocket_read, websocket_write};
use futures_util::StreamExt as _;
use crate::utils::websocket_compat;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;

Expand All @@ -288,16 +274,11 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result
.await
.with_context(|| "WebSocket handshake failed")?;

// By splitting that way, critical section (protected by lock) is smaller
let (sink, stream) = ws.split();

let read = Box::new(websocket_read(stream));
let write = Box::new(websocket_write(sink));
let stream = Box::new(websocket_compat(ws)) as ErasedReadWrite;

Ok(Pipe {
name: "websocket-listener",
read,
write,
stream,
_handle: None,
})
}
Expand All @@ -306,24 +287,24 @@ pub async fn open_pipe(mode: PipeMode, proxy_cfg: Option<ProxyConfig>) -> Result

#[instrument(skip_all)]
pub async fn pipe(mut a: Pipe, mut b: Pipe) -> Result<()> {
use tokio::io::AsyncWriteExt;
use transport::forward;
use tokio::io::copy_bidirectional_with_sizes;
use tokio::io::AsyncWriteExt as _;

let a_to_b = forward(&mut a.read, &mut b.write);
let b_to_a = forward(&mut b.read, &mut a.write);
const BUF_SIZE: usize = 16 * 1024;

let forward = copy_bidirectional_with_sizes(&mut a.stream, &mut b.stream, BUF_SIZE, BUF_SIZE);

info!(%a.name, %b.name, "Start piping");

let result = tokio::select! {
result = a_to_b => result.context("a to b forward"),
result = b_to_a => result.context("b to a forward"),
}
.map(|_| ());
let result = forward
.await
.map(|_| ())
.context("copy_bidirectional_with_sizes failed");

info!("Ended");

a.write.shutdown().await?;
b.write.shutdown().await?;
a.stream.shutdown().await?;
b.stream.shutdown().await?;

result
}
Loading