Skip to content

Commit

Permalink
feat: support SOCKS5 and HTTP proxy (#135)
Browse files Browse the repository at this point in the history
* chore: add comments

* feat: support socks5/http proxy

* fix: clippy

* fix: always validate tcp config

* chore: rename directories
  • Loading branch information
rapiz1 committed Mar 8, 2022
1 parent bec7533 commit 1ef7747
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 28 deletions.
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ base64 = { version = "0.13", optional = true }
notify = { version = "5.0.0-pre.13", optional = true }
console-subscriber = { version = "0.1", optional = true, features = ["parking_lot"] }
atty = "0.2"
async-http-proxy = { version = "1.2", features = ["runtime-tokio", "basic-auth"] }
async-socks5 = "0.5"
url = { version = "2.2", features = ["serde"] }

[build-dependencies]
vergen = { version = "6.0", default-features = false, features = ["build", "git", "cargo"] }
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ default_token = "default_token_if_not_specify" # Optional. The default token of

[client.transport] # The whole block is optional. Specify which transport to use
type = "tcp" # Optional. Possible values: ["tcp", "tls", "noise"]. Default: "tcp"

[client.transport.tcp] # Optional
proxy = "socks5://user:passwd@127.0.0.1:1080" # Optional. Use the proxy to connect to the server
nodelay = false # Optional. Determine whether to enable TCP_NODELAY, if applicable, to improve the latency but decrease the bandwidth. Default: false
keepalive_secs = 10 # Optional. Specify `tcp_keepalive_time` in `tcp(7)`, if applicable. Default: 10 seconds
keepalive_interval = 5 # Optional. Specify `tcp_keepalive_intvl` in `tcp(7)`, if applicable. Default: 5 seconds
Expand Down
15 changes: 15 additions & 0 deletions examples/use_proxy/client.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[client]
remote_addr = "127.0.0.1:2333"
default_token = "123"

[client.services.foo1]
local_addr = "127.0.0.1:80"

[client.transport]
type = "tcp"
[client.transport.tcp]
# `proxy` controls how the client connect to the server
# Use socks5 proxy at 127.0.0.1, with port 1080, username 'myuser' and password 'mypass'
proxy = "socks5://myuser:mypass@127.0.0.1:1080"
# Use http proxy. Similar to socks5 proxy
# proxy = "http://myuser:mypass@127.0.0.1:8080"
46 changes: 29 additions & 17 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::fmt::{Debug, Formatter};
use std::ops::Deref;
use std::path::Path;
use tokio::fs;
use url::Url;

use crate::transport::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_KEEPALIVE_SECS, DEFAULT_NODELAY};

Expand All @@ -20,7 +21,7 @@ impl Debug for MaskedString {
}

