From e5929827a7cc7eb57c4edecd6b7fa264936ec4e4 Mon Sep 17 00:00:00 2001 From: X1r0z Date: Sun, 22 Dec 2024 15:22:06 +0800 Subject: [PATCH] fix: add semaphore support in socks reverse mode add semaphore support in `socks_reverse_client` function --- src/proxy.rs | 49 +++++++++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/src/proxy.rs b/src/proxy.rs index ff45cf5..4a672a8 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,6 +1,9 @@ use std::{io::Result, sync::Arc}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::{ + join, + net::{TcpListener, TcpStream}, +}; use tracing::{error, info}; use crate::{ @@ -75,25 +78,27 @@ impl Proxy { false => None, }); + // limit the number of concurrent connections + let semaphore = Arc::new(tokio::sync::Semaphore::new(32)); + loop { - match TcpStream::connect(&remote_addr).await { - Ok(stream) => { - info!("Connect to remote {} success", stream.peer_addr()?); - let connector = connector.clone(); - - tokio::spawn(async move { - let stream = tcp::NetStream::from_connector(stream, connector).await; - - if let Err(e) = handle_connection(stream).await { - error!("Failed to handle connection: {}", e); - } - }); - } - Err(e) => { - error!("Failed to establish connection: {}", e); - continue; + let permit = semaphore.clone().acquire_owned().await; + + let stream = TcpStream::connect(&remote_addr).await?; + info!("Connect to remote {} success", stream.peer_addr()?); + + let connector = connector.clone(); + + tokio::spawn(async move { + let stream = tcp::NetStream::from_connector(stream, connector).await; + + if let Err(e) = handle_connection(stream).await { + error!("Failed to handle connection: {}", e); } - } + + // drop the permit to release the semaphore + drop(permit); + }); } } @@ -115,8 +120,10 @@ impl Proxy { }); loop { - let (proxy_stream, proxy_addr) = proxy_listener.accept().await?; - let (control_stream, control_addr) = control_listener.accept().await?; + let (r1, r2) = join!(proxy_listener.accept(), control_listener.accept()); + + let (proxy_stream, proxy_addr) = r1?; + let (control_stream, control_addr) = r2?; info!("Accept connection from {}", proxy_addr); info!("Accept connection from {}", control_addr); @@ -130,9 +137,11 @@ impl Proxy { let control_stream = tcp::NetStream::from_acceptor(control_stream, control_acceptor).await; + info!("Open pipe: {} <=> {}", proxy_addr, control_addr); if let Err(e) = tcp::handle_forward(proxy_stream, control_stream).await { error!("Failed to handle forward: {}", e); } + info!("Close pipe: {} <=> {}", proxy_addr, control_addr); }); } }