Skip to content

Commit

Permalink
coredb-operator: fix empty string and add tests (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend authored Mar 15, 2023
1 parent edf6557 commit a60546e
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 108 deletions.
1 change: 1 addition & 0 deletions coredb-operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl CoreDB {
return Ok(Action::requeue(Duration::from_secs(1)));
}

// creating exporter role is pre-requisite to the postgres pod becoming "ready"
create_postgres_exporter_role(self, ctx.clone())
.await
.unwrap_or_else(|_| {
Expand Down
288 changes: 183 additions & 105 deletions coredb-operator/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};


#[derive(Debug)]
pub struct ExtRow {
Expand Down Expand Up @@ -76,9 +77,6 @@ pub async fn toggle_extensions(
let client = ctx.client.clone();
let re = Regex::new(r"[a-zA-Z][0-9a-zA-Z_-]*$").unwrap();

// TODO(ianstanton) Some extensions will fail to create. We need to handle and surface any errors.
// Logging result at debug level for now.

// iterate through list of extensions and run CREATE EXTENSION <extension-name> for each
for ext in extensions {
let ext_name = ext.name.as_str();
Expand All @@ -91,44 +89,45 @@ pub async fn toggle_extensions(
// extensions can be installed in multiple databases but only a single schema
for ext_loc in ext.locations.iter() {
let database_name = ext_loc.database.to_owned();

if !re.is_match(&database_name) {
warn!(
"Extension.Database {}.{} is not formatted properly. Skipping operation.",
ext_name, database_name
);
continue;
}
if ext_loc.enabled {
info!("Creating extension: {}, database {}", ext_name, database_name);
let schema_name = ext_loc.schema.to_owned();
if !re.is_match(&schema_name) {
warn!(
"Extension.Database.Schema {}.{}.{} is not formatted properly. Skipping operation.",
ext_name, database_name, schema_name
);
continue;
let command = match ext_loc.enabled {
true => {
info!("Creating extension: {}, database {}", ext_name, database_name);
let schema_name = ext_loc.schema.to_owned();
if !re.is_match(&schema_name) {
warn!(
"Extension.Database.Schema {}.{}.{} is not formatted properly. Skipping operation.",
ext_name, database_name, schema_name
);
continue;
}
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} SCHEMA {schema_name};")
}
false => {
info!("Dropping extension: {}, database {}", ext_name, database_name);
format!("DROP EXTENSION IF EXISTS {ext_name};")
}
};

let result = cdb
.psql(command.clone(), database_name.clone(), client.clone())
.await;

match result {
Ok(result) => {
debug!("Result: {}", result.stdout.clone().unwrap());
}
Err(err) => {
error!("error managing extension");
return Err(err.into());
}
// this will no-op if we've already created the extension
let result = cdb
.psql(
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} SCHEMA {schema_name};"),
database_name,
client.clone(),
)
.await
.unwrap();
debug!("Result: {}", result.stdout.clone().unwrap());
} else {
info!("Dropping extension: {}, database {}", ext_name, database_name);
let result = cdb
.psql(
format!("DROP EXTENSION IF EXISTS {ext_name};"),
database_name,
client.clone(),
)
.await
.unwrap();
debug!("Result: {}", result.stdout.clone().unwrap());
}
}
}
Expand All @@ -148,18 +147,26 @@ pub async fn list_databases(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<Strin
.await
.unwrap();
let result_string = psql_out.stdout.unwrap();
Ok(parse_databases(&result_string))
}

fn parse_databases(psql_str: &str) -> Vec<String> {
let mut databases = vec![];
for line in result_string.lines().skip(2) {
for line in psql_str.lines().skip(2) {
let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect();
if fields.is_empty() {
if fields.is_empty()
|| fields[0].is_empty()
|| fields[0].contains("rows)")
|| fields[0].contains("row)")
{
debug!("Done:{:?}", fields);
continue;
}
databases.push(fields[0].to_string());
}
let num_databases = databases.len();
info!("Found {} databases", num_databases);
Ok(databases)
databases
}

/// lists all extensions in a single database
Expand All @@ -174,8 +181,12 @@ pub async fn list_extensions(cdb: &CoreDB, ctx: Arc<Context>, database: &str) ->
.await
.unwrap();
let result_string = psql_out.stdout.unwrap();
Ok(parse_extensions(&result_string))
}

fn parse_extensions(psql_str: &str) -> Vec<ExtRow> {
let mut extensions = vec![];
for line in result_string.lines().skip(2) {
for line in psql_str.lines().skip(2) {
let fields: Vec<&str> = line.split('|').map(|s| s.trim()).collect();
if fields.len() < 4 {
debug!("Done:{:?}", fields);
Expand All @@ -191,13 +202,14 @@ pub async fn list_extensions(cdb: &CoreDB, ctx: Arc<Context>, database: &str) ->
}
let num_extensions = extensions.len();
info!("Found {} extensions", num_extensions);
Ok(extensions)
extensions
}

/// list databases then get all extensions from each database
pub async fn get_all_extensions(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<Extension>, Error> {
let databases = list_databases(cdb, ctx.clone()).await?;

debug!("databases: {:?}", databases);
// sleep 10
let mut ext_hashmap: HashMap<String, Vec<ExtensionInstallLocation>> = HashMap::new();
// query every database for extensions
// transform results by extension name, rather than by database
Expand Down Expand Up @@ -253,69 +265,135 @@ fn diff_extensions(desired: &[Extension], actual: &[Extension]) -> Vec<Extension
diff
}

#[test]
fn test_diff() {
let postgis_disabled = Extension {
name: "postgis".to_owned(),
locations: vec![ExtensionInstallLocation {
enabled: false,
database: "postgres".to_owned(),
schema: "public".to_owned(),
version: Some("1.1.1".to_owned()),
}],
};

let pgmq_enabled = Extension {
name: "pgmq".to_owned(),
locations: vec![ExtensionInstallLocation {
enabled: true,
database: "postgres".to_owned(),
schema: "public".to_owned(),
version: Some("1.1.1".to_owned()),
}],
};

let pgmq_disabled = Extension {
name: "pgmq".to_owned(),
locations: vec![ExtensionInstallLocation {
enabled: false,
database: "postgres".to_owned(),
schema: "public".to_owned(),
version: Some("1.1.1".to_owned()),
}],
};

let desired = vec![postgis_disabled.clone(), pgmq_enabled.clone()];
let actual = vec![postgis_disabled.clone(), pgmq_disabled.clone()];
// diff should be that we need to enable pgmq
let diff = diff_extensions(&desired, &actual);
assert_eq!(diff.len(), 1);
assert_eq!(diff[0], pgmq_enabled);

// order does not matter
let desired = vec![pgmq_enabled.clone(), postgis_disabled.clone()];
let actual = vec![postgis_disabled.clone(), pgmq_disabled.clone()];
// diff will still be to enable pgmq
let diff = diff_extensions(&desired, &actual);
assert_eq!(diff.len(), 1);
assert_eq!(diff[0], pgmq_enabled);

let desired = vec![postgis_disabled.clone(), pgmq_enabled.clone()];
let actual = vec![postgis_disabled.clone(), pgmq_disabled.clone()];
// diff should be that we need to enable pgmq
let diff = diff_extensions(&desired, &actual);
assert_eq!(diff.len(), 1);
assert_eq!(diff[0], pgmq_enabled);

let desired = vec![postgis_disabled.clone(), pgmq_enabled.clone()];
let actual = vec![postgis_disabled.clone(), pgmq_enabled.clone()];
// diff == actual, so diff should be empty
let diff = diff_extensions(&desired, &actual);
assert_eq!(diff.len(), 0);

let desired = vec![postgis_disabled.clone()];
let actual = vec![postgis_disabled.clone(), pgmq_enabled.clone()];
// less extensions desired than exist - should be a no op
let diff = diff_extensions(&desired, &actual);
assert_eq!(diff.len(), 0);

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_diff() {
let postgis_disabled = Extension {
name: "postgis".to_owned(),
locations: vec![ExtensionInstallLocation {
enabled: false,
database: "postgres".to_owned(),
schema: "public".to_owned(),
version: Some("1.1.1".to_owned()),
}],
};

let pgmq_enabled = Extension {
name: "pgmq".to_owned(),
locations: vec![ExtensionInstallLocation {
enabled: true,
database: "postgres".to_owned(),
schema: "public".to_owned(),
version: Some("1.1.1".to_owned()),
}],
};

let pgmq_disabled = Extension {
name: "pgmq".to_owned(),
locations: vec![ExtensionInstallLocation {
enabled: false,
database: "postgres".to_owned(),
schema: "public".to_owned(),
version: Some("1.1.1".to_owned()),
}],
};

let desired = vec![postgis_disabled.clone(), pgmq_enabled.clone()];
let actual = vec![postgis_disabled.clone(), pgmq_disabled.clone()];
// diff should be that we need to enable pgmq
let diff = diff_extensions(&desired, &actual);
assert_eq!(diff.len(), 1);
assert_eq!(diff[0], pgmq_enabled);

// order does not matter
let desired = vec![pgmq_enabled.clone(), postgis_disabled.clone()];
let actual = vec![postgis_disabled.clone(), pgmq_disabled.clone()];
// diff will still be to enable pgmq
let diff = diff_extensions(&desired, &actual);
assert_eq!(diff.len(), 1);
assert_eq!(diff[0], pgmq_enabled);

let desired = vec![postgis_disabled.clone(), pgmq_enabled.clone()];
let actual = vec![postgis_disabled.clone(), pgmq_disabled.clone()];
// diff should be that we need to enable pgmq
let diff = diff_extensions(&desired, &actual);
assert_eq!(diff.len(), 1);
assert_eq!(diff[0], pgmq_enabled);

let desired = vec![postgis_disabled.clone(), pgmq_enabled.clone()];
let actual = vec![postgis_disabled.clone(), pgmq_enabled.clone()];
// diff == actual, so diff should be empty
let diff = diff_extensions(&desired, &actual);
assert_eq!(diff.len(), 0);

let desired = vec![postgis_disabled.clone()];
let actual = vec![postgis_disabled.clone(), pgmq_enabled.clone()];
// less extensions desired than exist - should be a no op
let diff = diff_extensions(&desired, &actual);
assert_eq!(diff.len(), 0);
}


#[test]
fn test_parse_databases() {
let three_db = " datname
----------
postgres
cat
dog
(3 rows)
";

let rows = parse_databases(three_db);
println!("{:?}", rows);
assert_eq!(rows.len(), 3);
assert_eq!(rows[0], "postgres");
assert_eq!(rows[1], "cat");
assert_eq!(rows[2], "dog");

let one_db = " datname
----------
postgres
(1 row)
";

let rows = parse_databases(one_db);
println!("{:?}", rows);
assert_eq!(rows.len(), 1);
assert_eq!(rows[0], "postgres");
}

#[test]
fn test_parse_extensions() {
let ext_psql = " name | version | enabled | schema
--------------------+---------+---------+------------
adminpack | 2.1 | f | public
amcheck | 1.3 | f | public
autoinc | 1.0 | f | public
bloom | 1.0 | f | public
btree_gin | 1.3 | f | public
btree_gist | 1.7 | f | public
citext | 1.6 | f | public
cube | 1.5 | f | public
dblink | 1.2 | f | public";


let ext = parse_extensions(ext_psql);
assert_eq!(ext.len(), 9);
assert_eq!(ext[0].name, "adminpack");
assert_eq!(ext[0].enabled, false);
assert_eq!(ext[0].version, "2.1".to_owned());
assert_eq!(ext[0].schema, "public".to_owned());

assert_eq!(ext[8].name, "dblink");
assert_eq!(ext[8].enabled, false);
assert_eq!(ext[8].version, "1.2".to_owned());
assert_eq!(ext[8].schema, "public".to_owned());
}
}
2 changes: 1 addition & 1 deletion coredb-operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum Error {
SerializationError(#[source] serde_json::Error),

#[error("Kube Error: {0}")]
KubeError(#[source] kube::Error),
KubeError(#[from] kube::Error),

#[error("Finalizer Error: {0}")]
// NB: awkward type because finalizer::Error embeds the reconciler error (which is this)
Expand Down
3 changes: 1 addition & 2 deletions coredb-operator/src/psql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ impl PsqlCommand {
let mut attached_process = self
.pods_api
.exec(self.pod_name.as_str(), &psql_command, &attach_params)
.await
.unwrap();
.await?;

// https://docs.rs/tokio/latest/tokio/io/trait.AsyncReadExt.html#method.read_to_string
// Since waiting for EOF to be reached, a join is not needed and the attached_process will
Expand Down

0 comments on commit a60546e

Please sign in to comment.