diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs b/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs index c4cdebd55..acb3ae9bc 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs @@ -32,6 +32,7 @@ use crate::{ AgentError, }; +/// pub const IC0_SEED_DOMAIN: &str = "ic0.app"; const MAINNET_ROOT_SUBNET_ID: &str = @@ -45,6 +46,7 @@ const HEALTH_CHECK_PERIOD: Duration = Duration::from_secs(1); const DYNAMIC_ROUTE_PROVIDER: &str = "DynamicRouteProvider"; +/// #[derive(Debug)] pub struct DynamicRouteProvider { fetcher: Arc, @@ -75,6 +77,7 @@ impl DynamicRouteProvider where S: RoutingSnapshot + 'static, { + /// pub fn new(snapshot: S, seeds: Vec, http_client: Client) -> Self { let fetcher = Arc::new(NodesFetcher::new( http_client.clone(), @@ -94,21 +97,25 @@ where } } + /// pub fn with_fetcher(mut self, fetcher: Arc) -> Self { self.fetcher = fetcher; self } + /// pub fn with_fetch_period(mut self, period: Duration) -> Self { self.fetch_period = period; self } + /// pub fn with_checker(mut self, checker: Arc) -> Self { self.checker = checker; self } + /// pub fn with_check_period(mut self, period: Duration) -> Self { self.check_period = period; self @@ -183,11 +190,11 @@ where ); (found_healthy_seeds) - .then(|| ()) - .ok_or_else(|| anyhow!("No healthy seeds found")) + .then_some(()) + .ok_or(anyhow!("No healthy seeds found")) } - // Kill all running tasks. + /// Kill all running tasks. pub async fn stop(&self) { self.token.cancel(); self.tracker.close(); @@ -517,7 +524,6 @@ mod tests { // Setup. setup_tracing(); let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); - let node_2 = Node::new("api1.com").unwrap(); // Set nodes fetching params: topology, fetching periodicity. let fetcher = Arc::new(NodesFetcherMock::new()); let fetch_interval = Duration::from_secs(2); diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs b/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs index a9cb7a971..9ce88b99f 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs @@ -21,33 +21,43 @@ use crate::agent::http_transport::dynamic_routing::{ const CHANNEL_BUFFER: usize = 128; +/// #[async_trait] pub trait HealthCheck: Send + Sync + Debug { + /// async fn check(&self, node: &Node) -> anyhow::Result; } +/// #[derive(Clone, PartialEq, Debug, Default)] pub struct HealthCheckStatus { + /// pub latency: Option, } +/// impl HealthCheckStatus { + /// pub fn new(latency: Option) -> Self { Self { latency } } + /// pub fn is_healthy(&self) -> bool { self.latency.is_some() } } +/// #[derive(Debug)] pub struct HealthChecker { http_client: Client, timeout: Duration, } +/// impl HealthChecker { + /// pub fn new(http_client: Client, timeout: Duration) -> Self { Self { http_client, @@ -133,8 +143,10 @@ impl HealthCheckActor { } } +/// pub const HEALTH_MANAGER_ACTOR: &str = "HealthManagerActor"; +/// pub struct HealthManagerActor { checker: Arc, period: Duration, @@ -153,6 +165,7 @@ impl HealthManagerActor where S: RoutingSnapshot, { + /// pub fn new( checker: Arc, period: Duration, @@ -178,6 +191,7 @@ where } } + /// pub async fn run(mut self) { loop { tokio::select! { diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs b/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs index 0e3d39350..90e25cee9 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs @@ -1,11 +1,16 @@ use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; +/// #[derive(Debug, Clone)] pub struct FetchedNodes { + /// pub nodes: Vec, } +/// pub struct NodeHealthState { + /// pub node: Node, + /// pub health: HealthCheckStatus, } diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs b/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs index 42dc4e082..d450ce678 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs @@ -1,9 +1,15 @@ pub mod dynamic_route_provider; +/// pub mod health_check; +/// pub mod messages; +/// pub mod node; +/// pub mod nodes_fetch; +/// pub mod snapshot; #[cfg(test)] pub mod test_utils; +/// pub mod type_aliases; diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/node.rs b/ic-agent/src/agent/http_transport/dynamic_routing/node.rs index 1cadc543f..d74a7cd09 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/node.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/node.rs @@ -3,12 +3,14 @@ use url::Url; use crate::agent::ApiBoundaryNode; use anyhow::anyhow; +/// #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Node { domain: String, } impl Node { + /// pub fn new(domain: &str) -> anyhow::Result { if !is_valid_domain(domain) { return Err(anyhow!("Invalid domain name {domain}")); @@ -18,12 +20,14 @@ impl Node { }) } + /// pub fn domain(&self) -> String { self.domain.clone() } } impl Node { + /// pub fn to_routing_url(&self) -> Url { Url::parse(&format!("https://{}/api/v2/", self.domain)).expect("failed to parse URL") } @@ -44,6 +48,7 @@ impl TryFrom<&ApiBoundaryNode> for Node { } } +/// pub fn is_valid_domain>(domain: S) -> bool { // Prepend scheme to make it a valid URL let url_string = format!("http://{}", domain.as_ref()); diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs b/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs index 0c04fc0e4..e10b5f55b 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs @@ -22,11 +22,14 @@ use crate::agent::{ const NODES_FETCH_ACTOR: &str = "NodesFetchActor"; +/// #[async_trait] pub trait Fetch: Sync + Send + Debug { + /// async fn fetch(&self, url: Url) -> anyhow::Result>; } +/// #[derive(Debug)] pub struct NodesFetcher { http_client: Client, @@ -34,6 +37,7 @@ pub struct NodesFetcher { } impl NodesFetcher { + /// pub fn new(http_client: Client, subnet_id: Principal) -> Self { Self { http_client, @@ -67,6 +71,7 @@ impl Fetch for NodesFetcher { } } +/// pub struct NodesFetchActor { fetcher: Arc, period: Duration, @@ -80,6 +85,7 @@ impl NodesFetchActor where S: RoutingSnapshot, { + /// pub fn new( fetcher: Arc, period: Duration, @@ -98,6 +104,7 @@ where } } + /// pub async fn run(self) { let mut interval = time::interval(self.period); loop { diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs index b8328724d..70710cce6 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs @@ -21,13 +21,16 @@ struct WeightedNode { weight: f64, } +/// #[derive(Default, Debug, Clone)] pub struct LatencyRoutingSnapshot { weighted_nodes: Vec, existing_nodes: HashSet, } +/// impl LatencyRoutingSnapshot { + /// pub fn new() -> Self { Self { weighted_nodes: vec![], @@ -39,7 +42,7 @@ impl LatencyRoutingSnapshot { // select weight index based on the input number in range [0, 1] #[inline(always)] fn weighted_sample(weights: &[f64], number: f64) -> Option { - if number < 0.0 || number > 1.0 { + if !(0.0..=1.0).contains(&number) { return None; } let sum: f64 = weights.iter().sum(); @@ -74,7 +77,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { } fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result { - let new_nodes = HashSet::from_iter(nodes.into_iter().cloned()); + let new_nodes = HashSet::from_iter(nodes.iter().cloned()); // Find nodes removed from snapshot. let nodes_removed: Vec<_> = self .existing_nodes @@ -292,7 +295,7 @@ mod tests { assert_eq!(snapshot.weighted_nodes[0].node, node_2); // Add node_3 to weighted_nodes manually snapshot.weighted_nodes.push(WeightedNode { - node: node_3.clone(), + node: node_3, latency_mov_avg: LatencyMovAvg::new(), weight: 0.0, }); diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs index 1c63df8bf..3695f1d3a 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs @@ -1,3 +1,6 @@ +/// pub mod latency_based_routing; +/// pub mod round_robin_routing; +/// pub mod routing_snapshot; diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs index d7dc4995b..21216c5ce 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs @@ -10,6 +10,7 @@ use crate::agent::http_transport::dynamic_routing::{ health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot, }; +/// #[derive(Default, Debug, Clone)] pub struct RoundRobinRoutingSnapshot { current_idx: Arc, @@ -18,6 +19,7 @@ pub struct RoundRobinRoutingSnapshot { } impl RoundRobinRoutingSnapshot { + /// pub fn new() -> Self { Self { current_idx: Arc::new(AtomicUsize::new(0)), @@ -44,7 +46,7 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot { } fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result { - let new_nodes = HashSet::from_iter(nodes.into_iter().cloned()); + let new_nodes = HashSet::from_iter(nodes.iter().cloned()); // Find nodes removed from snapshot. let nodes_removed: Vec<_> = self .existing_nodes @@ -69,13 +71,13 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot { } fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result { - if !self.existing_nodes.contains(&node) { + if !self.existing_nodes.contains(node) { return Ok(false); } if health.latency.is_some() { Ok(self.healthy_nodes.insert(node.clone())) } else { - Ok(self.healthy_nodes.remove(&node)) + Ok(self.healthy_nodes.remove(node)) } } } @@ -192,10 +194,7 @@ mod tests { snapshot.existing_nodes, HashSet::from_iter(vec![node_1.clone()]) ); - assert_eq!( - snapshot.healthy_nodes, - HashSet::from_iter(vec![node_1.clone()]) - ); + assert_eq!(snapshot.healthy_nodes, HashSet::from_iter(vec![node_1])); // Sync with node_2 let node_2 = Node::new("api2.com").unwrap(); let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]).unwrap(); @@ -218,11 +217,8 @@ mod tests { snapshot.existing_nodes, HashSet::from_iter(vec![node_3.clone(), node_2.clone()]) ); - assert_eq!( - snapshot.healthy_nodes, - HashSet::from_iter(vec![node_2.clone()]) - ); - snapshot.healthy_nodes.insert(node_3.clone()); + assert_eq!(snapshot.healthy_nodes, HashSet::from_iter(vec![node_2])); + snapshot.healthy_nodes.insert(node_3); // Sync with [] let nodes_changed = snapshot.sync_nodes(&[]).unwrap(); assert!(nodes_changed); diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs index f4f2cc5f8..ef88b8df1 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs @@ -2,9 +2,14 @@ use std::fmt::Debug; use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; +/// pub trait RoutingSnapshot: Send + Sync + Clone + Debug { + /// fn has_nodes(&self) -> bool; + /// fn next(&self) -> Option; + /// fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result; + /// fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result; } diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs b/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs index 92f922d4a..33ef77ea2 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs @@ -3,10 +3,15 @@ use std::sync::Arc; use arc_swap::ArcSwap; use tokio::sync::{mpsc, watch}; +/// pub type SenderWatch = watch::Sender>; +/// pub type ReceiverWatch = watch::Receiver>; +/// pub type SenderMpsc = mpsc::Sender; +/// pub type ReceiverMpsc = mpsc::Receiver; +/// pub type GlobalShared = Arc>; diff --git a/ic-agent/src/agent/http_transport/mod.rs b/ic-agent/src/agent/http_transport/mod.rs index f938a90d5..44b4b14f4 100644 --- a/ic-agent/src/agent/http_transport/mod.rs +++ b/ic-agent/src/agent/http_transport/mod.rs @@ -30,5 +30,7 @@ const ICP0_SUB_DOMAIN: &str = ".icp0.io"; const ICP_API_SUB_DOMAIN: &str = ".icp-api.io"; #[allow(dead_code)] const LOCALHOST_SUB_DOMAIN: &str = ".localhost"; +/// +#[cfg(feature = "reqwest")] pub mod dynamic_routing; pub mod route_provider;