Skip to content

Commit

Permalink
Handle GCP restore URI destination path correctly (#979)
Browse files Browse the repository at this point in the history
Co-authored-by: Ian Stanton <ian@tembo.io>
  • Loading branch information
nhudson and ianstanton authored Sep 26, 2024
1 parent 4ea4227 commit 3741105
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 28 deletions.
1 change: 1 addition & 0 deletions conductor/justfile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ watch:
BACKUP_ARCHIVE_BUCKET=cdb-plat-use1-dev-instance-backups \
STORAGE_ARCHIVE_BUCKET=cdb-plat-use1-dev-instance-storage \
IS_CLOUD_FORMATION=false \
IS_GCP=false \
cargo watch -x run

run-postgres:
Expand Down
62 changes: 62 additions & 0 deletions conductor/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Add this enum at the top of your file or in a separate module
pub struct CloudProviderBuilder {
gcp: bool,
aws: bool,
}

impl CloudProviderBuilder {
fn new() -> Self {
CloudProviderBuilder {
gcp: false,
aws: false,
}
}

pub fn gcp(mut self, value: bool) -> Self {
self.gcp = value;
self
}

pub fn aws(mut self, value: bool) -> Self {
self.aws = value;
self
}

pub fn build(self) -> CloudProvider {
if self.gcp {
CloudProvider::GCP
} else if self.aws {
CloudProvider::AWS
} else {
CloudProvider::Unknown
}
}
}

pub enum CloudProvider {
AWS,
GCP,
Unknown,
}

impl CloudProvider {
pub fn as_str(&self) -> &'static str {
match self {
CloudProvider::AWS => "aws",
CloudProvider::GCP => "gcp",
CloudProvider::Unknown => "unknown",
}
}

pub fn prefix(&self) -> &'static str {
match self {
CloudProvider::AWS => "s3://",
CloudProvider::GCP => "gs://",
CloudProvider::Unknown => "",
}
}