impl Deref for MaskedString {
type Target = String;
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
Expand Down Expand Up @@ -142,36 +143,38 @@ fn default_keepalive_interval() -> u64 {
DEFAULT_KEEPALIVE_INTERVAL
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct TransportConfig {
#[serde(rename = "type")]
pub transport_type: TransportType,
pub struct TcpConfig {
#[serde(default = "default_nodelay")]
pub nodelay: bool,
#[serde(default = "default_keepalive_secs")]
pub keepalive_secs: u64,
#[serde(default = "default_keepalive_interval")]
pub keepalive_interval: u64,
pub tls: Option<TlsConfig>,
pub noise: Option<NoiseConfig>,
pub proxy: Option<Url>,
}

impl Default for TransportConfig {
fn default() -> TransportConfig {
TransportConfig {
transport_type: Default::default(),
impl Default for TcpConfig {
fn default() -> Self {
Self {
nodelay: default_nodelay(),
keepalive_secs: default_keepalive_secs(),
keepalive_interval: default_keepalive_interval(),
tls: None,
noise: None,
proxy: None,
}
}
}

fn default_transport() -> TransportConfig {
Default::default()
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Default)]
#[serde(deny_unknown_fields)]
pub struct TransportConfig {
#[serde(rename = "type")]
pub transport_type: TransportType,
#[serde(default)]
pub tcp: TcpConfig,
pub tls: Option<TlsConfig>,
pub noise: Option<NoiseConfig>,
}

#[derive(Debug, Serialize, Deserialize, Default, PartialEq, Clone)]
Expand All @@ -180,7 +183,7 @@ pub struct ClientConfig {
pub remote_addr: String,
pub default_token: Option<MaskedString>,
pub services: HashMap<String, ClientServiceConfig>,
#[serde(default = "default_transport")]
#[serde(default)]
pub transport: TransportConfig,
}

Expand All @@ -190,7 +193,7 @@ pub struct ServerConfig {
pub bind_addr: String,
pub default_token: Option<MaskedString>,
pub services: HashMap<String, ServerServiceConfig>,
#[serde(default = "default_transport")]
#[serde(default)]
pub transport: TransportConfig,
}

Expand Down Expand Up @@ -255,6 +258,15 @@ impl Config {
}

fn validate_transport_config(config: &TransportConfig, is_server: bool) -> Result<()> {
config
.tcp
.proxy
.as_ref()
.map_or(Ok(()), |u| match u.scheme() {
"socks5" => Ok(()),
"http" => Ok(()),
_ => Err(anyhow!(format!("Unknown proxy scheme: {}", u.scheme()))),
})?;
match config.transport_type {
TransportType::Tcp => Ok(()),
TransportType::Tls => {
Expand Down
65 changes: 61 additions & 4 deletions src/helper.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, Result};
use async_http_proxy::{http_connect_tokio, http_connect_tokio_with_basic_auth};
use backoff::{backoff::Backoff, Notify};
use socket2::{SockRef, TcpKeepalive};
use std::{future::Future, net::SocketAddr, time::Duration};
Expand All @@ -7,6 +8,7 @@ use tokio::{
sync::broadcast,
};
use tracing::trace;
use url::Url;

// Tokio hesitates to expose this option...So we have to do it on our own :(
// The good news is that using socket2 it can be easily done, without losing portability.
Expand Down Expand Up @@ -38,12 +40,21 @@ pub fn feature_not_compile(feature: &str) -> ! {
)
}

/// Create a UDP socket and connect to `addr`
pub async fn udp_connect<A: ToSocketAddrs>(addr: A) -> Result<UdpSocket> {
let addr = lookup_host(addr)
async fn to_socket_addr<A: ToSocketAddrs>(addr: A) -> Result<SocketAddr> {
lookup_host(addr)
.await?
.next()
.ok_or(anyhow!("Failed to lookup the host"))?;
.ok_or(anyhow!("Failed to lookup the host"))
}

pub fn host_port_pair(s: &str) -> Result<(&str, u16)> {
let semi = s.rfind(':').expect("missing semicolon");
Ok((&s[..semi], s[semi + 1..].parse()?))
}

/// Create a UDP socket and connect to `addr`
pub async fn udp_connect<A: ToSocketAddrs>(addr: A) -> Result<UdpSocket> {
let addr = to_socket_addr(addr).await?;

let bind_addr = match addr {
SocketAddr::V4(_) => "0.0.0.0:0",
Expand All @@ -55,6 +66,52 @@ pub async fn udp_connect<A: ToSocketAddrs>(addr: A) -> Result<UdpSocket> {
Ok(s)
}

/// Create a TcpStream using a proxy
/// e.g. socks5://user:pass@127.0.0.1:1080 http://127.0.0.1:8080
pub async fn tcp_connect_with_proxy(addr: &str, proxy: Option<&Url>) -> Result<TcpStream> {
if let Some(url) = proxy {
let mut s = TcpStream::connect((
url.host_str().expect("proxy url should have host field"),
url.port().expect("proxy url should have port field"),
))
.await?;

let auth = if !url.username().is_empty() || url.password().is_some() {
Some(async_socks5::Auth {
username: url.username().into(),
password: url.password().unwrap_or("").into(),
})
} else {
None
};
match url.scheme() {
"socks5" => {
async_socks5::connect(&mut s, host_port_pair(addr)?, auth).await?;
}
"http" => {
let (host, port) = host_port_pair(addr)?;
match auth {
Some(auth) => {
http_connect_tokio_with_basic_auth(
&mut s,
host,
port,
&auth.username,
&auth.password,
)
.await?
}
None => http_connect_tokio(&mut s, host, port).await?,
}
}
_ => panic!("unknown proxy scheme"),
}
Ok(s)
} else {
Ok(TcpStream::connect(addr).await?)
}
}

// Wrapper of retry_notify
pub async fn retry_notify_with_deadline<I, E, Fn, Fut, B, N>(
backoff: B,
Expand Down
5 changes: 3 additions & 2 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::{ClientServiceConfig, ServerServiceConfig, TransportConfig};
use crate::config::{ClientServiceConfig, ServerServiceConfig, TcpConfig, TransportConfig};
use crate::helper::try_set_tcp_keepalive;
use anyhow::{Context, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -27,6 +27,7 @@ pub trait Transport: Debug + Send + Sync {
/// Provide the transport with socket options, which can be handled at the need of the transport
fn hint(conn: &Self::Stream, opts: SocketOpts);
async fn bind<T: ToSocketAddrs + Send + Sync>(&self, addr: T) -> Result<Self::Acceptor>;
/// accept must be cancel safe
async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)>;
async fn handshake(&self, conn: Self::RawStream) -> Result<Self::Stream>;
async fn connect(&self, addr: &str) -> Result<Self::Stream>;
Expand Down Expand Up @@ -78,7 +79,7 @@ impl SocketOpts {
}

impl SocketOpts {
pub fn from_transport_cfg(cfg: &TransportConfig) -> SocketOpts {
pub fn from_cfg(cfg: &TcpConfig) -> SocketOpts {
SocketOpts {
nodelay: Some(cfg.nodelay),
keepalive: Some(Keepalive {
Expand Down
11 changes: 8 additions & 3 deletions src/transport/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::config::TransportConfig;
use crate::{
config::{TcpConfig, TransportConfig},
helper::tcp_connect_with_proxy,
};

use super::{SocketOpts, Transport};
use anyhow::Result;
Expand All @@ -9,6 +12,7 @@ use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
#[derive(Debug)]
pub struct TcpTransport {
socket_opts: SocketOpts,
cfg: TcpConfig,
}

#[async_trait]
Expand All @@ -19,7 +23,8 @@ impl Transport for TcpTransport {

fn new(config: &TransportConfig) -> Result<Self> {
Ok(TcpTransport {
socket_opts: SocketOpts::from_transport_cfg(config),
socket_opts: SocketOpts::from_cfg(&config.tcp),
cfg: config.tcp.clone(),
})
}

Expand All @@ -42,7 +47,7 @@ impl Transport for TcpTransport {
}

async fn connect(&self, addr: &str) -> Result<Self::Stream> {
let s = TcpStream::connect(addr).await?;
let s = tcp_connect_with_proxy(addr, self.cfg.proxy.as_ref()).await?;
self.socket_opts.apply(&s);
Ok(s)
}
Expand Down
5 changes: 3 additions & 2 deletions src/transport/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::net::SocketAddr;

use super::{SocketOpts, TcpTransport, Transport};
use crate::config::{TlsConfig, TransportConfig};
use crate::helper::host_port_pair;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use std::fs;
Expand Down Expand Up @@ -94,8 +95,8 @@ impl Transport for TlsTransport {
.connect(
self.config
.hostname
.as_ref()
.unwrap_or(&String::from(addr.split(':').next().unwrap())),
.as_deref()
.unwrap_or(host_port_pair(addr)?.0),
conn,
)
.await?)
Expand Down

0 comments on commit 1ef7747

Please sign in to comment.