Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: Ratelimit calls to brupop apiserver #505

Merged
merged 4 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 23 additions & 20 deletions agent/src/agentclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::{event, instrument, Level};
// The reflector uses exponential backoff.
// These values configure how long to delay between tries.
const RETRY_BASE_DELAY: Duration = Duration::from_millis(1000);
const RETRY_MAX_DELAY: Duration = Duration::from_secs(10);
const RETRY_MAX_DELAY: Duration = Duration::from_secs(30);
const NUM_RETRIES: usize = 5;

const AGENT_SLEEP_DURATION: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -117,15 +117,12 @@ impl<T: APIServerClient> BrupopAgent<T> {
async fn check_node_shadow_exists(
&self,
) -> std::result::Result<bool, agentclient_error::BottlerocketShadowRWError> {
Retry::spawn(retry_strategy(), || async {
let local_cache_has_associated_shadow = !self.brs_reader.is_empty();
if local_cache_has_associated_shadow {
Ok(true)
} else {
self.query_api_for_shadow().await
}
})
.await
let local_cache_has_associated_shadow = !self.brs_reader.is_empty();
if local_cache_has_associated_shadow {
Ok(true)
} else {
self.query_api_for_shadow().await
}
}

/// Returns whether or not the BottlerocketShadow for this node has a .status.
Expand Down Expand Up @@ -326,20 +323,26 @@ impl<T: APIServerClient> BrupopAgent<T> {

#[instrument(skip(self))]
async fn create_shadow_if_not_exist(&self) -> Result<()> {
let shadow_exists = self.check_node_shadow_exists().await?;
if !shadow_exists {
self.create_metadata_shadow().await?;
}
Ok(())
Retry::spawn(retry_strategy(), || async {
let shadow_exists = self.check_node_shadow_exists().await?;
if !shadow_exists {
self.create_metadata_shadow().await?;
}
Ok(())
})
.await
}

#[instrument(skip(self))]
async fn initialize_shadow_if_not_initialized(&self) -> Result<()> {
let shadow_status_exists = self.check_shadow_status_exists().await?;
if !shadow_status_exists {
self.initialize_metadata_shadow().await?;
}
Ok(())
Retry::spawn(retry_strategy(), || async {
let shadow_status_exists = self.check_shadow_status_exists().await?;
if !shadow_status_exists {
self.initialize_metadata_shadow().await?;
}
Ok(())
})
.await
}

#[instrument(skip(self))]
Expand Down
12 changes: 6 additions & 6 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use agent::agentclient::BrupopAgent;
use apiserver::client::K8SAPIServerClient;
use apiserver::client::{K8SAPIServerClient, RateLimitedAPIServerClient};
use futures::StreamExt;
use k8s_openapi::api::core::v1::Node;
use kube::{
Expand All @@ -13,14 +13,12 @@ use kube::{
use models::constants::{AGENT_TOKEN_PATH, AGENT_TOKEN_PROJECTION_MOUNT_PATH};
use models::node::{brs_name_from_node_name, BottlerocketShadow};
use models::telemetry;

use snafu::{OptionExt, ResultExt};
use tracing::{event, Level};

use std::convert::TryFrom;
use std::env;
use std::fs;
use std::path::Path;
use tracing::{event, Level};

const TERMINATION_LOG: &str = "/dev/termination-log";

Expand Down Expand Up @@ -52,8 +50,10 @@ async fn run_agent() -> Result<()> {
let token_path = token_path.to_str().context(agent_error::AssertionSnafu {
message: "Token path (defined in models/agent.rs) is not valid unicode.",
})?;
let apiserver_client = K8SAPIServerClient::new(token_path.to_string(), &namespace)
.context(agent_error::ApiClientSnafu)?;
let apiserver_client = RateLimitedAPIServerClient::default(
K8SAPIServerClient::new(token_path.to_string(), &namespace)
.context(agent_error::ApiClientSnafu)?,
);

// Get node and BottlerocketShadow names
let associated_node_name = env::var("MY_NODE_NAME").context(agent_error::GetNodeNameSnafu)?;
Expand Down
2 changes: 2 additions & 0 deletions apiserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ kube = { version = "0.84", default-features = false, features = [ "client", "der

async-trait = "0.1"
futures = "0.3"
governor = "0.6"
lazy_static = "1.4"
log = "0.4"
mockall = { version = "0.11", optional = true }
nonzero_ext = "0.3"
reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls" ] }
schemars = "0.8.11"
serde = { version = "1", features = [ "derive" ] }
Expand Down
14 changes: 5 additions & 9 deletions apiserver/src/client/mock.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
/// This module contains client implementations that are useful for testing purposes.
use super::{error::Result, APIServerClient};
use crate::{
CordonAndDrainBottlerocketShadowRequest, CreateBottlerocketShadowRequest,
ExcludeNodeFromLoadBalancerRequest, RemoveNodeExclusionFromLoadBalancerRequest,
UncordonBottlerocketShadowRequest, UpdateBottlerocketShadowRequest,
};
use models::node::{BottlerocketShadow, BottlerocketShadowStatus};

use crate::client::prelude::*;
use async_trait::async_trait;

use mockall::{mock, predicate::*};
use models::node::{BottlerocketShadow, BottlerocketShadowStatus};

type Result<T> = std::result::Result<T, ClientError>;

mock! {
/// A Mock APIServerClient for use in tests.
#[derive(Debug)]
pub APIServerClient {}
#[async_trait]
impl APIServerClient for APIServerClient {
Expand Down
12 changes: 12 additions & 0 deletions apiserver/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
pub mod error;
mod ratelimited;
mod webclient;

#[cfg(any(feature = "mockall", test))]
pub mod mock;

pub use error::ClientError;
pub use ratelimited::RateLimitedAPIServerClient;
pub use webclient::{APIServerClient, K8SAPIServerClient};

pub mod prelude {
pub use super::error::ClientError;
pub use super::APIServerClient;
pub use crate::{
CordonAndDrainBottlerocketShadowRequest, CreateBottlerocketShadowRequest,
ExcludeNodeFromLoadBalancerRequest, RemoveNodeExclusionFromLoadBalancerRequest,
UncordonBottlerocketShadowRequest, UpdateBottlerocketShadowRequest,
};
}
140 changes: 140 additions & 0 deletions apiserver/src/client/ratelimited.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//! This module defines an ApiServerClient implementation that wraps another and rate-limits API calls.
use crate::client::prelude::*;
use async_trait::async_trait;
use governor::{
clock::{DefaultClock, ReasonablyRealtime},
middleware::NoOpMiddleware,
state::{DirectStateStore, InMemoryState, NotKeyed},
Jitter, Quota, RateLimiter,
};
use models::node::{BottlerocketShadow, BottlerocketShadowStatus};
use nonzero_ext::nonzero;
use std::{fmt::Debug, sync::Arc};
use std::{num::NonZeroU32, ops::Deref, time::Duration};

type Result<T> = std::result::Result<T, ClientError>;

#[derive(Debug, Clone)]
pub struct RateLimitedAPIServerClient<WC, S, C, RL>
where
WC: APIServerClient,
S: DirectStateStore + Debug,
C: ReasonablyRealtime + Debug,
RL: Deref<Target = RateLimiter<NotKeyed, S, C, NoOpMiddleware<C::Instant>>>
+ Send
+ Sync
+ Debug,
{
rate_limiter: RL,
wrapped_client: WC,
jitter: Option<Jitter>,
}

impl<WC, S, C, RL> RateLimitedAPIServerClient<WC, S, C, RL>
where
WC: APIServerClient,
S: DirectStateStore + Debug,
C: ReasonablyRealtime + Debug,
RL: Deref<Target = RateLimiter<NotKeyed, S, C, NoOpMiddleware<C::Instant>>>
+ Send
+ Sync
+ Debug,
{
pub fn new(wrapped_client: WC, rate_limiter: RL, jitter: Option<Jitter>) -> Self {
Self {
wrapped_client,
rate_limiter,
jitter,
}
}

async fn rate_limit(&self) {
if let Some(jitter) = self.jitter {
self.rate_limiter.until_ready_with_jitter(jitter).await;
} else {
self.rate_limiter.until_ready().await;
}
}
}

/// Rate at which request token bucket refills.
const DEFAULT_REQUESTS_PER_MINUTE: NonZeroU32 = nonzero!(2u32);
/// Maximum request tokens that can be stored.
const DEFAULT_BURST_TOKENS: NonZeroU32 = nonzero!(5u32);
/// Maximum jitter between tokens being added to the bucket.
const DEFAULT_MAX_JITTER: Duration = Duration::from_secs(10);

/// Default rate limiter.
type SimpleRateLimiter = RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>;

/// Provides a rate-limiter with reasonable default settings.
impl<WC> RateLimitedAPIServerClient<WC, InMemoryState, DefaultClock, Arc<SimpleRateLimiter>>
where
WC: APIServerClient,
{
pub fn default(wrapped_client: WC) -> Self {
let rate_limiter = Arc::new(SimpleRateLimiter::direct(
Quota::per_minute(DEFAULT_REQUESTS_PER_MINUTE).allow_burst(DEFAULT_BURST_TOKENS),
));
let jitter = Some(Jitter::up_to(DEFAULT_MAX_JITTER));
Self {
wrapped_client,
rate_limiter,
jitter,
}
}
}

#[async_trait]
impl<WC, S, C, RL> APIServerClient for RateLimitedAPIServerClient<WC, S, C, RL>
where
WC: APIServerClient,
S: DirectStateStore + Sync + Send + Debug,
C: ReasonablyRealtime + Sync + Send + Debug,
RL: Deref<Target = RateLimiter<NotKeyed, S, C, NoOpMiddleware<C::Instant>>>
+ Send
+ Sync
+ Debug,
{
async fn create_bottlerocket_shadow(
&self,
req: CreateBottlerocketShadowRequest,
) -> Result<BottlerocketShadow> {
self.rate_limit().await;
self.wrapped_client.create_bottlerocket_shadow(req).await
}

async fn update_bottlerocket_shadow(
&self,
req: UpdateBottlerocketShadowRequest,
) -> Result<BottlerocketShadowStatus> {
self.rate_limit().await;
self.wrapped_client.update_bottlerocket_shadow(req).await
}

async fn cordon_and_drain_node(
&self,
req: CordonAndDrainBottlerocketShadowRequest,
) -> Result<()> {
self.rate_limit().await;
self.wrapped_client.cordon_and_drain_node(req).await
}

async fn uncordon_node(&self, req: UncordonBottlerocketShadowRequest) -> Result<()> {
self.rate_limit().await;
self.wrapped_client.uncordon_node(req).await
}

async fn exclude_node_from_lb(&self, req: ExcludeNodeFromLoadBalancerRequest) -> Result<()> {
self.rate_limit().await;
self.wrapped_client.exclude_node_from_lb(req).await
}

async fn remove_node_exclusion_from_lb(
&self,
req: RemoveNodeExclusionFromLoadBalancerRequest,
) -> Result<()> {
self.rate_limit().await;
self.wrapped_client.remove_node_exclusion_from_lb(req).await
}
}
14 changes: 6 additions & 8 deletions apiserver/src/client/webclient.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
use super::error::{self, Result};
use crate::{
client::{error, prelude::*},
constants::{
EXCLUDE_NODE_FROM_LB_ENDPOINT, HEADER_BRUPOP_K8S_AUTH_TOKEN, HEADER_BRUPOP_NODE_NAME,
HEADER_BRUPOP_NODE_UID, NODE_CORDON_AND_DRAIN_ENDPOINT, NODE_RESOURCE_ENDPOINT,
NODE_UNCORDON_ENDPOINT, REMOVE_NODE_EXCLUSION_TO_LB_ENDPOINT,
},
CordonAndDrainBottlerocketShadowRequest, CreateBottlerocketShadowRequest,
ExcludeNodeFromLoadBalancerRequest, RemoveNodeExclusionFromLoadBalancerRequest,
UncordonBottlerocketShadowRequest, UpdateBottlerocketShadowRequest,
};
use async_trait::async_trait;
use models::{
constants::{APISERVER_SERVICE_NAME, CA_NAME, TLS_KEY_MOUNT_PATH},
node::{BottlerocketShadow, BottlerocketShadowSelector, BottlerocketShadowStatus},
};

use async_trait::async_trait;
use snafu::ResultExt;
use std::io::Read;
use std::{env, fs};
use std::{fmt::Debug, io::Read};
use tokio::time::Duration;
use tokio_retry::{
strategy::{jitter, ExponentialBackoff},
Expand All @@ -26,6 +22,8 @@ use tokio_retry::{
use tracing::instrument;
use tracing::{event, Level};

type Result<T> = std::result::Result<T, ClientError>;

// The web client uses exponential backoff.
// These values configure how long to delay between tries.
const RETRY_BASE_DELAY: Duration = Duration::from_millis(100);
Expand All @@ -41,7 +39,7 @@ fn retry_strategy() -> impl Iterator<Item = Duration> {
}

#[async_trait]
pub trait APIServerClient {
pub trait APIServerClient: Debug + Send + Sync {
async fn create_bottlerocket_shadow(
&self,
req: CreateBottlerocketShadowRequest,
Expand Down