pub fn builder() -> CloudProviderBuilder {
CloudProviderBuilder::new()
}
}
4 changes: 4 additions & 0 deletions conductor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@ pub enum ConductorError {
/// Google Cloud Storage error
#[error("Google Cloud Storage error: {0}")]
GcsError(#[from] GcsError),

/// Dataplane error
#[error("Dataplane not found error: {0}")]
DataplaneError(String),
}
144 changes: 125 additions & 19 deletions conductor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod aws;
pub mod cloud;
pub mod errors;
pub mod extensions;
pub mod gcp;
Expand All @@ -7,7 +8,10 @@ pub mod monitoring;
pub mod routes;
pub mod types;

use crate::aws::cloudformation::{AWSConfigState, CloudFormationParams};
use crate::{
aws::cloudformation::{AWSConfigState, CloudFormationParams},
cloud::CloudProvider,
};
use aws_sdk_cloudformation::config::Region;
use controller::apis::coredb_types::{CoreDB, CoreDBSpec};
use errors::ConductorError;
Expand All @@ -18,7 +22,7 @@ use kube::api::{DeleteParams, ListParams, Patch, PatchParams};

use chrono::{DateTime, SecondsFormat, Utc};
use kube::{Api, Client, ResourceExt};
use log::{debug, info};
use log::{debug, info, warn};
use serde_json::{from_str, to_string, Value};
use std::{
collections::hash_map::DefaultHasher,
Expand All @@ -27,6 +31,7 @@ use std::{

pub type Result<T, E = ConductorError> = std::result::Result<T, E>;

#[allow(clippy::too_many_arguments)]
pub async fn generate_spec(
org_id: &str,
entity_name: &str,
Expand All @@ -35,18 +40,38 @@ pub async fn generate_spec(
namespace: &str,
backups_bucket: &str,
spec: &CoreDBSpec,
) -> Value {
cloud_provider: &CloudProvider,
) -> Result<Value, ConductorError> {
let mut spec = spec.clone();
// Add the bucket name into the backups_path if it's not already there
if let Some(restore) = &mut spec.restore {
if let Some(backups_path) = &mut restore.backups_path {
if !backups_path.starts_with(&format!("s3://{}", backups_bucket)) {
let path_suffix = backups_path.trim_start_matches("s3://");
*backups_path = format!("s3://{}/{}", backups_bucket, path_suffix);

match cloud_provider {
CloudProvider::AWS | CloudProvider::GCP => {
let prefix = cloud_provider.prefix();

// Format the backups_path with the correct prefix
if let Some(restore) = &mut spec.restore {
if let Some(backups_path) = &mut restore.backups_path {
let clean_path = remove_known_prefixes(backups_path);
if clean_path.starts_with(backups_bucket) {
// If the path already includes the bucket, just add the prefix
*backups_path = format!("{}{}", prefix, clean_path);
} else {
// If the path doesn't include the bucket, add both prefix and bucket
*backups_path = format!("{}{}/{}", prefix, backups_bucket, clean_path);
}
}
}
}
CloudProvider::Unknown => {
warn!(
"Unknown cloud provider or cloud provider is disabled, restore spec removed from Spec value",
);
// Remove the restore information if the cloud provider is unknown
spec.restore = None;
}
}
serde_json::json!({

Ok(serde_json::json!({
"apiVersion": "coredb.io/v1alpha1",
"kind": "CoreDB",
"metadata": {
Expand All @@ -59,7 +84,18 @@ pub async fn generate_spec(
}
},
"spec": spec,
})
}))
}

// Remove known prefixes from the backup path
fn remove_known_prefixes(path: &str) -> &str {
let known_prefixes = ["s3://", "gs://", "https://"];
for prefix in &known_prefixes {
if let Some(stripped) = path.strip_prefix(prefix) {
return stripped;
}
}
path
}

pub fn get_data_plane_id_from_coredb(coredb: &CoreDB) -> Result<String, Box<ConductorError>> {
Expand Down Expand Up @@ -636,16 +672,19 @@ mod tests {
}),
..CoreDBSpec::default()
};
let cloud_provider = CloudProvider::AWS;
let result = generate_spec(
"org-id",
"entity-name",
"instance-id",
"data-plane-id",
"aws_data_1_use1",
"namespace",
"my-bucket",
&spec,
&cloud_provider,
)
.await;
.await
.expect("Failed to generate spec");
let expected_backups_path = "s3://my-bucket/coredb/coredb/org-coredb-inst-pgtrunkio-dev";
assert_eq!(
result["spec"]["restore"]["backupsPath"].as_str().unwrap(),
Expand All @@ -664,16 +703,19 @@ mod tests {
}),
..CoreDBSpec::default()
};
let cloud_provider = CloudProvider::AWS;
let result = generate_spec(
"org-id",
"entity-name",
"instance-id",
"data-plane-id",
"aws_data_1_use1",
"namespace",
"my-bucket",
&spec,
&cloud_provider,
)
.await;
.await
.expect("Failed to generate spec");
let expected_backups_path = "s3://my-bucket/coredb/coredb/org-coredb-inst-pgtrunkio-dev";
assert_eq!(
result["spec"]["restore"]["backupsPath"].as_str().unwrap(),
Expand All @@ -690,16 +732,19 @@ mod tests {
}),
..CoreDBSpec::default()
};
let cloud_provider = CloudProvider::AWS;
let result = generate_spec(
"org-id",
"entity-name",
"instance-id",
"data-plane-id",
"aws_data_1_use1",
"namespace",
"my-bucket",
&spec,
&cloud_provider,
)
.await;
.await
.expect("Failed to generate spec");
assert!(result["spec"]["restore"]["backupsPath"].is_null());
}

Expand All @@ -709,16 +754,77 @@ mod tests {
restore: None,
..CoreDBSpec::default()
};
let cloud_provider = CloudProvider::AWS;
let result = generate_spec(
"org-id",
"entity-name",
"instance-id",
"data-plane-id",
"aws_data_1_use1",
"namespace",
"my-bucket",
&spec,
&cloud_provider,
)
.await;
.await
.expect("Failed to generate spec");
assert!(result["spec"]["restore"].is_null());
}

#[tokio::test]
async fn test_generate_spec_with_non_matching_gcp_bucket() {
let spec = CoreDBSpec {
restore: Some(Restore {
backups_path: Some("gs://v2/test-instance".to_string()),
..Restore::default()
}),
..CoreDBSpec::default()
};
let cloud_provider = CloudProvider::GCP;
let result = generate_spec(
"org-id",
"entity-name",
"instance-id",
"gcp_data_1_usc1",
"namespace",
"my-bucket",
&spec,
&cloud_provider,
)
.await
.expect("Failed to generate spec");
let expected_backups_path = "gs://my-bucket/v2/test-instance";
assert_eq!(
result["spec"]["restore"]["backupsPath"].as_str().unwrap(),
expected_backups_path
);
}

#[tokio::test]
async fn test_generate_spec_with_gcp_bucket() {
let spec = CoreDBSpec {
restore: Some(Restore {
backups_path: Some("gs://my-bucket/v2/test-instance".to_string()),
..Restore::default()
}),
..CoreDBSpec::default()
};
let cloud_provider = CloudProvider::GCP;
let result = generate_spec(
"org-id",
"entity-name",
"instance-id",
"gcp_data_1_usc1",
"namespace",
"my-bucket",
&spec,
&cloud_provider,
)
.await
.expect("Failed to generate spec");
let expected_backups_path = "gs://my-bucket/v2/test-instance";
assert_eq!(
result["spec"]["restore"]["backupsPath"].as_str().unwrap(),
expected_backups_path
);
}
}
18 changes: 13 additions & 5 deletions conductor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use actix_web_opentelemetry::{PrometheusMetricsHandler, RequestTracing};
use conductor::errors::ConductorError;
use conductor::monitoring::CustomMetrics;
use conductor::{
create_cloudformation, create_gcp_storage_workload_identity_binding, create_namespace,
create_or_update, delete, delete_cloudformation, delete_gcp_storage_workload_identity_binding,
delete_namespace, generate_cron_expression, generate_spec, get_coredb_error_without_status,
get_one, get_pg_conn, lookup_role_arn, restart_coredb, types,
cloud::CloudProvider, create_cloudformation, create_gcp_storage_workload_identity_binding,
create_namespace, create_or_update, delete, delete_cloudformation,
delete_gcp_storage_workload_identity_binding, delete_namespace, generate_cron_expression,
generate_spec, get_coredb_error_without_status, get_one, get_pg_conn, lookup_role_arn,
restart_coredb, types,
};

use crate::metrics_reporter::run_metrics_reporter;
Expand Down Expand Up @@ -137,6 +138,12 @@ async fn run(metrics: CustomMetrics) -> Result<(), ConductorError> {

log::info!("Database migrations have been successfully applied.");

// Determine the cloud provider using the builder
let cloud_provider = CloudProvider::builder()
.gcp(is_gcp)
.aws(is_cloud_formation)
.build();

loop {
// Read from queue (check for new message)
// messages that dont fit a CRUDevent will error
Expand Down Expand Up @@ -357,8 +364,9 @@ async fn run(metrics: CustomMetrics) -> Result<(), ConductorError> {
&namespace,
&backup_archive_bucket,
&coredb_spec,
&cloud_provider,
)
.await;
.await?;

info!("{}: Creating or updating spec", read_msg.msg_id);
// create or update CoreDB
Expand Down
Loading

0 comments on commit 3741105

Please sign in to comment.