Skip to content

Commit

Permalink
exec trunk install (#217)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend authored Apr 10, 2023
1 parent da701ed commit 9dc5657
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 28 deletions.
2 changes: 1 addition & 1 deletion coredb-operator/charts/coredb-operator/templates/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ spec:
type: object
type: array
image:
default: quay.io/coredb/postgres:6e3c4a7
default: quay.io/coredb/postgres:d45c5b1
type: string
pkglibdirStorage:
default: 1Gi
Expand Down
12 changes: 12 additions & 0 deletions coredb-operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::{
};

use crate::{
exec::{ExecCommand, ExecOutput},
psql::{PsqlCommand, PsqlOutput},
service::reconcile_svc,
statefulset::{reconcile_sts, stateful_set_from_cdb},
Expand Down Expand Up @@ -254,6 +255,17 @@ impl CoreDB {
.execute()
.await
}

pub async fn exec(
&self,
pod_name: String,
client: Client,
command: &[String],
) -> Result<ExecOutput, Error> {
ExecCommand::new(pod_name, self.metadata.namespace.clone().unwrap(), client)
.execute(command)
.await
}
}

pub fn is_pod_ready() -> impl Condition<Pod> + 'static {
Expand Down
2 changes: 1 addition & 1 deletion coredb-operator/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub fn default_port() -> i32 {
}

pub fn default_image() -> String {
"quay.io/coredb/postgres:6e3c4a7".to_owned()
"quay.io/coredb/postgres:d45c5b1".to_owned()
}

pub fn default_storage() -> Quantity {
Expand Down
87 changes: 87 additions & 0 deletions coredb-operator/src/exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::Status};
use kube::{api::Api, client::Client, core::subresource::AttachParams};
use tokio::io::AsyncReadExt;

use crate::Error;
use tracing::error;

#[derive(Debug)]
pub struct ExecOutput {
pub stdout: Option<String>,
pub stderr: Option<String>,
pub status: Option<Status>,
}

impl ExecOutput {
pub fn new(stdout: Option<String>, stderr: Option<String>, status: Option<Status>) -> Self {
Self {
stdout,
stderr,
status,
}
}
}

pub struct ExecCommand {
pods_api: Api<Pod>,
pod_name: String,
}

impl ExecCommand {
pub fn new(pod_name: String, namespace: String, client: Client) -> Self {
let pods_api: Api<Pod> = Api::namespaced(client, &namespace);
Self { pod_name, pods_api }
}

pub async fn execute(&self, command: &[String]) -> Result<ExecOutput, Error> {
let attach_params = AttachParams {
container: Some("postgres".to_string()),
tty: false,
stdin: true,
stdout: true,
stderr: true,
max_stdin_buf_size: Some(1024),
max_stdout_buf_size: Some(1024),
max_stderr_buf_size: Some(1024),
};

let mut attached_process = self
.pods_api
.exec(self.pod_name.as_str(), command, &attach_params)
.await?;

let mut stdout_reader = attached_process.stdout().unwrap();
let mut result_stdout = String::new();
stdout_reader.read_to_string(&mut result_stdout).await.unwrap();

let mut stderr_reader = attached_process.stderr().unwrap();
let mut result_stderr = String::new();
stderr_reader.read_to_string(&mut result_stderr).await.unwrap();


let status = attached_process.take_status().unwrap().await.unwrap();
// https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
let response = ExecOutput::new(Some(result_stdout), Some(result_stderr), Some(status.clone()));

match status.status.expect("no status reported").as_str() {
"Success" => Ok(response),
"Failure" => {
error!("Error executing command: {:?}. response: {:?}", command, response);
Err(Error::KubeExecError(format!(
"Error executing command: {:?}. response: {:?}",
command, response
)))
}
_ => {
error!(
"Undefined response from kube API {:?}, command: {:?}",
response, command
);
Err(Error::KubeExecError(format!(
"Error executing command: {:?}. response: {:?}",
command, response
)))
}
}
}
}
101 changes: 75 additions & 26 deletions coredb-operator/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,55 @@ name asc,
enabled desc
"#;

/// handles installing extensions
pub async fn install_extension(
cdb: &CoreDB,
extensions: &[Extension],
ctx: Arc<Context>,
) -> Result<(), Error> {
debug!("extensions to install: {:?}", extensions);
let client = ctx.client.clone();

let pod_name = cdb
.primary_pod(client.clone())
.await
.unwrap()
.metadata
.name
.unwrap();

let mut errors: Vec<Error> = Vec::new();
for ext in extensions.iter() {
let version = ext.locations[0].version.clone().unwrap();
let cmd = vec![
"trunk".to_owned(),
"install".to_owned(),
ext.name.clone(),
"--version".to_owned(),
version,
];
let result = cdb.exec(pod_name.clone(), client.clone(), &cmd).await;

match result {
Ok(result) => {
debug!("installed extension: {}", result.stdout.clone().unwrap());
}
Err(err) => {
error!("error installing extension, {}", err);
errors.push(err);
}
}
}
let num_success = extensions.len() - errors.len();
info!(
"Successfully installed {} / {} extensions",
num_success,
extensions.len()
);
Ok(())
}


/// handles create/drop extensions
pub async fn toggle_extensions(
cdb: &CoreDB,
Expand Down Expand Up @@ -302,32 +351,6 @@ pub async fn get_all_extensions(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<E
Ok(ext_spec)
}

/// reconcile extensions between the spec and the database
pub async fn reconcile_extensions(coredb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<Extension>, Error> {
// always get the current state of extensions in the database
// this is due to out of band changes - manual create/drop extension
let actual_extensions = get_all_extensions(coredb, ctx.clone()).await?;
let desired_extensions = coredb.spec.extensions.clone();

// most of the time there will be no changes
let extensions_changed = diff_extensions(&desired_extensions, &actual_extensions);

if extensions_changed.is_empty() {
// no further work when no changes
return Ok(actual_extensions);
}

// otherwise, need to determine the plan to apply
let (changed_extensions, extensions_to_install) = extension_plan(&desired_extensions, &actual_extensions);

toggle_extensions(coredb, &changed_extensions, ctx.clone()).await?;
debug!("extensions to install: {:?}", extensions_to_install);
// TODO: trunk install >extensions_to_install< on container

// return final state of extensions
get_all_extensions(coredb, ctx.clone()).await
}

// returns any elements that are in the desired, and not in actual
// any Extensions returned by this function need either create or drop extension
// cheap way to determine if there have been any sort of changes to extensions
Expand Down Expand Up @@ -367,6 +390,32 @@ fn extension_plan(have_changed: &[Extension], actual: &[Extension]) -> (Vec<Exte
(changed, to_install)
}

/// reconcile extensions between the spec and the database
pub async fn reconcile_extensions(coredb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<Extension>, Error> {
// always get the current state of extensions in the database
// this is due to out of band changes - manual create/drop extension
let actual_extensions = get_all_extensions(coredb, ctx.clone()).await?;
let desired_extensions = coredb.spec.extensions.clone();

// most of the time there will be no changes
let extensions_changed = diff_extensions(&desired_extensions, &actual_extensions);

if extensions_changed.is_empty() {
// no further work when no changes
return Ok(actual_extensions);
}

// otherwise, need to determine the plan to apply
let (changed_extensions, extensions_to_install) = extension_plan(&desired_extensions, &actual_extensions);

toggle_extensions(coredb, &changed_extensions, ctx.clone()).await?;
install_extension(coredb, &extensions_to_install, ctx.clone()).await?;

// return final state of extensions
get_all_extensions(coredb, ctx.clone()).await
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 4 additions & 0 deletions coredb-operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod apis;
/// Log and trace integrations
pub mod telemetry;

mod exec;
/// Metrics
mod metrics;
pub use metrics::Metrics;
Expand All @@ -21,6 +22,9 @@ use thiserror::Error;

#[derive(Error, Debug)]
pub enum Error {
#[error("An error occurred in kube-exec: {0}")]
KubeExecError(String),

#[error("SerializationError: {0}")]
SerializationError(#[source] serde_json::Error),

Expand Down

0 comments on commit 9dc5657

Please sign in to comment.