Skip to content

Commit

Permalink
WIP: Remote port forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
aramperes committed Dec 24, 2023
1 parent 56c950d commit 4abb4f9
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 10 deletions.
1 change: 0 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ const DEFAULT_PORT_FORWARD_SOURCE: &str = "127.0.0.1";
#[derive(Clone)]
pub struct Config {
pub port_forwards: Vec<PortForwardConfig>,
#[allow(dead_code)]
pub remote_port_forwards: Vec<PortForwardConfig>,
pub private_key: Arc<StaticSecret>,
pub endpoint_public_key: Arc<PublicKey>,
Expand Down
35 changes: 34 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ pub async fn start_tunnels(config: Config, bus: Bus) -> anyhow::Result<()> {
.port_forwards
.iter()
.any(|pf| pf.protocol == PortProtocol::Udp)
|| config
.remote_port_forwards
.iter()
.any(|pf| pf.protocol == PortProtocol::Udp)
{
// UDP device
let bus = bus.clone();
Expand All @@ -90,7 +94,13 @@ pub async fn start_tunnels(config: Config, bus: Bus) -> anyhow::Result<()> {

// Start UDP Virtual Interface
let port_forwards = config.port_forwards.clone();
let iface = UdpVirtualInterface::new(port_forwards, bus, config.source_peer_ip);
let remote_port_forwards = config.remote_port_forwards.clone();
let iface = UdpVirtualInterface::new(
port_forwards,
remote_port_forwards,
bus,
config.source_peer_ip,
);
tokio::spawn(async move { iface.poll_loop(device).await });
}

Expand Down Expand Up @@ -118,5 +128,28 @@ pub async fn start_tunnels(config: Config, bus: Bus) -> anyhow::Result<()> {
});
}

{
let remote_port_forwards = config.remote_port_forwards;

remote_port_forwards
.into_iter()
.map(|pf| {
(
pf,
wg.clone(),
tcp_port_pool.clone(),
udp_port_pool.clone(),
bus.clone(),
)
})
.for_each(move |(pf, wg, tcp_port_pool, udp_port_pool, bus)| {
tokio::spawn(async move {
tunnel::remote_port_forward(pf, tcp_port_pool, udp_port_pool, wg, bus)
.await
.unwrap_or_else(|e| error!("Remote port-forward failed for {} : {}", pf, e))
});
});
}

Ok(())
}
18 changes: 18 additions & 0 deletions src/tunnel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,21 @@ pub async fn port_forward(
PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await,
}
}

pub async fn remote_port_forward(
port_forward: PortForwardConfig,
_tcp_port_pool: TcpPortPool,
udp_port_pool: UdpPortPool,
wg: Arc<WireGuardTunnel>,
bus: Bus,
) -> anyhow::Result<()> {
info!(
"Remote Tunneling {} [{}]<-[{}] (via [{}])",
port_forward.protocol, port_forward.destination, port_forward.source, &wg.endpoint,
);

match port_forward.protocol {
PortProtocol::Tcp => Ok(()), // TODO: Remote TCP forwarding
PortProtocol::Udp => udp::udp_proxy_server(port_forward, udp_port_pool, bus).await,
}
}
26 changes: 25 additions & 1 deletion src/tunnel/udp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::{HashMap, VecDeque};
use std::net::{IpAddr, SocketAddr};
use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;

Expand Down Expand Up @@ -35,7 +36,22 @@ pub async fn udp_proxy_server(
bus: Bus,
) -> anyhow::Result<()> {
let mut endpoint = bus.new_endpoint();
let socket = UdpSocket::bind(port_forward.source)

// Remote port forwards bind on localhost. Regular port forwards bind on the given source.
let bind = if port_forward.remote {
port_pool
.reserve(port_forward.source.port(), port_forward.destination)
.await
.with_context(|| "Failed to assign virtual port for remote UDP port forward")?;
match port_forward.source.ip() {
IpAddr::V4(_) => SocketAddr::from((IpAddr::from_str("0.0.0.0").unwrap(), 0)),
IpAddr::V6(_) => SocketAddr::from((IpAddr::from_str("[::]").unwrap(), 0)),
}
} else {
port_forward.source
};

let socket = UdpSocket::bind(bind)
.await
.context("Failed to bind on UDP proxy address")?;

Expand Down Expand Up @@ -156,6 +172,14 @@ impl UdpPortPool {
}
}

