Skip to content

Commit

Permalink
Middle of refactoring: adding a new builder-ish struct to unify with/…
Browse files Browse the repository at this point in the history
…without operator targeting in cli.
  • Loading branch information
meowjesty committed Sep 9, 2024
1 parent 8a122e9 commit cf320e3
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 5 deletions.
176 changes: 171 additions & 5 deletions mirrord/cli/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,176 @@
use std::{collections::HashSet, time::Duration};
use std::{collections::HashSet, error::Error, time::Duration};

use kube::{Client, Config};
use mirrord_analytics::Reporter;
use mirrord_config::LayerConfig;
use mirrord_intproxy::agent_conn::AgentConnectInfo;
use mirrord_kube::{
api::{kubernetes::KubernetesAPI, wrap_raw_connection},
api::{
kubernetes::{create_kube_config, KubernetesAPI},
wrap_raw_connection,
},
error::KubeApiError,
};
use mirrord_operator::client::{OperatorApi, OperatorSessionConnection};
use mirrord_operator::{
client::{
operator_config, NoClientCert, OperatorApi, OperatorSessionConnection, PreparedClientCert,
},
types::{CLIENT_HOSTNAME_HEADER, CLIENT_NAME_HEADER, MIRRORD_CLI_VERSION_HEADER},
};
use mirrord_progress::{
messages::MULTIPOD_WARNING, IdeAction, IdeMessage, NotificationLevel, Progress,
};
use mirrord_protocol::{ClientMessage, DaemonMessage};
use reqwest::header::{HeaderName, HeaderValue};
use tokio::sync::mpsc;
use tracing::Level;
use tracing::{instrument::WithSubscriber, Level};

use crate::{CliError, Result};

mod operator;

pub const AGENT_CONNECT_INFO_ENV_KEY: &str = "MIRRORD_AGENT_CONNECT_INFO";

pub(crate) struct AgentConnection {
pub sender: mpsc::Sender<ClientMessage>,
pub receiver: mpsc::Receiver<DaemonMessage>,
}

pub(super) struct TargetConnection<'a, P, R>
where
P: Progress,
R: Reporter,
{
config: &'a LayerConfig,
progress: &'a P,
analytics: &'a mut R,
kube_config: Config,
base_kube_client: Client,
operator: Option<OperatorApi<PreparedClientCert>>,
}

impl<'a, P, R> TargetConnection<'a, P, R>
where
P: Progress,
R: Reporter,
{
async fn new(
config: &'a LayerConfig,
progress: &'a P,
analytics: &'a mut R,
) -> Result<Self, Box<dyn Error>> {
let kube_config = create_kube_config(
config.accept_invalid_certificates,
config.kubeconfig.clone(),
config.kube_context.clone(),
)
.await?;

let base_kube_client = Client::try_from(kube_config.clone())?;

Ok(Self {
config,
progress,
analytics,
kube_config,
base_kube_client,
operator: None,
})
}

async fn create_kube_client(self) -> Result<Self, Box<dyn Error>> {
let mut operator_subtask = self.progress.subtask("checking operator");
if self.config.operator == Some(false) {
operator_subtask.success(Some("operator disabled"));
Ok(self)
} else {
let operator_config = operator_config(&self.kube_config);
let operator_kube_client = Client::try_from(operator_config.clone())?;

let operator_api =
OperatorApi::try_new_new(&self.kube_config, self.analytics, &operator_kube_client)
.await?;

match operator_api {
Some(operator_api) => {
let mut version_cmp_subtask =
operator_subtask.subtask("checking version compatibility");
let compatible = operator_api.check_operator_version(&version_cmp_subtask);
if compatible {
version_cmp_subtask.success(Some("operator version compatible"));
} else {
version_cmp_subtask.failure(Some("operator version may not be compatible"));
}

// TODO(alex) [high] 5: Do we have to keep the operator config alive? Nah
self.validated_operator_license(&operator_api, &mut operator_subtask)?
.certified_operator(operator_api, &mut operator_subtask)
.await
// TODO(alex) [high] 6: Now that we have `kube_client` as being the operator
// client, we can move the operator specific checks to mirrord, and if
// `operator: None`, we can just skip these checks, and do line 285 and forwards
// from `create_and_connect` .
}
None if self.config.operator == Some(true) => {
// TODO(alex) [mid] 4: This is an error, we don't proceed!
todo!()
}
None => {
operator_subtask.success(Some("operator not found"));
Ok(self)
}
}
}
}

fn validated_operator_license(
self,
operator_api: &OperatorApi<NoClientCert>,
operator_subtask: &mut P,
) -> Result<Self, Box<dyn Error>> {
let mut license_subtask = operator_subtask.subtask("checking license");
match operator_api.check_license_validity(&license_subtask) {
Ok(()) => {
license_subtask.success(Some("operator license valid"));
Ok(self)
}
Err(fail) => {
license_subtask.failure(Some("operator license expired"));

if self.config.operator == Some(true) {
Err(fail.into())
} else {
operator_subtask.failure(Some("proceeding without operator"));
Ok(Self {
operator: None,
..self
})
}
}
}
}

async fn certified_operator(
self,
operator_api: OperatorApi<NoClientCert>,
operator_subtask: &mut P,
) -> Result<Self, Box<dyn Error>> {
let mut user_cert_subtask = operator_subtask.subtask("preparing user credentials");
let prepared_api = operator_api
.prepare_client_cert(self.analytics)
.await
.into_certified()?;
user_cert_subtask.success(Some("user credentials prepared"));

let kube_client = prepared_api.client().clone();
Ok(Self {
operator: Some(prepared_api),
base_kube_client: kube_client,
..self
})
}
}

