diff --git a/Cargo.lock b/Cargo.lock index 537863b44b..8ea8d8ef51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8059,6 +8059,7 @@ dependencies = [ "tokio 1.32.0", "tokio-stream", "tokio-util 0.7.8", + "trust-dns-resolver", "url", "winapi 0.3.9", "ya-agreement-utils 0.4.1", diff --git a/exe-unit/Cargo.toml b/exe-unit/Cargo.toml index 78dd11810f..07c3240eed 100644 --- a/exe-unit/Cargo.toml +++ b/exe-unit/Cargo.toml @@ -77,6 +77,7 @@ tokio-util = { version = "0.7.2", features = ["codec", "net"] } tokio-stream = "0.1.6" url = "2.1" yansi = "0.5.0" +trust-dns-resolver = { workspace = true } [dev-dependencies] ya-runtime-api = { version = "0.7", path = "runtime-api", features = ["codec", "server"] } diff --git a/exe-unit/src/dns.rs b/exe-unit/src/dns.rs new file mode 100644 index 0000000000..286f80abed --- /dev/null +++ b/exe-unit/src/dns.rs @@ -0,0 +1,113 @@ +use std::collections::HashSet; +use std::net::IpAddr; +use std::str::FromStr; +use trust_dns_resolver::config; +use trust_dns_resolver::TokioAsyncResolver; + +#[cfg(test)] +use std::time::Duration; + +#[derive(Clone)] +pub struct StableResolver { + stable_dns: IpAddr, + resolver: TokioAsyncResolver, +} + +impl StableResolver { + pub async fn ips(&self, host_name: &str) -> anyhow::Result> { + if let Ok(ip_addr) = IpAddr::from_str(host_name) { + return Ok(HashSet::from([ip_addr])); + } + log::debug!("Resolving IP addresses of '{}'", host_name); + + let response = self.resolver.lookup_ip(host_name).await?; + + Ok(response.into_iter().collect()) + } + + pub fn stable_dns(&self) -> IpAddr { + self.stable_dns + } + + #[cfg(test)] + fn clear_cache(&self) { + self.resolver.clear_cache(); + } +} + +#[cfg(test)] +async fn google_resolver() -> anyhow::Result { + let mut options: config::ResolverOpts = Default::default(); + options.use_hosts_file = false; + options.cache_size = 0; + let config = config::ResolverConfig::default(); + let resolver = TokioAsyncResolver::tokio(config, options)?; + let stable_dns = config::GOOGLE_IPS[0]; + + Ok(StableResolver { + stable_dns, + resolver, + }) +} + +pub async fn resolver() -> anyhow::Result { + let default_resolver = TokioAsyncResolver::tokio(Default::default(), Default::default())?; + let response = default_resolver + .lookup_ip("stable-dns.dev.golem.network") + .await?; + let stable_dns = response.into_iter().next().unwrap_or(config::GOOGLE_IPS[0]); + let mut config = config::ResolverConfig::new(); + config.add_name_server(config::NameServerConfig::new( + (stable_dns, 53).into(), + trust_dns_resolver::config::Protocol::Udp, + )); + let resolver = TokioAsyncResolver::tokio(config, Default::default())?; + Ok(StableResolver { + stable_dns, + resolver, + }) +} + +pub const DNS_PORT: u16 = 53; + +pub fn dns_servers() -> impl Iterator { + use trust_dns_resolver::config::*; + + GOOGLE_IPS + .iter() + .cloned() + .chain(CLOUDFLARE_IPS.iter().cloned()) + .chain(QUAD9_IPS.iter().cloned()) +} + +// Do not use it in CI +#[cfg(test)] +#[ignore] +#[actix_rt::test] +async fn test_resolver() { + let name = "accounts.google.com"; + let r = resolver().await.unwrap(); + let ac = r.ips(name).await.unwrap(); + for _i in 1..5 { + actix_rt::time::sleep(Duration::from_secs(30)).await; + let ac2 = r.ips(name).await.unwrap(); + assert_eq!(ac, ac2); + } +} + +// Do not use it in CI +#[cfg(test)] +#[ignore] +#[should_panic] +#[actix_rt::test] +async fn test_fail_resolver() { + let name = "accounts.google.com"; + let r = google_resolver().await.unwrap(); + let ac = r.ips(name).await.unwrap(); + for _i in 1..5 { + actix_rt::time::sleep(Duration::from_secs(15)).await; + r.clear_cache(); + let ac2 = r.ips(name).await.unwrap(); + assert_eq!(ac, ac2); + } +} diff --git a/exe-unit/src/lib.rs b/exe-unit/src/lib.rs index 01814f8c49..16a563ced3 100644 --- a/exe-unit/src/lib.rs +++ b/exe-unit/src/lib.rs @@ -47,6 +47,7 @@ pub mod service; pub mod state; pub mod util; +mod dns; pub type Result = std::result::Result; lazy_static::lazy_static! { diff --git a/exe-unit/src/manifest.rs b/exe-unit/src/manifest.rs index 5e48cbc18a..15aa919e49 100644 --- a/exe-unit/src/manifest.rs +++ b/exe-unit/src/manifest.rs @@ -1,22 +1,21 @@ use std::any::Any; use std::collections::{HashMap, HashSet}; -use std::net::{IpAddr, Ipv4Addr}; +use std::net::IpAddr; use std::ops::Not; use std::str::FromStr; use std::sync::{Arc, RwLock}; use anyhow::Context; -use futures::future::LocalBoxFuture; -use futures::{FutureExt, StreamExt, TryStreamExt}; +use futures::prelude::*; use serde_json::{Map, Value}; use structopt::StructOpt; use url::Url; +use crate::dns::{StableResolver, DNS_PORT}; use ya_agreement_utils::AgreementView; use ya_client_model::activity::ExeScriptCommand; use ya_manifest_utils::{read_manifest, AppManifest, ArgMatch, Command, Feature, Script}; use ya_manifest_utils::{Policy, PolicyConfig}; -use ya_utils_networking::resolver::resolve_domain_name; use ya_utils_networking::vpn::Protocol; type ValidatorMap = HashMap>; @@ -67,7 +66,7 @@ impl ManifestContext { .and_then(|m| m.find_payload(std::env::consts::ARCH, std::env::consts::OS)) } - pub fn build_validators<'a>(&self) -> LocalBoxFuture<'a, anyhow::Result> { + pub fn build_validators<'a>(&self) -> future::LocalBoxFuture<'a, anyhow::Result> { if self.manifest.is_none() || self .policy @@ -151,7 +150,7 @@ pub trait ManifestValidator: Clone + Sized { fn build<'a>( manifest: &AppManifest, policy: &PolicyConfig, - ) -> LocalBoxFuture<'a, anyhow::Result>>; + ) -> future::LocalBoxFuture<'a, anyhow::Result>>; } pub trait ManifestValidatorExt: Sized { @@ -196,7 +195,7 @@ impl ManifestValidator for ScriptValidator { fn build<'a>( manifest: &AppManifest, policy: &PolicyConfig, - ) -> LocalBoxFuture<'a, anyhow::Result>> { + ) -> future::LocalBoxFuture<'a, anyhow::Result>> { if policy .policy_set() .contains(&Policy::ManifestScriptCompliance) @@ -389,6 +388,7 @@ impl FromStr for ScriptValidator { #[derive(Clone)] pub struct UrlValidator { inner: Arc, + resolver: Option>, } enum AllowedAccess { @@ -402,7 +402,7 @@ impl ManifestValidator for UrlValidator { fn build<'a>( manifest: &AppManifest, policy: &PolicyConfig, - ) -> LocalBoxFuture<'a, anyhow::Result>> { + ) -> future::LocalBoxFuture<'a, anyhow::Result>> { if policy .policy_set() .contains(&Policy::ManifestInetUrlCompliance) @@ -417,21 +417,25 @@ impl ManifestValidator for UrlValidator { if let Some(access) = access { match access { ya_manifest_utils::OutboundAccess::Urls(urls) => { - let mut set = Self::DEFAULT_ADDRESSES - .iter() - .map(|(proto, ip, port)| (*proto, IpAddr::from(*ip), *port)) + let resolver = crate::dns::resolver().await?; + + // by default we whitelist well known dns servers. + let mut set = crate::dns::dns_servers() + .map(|ip| (Protocol::Udp, ip, DNS_PORT)) .collect::>(); - let ips = resolve_ips(urls.iter()).await?; + let ips = resolve_ips(&resolver, urls.iter()).await?; set.extend(ips.into_iter()); Ok(Some(Self { inner: Arc::new(AllowedAccess::Urls(set)), + resolver: Some(Arc::new(resolver)), })) } ya_manifest_utils::OutboundAccess::Unrestricted => Ok(Some(Self { inner: Arc::new(AllowedAccess::Unrestricted), + resolver: None, })), } } else { @@ -443,15 +447,6 @@ impl ManifestValidator for UrlValidator { } impl UrlValidator { - const DEFAULT_ADDRESSES: [(Protocol, Ipv4Addr, u16); 6] = [ - (Protocol::Udp, Ipv4Addr::new(1, 0, 0, 1), 53), - (Protocol::Udp, Ipv4Addr::new(1, 1, 1, 1), 53), - (Protocol::Udp, Ipv4Addr::new(8, 8, 4, 4), 53), - (Protocol::Udp, Ipv4Addr::new(8, 8, 8, 8), 53), - (Protocol::Udp, Ipv4Addr::new(9, 9, 9, 9), 53), - (Protocol::Udp, Ipv4Addr::new(149, 112, 112, 112), 53), - ]; - pub fn validate(&self, proto: Protocol, ip: IpAddr, port: u16) -> Result<(), ValidationError> { match self.inner.as_ref() { AllowedAccess::Urls(urls) => urls @@ -466,9 +461,14 @@ impl UrlValidator { AllowedAccess::Unrestricted => Ok(()), } } + + pub fn stable_dns(&self) -> Option { + self.resolver.as_ref().map(|r| r.stable_dns()) + } } async fn resolve_ips<'a>( + resolver: &StableResolver, urls: impl Iterator, ) -> anyhow::Result> { futures::stream::iter(urls) @@ -485,14 +485,7 @@ async fn resolve_ips<'a>( .host_str() .ok_or_else(|| anyhow::anyhow!("invalid url: {}", url))?; - let ips: HashSet = match IpAddr::from_str(host) { - Ok(ip) => [ip].into(), - Err(_) => { - log::debug!("Resolving IP addresses of '{}'", host); - resolve_domain_name(host).await? - } - }; - + let ips: HashSet = resolver.ips(host).await?; set.extend(ips.into_iter().map(|ip| (protocol, ip, port))); Ok(set) }) diff --git a/exe-unit/src/network/inet.rs b/exe-unit/src/network/inet.rs index dbc7594aef..0de3417d59 100644 --- a/exe-unit/src/network/inet.rs +++ b/exe-unit/src/network/inet.rs @@ -44,10 +44,11 @@ use ya_utils_networking::vpn::{ EtherFrame, EtherType, IpPacket, PeekPacket, SocketEndpoint, TcpPacket, UdpPacket, }; +use crate::dns::DNS_PORT; use crate::manifest::UrlValidator; use crate::message::Shutdown; use crate::network::Endpoint; -use crate::{Error, Result}; +use crate::{dns, Error, Result}; // 10.0.0.0/8 is a reserved private address space const IP4_ADDRESS: Ipv4Addr = Ipv4Addr::new(10, 42, 42, 1); @@ -608,7 +609,7 @@ impl Proxy { log::debug!("[inet] connect to {desc:?}, using handle: {handle}"); - let (ip, port) = ( + let (mut ip, port) = ( conv_ip_addr(meta.local.addr).map_err(|e| ProxyingError::routeable(conn, e))?, meta.local.port, ); @@ -632,6 +633,16 @@ impl Proxy { let proxy2 = proxy.clone(); let network2 = network.clone(); + + if let Some(stable_dns) = self.filter.as_ref().and_then(|f| f.stable_dns()) { + if port == DNS_PORT + && meta.protocol == Protocol::Udp + && dns::dns_servers().any(|dns_ip| ip == dns_ip) + { + ip = stable_dns; + } + } + tokio::task::spawn_local(async move { let maybe_tx_rx = match meta.protocol { Protocol::Tcp => inet_tcp_proxy(ip, port).await,