Skip to content

Commit

Permalink
support uploading large file through multipart (#81)
Browse files Browse the repository at this point in the history
* add upload file e2e test

* Support upload files(multipart) through http tunnel
  • Loading branch information
sword-jin authored Jul 23, 2024
1 parent f74ed4c commit 8ff964e
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 79 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ Basically, this tunnel is primarily for this purpose. If you want to expose your
- random subdomain if `--random-subdomain` is specified
- random remote port if not specified
- support http/1.1
- Upload file
- [ ] support http/2
21 changes: 16 additions & 5 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use tonic::{transport::Channel, Response, Status, Streaming};
use tracing::{debug, error, info, instrument, span};

use tokio::{
io::{self, AsyncRead, AsyncWrite, AsyncWriteExt},
io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader},
net::{TcpStream, UdpSocket},
select,
sync::{mpsc, oneshot},
};

use crate::{
constant,
io::{StreamingReader, StreamingWriter, TrafficToServerWrapper},
pb::{
self, control::Payload, traffic_to_server, tunnel::Type,
Expand Down Expand Up @@ -469,14 +470,19 @@ async fn handle_work_traffic(
/// 1. remote <=> me
/// 2. me <=> local
async fn forward_traffic_to_local(
mut local_r: impl AsyncRead + Unpin,
local_r: impl AsyncRead + Unpin,
mut local_w: impl AsyncWrite + Unpin,
mut remote_r: StreamingReader<TrafficToClient>,
remote_r: StreamingReader<TrafficToClient>,
mut remote_w: StreamingWriter<TrafficToServer>,
) -> Result<()> {
let remote_to_me_to_local = async {
// read from remote, write to local
match io::copy(&mut remote_r, &mut local_w).await {
match io::copy_buf(
&mut BufReader::with_capacity(constant::DEFAULT_BUF_SIZE, remote_r),
&mut local_w,
)
.await
{
Ok(n) => {
debug!("copied {} bytes from remote to local", n);
let _ = local_w.shutdown().await;
Expand All @@ -489,7 +495,12 @@ async fn forward_traffic_to_local(

let local_to_me_to_remote = async {
// read from local, write to remote
match io::copy(&mut local_r, &mut remote_w).await {
match io::copy_buf(
&mut BufReader::with_capacity(constant::DEFAULT_BUF_SIZE, local_r),
&mut remote_w,
)
.await
{
Ok(n) => {
debug!("copied {} bytes from local to remote", n);
let _ = remote_w.shutdown().await;
Expand Down
2 changes: 2 additions & 0 deletions src/constant.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// use 8K as the default buffer size when transferring io data.
pub(crate) const DEFAULT_BUF_SIZE: usize = 8 * 1024;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod bridge;
pub(crate) mod constant;
pub(crate) mod event;
pub(crate) mod helper;
pub(crate) mod io;
Expand Down
26 changes: 20 additions & 6 deletions src/server/control_server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::event::ClientEventResponse;
use crate::helper::validate_register_req;
use crate::pb::{traffic_to_server, TrafficToServer};
use crate::{bridge, event};
use crate::{bridge, constant, event};
use crate::{
io::CancellableReceiver,
pb::{
Expand Down Expand Up @@ -394,11 +394,25 @@ impl TunnelService for ControlHandler {
tokio::select! {
// server -> client
Some(data) = transfer_rx.recv() => {
outbound_tx
.send(Ok(TrafficToClient { data }))
.await
.context("failed to send traffic to outbound channel")
.unwrap();
if data.len() <= constant::DEFAULT_BUF_SIZE {
outbound_tx
.send(Ok(TrafficToClient { data }))
.await
.context("failed to send traffic to outbound channel")
.unwrap();
} else {
// read data chunk by chunk, then send to the client
// if data is empty, the first chunk will be none.
// which means we have no change to send the data.
// so if the length of data is less than 8192, we can send it directly.
for data in data.chunks(constant::DEFAULT_BUF_SIZE) {
outbound_tx
.send(Ok(TrafficToClient { data: data.to_vec() }))
.await
.context("failed to send traffic to outbound channel")
.unwrap();
}
}
}
_ = close_sender_listener.cancelled() => {
// after connection is removed, this listener will be notified
Expand Down
Loading

0 comments on commit 8ff964e

Please sign in to comment.