Skip to content

Commit

Permalink
Merge pull request #505 from cbgbt/apiserver-ratelimit
Browse files Browse the repository at this point in the history
agent: Ratelimit calls to brupop apiserver
  • Loading branch information
cbgbt committed Aug 8, 2023
2 parents 591aab0 + 4804ab0 commit 2d37c8d
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 43 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 23 additions & 20 deletions agent/src/agentclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::{event, instrument, Level};
// The reflector uses exponential backoff.
// These values configure how long to delay between tries.
const RETRY_BASE_DELAY: Duration = Duration::from_millis(1000);
const RETRY_MAX_DELAY: Duration = Duration::from_secs(10);
const RETRY_MAX_DELAY: Duration = Duration::from_secs(30);
const NUM_RETRIES: usize = 5;

const AGENT_SLEEP_DURATION: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -117,15 +117,12 @@ impl<T: APIServerClient> BrupopAgent<T> {
async fn check_node_shadow_exists(
&self,
) -> std::result::Result<bool, agentclient_error::BottlerocketShadowRWError> {
Retry::spawn(retry_strategy(), || async {
let local_cache_has_associated_shadow = !self.brs_reader.is_empty();
if local_cache_has_associated_shadow {
Ok(true)
} else {
self.query_api_for_shadow().await
}
})
.await
let local_cache_has_associated_shadow = !self.brs_reader.is_empty();
if local_cache_has_associated_shadow {
Ok(true)
} else {
self.query_api_for_shadow().await
}
}

/// Returns whether or not the BottlerocketShadow for this node has a .status.
Expand Down Expand Up @@ -326,20 +323,26 @@ impl<T: APIServerClient> BrupopAgent<T> {

#[instrument(skip(self))]
async fn create_shadow_if_not_exist(&self) -> Result<()> {
let shadow_exists = self.check_node_shadow_exists().await?;
if !shadow_exists {
self.create_metadata_shadow().await?;
}
Ok(())
Retry::spawn(retry_strategy(), || async {
let shadow_exists = self.check_node_shadow_exists().await?;
if !shadow_exists {
self.create_metadata_shadow().await?;
}
Ok(())
})
.await
}

#[instrument(skip(self))]
async fn initialize_shadow_if_not_initialized(&self) -> Result<()> {
let shadow_status_exists = self.check_shadow_status_exists().await?;
if !shadow_status_exists {
self.initialize_metadata_shadow().await?;
}
Ok(())
Retry::spawn(retry_strategy(), || async {
let shadow_status_exists = self.check_shadow_status_exists().await?;
if !shadow_status_exists {
self.initialize_metadata_shadow().await?;
}
Ok(())
})
.await
}

#[instrument(skip(self))]
Expand Down
12 changes: 6 additions & 6 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use agent::agentclient::BrupopAgent;
use apiserver::client::K8SAPIServerClient;
use apiserver::client::{K8SAPIServerClient, RateLimitedAPIServerClient};
use futures::StreamExt;
use k8s_openapi::api::core::v1::Node;
use kube::{
Expand All @@ -13,14 +13,12 @@ use kube::{
use models::constants::{AGENT_TOKEN_PATH, AGENT_TOKEN_PROJECTION_MOUNT_PATH};
use models::node::{brs_name_from_node_name, BottlerocketShadow};
use models::telemetry;

use snafu::{OptionExt, ResultExt};
use tracing::{event, Level};

use std::convert::TryFrom;
use std::env;
use std::fs;
use std::path::Path;
use tracing::{event, Level};

const TERMINATION_LOG: &str = "/dev/termination-log";

Expand Down Expand Up @@ -52,8 +50,10 @@ async fn run_agent() -> Result<()> {
let token_path = token_path.to_str().context(agent_error::AssertionSnafu {
message: "Token path (defined in models/agent.rs) is not valid unicode.",
})?;
let apiserver_client = K8SAPIServerClient::new(token_path.to_string(), &namespace)
.context(agent_error::ApiClientSnafu)?;
let apiserver_client = RateLimitedAPIServerClient::default(
K8SAPIServerClient::new(token_path.to_string(), &namespace)
.context(agent_error::ApiClientSnafu)?,
);

// Get node and BottlerocketShadow names
let associated_node_name = env::var("MY_NODE_NAME").context(agent_error::GetNodeNameSnafu)?;
Expand Down
2 changes: 2 additions & 0 deletions apiserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ kube = { version = "0.85", default-features = false, features = [ "client", "der

async-trait = "0.1"
futures = "0.3"
governor = "0.6"
lazy_static = "1.4"
log = "0.4"
mockall = { version = "0.11", optional = true }
nonzero_ext = "0.3"
reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls" ] }
schemars = "0.8.11"
serde = { version = "1", features = [ "derive" ] }
Expand Down
14 changes: 5 additions & 9 deletions apiserver/src/client/mock.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
/// This module contains client implementations that are useful for testing purposes.
use super::{error::Result, APIServerClient};
use crate::{
CordonAndDrainBottlerocketShadowRequest, CreateBottlerocketShadowRequest,
ExcludeNodeFromLoadBalancerRequest, RemoveNodeExclusionFromLoadBalancerRequest,
UncordonBottlerocketShadowRequest, UpdateBottlerocketShadowRequest,
};
use models::node::{BottlerocketShadow, BottlerocketShadowStatus};

use crate::client::prelude::*;
use async_trait::async_trait;

use mockall::{mock, predicate::*};
use models::node::{BottlerocketShadow, BottlerocketShadowStatus};

type Result<T> = std::result::Result<T, ClientError>;

mock! {
/// A Mock APIServerClient for use in tests.
#[derive(Debug)]
pub APIServerClient {}
#[async_trait]
impl APIServerClient for APIServerClient {
Expand Down
12 changes: 12 additions & 0 deletions apiserver/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
pub mod error;
mod ratelimited;
mod webclient;

#[cfg(any(feature = "mockall", test))]
pub mod mock;

pub use error::ClientError;
pub use ratelimited::RateLimitedAPIServerClient;
pub use webclient::{APIServerClient, K8SAPIServerClient};

pub mod prelude {
pub use super::error::ClientError;
pub use super::APIServerClient;
pub use crate::{
CordonAndDrainBottlerocketShadowRequest, CreateBottlerocketShadowRequest,
ExcludeNodeFromLoadBalancerRequest, RemoveNodeExclusionFromLoadBalancerRequest,
UncordonBottlerocketShadowRequest, UpdateBottlerocketShadowRequest,
};
}
140 changes: 140 additions & 0 deletions apiserver/src/client/ratelimited.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//! This module defines an ApiServerClient implementation that wraps another and rate-limits API calls.
use crate::client::prelude::*;
use async_trait::async_trait;
use governor::{
clock::{DefaultClock, ReasonablyRealtime},
middleware::NoOpMiddleware,
state::{DirectStateStore, InMemoryState, NotKeyed},
Jitter, Quota, RateLimiter,
};
use models::node::{BottlerocketShadow, BottlerocketShadowStatus};
use nonzero_ext::nonzero;
use std::{fmt::Debug, sync::Arc};
use std::{num::NonZeroU32, ops::Deref, time::Duration};

type Result<T> = std::result::Result<T, ClientError>;

#[derive(Debug, Clone)]
pub struct RateLimitedAPIServerClient<WC, S, C, RL>
where
WC: APIServerClient,
S: DirectStateStore + Debug,
C: ReasonablyRealtime + Debug,
RL: Deref<Target = RateLimiter<NotKeyed, S, C, NoOpMiddleware<C::Instant>>>
+ Send
+ Sync
+ Debug,
{
rate_limiter: RL,
wrapped_client: WC,
jitter: Option<Jitter>,
}

impl<WC, S, C, RL> RateLimitedAPIServerClient<WC, S, C, RL>
where
WC: APIServerClient,
S: DirectStateStore + Debug,
C: ReasonablyRealtime + Debug,
RL: Deref<Target = RateLimiter<NotKeyed, S, C, NoOpMiddleware<C::Instant>>>
+ Send
+ Sync
+ Debug,
{
pub fn new(wrapped_client: WC, rate_limiter: RL, jitter: Option<Jitter>) -> Self {
Self {
wrapped_client,
rate_limiter,
jitter,
}
}

async fn rate_limit(&self) {
if let Some(jitter) = self.jitter {
self.rate_limiter.until_ready_with_jitter(jitter).await;
} else {
self.rate_limiter.until_ready().await;
}
}
}

/// Rate at which request token bucket refills.
const DEFAULT_REQUESTS_PER_MINUTE: NonZeroU32 = nonzero!(2u32);
/// Maximum request tokens that can be stored.
const DEFAULT_BURST_TOKENS: NonZeroU32 = nonzero!(5u32);
/// Maximum jitter between tokens being added to the bucket.
const DEFAULT_MAX_JITTER: Duration = Duration::from_secs(10);

/// Default rate limiter.
type SimpleRateLimiter = RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>;

/// Provides a rate-limiter with reasonable default settings.
impl<WC> RateLimitedAPIServerClient<WC, InMemoryState, DefaultClock, Arc<SimpleRateLimiter>>
where
WC: APIServerClient,
{
pub fn default(wrapped_client: WC) -> Self {
let rate_limiter = Arc::new(SimpleRateLimiter::direct(
Quota::per_minute(DEFAULT_REQUESTS_PER_MINUTE).allow_burst(DEFAULT_BURST_TOKENS),
));
let jitter = Some(Jitter::up_to(DEFAULT_MAX_JITTER));
Self {
wrapped_client,
rate_limiter,
jitter,
}
}
}

#[async_trait]
impl<WC, S, C, RL> APIServerClient for RateLimitedAPIServerClient<WC, S, C, RL>
where
WC: APIServerClient,
S: DirectStateStore + Sync + Send + Debug,
C: ReasonablyRealtime + Sync + Send + Debug,
RL: Deref<Target = RateLimiter<NotKeyed, S, C, NoOpMiddleware<C::Instant>>>
+ Send
+ Sync
+ Debug,
{
async fn create_bottlerocket_shadow(
&self,
req: CreateBottlerocketShadowRequest,
) -> Result<BottlerocketShadow> {
self.rate_limit().await;
self.wrapped_client.create_bottlerocket_shadow(req).await
}

async fn update_bottlerocket_shadow(
&self,
req: UpdateBottlerocketShadowRequest,
) -> Result<BottlerocketShadowStatus> {
self.rate_limit().await;
self.wrapped_client.update_bottlerocket_shadow(req).await
}

async fn cordon_and_drain_node(
&self,
req: CordonAndDrainBottlerocketShadowRequest,
) -> Result<()> {
self.rate_limit().await;
self.wrapped_client.cordon_and_drain_node(req).await
}

async fn uncordon_node(&self, req: UncordonBottlerocketShadowRequest) -> Result<()> {
self.rate_limit().await;
self.wrapped_client.uncordon_node(req).await
}

async fn exclude_node_from_lb(&self, req: ExcludeNodeFromLoadBalancerRequest) -> Result<()> {
self.rate_limit().await;
self.wrapped_client.exclude_node_from_lb(req).await
}

async fn remove_node_exclusion_from_lb(
&self,
req: RemoveNodeExclusionFromLoadBalancerRequest,
) -> Result<()> {
self.rate_limit().await;
self.wrapped_client.remove_node_exclusion_from_lb(req).await
}
}
14 changes: 6 additions & 8 deletions apiserver/src/client/webclient.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
use super::error::{self, Result};
use crate::{
client::{error, prelude::*},
constants::{
EXCLUDE_NODE_FROM_LB_ENDPOINT, HEADER_BRUPOP_K8S_AUTH_TOKEN, HEADER_BRUPOP_NODE_NAME,
HEADER_BRUPOP_NODE_UID, NODE_CORDON_AND_DRAIN_ENDPOINT, NODE_RESOURCE_ENDPOINT,
NODE_UNCORDON_ENDPOINT, REMOVE_NODE_EXCLUSION_TO_LB_ENDPOINT,
},
CordonAndDrainBottlerocketShadowRequest, CreateBottlerocketShadowRequest,
ExcludeNodeFromLoadBalancerRequest, RemoveNodeExclusionFromLoadBalancerRequest,
UncordonBottlerocketShadowRequest, UpdateBottlerocketShadowRequest,
};
use async_trait::async_trait;
use models::{
constants::{APISERVER_SERVICE_NAME, CA_NAME, TLS_KEY_MOUNT_PATH},
node::{BottlerocketShadow, BottlerocketShadowSelector, BottlerocketShadowStatus},
};

use async_trait::async_trait;
use snafu::ResultExt;
use std::io::Read;
use std::{env, fs};
use std::{fmt::Debug, io::Read};
use tokio::time::Duration;
use tokio_retry::{
strategy::{jitter, ExponentialBackoff},
Expand All @@ -26,6 +22,8 @@ use tokio_retry::{
use tracing::instrument;
use tracing::{event, Level};

type Result<T> = std::result::Result<T, ClientError>;

// The web client uses exponential backoff.
// These values configure how long to delay between tries.
const RETRY_BASE_DELAY: Duration = Duration::from_millis(100);
Expand All @@ -41,7 +39,7 @@ fn retry_strategy() -> impl Iterator<Item = Duration> {
}

#[async_trait]
pub trait APIServerClient {
pub trait APIServerClient: Debug + Send + Sync {
async fn create_bottlerocket_shadow(
&self,
req: CreateBottlerocketShadowRequest,
Expand Down

0 comments on commit 2d37c8d

Please sign in to comment.