Skip to content

Commit

Permalink
Merge pull request #506 from cbgbt/watcher-backoff
Browse files Browse the repository at this point in the history
Implement backoff for k8s watcher retries
  • Loading branch information
cbgbt authored Aug 8, 2023
2 parents e4a23de + b229d35 commit 591aab0
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 61 deletions.
78 changes: 33 additions & 45 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ lazy_static = "1"
tracing = "0.1"

# k8s-openapi must match the version required by kube and enable a k8s version feature
k8s-openapi = { version = "0.18", default-features = false, features = ["v1_24"] }
kube = { version = "0.84", default-features = false, features = [ "derive", "runtime", "rustls-tls" ] }
k8s-openapi = { version = "0.19", default-features = false, features = ["v1_24"] }
kube = { version = "0.85", default-features = false, features = [ "derive", "runtime", "rustls-tls" ] }

semver = { version = "1.0", features = [ "serde" ] }
serde = { version = "1", features = [ "derive" ] }
Expand Down
6 changes: 4 additions & 2 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ async fn run_agent() -> Result<()> {
.fields(format!("metadata.name={}", associated_bottlerocketshadow_name).as_str());
let brs_store = reflector::store::Writer::<BottlerocketShadow>::default();
let brs_reader = brs_store.as_reader();
let brs_reflector = reflector::reflector(brs_store, watcher(brss, brs_config));
let brs_reflector =
reflector::reflector(brs_store, watcher(brss, brs_config).default_backoff());
let brs_drainer = brs_reflector
.touched_objects()
.filter_map(|x| async move { std::result::Result::ok(x) })
Expand All @@ -80,7 +81,8 @@ async fn run_agent() -> Result<()> {
let nodes: Api<Node> = Api::all(k8s_client.clone());
let nodes_store = reflector::store::Writer::<Node>::default();
let node_reader = nodes_store.as_reader();
let node_reflector = reflector::reflector(nodes_store, watcher(nodes, node_config));
let node_reflector =
reflector::reflector(nodes_store, watcher(nodes, node_config).default_backoff());
let node_drainer = node_reflector
.touched_objects()
.filter_map(|x| async move { std::result::Result::ok(x) })
Expand Down
4 changes: 2 additions & 2 deletions apiserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ tracing = "0.1"
tracing-actix-web = "0.7"

# k8s-openapi must match the version required by kube and enable a k8s version feature
k8s-openapi = { version = "0.18", default-features = false, features = ["v1_24"] }
kube = { version = "0.84", default-features = false, features = [ "client", "derive", "runtime", "rustls-tls" ] }
k8s-openapi = { version = "0.19", default-features = false, features = ["v1_24"] }
kube = { version = "0.85", default-features = false, features = [ "client", "derive", "runtime", "rustls-tls" ] }

async-trait = "0.1"
futures = "0.3"
Expand Down
3 changes: 2 additions & 1 deletion apiserver/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ pub async fn run_server<T: 'static + BottlerocketShadowClient>(
watcher(
pods,
Config::default().labels(&format!("{}={}", LABEL_COMPONENT, AGENT)),
),
)
.default_backoff(),
);
let drainer = pod_reflector.touched_objects()
.filter_map(|x| async move {
Expand Down
4 changes: 2 additions & 2 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ semver = "1.0"
serde = "1"
serde_plain = "1"
# k8s-openapi must match the version required by kube and enable a k8s version feature
k8s-openapi = { version = "0.18", default-features = false, features = ["v1_24"] }
kube = { version = "0.84", default-features = false, features = [ "derive", "runtime", "rustls-tls" ] }
k8s-openapi = { version = "0.19", default-features = false, features = ["v1_24"] }
kube = { version = "0.85", default-features = false, features = [ "derive", "runtime", "rustls-tls" ] }
models = { path = "../models", version = "0.1.0" }
opentelemetry = { version = "0.18", features = ["rt-tokio-current-thread"] }
opentelemetry-prometheus = "0.11"
Expand Down
14 changes: 14 additions & 0 deletions controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use tracing::{event, instrument, Level};
// Defines the length time after which the controller will take actions.
const ACTION_INTERVAL: Duration = Duration::from_secs(2);

// The interval between control loop polls if no nodes are detected.
const CANNOT_FIND_ANY_NODES_WAIT_INTERVAL: Duration = Duration::from_secs(10);

// Defines environment variable name used to fetch max concurrent update number.
const MAX_CONCURRENT_UPDATE_ENV_VAR: &str = "MAX_CONCURRENT_UPDATE";

Expand Down Expand Up @@ -278,6 +281,17 @@ impl<T: BottlerocketShadowClient> BrupopController<T> {
// On every iteration of the event loop, we reconstruct the state of the controller and determine its
// next actions. This is to ensure that the operator would behave consistently even if suddenly restarted.
loop {
if self.all_brss().is_empty() {
event!(
Level::INFO,
"Nothing to do: The bottlerocket-update-operator is not aware of any BottlerocketShadow objects. \
Is the bottlerocket-shadow CRD installed? Are nodes labelled so that the agent is deployed to them? \
See the project's README for more information.",
);
sleep(CANNOT_FIND_ANY_NODES_WAIT_INTERVAL).await;
continue;
}

let active_set = self.active_brs_set();
event!(Level::TRACE, ?active_set, "Found active set of nodes.");

Expand Down
10 changes: 8 additions & 2 deletions controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ async fn main() -> Result<()> {
let exporter = opentelemetry_prometheus::exporter(controller).init();

// Setup and run a reflector, ensuring that `BottlerocketShadow` updates are reflected to the controller.
let brs_reflector = reflector::reflector(brs_store, watcher(brss, Config::default()));
let brs_reflector = reflector::reflector(
brs_store,
watcher(brss, Config::default()).default_backoff(),
);
let brs_drainer = brs_reflector
.touched_objects()
.filter_map(|x| async move { std::result::Result::ok(x) })
Expand All @@ -80,7 +83,10 @@ async fn main() -> Result<()> {
let nodes: Api<Node> = Api::all(k8s_client.clone());
let nodes_store = reflector::store::Writer::<Node>::default();
let node_reader = nodes_store.as_reader();
let node_reflector = reflector::reflector(nodes_store, watcher(nodes, Config::default()));
let node_reflector = reflector::reflector(
nodes_store,
watcher(nodes, Config::default()).default_backoff(),
);
let node_drainer = node_reflector
.touched_objects()
.filter_map(|x| async move { std::result::Result::ok(x) })
Expand Down
2 changes: 1 addition & 1 deletion deploy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "Apache-2.0 OR MIT"
[build-dependencies]
models = { path = "../models", version = "0.1.0" }
dotenv = "0.15"
kube = { version = "0.84", default-features = false, features = [ "derive", "runtime" ] }
kube = { version = "0.85", default-features = false, features = [ "derive", "runtime" ] }
serde_yaml = "0.9"

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions integ/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ tokio-retry = "0.3"
uuid = { version = "0.8", default-features = false, features = ["serde", "v4"] }

# k8s-openapi must match the version required by kube and enable a k8s version feature
k8s-openapi = { version = "0.18", default-features = false, features = ["v1_24"] }
kube = { version = "0.84", default-features = false, features = [ "derive", "runtime" ] }
k8s-openapi = { version = "0.19", default-features = false, features = ["v1_24"] }
kube = { version = "0.85", default-features = false, features = [ "derive", "runtime" ] }


[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions models/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ async-trait = "0.1"
chrono = { version = "0.4", default-features = false, features = ["std"] }
futures = "0.3"
# k8s-openapi must match the version required by kube and enable a k8s version feature
k8s-openapi = { version = "0.18", default-features = false, features = ["v1_24"] }
kube = { version = "0.84", default-features = false, features = [ "client", "derive", "runtime" ] }
k8s-openapi = { version = "0.19", default-features = false, features = ["v1_24"] }
kube = { version = "0.85", default-features = false, features = [ "client", "derive", "runtime" ] }

lazy_static = "1.4"
maplit = "1.0"
Expand Down

0 comments on commit 591aab0

Please sign in to comment.