Skip to content

Commit

Permalink
fix: add semaphore support in socks reverse mode
Browse files Browse the repository at this point in the history
add semaphore support in `socks_reverse_client` function
  • Loading branch information
X1r0z committed Dec 22, 2024
1 parent fda96b1 commit e592982
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{io::Result, sync::Arc};

use tokio::net::{TcpListener, TcpStream};
use tokio::{
join,
net::{TcpListener, TcpStream},
};
use tracing::{error, info};

use crate::{
Expand Down Expand Up @@ -75,25 +78,27 @@ impl Proxy {
false => None,
});

// limit the number of concurrent connections
let semaphore = Arc::new(tokio::sync::Semaphore::new(32));

loop {
match TcpStream::connect(&remote_addr).await {
Ok(stream) => {
info!("Connect to remote {} success", stream.peer_addr()?);
let connector = connector.clone();

tokio::spawn(async move {
let stream = tcp::NetStream::from_connector(stream, connector).await;

if let Err(e) = handle_connection(stream).await {
error!("Failed to handle connection: {}", e);
}
});
}
Err(e) => {
error!("Failed to establish connection: {}", e);
continue;
let permit = semaphore.clone().acquire_owned().await;

let stream = TcpStream::connect(&remote_addr).await?;
info!("Connect to remote {} success", stream.peer_addr()?);

let connector = connector.clone();

tokio::spawn(async move {
let stream = tcp::NetStream::from_connector(stream, connector).await;

if let Err(e) = handle_connection(stream).await {
error!("Failed to handle connection: {}", e);
}
}

// drop the permit to release the semaphore
drop(permit);
});
}
}

Expand All @@ -115,8 +120,10 @@ impl Proxy {
});

loop {
let (proxy_stream, proxy_addr) = proxy_listener.accept().await?;
let (control_stream, control_addr) = control_listener.accept().await?;
let (r1, r2) = join!(proxy_listener.accept(), control_listener.accept());

let (proxy_stream, proxy_addr) = r1?;
let (control_stream, control_addr) = r2?;

info!("Accept connection from {}", proxy_addr);
info!("Accept connection from {}", control_addr);
Expand All @@ -130,9 +137,11 @@ impl Proxy {
let control_stream =
tcp::NetStream::from_acceptor(control_stream, control_acceptor).await;

info!("Open pipe: {} <=> {}", proxy_addr, control_addr);
if let Err(e) = tcp::handle_forward(proxy_stream, control_stream).await {
error!("Failed to handle forward: {}", e);
}
info!("Close pipe: {} <=> {}", proxy_addr, control_addr);
});
}
}
Expand Down

0 comments on commit e592982

Please sign in to comment.