/// 1. If mirrord-operator is explicitly enabled in the given [`LayerConfig`], makes a connection
/// with the target using the mirrord-operator.
/// 2. If mirrord-operator is explicitly disabled in the given [`LayerConfig`], returns [`None`].
Expand Down Expand Up @@ -97,7 +244,6 @@ where
/// mirrord-operator is not found or its license is invalid.
///
/// Here is where we start interactions with the kubernetes API.
// #[tracing::instrument(level = Level::TRACE, skip_all)]
pub(crate) async fn create_and_connect<P, R: Reporter>(
config: &LayerConfig,
progress: &mut P,
Expand All @@ -106,6 +252,26 @@ pub(crate) async fn create_and_connect<P, R: Reporter>(
where
P: Progress + Send + Sync,
{
let newstuff = TargetConnection::new(config, progress, analytics)
.await
.unwrap()
.create_kube_client();

todo!()
}

// #[tracing::instrument(level = Level::TRACE, skip_all)]
pub(crate) async fn create_and_connect2<P, R: Reporter>(
config: &LayerConfig,
progress: &mut P,
analytics: &mut R,
) -> Result<(AgentConnectInfo, AgentConnection)>
where
P: Progress + Send + Sync,
{
// TODO(alex) [high] 3: If this is `Ok(None)`, then the config we use is not the one
// from the operator, it's just the normal user kube config, which will be created after
// this whole thing.
if let Some(connection) = try_connect_using_operator(config, progress, analytics).await? {
return Ok((
AgentConnectInfo::Operator(connection.session),
Expand Down
42 changes: 42 additions & 0 deletions mirrord/cli/src/connection/operator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use mirrord_analytics::Reporter;
use mirrord_config::LayerConfig;
use mirrord_operator::client::OperatorApi;
use mirrord_progress::Progress;

use crate::CliError;

pub(super) struct OperatorConnection<'a, P, R>
where
P: Progress,
R: Reporter,
{
config: &'a LayerConfig,
progress: &'a P,
analytics: &'a mut R,
}

impl<'a, P, R> OperatorConnection<'a, P, R>
where
P: Progress,
R: Reporter,
{
pub(super) fn new(
config: &'a LayerConfig,
progress: &'a P,
analytics: &'a mut R,
) -> Option<Self> {
let mut operator_subtask = progress.subtask("checking operator");
if config.operator == Some(false) {
operator_subtask.success(Some("operator disabled"));
None
} else {
Some(Self {
config,
progress,
analytics,
})
}
}

pub(super) fn create_api(&mut self) {}
}
86 changes: 86 additions & 0 deletions mirrord/operator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,55 @@ impl OperatorApi<NoClientCert> {
})
}

pub async fn try_new_new<R>(
operator_kube_config: &Config,
reporter: &mut R,
client: &Client,
) -> OperatorApiResult<Option<Self>>
where
R: Reporter,
{
let operator: Result<MirrordOperatorCrd, _> =
Api::all(client.clone()).get(OPERATOR_STATUS_NAME).await;

let error = match operator {
Ok(operator) => {
reporter.set_operator_properties(AnalyticsOperatorProperties {
client_hash: None,
license_hash: operator
.spec
.license
.fingerprint
.as_deref()
.map(AnalyticsHash::from_base64),
});

return Ok(Some(Self {
client: client.clone(),
client_cert: NoClientCert {
base_config: operator_kube_config.clone(),
},
operator,
}));
}

Err(error @ kube::Error::Api(..)) => {
match discovery::operator_installed(&client).await {

Check failure on line 369 in mirrord/operator/src/client.rs

View workflow job for this annotation

GitHub Actions / lint

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 369 in mirrord/operator/src/client.rs

View workflow job for this annotation

GitHub Actions / macos_tests

this expression creates a reference which is immediately dereferenced by the compiler
Ok(false) | Err(..) => {
return Ok(None);
}
Ok(true) => error,
}
}

Err(error) => error,
};

Err(OperatorApiError::KubeError {
error,
operation: OperatorOperation::FindingOperator,
})
}
/// Prepares client [`Certificate`] to be sent in all subsequent requests to the operator.
/// In case of failure, state of this API instance does not change.
#[tracing::instrument(level = Level::TRACE, skip(reporter))]
Expand Down Expand Up @@ -407,6 +456,43 @@ impl OperatorApi<MaybeClientCert> {
}
}

pub fn operator_config(base_kube_config: &Config) -> Config {
let mut operator_kube_config = base_kube_config.clone();
operator_kube_config.headers.push((
HeaderName::from_static(MIRRORD_CLI_VERSION_HEADER),
HeaderValue::from_static(env!("CARGO_PKG_VERSION")),
));

let UserIdentity { name, hostname } = UserIdentity::load();

let headers = [
(CLIENT_NAME_HEADER, name),
(CLIENT_HOSTNAME_HEADER, hostname),
];
for (name, raw_value) in headers {
let Some(raw_value) = raw_value else {
continue;
};

// Replace non-ascii (not supported in headers) chars and trim.
let cleaned = raw_value
.replace(|c: char| !c.is_ascii(), "")
.trim()
.to_string();
let value = HeaderValue::from_str(&cleaned);
match value {
Ok(value) => operator_kube_config
.headers
.push((HeaderName::from_static(name), value)),
Err(error) => {
tracing::debug!(%error, %name, raw_value = raw_value, cleaned, "Invalid header value");
}
}
}

operator_kube_config
}

impl<C> OperatorApi<C>
where
C: ClientCertificateState,
Expand Down

0 comments on commit cf320e3

Please sign in to comment.