Skip to content

Commit

Permalink
fix: clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy committed Jun 26, 2024
1 parent 556ad79 commit 25794d8
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
AgentError,
};

///
pub const IC0_SEED_DOMAIN: &str = "ic0.app";

const MAINNET_ROOT_SUBNET_ID: &str =
Expand All @@ -45,6 +46,7 @@ const HEALTH_CHECK_PERIOD: Duration = Duration::from_secs(1);

const DYNAMIC_ROUTE_PROVIDER: &str = "DynamicRouteProvider";

///
#[derive(Debug)]
pub struct DynamicRouteProvider<S> {
fetcher: Arc<dyn Fetch>,
Expand Down Expand Up @@ -75,6 +77,7 @@ impl<S> DynamicRouteProvider<S>
where
S: RoutingSnapshot + 'static,
{
///
pub fn new(snapshot: S, seeds: Vec<Node>, http_client: Client) -> Self {
let fetcher = Arc::new(NodesFetcher::new(
http_client.clone(),
Expand All @@ -94,21 +97,25 @@ where
}
}

///
pub fn with_fetcher(mut self, fetcher: Arc<dyn Fetch>) -> Self {
self.fetcher = fetcher;
self
}

///
pub fn with_fetch_period(mut self, period: Duration) -> Self {
self.fetch_period = period;
self
}

///
pub fn with_checker(mut self, checker: Arc<dyn HealthCheck>) -> Self {
self.checker = checker;
self
}

///
pub fn with_check_period(mut self, period: Duration) -> Self {
self.check_period = period;
self
Expand Down Expand Up @@ -183,11 +190,11 @@ where
);

(found_healthy_seeds)
.then(|| ())
.ok_or_else(|| anyhow!("No healthy seeds found"))
.then_some(())
.ok_or(anyhow!("No healthy seeds found"))
}

// Kill all running tasks.
/// Kill all running tasks.
pub async fn stop(&self) {
self.token.cancel();
self.tracker.close();
Expand Down Expand Up @@ -517,7 +524,6 @@ mod tests {
// Setup.
setup_tracing();
let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap();
let node_2 = Node::new("api1.com").unwrap();
// Set nodes fetching params: topology, fetching periodicity.
let fetcher = Arc::new(NodesFetcherMock::new());
let fetch_interval = Duration::from_secs(2);
Expand Down
14 changes: 14 additions & 0 deletions ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,43 @@ use crate::agent::http_transport::dynamic_routing::{

const CHANNEL_BUFFER: usize = 128;

///
#[async_trait]
pub trait HealthCheck: Send + Sync + Debug {
///
async fn check(&self, node: &Node) -> anyhow::Result<HealthCheckStatus>;
}

///
#[derive(Clone, PartialEq, Debug, Default)]
pub struct HealthCheckStatus {
///
pub latency: Option<Duration>,
}

///
impl HealthCheckStatus {
///
pub fn new(latency: Option<Duration>) -> Self {
Self { latency }
}

///
pub fn is_healthy(&self) -> bool {
self.latency.is_some()
}
}

///
#[derive(Debug)]
pub struct HealthChecker {
http_client: Client,
timeout: Duration,
}

///
impl HealthChecker {
///
pub fn new(http_client: Client, timeout: Duration) -> Self {
Self {
http_client,
Expand Down Expand Up @@ -133,8 +143,10 @@ impl HealthCheckActor {
}
}

///
pub const HEALTH_MANAGER_ACTOR: &str = "HealthManagerActor";

///
pub struct HealthManagerActor<S> {
checker: Arc<dyn HealthCheck>,
period: Duration,
Expand All @@ -153,6 +165,7 @@ impl<S> HealthManagerActor<S>
where
S: RoutingSnapshot,
{
///
pub fn new(
checker: Arc<dyn HealthCheck>,
period: Duration,
Expand All @@ -178,6 +191,7 @@ where
}
}

///
pub async fn run(mut self) {
loop {
tokio::select! {
Expand Down
5 changes: 5 additions & 0 deletions ic-agent/src/agent/http_transport/dynamic_routing/messages.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node};

///
#[derive(Debug, Clone)]
pub struct FetchedNodes {
///
pub nodes: Vec<Node>,
}

///
pub struct NodeHealthState {
///
pub node: Node,
///
pub health: HealthCheckStatus,
}
6 changes: 6 additions & 0 deletions ic-agent/src/agent/http_transport/dynamic_routing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
pub mod dynamic_route_provider;
///
pub mod health_check;
///
pub mod messages;
///
pub mod node;
///
pub mod nodes_fetch;
///
pub mod snapshot;
#[cfg(test)]
pub mod test_utils;
///
pub mod type_aliases;
5 changes: 5 additions & 0 deletions ic-agent/src/agent/http_transport/dynamic_routing/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use url::Url;
use crate::agent::ApiBoundaryNode;
use anyhow::anyhow;

///
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Node {
domain: String,
}

impl Node {
///
pub fn new(domain: &str) -> anyhow::Result<Self> {
if !is_valid_domain(domain) {
return Err(anyhow!("Invalid domain name {domain}"));
Expand All @@ -18,12 +20,14 @@ impl Node {
})
}

///
pub fn domain(&self) -> String {
self.domain.clone()
}
}

impl Node {
///
pub fn to_routing_url(&self) -> Url {
Url::parse(&format!("https://{}/api/v2/", self.domain)).expect("failed to parse URL")
}
Expand All @@ -44,6 +48,7 @@ impl TryFrom<&ApiBoundaryNode> for Node {
}
}

///
pub fn is_valid_domain<S: AsRef<str>>(domain: S) -> bool {
// Prepend scheme to make it a valid URL
let url_string = format!("http://{}", domain.as_ref());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,22 @@ use crate::agent::{

const NODES_FETCH_ACTOR: &str = "NodesFetchActor";

///
#[async_trait]
pub trait Fetch: Sync + Send + Debug {
///
async fn fetch(&self, url: Url) -> anyhow::Result<Vec<Node>>;
}

///
#[derive(Debug)]
pub struct NodesFetcher {
http_client: Client,
subnet_id: Principal,
}

impl NodesFetcher {
///
pub fn new(http_client: Client, subnet_id: Principal) -> Self {
Self {
http_client,
Expand Down Expand Up @@ -67,6 +71,7 @@ impl Fetch for NodesFetcher {
}
}

///
pub struct NodesFetchActor<S> {
fetcher: Arc<dyn Fetch>,
period: Duration,
Expand All @@ -80,6 +85,7 @@ impl<S> NodesFetchActor<S>
where
S: RoutingSnapshot,
{
///
pub fn new(
fetcher: Arc<dyn Fetch>,
period: Duration,
Expand All @@ -98,6 +104,7 @@ where
}
}

///
pub async fn run(self) {
let mut interval = time::interval(self.period);
loop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ struct WeightedNode {
weight: f64,
}

///
#[derive(Default, Debug, Clone)]
pub struct LatencyRoutingSnapshot {
weighted_nodes: Vec<WeightedNode>,
existing_nodes: HashSet<Node>,
}

///
impl LatencyRoutingSnapshot {
///
pub fn new() -> Self {
Self {
weighted_nodes: vec![],
Expand All @@ -39,7 +42,7 @@ impl LatencyRoutingSnapshot {
// select weight index based on the input number in range [0, 1]
#[inline(always)]
fn weighted_sample(weights: &[f64], number: f64) -> Option<usize> {
if number < 0.0 || number > 1.0 {
if !(0.0..=1.0).contains(&number) {
return None;
}
let sum: f64 = weights.iter().sum();
Expand Down Expand Up @@ -74,7 +77,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
}

fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result<bool> {
let new_nodes = HashSet::from_iter(nodes.into_iter().cloned());
let new_nodes = HashSet::from_iter(nodes.iter().cloned());
// Find nodes removed from snapshot.
let nodes_removed: Vec<_> = self
.existing_nodes
Expand Down Expand Up @@ -292,7 +295,7 @@ mod tests {
assert_eq!(snapshot.weighted_nodes[0].node, node_2);
// Add node_3 to weighted_nodes manually
snapshot.weighted_nodes.push(WeightedNode {
node: node_3.clone(),
node: node_3,
latency_mov_avg: LatencyMovAvg::new(),
weight: 0.0,
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
///
pub mod latency_based_routing;
///
pub mod round_robin_routing;
///
pub mod routing_snapshot;
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::agent::http_transport::dynamic_routing::{
health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot,
};

///
#[derive(Default, Debug, Clone)]
pub struct RoundRobinRoutingSnapshot {
current_idx: Arc<AtomicUsize>,
Expand All @@ -18,6 +19,7 @@ pub struct RoundRobinRoutingSnapshot {
}

impl RoundRobinRoutingSnapshot {
///
pub fn new() -> Self {
Self {
current_idx: Arc::new(AtomicUsize::new(0)),
Expand All @@ -44,7 +46,7 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot {
}

fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result<bool> {
let new_nodes = HashSet::from_iter(nodes.into_iter().cloned());
let new_nodes = HashSet::from_iter(nodes.iter().cloned());
// Find nodes removed from snapshot.
let nodes_removed: Vec<_> = self
.existing_nodes
Expand All @@ -69,13 +71,13 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot {
}

fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result<bool> {
if !self.existing_nodes.contains(&node) {
if !self.existing_nodes.contains(node) {
return Ok(false);
}
if health.latency.is_some() {
Ok(self.healthy_nodes.insert(node.clone()))
} else {
Ok(self.healthy_nodes.remove(&node))
Ok(self.healthy_nodes.remove(node))
}
}
}
Expand Down Expand Up @@ -192,10 +194,7 @@ mod tests {
snapshot.existing_nodes,
HashSet::from_iter(vec![node_1.clone()])
);
assert_eq!(
snapshot.healthy_nodes,
HashSet::from_iter(vec![node_1.clone()])
);
assert_eq!(snapshot.healthy_nodes, HashSet::from_iter(vec![node_1]));
// Sync with node_2
let node_2 = Node::new("api2.com").unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]).unwrap();
Expand All @@ -218,11 +217,8 @@ mod tests {
snapshot.existing_nodes,
HashSet::from_iter(vec![node_3.clone(), node_2.clone()])
);
assert_eq!(
snapshot.healthy_nodes,
HashSet::from_iter(vec![node_2.clone()])
);
snapshot.healthy_nodes.insert(node_3.clone());
assert_eq!(snapshot.healthy_nodes, HashSet::from_iter(vec![node_2]));
snapshot.healthy_nodes.insert(node_3);
// Sync with []
let nodes_changed = snapshot.sync_nodes(&[]).unwrap();
assert!(nodes_changed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ use std::fmt::Debug;

use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node};

///
pub trait RoutingSnapshot: Send + Sync + Clone + Debug {
///
fn has_nodes(&self) -> bool;
///
fn next(&self) -> Option<Node>;
///
fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result<bool>;
///
fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result<bool>;
}
Loading

0 comments on commit 25794d8

Please sign in to comment.