Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect controller from becoming unschedulable #214

Merged
merged 3 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions agent/src/agentclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,15 +362,12 @@ impl<T: APIServerClient> BrupopAgent<T> {
);
}
},
BottlerocketShadowState::StagedUpdate => {
BottlerocketShadowState::StagedAndPerformedUpdate => {
event!(Level::INFO, "Preparing update");
prepare().await.context(agentclient_error::UpdateActions {
action: "Prepare".to_string(),
})?;
cbgbt marked this conversation as resolved.
Show resolved Hide resolved

self.cordon_and_drain().await?;
}
BottlerocketShadowState::PerformedUpdate => {
event!(Level::INFO, "Performing update");
update().await.context(agentclient_error::UpdateActions {
action: "Perform".to_string(),
Expand All @@ -390,7 +387,9 @@ impl<T: APIServerClient> BrupopAgent<T> {
),
)
.await?;
self.handle_recover().await?;
} else {
self.cordon_and_drain().await?;
boot_update()
.await
.context(agentclient_error::UpdateActions {
Expand All @@ -400,9 +399,8 @@ impl<T: APIServerClient> BrupopAgent<T> {
}
BottlerocketShadowState::MonitoringUpdate => {
event!(Level::INFO, "Monitoring node's healthy condition");
self.handle_recover().await?;
// TODO: we need add some criterias here by which we decide to transition
// from MonitoringUpdate to WaitingForUpdate.
// TODO: we left space here for customer if they need add customized criteria
// which uses to decide to transition from MonitoringUpdate to WaitingForUpdate.
}
BottlerocketShadowState::ErrorReset => {
// Spec state should never be ErrorReset
Expand Down
1 change: 1 addition & 0 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ actix-web = { version = "4.0.0-beta.9", default-features = false }
chrono = "0.4"
futures = "0.3"
http = "0.2.5"
semver = "1.0"
# k8s-openapi must match the version required by kube and enable a k8s version feature
k8s-openapi = { version = "0.14.0", default-features = false, features = ["v1_20"] }
kube = { version = "0.71.0", default-features = true, features = [ "derive", "runtime", "rustls-tls" ] }
Expand Down
199 changes: 192 additions & 7 deletions controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ use super::{
statemachine::determine_next_node_spec,
};
use models::node::{
BottlerocketShadow, BottlerocketShadowClient, BottlerocketShadowState, Selector,
brs_name_from_node_name, BottlerocketShadow, BottlerocketShadowClient, BottlerocketShadowState,
Selector,
};

use kube::runtime::reflector::Store;
use kube::ResourceExt;
use opentelemetry::global;
use snafu::ResultExt;
use std::collections::{BTreeMap, HashMap};
use std::env;
use tokio::time::{sleep, Duration};
use tracing::{event, instrument, Level};

Expand Down Expand Up @@ -104,16 +106,16 @@ impl<T: BottlerocketShadowClient> BrupopController<T> {
/// The state transition is then attempted. If successful, this node should be detected as part of the active
/// set during the next iteration of the controller's event loop.
#[instrument(skip(self))]
async fn find_and_update_ready_node(&self) -> Option<BottlerocketShadow> {
async fn find_and_update_ready_node(&self) -> Result<Option<BottlerocketShadow>> {
let mut shadows: Vec<BottlerocketShadow> = self.all_nodes();

shadows.sort_by(|a, b| a.compare_crash_count(b));
for brs in shadows {
sort_shadows(&mut shadows, &get_associated_bottlerocketshadow_name()?);
for brs in shadows.drain(..) {
Copy link
Member Author

@gthao313 gthao313 Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to use drain

// If we determine that the spec should change, this node is a candidate to begin updating.
let next_spec = determine_next_node_spec(&brs);
if next_spec != brs.spec {
match self.progress_node(brs.clone()).await {
Ok(_) => return Some(brs),
Ok(_) => return Ok(Some(brs)),
Err(_) => {
// Errors connecting to the k8s API are ignored (and also logged by `progress_node()`).
// We'll just move on and try a different node.
Expand All @@ -122,7 +124,7 @@ impl<T: BottlerocketShadowClient> BrupopController<T> {
}
}
}
None
Ok(None)
}

#[instrument(skip(self))]
Expand Down Expand Up @@ -189,7 +191,7 @@ impl<T: BottlerocketShadowClient> BrupopController<T> {
}
} else {
// If there's nothing to operate on, check to see if any other nodes are ready for action.
let new_active_node = self.find_and_update_ready_node().await;
let new_active_node = self.find_and_update_ready_node().await?;
if let Some(brs) = new_active_node {
event!(Level::INFO, name = %brs.name(), "Began updating new node.")
}
Expand All @@ -203,3 +205,186 @@ impl<T: BottlerocketShadowClient> BrupopController<T> {
}
}
}

// Get node and BottlerocketShadow names
#[instrument]
fn get_associated_bottlerocketshadow_name() -> Result<String> {
let associated_node_name = env::var("MY_NODE_NAME").context(error::GetNodeName)?;
let associated_bottlerocketshadow_name = brs_name_from_node_name(&associated_node_name);

event!(
Level::INFO,
?associated_bottlerocketshadow_name,
"Found associated bottlerocketshadow name."
);

Ok(associated_bottlerocketshadow_name)
}

/// sort shadows list which uses to determine the order of node update
/// logic1: sort shadows by crash count
/// logic2: move the shadow which associated bottlerocketshadow node hosts controller pod to the last
#[instrument(skip())]
fn sort_shadows(shadows: &mut Vec<BottlerocketShadow>, associated_brs_name: &str) {
// sort shadows by crash count
shadows.sort_by(|a, b| a.compare_crash_count(b));

// move the shadow which associated bottlerocketshadow node hosts controller pod to the last
// Step1: find associated brs node position
let associated_brs_node_position = shadows
.iter()
.position(|brs| brs.metadata.name.as_ref() == Some(&associated_brs_name.to_string()));

// Step2: move associated brs node to the last
// if it doesn't find the brs, it means some brss aren't ready and the program should skip sort.
match associated_brs_node_position {
Some(position) => {
let last_brs = shadows[position].clone();
shadows.remove(position.clone());
shadows.push(last_brs);
}
None => {
event!(
Level::INFO,
"Unable to find associated bottlerocketshadow, skip sort."
)
}
}
}

#[cfg(test)]
pub(crate) mod test {
use super::*;
use chrono::{TimeZone, Utc};
use semver::Version;
use std::str::FromStr;

use kube::api::ObjectMeta;

use models::node::{BottlerocketShadow, BottlerocketShadowState, BottlerocketShadowStatus};

pub(crate) fn fake_shadow(
name: String,
current_version: String,
target_version: String,
current_state: BottlerocketShadowState,
) -> BottlerocketShadow {
BottlerocketShadow {
status: Some(BottlerocketShadowStatus::new(
Version::from_str(&current_version).unwrap(),
Version::from_str(&target_version).unwrap(),
current_state,
0,
Some(Utc.ymd(2022, 1, 1).and_hms_milli(0, 0, 1, 444)),
)),
metadata: ObjectMeta {
name: Some(name),
..Default::default()
},
..Default::default()
}
}

#[tokio::test]
async fn test_sort_shadows_find_brs() {
let mut test_shadows = vec![
fake_shadow(
"brs-ip-1.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
fake_shadow(
"brs-ip-2.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
fake_shadow(
"brs-ip-3.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
];

let associated_brs_name = "brs-ip-1.us-west-2.compute.internal";

let expected_result = vec![
fake_shadow(
"brs-ip-2.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
fake_shadow(
"brs-ip-3.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
fake_shadow(
"brs-ip-1.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
];

sort_shadows(&mut test_shadows, associated_brs_name);

assert_eq!(test_shadows, expected_result);
}

/// test when it doesn't find the brs (some brss aren't ready), the program should skip sort.
#[tokio::test]
async fn test_sort_shadows_not_find_brs() {
let mut test_shadows = vec![
fake_shadow(
"brs-ip-17.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
fake_shadow(
"brs-ip-123.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
fake_shadow(
"brs-ip-321.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
];

let associated_brs_name = "brs-ip-5.us-west-2.compute.internal";

let expected_result = vec![
fake_shadow(
"brs-ip-17.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
fake_shadow(
"brs-ip-123.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
fake_shadow(
"brs-ip-321.us-west-2.compute.internal".to_string(),
"1.8.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::Idle,
),
];

sort_shadows(&mut test_shadows, associated_brs_name);

assert_eq!(test_shadows, expected_result);
}
}
3 changes: 3 additions & 0 deletions controller/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ pub enum Error {

#[snafu(display("Controller failed due to internal assertion issue: '{}'", source))]
Assertion { source: serde_plain::Error },

#[snafu(display("Unable to get host controller pod node name: {}", source))]
GetNodeName { source: std::env::VarError },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note here this GetNodeName didn't show up in the adjust error module, you might want to reimplement this error when you merge two PRs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! I would like to merge this PR first and then solve the conflict on the PR you mentioned above. Thanks!

}
2 changes: 1 addition & 1 deletion controller/src/statemachine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn determine_next_node_spec(brs: &BottlerocketShadow) -> BottlerocketShadowS
// Or node just start or completed without crashing
if node_allowed_to_update(node_status) {
BottlerocketShadowSpec::new_starting_now(
BottlerocketShadowState::StagedUpdate,
BottlerocketShadowState::StagedAndPerformedUpdate,
Some(target_version.clone()),
)
} else {
Expand Down
4 changes: 2 additions & 2 deletions integ/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ pub(crate) mod test {
"brs-ip-2.us-west-2.compute.internal".to_string(),
"1.6.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::StagedUpdate,
BottlerocketShadowState::StagedAndPerformedUpdate,
),
fake_shadow(
"brs-ip-3.us-west-2.compute.internal".to_string(),
Expand Down Expand Up @@ -531,7 +531,7 @@ pub(crate) mod test {
"brs-ip-2.us-west-2.compute.internal".to_string(),
"1.6.0".to_string(),
"1.5.0".to_string(),
BottlerocketShadowState::PerformedUpdate,
BottlerocketShadowState::StagedAndPerformedUpdate,
),
fake_shadow(
"brs-ip-3.us-west-2.compute.internal".to_string(),
Expand Down
6 changes: 6 additions & 0 deletions models/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@ pub const CONTROLLER_DEPLOYMENT_NAME: &str = "brupop-controller-deployment";
pub const CONTROLLER_SERVICE_NAME: &str = "brupop-controller-server"; // The name for the `svc` fronting the controller.
pub const CONTROLLER_INTERNAL_PORT: i32 = 8080; // The internal port on which the the controller service is hosted.
pub const CONTROLLER_SERVICE_PORT: i32 = 80; // The k8s service port hosting the controller service.
pub const BRUPOP_CONTROLLER_PRIORITY_CLASS: &str = "brupop-controller-high-priority";
pub const BRUPOP_CONTROLLER_PREEMPTION_POLICY: &str = "Never";
// We strategically determine the controller priority class value to be one million,
// since one million presents a high priority value which can enable controller to be scheduled preferentially,
// but not a critical value which takes precedence over customers' critical k8s resources.
pub const BRUPOP_CONTROLLER_PRIORITY_VALUE: i32 = 1000000;
Loading