Skip to content

Commit

Permalink
Use UDP for vpn packets (#1802)
Browse files Browse the repository at this point in the history
Use UDP for VPN packets

Co-authored-by: mfranciszkiewicz <marek@golem.network>
  • Loading branch information
jiivan and mfranciszkiewicz authored Feb 1, 2022
1 parent 7b8449e commit b14b587
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 59 deletions.
1 change: 1 addition & 0 deletions core/model/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use ya_client_model::NodeId;
use ya_service_bus::typed as bus;

pub const BUS_ID: &str = "/net";
pub const BUS_ID_UDP: &str = "/u/net";

// TODO: replace with dedicated endpoint/service descriptor with enum for visibility
pub const PUBLIC_PREFIX: &str = "/public";
Expand Down
116 changes: 63 additions & 53 deletions core/net/src/hybrid/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use anyhow::{anyhow, Context as AnyhowContext};
use futures::channel::mpsc;
use futures::stream::LocalBoxStream;
use futures::{FutureExt, SinkExt, Stream, StreamExt, TryStreamExt};
use tokio::time::{self};
use tokio::time;
use url::Url;

use ya_core_model::net::{self, net_service};
Expand Down Expand Up @@ -121,36 +121,41 @@ pub async fn start_network(
BCAST_SENDER.lock().unwrap().replace(btx);

let receiver = client.forward_receiver().await.unwrap();
let services = ids.iter().map(|id| net_service(id)).collect();
let mut services: HashSet<_> = Default::default();
ids.iter().for_each(|id| {
let service = net_service(id);
services.insert(format!("/u{}", service));
services.insert(service);
});
let state = State::new(ids, services);

// outbound traffic
let state_ = state.clone();
bind_local_bus(net::BUS_ID, state.clone(), move |_, addr| {
let from_node = default_id.clone();
let (to_node, addr) = match parse_net_to_addr(addr) {
Ok(id) => id,
Err(err) => anyhow::bail!("invalid address: {}", err),
};

if !state_.inner.borrow().ids.contains(&from_node) {
anyhow::bail!("unknown identity: {:?}", from_node);
let net_handler = || {
let default_id = default_id.clone();
move |_: &str, addr: &str| {
let from = default_id.clone();
match parse_net_to_addr(addr) {
Ok((to, addr)) => Ok((from, to, addr)),
Err(err) => anyhow::bail!("invalid address: {}", err),
}
}
Ok((from_node, to_node, addr))
});

let state_ = state.clone();
bind_local_bus("/from", state.clone(), move |_, addr| {
let (from_node, to_node, addr) = match parse_from_to_addr(addr) {
Ok(tup) => tup,
Err(err) => anyhow::bail!("invalid address: {}", err),
};

if !state_.inner.borrow().ids.contains(&from_node) {
anyhow::bail!("unknown identity: {:?}", from_node);
};
bind_local_bus(net::BUS_ID, state.clone(), true, net_handler());
bind_local_bus(net::BUS_ID_UDP, state.clone(), false, net_handler());

let from_handler = || {
let state_from = state.clone();
move |_: &str, addr: &str| {
let (from, to, addr) =
parse_from_to_addr(addr).map_err(|e| anyhow::anyhow!("invalid address: {}", e))?;
if !state_from.inner.borrow().ids.contains(&from) {
anyhow::bail!("unknown identity: {:?}", from);
}
Ok((from, to, addr))
}
Ok((from_node, to_node, addr))
});
};
bind_local_bus("/from", state.clone(), true, from_handler());
bind_local_bus("/u/from", state.clone(), false, from_handler());

tokio::task::spawn_local(broadcast_handler(brx));
tokio::task::spawn_local(forward_handler(receiver, state.clone()));
Expand All @@ -175,7 +180,7 @@ pub async fn start_network(
Ok(())
}

fn bind_local_bus<F>(address: &'static str, state: State, resolver: F)
fn bind_local_bus<F>(address: &'static str, state: State, reliable: bool, resolver: F)
where
F: Fn(&str, &str) -> anyhow::Result<(NodeId, NodeId, String)> + 'static,
{
Expand Down Expand Up @@ -206,7 +211,7 @@ where
forward_bus_to_local(&caller_id.to_string(), addr, msg, &state_, tx);
rx
} else {
forward_bus_to_net(caller_id, remote_id, address, msg, &state_)
forward_bus_to_net(caller_id, remote_id, address, msg, &state_, reliable)
};

async move {
Expand Down Expand Up @@ -248,7 +253,7 @@ where
forward_bus_to_local(&caller_id.to_string(), addr, msg, &state, tx);
rx
} else {
forward_bus_to_net(caller_id, remote_id, address, msg, &state)
forward_bus_to_net(caller_id, remote_id, address, msg, &state, reliable)
};

let eos = Rc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -311,6 +316,7 @@ fn forward_bus_to_net(
address: impl ToString,
msg: &[u8],
state: &State,
reliable: bool,
) -> BusReceiver {
let address = address.to_string();
let state = state.clone();
Expand Down Expand Up @@ -346,7 +352,7 @@ fn forward_bus_to_net(
msg.len()
);

match state.forward_sink(remote_id, true).await {
match state.forward_sink(remote_id, reliable).await {
Ok(mut session) => {
let _ = session.send(msg).await.map_err(|_| {
let err = format!("error sending message: session closed");
Expand Down Expand Up @@ -762,36 +768,40 @@ fn chunk_err(request_id: impl ToString, err: impl ToString) -> Result<ResponseCh
}

fn parse_net_to_addr(addr: &str) -> anyhow::Result<(NodeId, String)> {
let mut it = addr.split("/").fuse();
if let (Some(""), Some("net"), Some(to_node_id)) = (it.next(), it.next(), it.next()) {
let to_id = to_node_id.parse::<NodeId>()?;
const ADDR_CONST: usize = 6;

let prefix = 6 + to_node_id.len();
let service_id = &addr[prefix..];
let mut it = addr.split("/").fuse().skip(1).peekable();
let (prefix, to) = match (it.next(), it.next(), it.next()) {
(Some("net"), Some(to), Some(_)) => ("", to),
(Some("u"), Some("net"), Some(to)) if it.peek().is_some() => ("/u", to),
_ => anyhow::bail!("invalid net-to destination: {}", addr),
};

if let Some(_) = it.next() {
return Ok((to_id, net_service(format!("{}/{}", to_node_id, service_id))));
}
}
anyhow::bail!("invalid net-to destination: {}", addr)
let to_id = to.parse::<NodeId>()?;
let skip = prefix.len() + ADDR_CONST + to.len();
let addr = net_service(format!("{}/{}", to, &addr[skip..]));

Ok((to_id, format!("{}{}", prefix, addr)))
}

fn parse_from_to_addr(addr: &str) -> anyhow::Result<(NodeId, NodeId, String)> {
let mut it = addr.split("/").fuse();
if let (Some(""), Some("from"), Some(from_node_id), Some("to"), Some(to_node_id)) =
(it.next(), it.next(), it.next(), it.next(), it.next())
{
let from_id = from_node_id.parse::<NodeId>()?;
let to_id = to_node_id.parse::<NodeId>()?;

let prefix = 10 + from_node_id.len();
let service_id = &addr[prefix..];
const ADDR_CONST: usize = 10;

if let Some(_) = it.next() {
return Ok((from_id, to_id, net_service(service_id)));
let mut it = addr.split("/").fuse().skip(1).peekable();
let (prefix, from, to) = match (it.next(), it.next(), it.next(), it.next(), it.next()) {
(Some("from"), Some(from), Some("to"), Some(to), Some(_)) => ("", from, to),
(Some("u"), Some("from"), Some(from), Some("to"), Some(to)) if it.peek().is_some() => {
("/u", from, to)
}
}
anyhow::bail!("invalid net-from-to destination: {}", addr)
_ => anyhow::bail!("invalid net-from-to destination: {}", addr),
};

let from_id = from.parse::<NodeId>()?;
let to_id = to.parse::<NodeId>()?;
let skip = prefix.len() + ADDR_CONST + from.len();
let addr = net_service(&addr[skip..]);

Ok((from_id, to_id, format!("{}{}", prefix, addr)))
}

fn gen_id() -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion core/vpn/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ fn gsb_local_url(net_id: &str) -> String {
}

fn gsb_remote_url(node_id: &str, net_id: &str) -> Endpoint {
typed::service(format!("/net/{}/vpn/{}", node_id, net_id))
typed::service(format!("/u/net/{}/vpn/{}", node_id, net_id))
}

trait ArbiterExt {
Expand Down
13 changes: 9 additions & 4 deletions exe-unit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,13 @@ impl<T> Default for Channel<T> {
}
}

pub(crate) async fn report<M: RpcMessage + Unpin + 'static>(url: String, msg: M) -> bool {
match ya_service_bus::typed::service(&url).send(msg).await {
pub(crate) async fn report<S, M>(url: S, msg: M) -> bool
where
M: RpcMessage + Unpin + 'static,
S: AsRef<str>,
{
let url = url.as_ref();
match ya_service_bus::typed::service(url).send(msg).await {
Err(ya_service_bus::Error::Timeout(msg)) => {
log::warn!("Timed out reporting to {}: {}", url, msg);
true
Expand Down Expand Up @@ -490,9 +495,9 @@ async fn report_usage<R: Runtime>(
},
timeout: None,
};
if !report(report_url, msg).await {
if !report(&report_url, msg).await {
exe_unit.do_send(Shutdown(ShutdownReason::Error(error::Error::RuntimeError(
"Reporting endpoint is not available".to_string(),
format!("Reporting endpoint '{}' is not available", report_url),
))));
}
}
Expand Down
2 changes: 1 addition & 1 deletion exe-unit/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ fn write_prefix(dst: &mut Vec<u8>) {
}

fn gsb_endpoint(node_id: &str, net_id: &str) -> GsbEndpoint {
typed::service(format!("/net/{}/vpn/{}", node_id, net_id))
typed::service(format!("/u/net/{}/vpn/{}", node_id, net_id))
}

#[cfg(test)]
Expand Down

0 comments on commit b14b587

Please sign in to comment.