diff --git a/Cargo.lock b/Cargo.lock index 0fba31f7..3fe59550 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -337,6 +337,7 @@ dependencies = [ "async-trait", "awc", "futures", + "governor", "http", "k8s-openapi", "kube", @@ -345,6 +346,7 @@ dependencies = [ "maplit", "mockall", "models", + "nonzero_ext", "opentelemetry", "opentelemetry-prometheus", "reqwest", diff --git a/agent/src/agentclient.rs b/agent/src/agentclient.rs index 1f45c058..3383d74d 100644 --- a/agent/src/agentclient.rs +++ b/agent/src/agentclient.rs @@ -26,7 +26,7 @@ use tracing::{event, instrument, Level}; // The reflector uses exponential backoff. // These values configure how long to delay between tries. const RETRY_BASE_DELAY: Duration = Duration::from_millis(1000); -const RETRY_MAX_DELAY: Duration = Duration::from_secs(10); +const RETRY_MAX_DELAY: Duration = Duration::from_secs(30); const NUM_RETRIES: usize = 5; const AGENT_SLEEP_DURATION: Duration = Duration::from_secs(5); @@ -117,15 +117,12 @@ impl BrupopAgent { async fn check_node_shadow_exists( &self, ) -> std::result::Result { - Retry::spawn(retry_strategy(), || async { - let local_cache_has_associated_shadow = !self.brs_reader.is_empty(); - if local_cache_has_associated_shadow { - Ok(true) - } else { - self.query_api_for_shadow().await - } - }) - .await + let local_cache_has_associated_shadow = !self.brs_reader.is_empty(); + if local_cache_has_associated_shadow { + Ok(true) + } else { + self.query_api_for_shadow().await + } } /// Returns whether or not the BottlerocketShadow for this node has a .status. @@ -326,20 +323,26 @@ impl BrupopAgent { #[instrument(skip(self))] async fn create_shadow_if_not_exist(&self) -> Result<()> { - let shadow_exists = self.check_node_shadow_exists().await?; - if !shadow_exists { - self.create_metadata_shadow().await?; - } - Ok(()) + Retry::spawn(retry_strategy(), || async { + let shadow_exists = self.check_node_shadow_exists().await?; + if !shadow_exists { + self.create_metadata_shadow().await?; + } + Ok(()) + }) + .await } #[instrument(skip(self))] async fn initialize_shadow_if_not_initialized(&self) -> Result<()> { - let shadow_status_exists = self.check_shadow_status_exists().await?; - if !shadow_status_exists { - self.initialize_metadata_shadow().await?; - } - Ok(()) + Retry::spawn(retry_strategy(), || async { + let shadow_status_exists = self.check_shadow_status_exists().await?; + if !shadow_status_exists { + self.initialize_metadata_shadow().await?; + } + Ok(()) + }) + .await } #[instrument(skip(self))] diff --git a/agent/src/main.rs b/agent/src/main.rs index fdaaf0d4..43787528 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,5 +1,5 @@ use agent::agentclient::BrupopAgent; -use apiserver::client::K8SAPIServerClient; +use apiserver::client::{K8SAPIServerClient, RateLimitedAPIServerClient}; use futures::StreamExt; use k8s_openapi::api::core::v1::Node; use kube::{ @@ -13,14 +13,12 @@ use kube::{ use models::constants::{AGENT_TOKEN_PATH, AGENT_TOKEN_PROJECTION_MOUNT_PATH}; use models::node::{brs_name_from_node_name, BottlerocketShadow}; use models::telemetry; - use snafu::{OptionExt, ResultExt}; -use tracing::{event, Level}; - use std::convert::TryFrom; use std::env; use std::fs; use std::path::Path; +use tracing::{event, Level}; const TERMINATION_LOG: &str = "/dev/termination-log"; @@ -52,8 +50,10 @@ async fn run_agent() -> Result<()> { let token_path = token_path.to_str().context(agent_error::AssertionSnafu { message: "Token path (defined in models/agent.rs) is not valid unicode.", })?; - let apiserver_client = K8SAPIServerClient::new(token_path.to_string(), &namespace) - .context(agent_error::ApiClientSnafu)?; + let apiserver_client = RateLimitedAPIServerClient::default( + K8SAPIServerClient::new(token_path.to_string(), &namespace) + .context(agent_error::ApiClientSnafu)?, + ); // Get node and BottlerocketShadow names let associated_node_name = env::var("MY_NODE_NAME").context(agent_error::GetNodeNameSnafu)?; diff --git a/apiserver/Cargo.toml b/apiserver/Cargo.toml index a7c7c629..d8a48e04 100644 --- a/apiserver/Cargo.toml +++ b/apiserver/Cargo.toml @@ -32,9 +32,11 @@ kube = { version = "0.84", default-features = false, features = [ "client", "der async-trait = "0.1" futures = "0.3" +governor = "0.6" lazy_static = "1.4" log = "0.4" mockall = { version = "0.11", optional = true } +nonzero_ext = "0.3" reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls" ] } schemars = "0.8.11" serde = { version = "1", features = [ "derive" ] } diff --git a/apiserver/src/client/mock.rs b/apiserver/src/client/mock.rs index 46853ac8..d5d70424 100644 --- a/apiserver/src/client/mock.rs +++ b/apiserver/src/client/mock.rs @@ -1,18 +1,14 @@ /// This module contains client implementations that are useful for testing purposes. -use super::{error::Result, APIServerClient}; -use crate::{ - CordonAndDrainBottlerocketShadowRequest, CreateBottlerocketShadowRequest, - ExcludeNodeFromLoadBalancerRequest, RemoveNodeExclusionFromLoadBalancerRequest, - UncordonBottlerocketShadowRequest, UpdateBottlerocketShadowRequest, -}; -use models::node::{BottlerocketShadow, BottlerocketShadowStatus}; - +use crate::client::prelude::*; use async_trait::async_trait; - use mockall::{mock, predicate::*}; +use models::node::{BottlerocketShadow, BottlerocketShadowStatus}; + +type Result = std::result::Result; mock! { /// A Mock APIServerClient for use in tests. + #[derive(Debug)] pub APIServerClient {} #[async_trait] impl APIServerClient for APIServerClient { diff --git a/apiserver/src/client/mod.rs b/apiserver/src/client/mod.rs index 9e6c23f0..2f99e95b 100644 --- a/apiserver/src/client/mod.rs +++ b/apiserver/src/client/mod.rs @@ -1,8 +1,20 @@ pub mod error; +mod ratelimited; mod webclient; #[cfg(any(feature = "mockall", test))] pub mod mock; pub use error::ClientError; +pub use ratelimited::RateLimitedAPIServerClient; pub use webclient::{APIServerClient, K8SAPIServerClient}; + +pub mod prelude { + pub use super::error::ClientError; + pub use super::APIServerClient; + pub use crate::{ + CordonAndDrainBottlerocketShadowRequest, CreateBottlerocketShadowRequest, + ExcludeNodeFromLoadBalancerRequest, RemoveNodeExclusionFromLoadBalancerRequest, + UncordonBottlerocketShadowRequest, UpdateBottlerocketShadowRequest, + }; +} diff --git a/apiserver/src/client/ratelimited.rs b/apiserver/src/client/ratelimited.rs new file mode 100644 index 00000000..089e753c --- /dev/null +++ b/apiserver/src/client/ratelimited.rs @@ -0,0 +1,140 @@ +//! This module defines an ApiServerClient implementation that wraps another and rate-limits API calls. +use crate::client::prelude::*; +use async_trait::async_trait; +use governor::{ + clock::{DefaultClock, ReasonablyRealtime}, + middleware::NoOpMiddleware, + state::{DirectStateStore, InMemoryState, NotKeyed}, + Jitter, Quota, RateLimiter, +}; +use models::node::{BottlerocketShadow, BottlerocketShadowStatus}; +use nonzero_ext::nonzero; +use std::{fmt::Debug, sync::Arc}; +use std::{num::NonZeroU32, ops::Deref, time::Duration}; + +type Result = std::result::Result; + +#[derive(Debug, Clone)] +pub struct RateLimitedAPIServerClient +where + WC: APIServerClient, + S: DirectStateStore + Debug, + C: ReasonablyRealtime + Debug, + RL: Deref>> + + Send + + Sync + + Debug, +{ + rate_limiter: RL, + wrapped_client: WC, + jitter: Option, +} + +impl RateLimitedAPIServerClient +where + WC: APIServerClient, + S: DirectStateStore + Debug, + C: ReasonablyRealtime + Debug, + RL: Deref>> + + Send + + Sync + + Debug, +{ + pub fn new(wrapped_client: WC, rate_limiter: RL, jitter: Option) -> Self { + Self { + wrapped_client, + rate_limiter, + jitter, + } + } + + async fn rate_limit(&self) { + if let Some(jitter) = self.jitter { + self.rate_limiter.until_ready_with_jitter(jitter).await; + } else { + self.rate_limiter.until_ready().await; + } + } +} + +/// Rate at which request token bucket refills. +const DEFAULT_REQUESTS_PER_MINUTE: NonZeroU32 = nonzero!(2u32); +/// Maximum request tokens that can be stored. +const DEFAULT_BURST_TOKENS: NonZeroU32 = nonzero!(5u32); +/// Maximum jitter between tokens being added to the bucket. +const DEFAULT_MAX_JITTER: Duration = Duration::from_secs(10); + +/// Default rate limiter. +type SimpleRateLimiter = RateLimiter; + +/// Provides a rate-limiter with reasonable default settings. +impl RateLimitedAPIServerClient> +where + WC: APIServerClient, +{ + pub fn default(wrapped_client: WC) -> Self { + let rate_limiter = Arc::new(SimpleRateLimiter::direct( + Quota::per_minute(DEFAULT_REQUESTS_PER_MINUTE).allow_burst(DEFAULT_BURST_TOKENS), + )); + let jitter = Some(Jitter::up_to(DEFAULT_MAX_JITTER)); + Self { + wrapped_client, + rate_limiter, + jitter, + } + } +} + +#[async_trait] +impl APIServerClient for RateLimitedAPIServerClient +where + WC: APIServerClient, + S: DirectStateStore + Sync + Send + Debug, + C: ReasonablyRealtime + Sync + Send + Debug, + RL: Deref>> + + Send + + Sync + + Debug, +{ + async fn create_bottlerocket_shadow( + &self, + req: CreateBottlerocketShadowRequest, + ) -> Result { + self.rate_limit().await; + self.wrapped_client.create_bottlerocket_shadow(req).await + } + + async fn update_bottlerocket_shadow( + &self, + req: UpdateBottlerocketShadowRequest, + ) -> Result { + self.rate_limit().await; + self.wrapped_client.update_bottlerocket_shadow(req).await + } + + async fn cordon_and_drain_node( + &self, + req: CordonAndDrainBottlerocketShadowRequest, + ) -> Result<()> { + self.rate_limit().await; + self.wrapped_client.cordon_and_drain_node(req).await + } + + async fn uncordon_node(&self, req: UncordonBottlerocketShadowRequest) -> Result<()> { + self.rate_limit().await; + self.wrapped_client.uncordon_node(req).await + } + + async fn exclude_node_from_lb(&self, req: ExcludeNodeFromLoadBalancerRequest) -> Result<()> { + self.rate_limit().await; + self.wrapped_client.exclude_node_from_lb(req).await + } + + async fn remove_node_exclusion_from_lb( + &self, + req: RemoveNodeExclusionFromLoadBalancerRequest, + ) -> Result<()> { + self.rate_limit().await; + self.wrapped_client.remove_node_exclusion_from_lb(req).await + } +} diff --git a/apiserver/src/client/webclient.rs b/apiserver/src/client/webclient.rs index 7f47dbb3..810efe4a 100644 --- a/apiserver/src/client/webclient.rs +++ b/apiserver/src/client/webclient.rs @@ -1,23 +1,19 @@ -use super::error::{self, Result}; use crate::{ + client::{error, prelude::*}, constants::{ EXCLUDE_NODE_FROM_LB_ENDPOINT, HEADER_BRUPOP_K8S_AUTH_TOKEN, HEADER_BRUPOP_NODE_NAME, HEADER_BRUPOP_NODE_UID, NODE_CORDON_AND_DRAIN_ENDPOINT, NODE_RESOURCE_ENDPOINT, NODE_UNCORDON_ENDPOINT, REMOVE_NODE_EXCLUSION_TO_LB_ENDPOINT, }, - CordonAndDrainBottlerocketShadowRequest, CreateBottlerocketShadowRequest, - ExcludeNodeFromLoadBalancerRequest, RemoveNodeExclusionFromLoadBalancerRequest, - UncordonBottlerocketShadowRequest, UpdateBottlerocketShadowRequest, }; +use async_trait::async_trait; use models::{ constants::{APISERVER_SERVICE_NAME, CA_NAME, TLS_KEY_MOUNT_PATH}, node::{BottlerocketShadow, BottlerocketShadowSelector, BottlerocketShadowStatus}, }; - -use async_trait::async_trait; use snafu::ResultExt; -use std::io::Read; use std::{env, fs}; +use std::{fmt::Debug, io::Read}; use tokio::time::Duration; use tokio_retry::{ strategy::{jitter, ExponentialBackoff}, @@ -26,6 +22,8 @@ use tokio_retry::{ use tracing::instrument; use tracing::{event, Level}; +type Result = std::result::Result; + // The web client uses exponential backoff. // These values configure how long to delay between tries. const RETRY_BASE_DELAY: Duration = Duration::from_millis(100); @@ -41,7 +39,7 @@ fn retry_strategy() -> impl Iterator { } #[async_trait] -pub trait APIServerClient { +pub trait APIServerClient: Debug + Send + Sync { async fn create_bottlerocket_shadow( &self, req: CreateBottlerocketShadowRequest,