Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exec trunk install #217

Merged
merged 13 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion coredb-operator/charts/coredb-operator/templates/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ spec:
type: object
type: array
image:
default: quay.io/coredb/postgres:6e3c4a7
default: quay.io/coredb/postgres:d45c5b1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to the image that has trunk cli installed

type: string
pkglibdirStorage:
default: 1Gi
Expand Down
12 changes: 12 additions & 0 deletions coredb-operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::{
};

use crate::{
exec::{ExecCommand, ExecOutput},
psql::{PsqlCommand, PsqlOutput},
service::reconcile_svc,
statefulset::{reconcile_sts, stateful_set_from_cdb},
Expand Down Expand Up @@ -254,6 +255,17 @@ impl CoreDB {
.execute()
.await
}

pub async fn exec(
&self,
pod_name: String,
client: Client,
command: &[String],
) -> Result<ExecOutput, Error> {
ExecCommand::new(pod_name, self.metadata.namespace.clone().unwrap(), client)
.execute(command)
.await
}
}

pub fn is_pod_ready() -> impl Condition<Pod> + 'static {
Expand Down
2 changes: 1 addition & 1 deletion coredb-operator/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub fn default_port() -> i32 {
}

pub fn default_image() -> String {
"quay.io/coredb/postgres:6e3c4a7".to_owned()
"quay.io/coredb/postgres:d45c5b1".to_owned()
}

pub fn default_storage() -> Quantity {
Expand Down
87 changes: 87 additions & 0 deletions coredb-operator/src/exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::Status};
use kube::{api::Api, client::Client, core::subresource::AttachParams};
use tokio::io::AsyncReadExt;

use crate::Error;
use tracing::error;

#[derive(Debug)]
pub struct ExecOutput {
pub stdout: Option<String>,
pub stderr: Option<String>,
pub status: Option<Status>,
}

impl ExecOutput {
pub fn new(stdout: Option<String>, stderr: Option<String>, status: Option<Status>) -> Self {
Self {
stdout,
stderr,
status,
}
}
}

pub struct ExecCommand {
pods_api: Api<Pod>,
pod_name: String,
}

impl ExecCommand {
pub fn new(pod_name: String, namespace: String, client: Client) -> Self {
let pods_api: Api<Pod> = Api::namespaced(client, &namespace);
Self { pod_name, pods_api }
}

pub async fn execute(&self, command: &[String]) -> Result<ExecOutput, Error> {
let attach_params = AttachParams {
container: Some("postgres".to_string()),
tty: false,
stdin: true,
stdout: true,
stderr: true,
max_stdin_buf_size: Some(1024),
max_stdout_buf_size: Some(1024),
max_stderr_buf_size: Some(1024),
};

let mut attached_process = self
.pods_api
.exec(self.pod_name.as_str(), command, &attach_params)
.await?;

let mut stdout_reader = attached_process.stdout().unwrap();
let mut result_stdout = String::new();
stdout_reader.read_to_string(&mut result_stdout).await.unwrap();

let mut stderr_reader = attached_process.stderr().unwrap();
let mut result_stderr = String::new();
stderr_reader.read_to_string(&mut result_stderr).await.unwrap();


let status = attached_process.take_status().unwrap().await.unwrap();
// https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
let response = ExecOutput::new(Some(result_stdout), Some(result_stderr), Some(status.clone()));

match status.status.expect("no status reported").as_str() {
"Success" => Ok(response),
"Failure" => {
error!("Error executing command: {:?}. response: {:?}", command, response);
Err(Error::KubeExecError(format!(
"Error executing command: {:?}. response: {:?}",
command, response
)))
}
_ => {
error!(
"Undefined response from kube API {:?}, command: {:?}",
response, command
);
Err(Error::KubeExecError(format!(
"Error executing command: {:?}. response: {:?}",
command, response
)))
}
}
}
}
101 changes: 75 additions & 26 deletions coredb-operator/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,55 @@ name asc,
enabled desc
"#;

