Skip to content

Commit

Permalink
fixing http tunnel (#80)
Browse files Browse the repository at this point in the history
* try to fixing http tunnel

* fix http can't close

* add a todo
  • Loading branch information
sword-jin authored Jul 22, 2024
1 parent b4a38b8 commit f74ed4c
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 17 deletions.
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# if you want to enable tokio console, you can make TARGET ENABLE_TOKIO_CONSOLE=1
ENABLE_TOKIO_CONSOLE ?= 0
RUST_LOG = INFO

RUSTFLAGS =
FEATURES =
Expand Down Expand Up @@ -33,11 +34,11 @@ build-docker:

.PHONY: run-server
run-server: build
RUST_LOG=INFO ./target/debug/castled --domain localhost --ip 127.0.0.1
RUST_LOG=$(RUST_LOG) ./target/debug/castled --domain localhost --ip 127.0.0.1

.PHONY: run-client
run-client: build
TOKIO_CONSOLE_BIND=127.0.0.1:6670 RUST_LOG=debug ./target/debug/castle tcp 12345 --remote-port 9991
TOKIO_CONSOLE_BIND=127.0.0.1:6670 RUST_LOG=$(RUST_LOG) ./target/debug/castle tcp 12345 --remote-port 9991

.PHONY: e2e
e2e: build
Expand Down
25 changes: 17 additions & 8 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ async fn handle_work_traffic(
// then forward_traffic_to_local can read the data from transfer_rx
let (transfer_tx, mut transfer_rx) = mpsc::channel::<TrafficToClient>(64);

let (local_conn_established_tx, mut local_conn_established_rx) = mpsc::channel(1);
let (local_conn_established_tx, local_conn_established_rx) = mpsc::channel::<()>(1);
let mut local_conn_established_rx = Some(local_conn_established_rx);
tokio::spawn(async move {
let mut streaming_response = rpc_client
.data(streaming_to_server)
Expand All @@ -347,28 +348,35 @@ async fn handle_work_traffic(

// if the local connection is not established, we should wait until it's established
if !local_conn_established {
if let Some(v) = local_conn_established_rx.recv().await {
local_conn_established = v;
} else {
if local_conn_established_rx
.take()
.unwrap()
.recv()
.await
.is_none()
{
debug!("connecting to local endpoint failed");
return;
} else {
info!("local connection established");
local_conn_established = true;
}
}

match result {
Some(Ok(traffic)) => {
transfer_tx.send(traffic).await.unwrap();
continue;
}
Some(Err(status)) => {
error!("received error status: {:?}", status);
return;
}
None => {
// when the server finished traffic, it will close the data streaming
debug!("data streaming closed by the server");
return;
}
}
return;
}
});

Expand Down Expand Up @@ -404,7 +412,7 @@ async fn handle_work_traffic(
if let Err(err) = result {
error!(err = ?err, "failed to connect to local endpoint, so let's notify the server to close the user connection");
}
local_conn_established_tx.send(true).await.unwrap();
local_conn_established_tx.send(()).await.unwrap();

let buf = transfer_rx.recv().await.unwrap();
socket.send(&buf.data).await.unwrap();
Expand All @@ -415,6 +423,7 @@ async fn handle_work_traffic(
});
} else {
tokio::spawn(async move {
// TODO(sword): use a connection pool to reuse the tcp connection
let local_conn = TcpStream::connect(local_endpoint).await;
if local_conn.is_err() {
error!("failed to connect to local endpoint {}, so let's notify the server to close the user connection", local_endpoint);
Expand All @@ -433,7 +442,7 @@ async fn handle_work_traffic(

let mut local_conn = local_conn.unwrap();
let (local_r, local_w) = local_conn.split();
local_conn_established_tx.send(true).await.unwrap();
local_conn_established_tx.send(()).await.unwrap();

if let Err(err) = forward_traffic_to_local(
local_r,
Expand Down
2 changes: 1 addition & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl<T: Send + Debug> StreamingWriter<T> {
&self,
_cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
debug!("flushing streaming writer");
debug!("flushing streaming");
Poll::Ready(Ok(()))
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/control_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ impl TunnelService for ControlHandler {
let bridge_id = Bytes::copy_from_slice(bridge_id_str.as_bytes());
let bridge = bridges
.get(&bridge_id)
.context("connection not found")
.context(format!("connection not found, traffic action: {}", traffic.action))
.unwrap();
let bridge = bridge.value();

Expand Down
27 changes: 22 additions & 5 deletions src/server/tunnel/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl Http {
let http1_builder = Arc::new(http1::Builder::new());
let vhttp_handler = async move {
loop {
let shutdown = shutdown.clone();
tokio::select! {
_ = shutdown.cancelled() => {
break;
Expand All @@ -89,6 +90,7 @@ impl Http {

tokio::spawn(async move {
let io = TokioIo::new(stream);

let handler = async move {
let new_service = service_fn(move |req| {
let http_tunnel = this.clone();
Expand All @@ -98,7 +100,13 @@ impl Http {
)
}
});
http1_builder.serve_connection(io, new_service).await

tokio::select! {
_ = shutdown.cancelled() => {}
_ = http1_builder.serve_connection(io, new_service) => {
info!("http1 connection closed");
}
}
}.instrument(info_span!("vhttp_handler"));
tokio::task::spawn(handler);
});
Expand Down Expand Up @@ -140,6 +148,7 @@ impl Http {
let data_sender = bridge.data_sender.clone();
let remove_bridge_sender = bridge.remove_bridge_sender.clone();
let client_cancel_receiver = bridge.client_cancel_receiver.clone();

tokio::spawn(async move {
data_sender
.send(headers)
Expand All @@ -153,15 +162,23 @@ impl Http {
loop {
tokio::select! {
_ = client_cancel_receiver.cancelled() => {
break;
return;
}
data = body_stream.try_next() => {
match data {
Ok(Some(data)) => {
let _ = data_sender.send(data.to_vec()).await;
data_sender.send(data.to_vec()).await.unwrap();
}
Ok(None) => {
// TODO(sword): reafctor this logic into io module
// sending a empty vec to indicate the end of the body
// then io::copy() will finish the transfer on the client side.
data_sender.send(vec![]).await.unwrap();
return;
}
_ => {
break;
Err(err) => {
error!(err = ?err, "failed to read body stream");
return;
}
}
}
Expand Down

0 comments on commit f74ed4c

Please sign in to comment.