From cf320e3d4d6a8f65c18200f63b288702065d3c0b Mon Sep 17 00:00:00 2001 From: meowjesty Date: Mon, 9 Sep 2024 19:11:12 -0300 Subject: [PATCH] Middle of refactoring: adding a new builder-ish struct to unify with/without operator targeting in cli. --- mirrord/cli/src/connection.rs | 176 ++++++++++++++++++++++++- mirrord/cli/src/connection/operator.rs | 42 ++++++ mirrord/operator/src/client.rs | 86 ++++++++++++ 3 files changed, 299 insertions(+), 5 deletions(-) create mode 100644 mirrord/cli/src/connection/operator.rs diff --git a/mirrord/cli/src/connection.rs b/mirrord/cli/src/connection.rs index 16d076f12e6..fffe9a301b5 100644 --- a/mirrord/cli/src/connection.rs +++ b/mirrord/cli/src/connection.rs @@ -1,22 +1,34 @@ -use std::{collections::HashSet, time::Duration}; +use std::{collections::HashSet, error::Error, time::Duration}; +use kube::{Client, Config}; use mirrord_analytics::Reporter; use mirrord_config::LayerConfig; use mirrord_intproxy::agent_conn::AgentConnectInfo; use mirrord_kube::{ - api::{kubernetes::KubernetesAPI, wrap_raw_connection}, + api::{ + kubernetes::{create_kube_config, KubernetesAPI}, + wrap_raw_connection, + }, error::KubeApiError, }; -use mirrord_operator::client::{OperatorApi, OperatorSessionConnection}; +use mirrord_operator::{ + client::{ + operator_config, NoClientCert, OperatorApi, OperatorSessionConnection, PreparedClientCert, + }, + types::{CLIENT_HOSTNAME_HEADER, CLIENT_NAME_HEADER, MIRRORD_CLI_VERSION_HEADER}, +}; use mirrord_progress::{ messages::MULTIPOD_WARNING, IdeAction, IdeMessage, NotificationLevel, Progress, }; use mirrord_protocol::{ClientMessage, DaemonMessage}; +use reqwest::header::{HeaderName, HeaderValue}; use tokio::sync::mpsc; -use tracing::Level; +use tracing::{instrument::WithSubscriber, Level}; use crate::{CliError, Result}; +mod operator; + pub const AGENT_CONNECT_INFO_ENV_KEY: &str = "MIRRORD_AGENT_CONNECT_INFO"; pub(crate) struct AgentConnection { @@ -24,6 +36,141 @@ pub(crate) struct AgentConnection { pub receiver: mpsc::Receiver, } +pub(super) struct TargetConnection<'a, P, R> +where + P: Progress, + R: Reporter, +{ + config: &'a LayerConfig, + progress: &'a P, + analytics: &'a mut R, + kube_config: Config, + base_kube_client: Client, + operator: Option>, +} + +impl<'a, P, R> TargetConnection<'a, P, R> +where + P: Progress, + R: Reporter, +{ + async fn new( + config: &'a LayerConfig, + progress: &'a P, + analytics: &'a mut R, + ) -> Result> { + let kube_config = create_kube_config( + config.accept_invalid_certificates, + config.kubeconfig.clone(), + config.kube_context.clone(), + ) + .await?; + + let base_kube_client = Client::try_from(kube_config.clone())?; + + Ok(Self { + config, + progress, + analytics, + kube_config, + base_kube_client, + operator: None, + }) + } + + async fn create_kube_client(self) -> Result> { + let mut operator_subtask = self.progress.subtask("checking operator"); + if self.config.operator == Some(false) { + operator_subtask.success(Some("operator disabled")); + Ok(self) + } else { + let operator_config = operator_config(&self.kube_config); + let operator_kube_client = Client::try_from(operator_config.clone())?; + + let operator_api = + OperatorApi::try_new_new(&self.kube_config, self.analytics, &operator_kube_client) + .await?; + + match operator_api { + Some(operator_api) => { + let mut version_cmp_subtask = + operator_subtask.subtask("checking version compatibility"); + let compatible = operator_api.check_operator_version(&version_cmp_subtask); + if compatible { + version_cmp_subtask.success(Some("operator version compatible")); + } else { + version_cmp_subtask.failure(Some("operator version may not be compatible")); + } + + // TODO(alex) [high] 5: Do we have to keep the operator config alive? Nah + self.validated_operator_license(&operator_api, &mut operator_subtask)? + .certified_operator(operator_api, &mut operator_subtask) + .await + // TODO(alex) [high] 6: Now that we have `kube_client` as being the operator + // client, we can move the operator specific checks to mirrord, and if + // `operator: None`, we can just skip these checks, and do line 285 and forwards + // from `create_and_connect` . + } + None if self.config.operator == Some(true) => { + // TODO(alex) [mid] 4: This is an error, we don't proceed! + todo!() + } + None => { + operator_subtask.success(Some("operator not found")); + Ok(self) + } + } + } + } + + fn validated_operator_license( + self, + operator_api: &OperatorApi, + operator_subtask: &mut P, + ) -> Result> { + let mut license_subtask = operator_subtask.subtask("checking license"); + match operator_api.check_license_validity(&license_subtask) { + Ok(()) => { + license_subtask.success(Some("operator license valid")); + Ok(self) + } + Err(fail) => { + license_subtask.failure(Some("operator license expired")); + + if self.config.operator == Some(true) { + Err(fail.into()) + } else { + operator_subtask.failure(Some("proceeding without operator")); + Ok(Self { + operator: None, + ..self + }) + } + } + } + } + + async fn certified_operator( + self, + operator_api: OperatorApi, + operator_subtask: &mut P, + ) -> Result> { + let mut user_cert_subtask = operator_subtask.subtask("preparing user credentials"); + let prepared_api = operator_api + .prepare_client_cert(self.analytics) + .await + .into_certified()?; + user_cert_subtask.success(Some("user credentials prepared")); + + let kube_client = prepared_api.client().clone(); + Ok(Self { + operator: Some(prepared_api), + base_kube_client: kube_client, + ..self + }) + } +} + /// 1. If mirrord-operator is explicitly enabled in the given [`LayerConfig`], makes a connection /// with the target using the mirrord-operator. /// 2. If mirrord-operator is explicitly disabled in the given [`LayerConfig`], returns [`None`]. @@ -97,7 +244,6 @@ where /// mirrord-operator is not found or its license is invalid. /// /// Here is where we start interactions with the kubernetes API. -// #[tracing::instrument(level = Level::TRACE, skip_all)] pub(crate) async fn create_and_connect( config: &LayerConfig, progress: &mut P, @@ -106,6 +252,26 @@ pub(crate) async fn create_and_connect( where P: Progress + Send + Sync, { + let newstuff = TargetConnection::new(config, progress, analytics) + .await + .unwrap() + .create_kube_client(); + + todo!() +} + +// #[tracing::instrument(level = Level::TRACE, skip_all)] +pub(crate) async fn create_and_connect2( + config: &LayerConfig, + progress: &mut P, + analytics: &mut R, +) -> Result<(AgentConnectInfo, AgentConnection)> +where + P: Progress + Send + Sync, +{ + // TODO(alex) [high] 3: If this is `Ok(None)`, then the config we use is not the one + // from the operator, it's just the normal user kube config, which will be created after + // this whole thing. if let Some(connection) = try_connect_using_operator(config, progress, analytics).await? { return Ok(( AgentConnectInfo::Operator(connection.session), diff --git a/mirrord/cli/src/connection/operator.rs b/mirrord/cli/src/connection/operator.rs new file mode 100644 index 00000000000..88adeaf3a1d --- /dev/null +++ b/mirrord/cli/src/connection/operator.rs @@ -0,0 +1,42 @@ +use mirrord_analytics::Reporter; +use mirrord_config::LayerConfig; +use mirrord_operator::client::OperatorApi; +use mirrord_progress::Progress; + +use crate::CliError; + +pub(super) struct OperatorConnection<'a, P, R> +where + P: Progress, + R: Reporter, +{ + config: &'a LayerConfig, + progress: &'a P, + analytics: &'a mut R, +} + +impl<'a, P, R> OperatorConnection<'a, P, R> +where + P: Progress, + R: Reporter, +{ + pub(super) fn new( + config: &'a LayerConfig, + progress: &'a P, + analytics: &'a mut R, + ) -> Option { + let mut operator_subtask = progress.subtask("checking operator"); + if config.operator == Some(false) { + operator_subtask.success(Some("operator disabled")); + None + } else { + Some(Self { + config, + progress, + analytics, + }) + } + } + + pub(super) fn create_api(&mut self) {} +} diff --git a/mirrord/operator/src/client.rs b/mirrord/operator/src/client.rs index bb2cf9e1dc3..b3c12af13c0 100644 --- a/mirrord/operator/src/client.rs +++ b/mirrord/operator/src/client.rs @@ -333,6 +333,55 @@ impl OperatorApi { }) } + pub async fn try_new_new( + operator_kube_config: &Config, + reporter: &mut R, + client: &Client, + ) -> OperatorApiResult> + where + R: Reporter, + { + let operator: Result = + Api::all(client.clone()).get(OPERATOR_STATUS_NAME).await; + + let error = match operator { + Ok(operator) => { + reporter.set_operator_properties(AnalyticsOperatorProperties { + client_hash: None, + license_hash: operator + .spec + .license + .fingerprint + .as_deref() + .map(AnalyticsHash::from_base64), + }); + + return Ok(Some(Self { + client: client.clone(), + client_cert: NoClientCert { + base_config: operator_kube_config.clone(), + }, + operator, + })); + } + + Err(error @ kube::Error::Api(..)) => { + match discovery::operator_installed(&client).await { + Ok(false) | Err(..) => { + return Ok(None); + } + Ok(true) => error, + } + } + + Err(error) => error, + }; + + Err(OperatorApiError::KubeError { + error, + operation: OperatorOperation::FindingOperator, + }) + } /// Prepares client [`Certificate`] to be sent in all subsequent requests to the operator. /// In case of failure, state of this API instance does not change. #[tracing::instrument(level = Level::TRACE, skip(reporter))] @@ -407,6 +456,43 @@ impl OperatorApi { } } +pub fn operator_config(base_kube_config: &Config) -> Config { + let mut operator_kube_config = base_kube_config.clone(); + operator_kube_config.headers.push(( + HeaderName::from_static(MIRRORD_CLI_VERSION_HEADER), + HeaderValue::from_static(env!("CARGO_PKG_VERSION")), + )); + + let UserIdentity { name, hostname } = UserIdentity::load(); + + let headers = [ + (CLIENT_NAME_HEADER, name), + (CLIENT_HOSTNAME_HEADER, hostname), + ]; + for (name, raw_value) in headers { + let Some(raw_value) = raw_value else { + continue; + }; + + // Replace non-ascii (not supported in headers) chars and trim. + let cleaned = raw_value + .replace(|c: char| !c.is_ascii(), "") + .trim() + .to_string(); + let value = HeaderValue::from_str(&cleaned); + match value { + Ok(value) => operator_kube_config + .headers + .push((HeaderName::from_static(name), value)), + Err(error) => { + tracing::debug!(%error, %name, raw_value = raw_value, cleaned, "Invalid header value"); + } + } + } + + operator_kube_config +} + impl OperatorApi where C: ClientCertificateState,