/// handles installing extensions
pub async fn install_extension(
cdb: &CoreDB,
extensions: &[Extension],
ctx: Arc<Context>,
) -> Result<(), Error> {
debug!("extensions to install: {:?}", extensions);
let client = ctx.client.clone();

let pod_name = cdb
.primary_pod(client.clone())
.await
.unwrap()
.metadata
.name
.unwrap();

let mut errors: Vec<Error> = Vec::new();
for ext in extensions.iter() {
let version = ext.locations[0].version.clone().unwrap();
let cmd = vec![
"trunk".to_owned(),
"install".to_owned(),
ext.name.clone(),
"--version".to_owned(),
version,
];
let result = cdb.exec(pod_name.clone(), client.clone(), &cmd).await;

match result {
Ok(result) => {
debug!("installed extension: {}", result.stdout.clone().unwrap());
}
Err(err) => {
error!("error installing extension, {}", err);
errors.push(err);
}
}
}
let num_success = extensions.len() - errors.len();
info!(
"Successfully installed {} / {} extensions",
num_success,
extensions.len()
);
Ok(())
}


/// handles create/drop extensions
pub async fn toggle_extensions(
cdb: &CoreDB,
Expand Down Expand Up @@ -302,32 +351,6 @@ pub async fn get_all_extensions(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<E
Ok(ext_spec)
}

/// reconcile extensions between the spec and the database
pub async fn reconcile_extensions(coredb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<Extension>, Error> {
// always get the current state of extensions in the database
// this is due to out of band changes - manual create/drop extension
let actual_extensions = get_all_extensions(coredb, ctx.clone()).await?;
let desired_extensions = coredb.spec.extensions.clone();

// most of the time there will be no changes
let extensions_changed = diff_extensions(&desired_extensions, &actual_extensions);

if extensions_changed.is_empty() {
// no further work when no changes
return Ok(actual_extensions);
}

// otherwise, need to determine the plan to apply
let (changed_extensions, extensions_to_install) = extension_plan(&desired_extensions, &actual_extensions);

toggle_extensions(coredb, &changed_extensions, ctx.clone()).await?;
debug!("extensions to install: {:?}", extensions_to_install);
// TODO: trunk install >extensions_to_install< on container

// return final state of extensions
get_all_extensions(coredb, ctx.clone()).await
}

// returns any elements that are in the desired, and not in actual
// any Extensions returned by this function need either create or drop extension
// cheap way to determine if there have been any sort of changes to extensions
Expand Down Expand Up @@ -367,6 +390,32 @@ fn extension_plan(have_changed: &[Extension], actual: &[Extension]) -> (Vec<Exte
(changed, to_install)
}

/// reconcile extensions between the spec and the database
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this to the bottom of the file so its easier to find. its the "top level" command in the file

pub async fn reconcile_extensions(coredb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<Extension>, Error> {
// always get the current state of extensions in the database
// this is due to out of band changes - manual create/drop extension
let actual_extensions = get_all_extensions(coredb, ctx.clone()).await?;
let desired_extensions = coredb.spec.extensions.clone();

// most of the time there will be no changes
let extensions_changed = diff_extensions(&desired_extensions, &actual_extensions);

if extensions_changed.is_empty() {
// no further work when no changes
return Ok(actual_extensions);
}

// otherwise, need to determine the plan to apply
let (changed_extensions, extensions_to_install) = extension_plan(&desired_extensions, &actual_extensions);

toggle_extensions(coredb, &changed_extensions, ctx.clone()).await?;
install_extension(coredb, &extensions_to_install, ctx.clone()).await?;

// return final state of extensions
get_all_extensions(coredb, ctx.clone()).await
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 4 additions & 0 deletions coredb-operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod apis;
/// Log and trace integrations
pub mod telemetry;

mod exec;
/// Metrics
mod metrics;
pub use metrics::Metrics;
Expand All @@ -21,6 +22,9 @@ use thiserror::Error;

#[derive(Error, Debug)]
pub enum Error {
#[error("An error occurred in kube-exec: {0}")]
KubeExecError(String),

#[error("SerializationError: {0}")]
SerializationError(#[source] serde_json::Error),

Expand Down