Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support uploading large file through multipart #81

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ Basically, this tunnel is primarily for this purpose. If you want to expose your
- random subdomain if `--random-subdomain` is specified
- random remote port if not specified
- support http/1.1
- Upload file
- [ ] support http/2
21 changes: 16 additions & 5 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use tonic::{transport::Channel, Response, Status, Streaming};
use tracing::{debug, error, info, instrument, span};

use tokio::{
io::{self, AsyncRead, AsyncWrite, AsyncWriteExt},
io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader},
net::{TcpStream, UdpSocket},
select,
sync::{mpsc, oneshot},
};

use crate::{
constant,
io::{StreamingReader, StreamingWriter, TrafficToServerWrapper},
pb::{
self, control::Payload, traffic_to_server, tunnel::Type,
Expand Down Expand Up @@ -469,14 +470,19 @@ async fn handle_work_traffic(
/// 1. remote <=> me
/// 2. me <=> local
async fn forward_traffic_to_local(
mut local_r: impl AsyncRead + Unpin,
local_r: impl AsyncRead + Unpin,
mut local_w: impl AsyncWrite + Unpin,
mut remote_r: StreamingReader<TrafficToClient>,
remote_r: StreamingReader<TrafficToClient>,
mut remote_w: StreamingWriter<TrafficToServer>,
) -> Result<()> {
let remote_to_me_to_local = async {
// read from remote, write to local
match io::copy(&mut remote_r, &mut local_w).await {
match io::copy_buf(
&mut BufReader::with_capacity(constant::DEFAULT_BUF_SIZE, remote_r),
&mut local_w,
)
.await
{
Ok(n) => {
debug!("copied {} bytes from remote to local", n);
let _ = local_w.shutdown().await;
Expand All @@ -489,7 +495,12 @@ async fn forward_traffic_to_local(

let local_to_me_to_remote = async {
// read from local, write to remote
match io::copy(&mut local_r, &mut remote_w).await {
match io::copy_buf(
&mut BufReader::with_capacity(constant::DEFAULT_BUF_SIZE, local_r),
&mut remote_w,
)
.await
{
Ok(n) => {
debug!("copied {} bytes from local to remote", n);
let _ = remote_w.shutdown().await;
Expand Down
2 changes: 2 additions & 0 deletions src/constant.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// use 8K as the default buffer size when transferring io data.
pub(crate) const DEFAULT_BUF_SIZE: usize = 8 * 1024;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod bridge;
pub(crate) mod constant;
pub(crate) mod event;
pub(crate) mod helper;
pub(crate) mod io;
Expand Down
26 changes: 20 additions & 6 deletions src/server/control_server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::event::ClientEventResponse;
use crate::helper::validate_register_req;
use crate::pb::{traffic_to_server, TrafficToServer};
use crate::{bridge, event};
use crate::{bridge, constant, event};
use crate::{
io::CancellableReceiver,
pb::{
Expand Down Expand Up @@ -394,11 +394,25 @@ impl TunnelService for ControlHandler {
tokio::select! {
// server -> client
Some(data) = transfer_rx.recv() => {
outbound_tx
.send(Ok(TrafficToClient { data }))
.await
.context("failed to send traffic to outbound channel")
.unwrap();
if data.len() <= constant::DEFAULT_BUF_SIZE {
outbound_tx
.send(Ok(TrafficToClient { data }))
.await
.context("failed to send traffic to outbound channel")
.unwrap();
} else {
// read data chunk by chunk, then send to the client
// if data is empty, the first chunk will be none.
// which means we have no change to send the data.
// so if the length of data is less than 8192, we can send it directly.
for data in data.chunks(constant::DEFAULT_BUF_SIZE) {
outbound_tx
.send(Ok(TrafficToClient { data: data.to_vec() }))
.await
.context("failed to send traffic to outbound channel")
.unwrap();
}
}
}
_ = close_sender_listener.cancelled() => {
// after connection is removed, this listener will be notified
Expand Down
Loading
Loading