/// Takes the given port out of the pool, marking it with the given peer address, for an unlimited amount of time.
pub async fn reserve(&self, port: u16, peer_addr: SocketAddr) -> anyhow::Result<VirtualPort> {
let mut inner = self.inner.write().await;
inner.port_by_peer_addr.insert(peer_addr, port);
inner.peer_addr_by_port.insert(port, peer_addr);
Ok(VirtualPort::new(port, PortProtocol::Udp))
}

/// Requests a free port from the pool. An error is returned if none is available (exhausted max capacity).
pub async fn next(&self, peer_addr: SocketAddr) -> anyhow::Result<VirtualPort> {
// A port found to be reused. This is outside of the block because the read lock cannot be upgraded to a write lock.
Expand Down
46 changes: 39 additions & 7 deletions src/virtual_iface/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,29 @@ const MAX_PACKET: usize = 65536;
pub struct UdpVirtualInterface {
source_peer_ip: IpAddr,
port_forwards: Vec<PortForwardConfig>,
remote_port_forwards: Vec<PortForwardConfig>,
bus: Bus,
sockets: SocketSet<'static>,
}

impl UdpVirtualInterface {
/// Initialize the parameters for a new virtual interface.
/// Use the `poll_loop()` future to start the virtual interface poll loop.
pub fn new(port_forwards: Vec<PortForwardConfig>, bus: Bus, source_peer_ip: IpAddr) -> Self {
pub fn new(
port_forwards: Vec<PortForwardConfig>,
remote_port_forwards: Vec<PortForwardConfig>,
bus: Bus,
source_peer_ip: IpAddr,
) -> Self {
Self {
port_forwards: port_forwards
.into_iter()
.filter(|f| matches!(f.protocol, PortProtocol::Udp))
.collect(),
remote_port_forwards: remote_port_forwards
.into_iter()
.filter(|f| matches!(f.protocol, PortProtocol::Udp))
.collect(),
source_peer_ip,
bus,
sockets: SocketSet::new([]),
Expand Down Expand Up @@ -129,11 +139,30 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
let mut send_queue: HashMap<VirtualPort, VecDeque<(PortForwardConfig, Bytes)>> =
HashMap::new();

// Create sockets for remote port forwards
for remote_port_forward in self.remote_port_forwards.iter() {
let virtual_port =
VirtualPort::new(remote_port_forward.source.port(), PortProtocol::Udp);
let client_socket = UdpVirtualInterface::new_client_socket(
remote_port_forward.source.ip(),
virtual_port,
)?;
debug!(
"Created remote client socket: {:?}",
client_socket.endpoint()
);
let client_handle = self.sockets.add(client_socket);
port_client_handle_map.insert(virtual_port, client_handle);
send_queue.insert(virtual_port, VecDeque::new());
}

let mut wake = false;

loop {
tokio::select! {
_ = match (next_poll, port_client_handle_map.len()) {
(None, 0) => tokio::time::sleep(Duration::MAX),
(None, _) => tokio::time::sleep(Duration::ZERO),
_ = match (next_poll, wake) {
(None, false) => tokio::time::sleep(Duration::MAX),
(None, true) => tokio::time::sleep(Duration::ZERO),
(Some(until), _) => tokio::time::sleep_until(until),
} => {
let loop_start = smoltcp::time::Instant::now();
Expand All @@ -147,11 +176,11 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
if client_socket.can_send() {
if let Some(send_queue) = send_queue.get_mut(virtual_port) {
let to_transfer = send_queue.pop_front();
if let Some((port_forward, data)) = to_transfer {
if let Some((destination, data)) = to_transfer {
client_socket
.send_slice(
&data,
UdpMetadata::from(port_forward.destination),
UdpMetadata::from(destination.destination),
)
.unwrap_or_else(|e| {
error!(
Expand All @@ -164,8 +193,9 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
}
if client_socket.can_recv() {
match client_socket.recv() {
Ok((data, _peer)) => {
Ok((data, peer)) => {
if !data.is_empty() {
trace!("notifying remote data from peer: {}", peer);
endpoint.send(Event::RemoteData(*virtual_port, data.to_vec().into()));
}
}
Expand Down Expand Up @@ -204,9 +234,11 @@ impl VirtualInterfacePoll for UdpVirtualInterface {
send_queue.insert(virtual_port, VecDeque::from(vec![(port_forward, data)]));
}
next_poll = None;
wake = true;
}
Event::VirtualDeviceFed(PortProtocol::Udp) => {
next_poll = None;
wake = true;
}
_ => {}
}
Expand Down

0 comments on commit 4abb4f9

Please sign in to comment.