Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle create pvc events #222

Merged
merged 12 commits into from
Apr 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 87 additions & 32 deletions coredb-operator/src/statefulset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,24 @@ use k8s_openapi::{
apps::v1::{StatefulSet, StatefulSetSpec},
core::v1::{
Container, ContainerPort, EnvVar, EnvVarSource, ExecAction, PersistentVolumeClaim,
PersistentVolumeClaimSpec, PodSpec, PodTemplateSpec, Probe, ResourceRequirements,
PersistentVolumeClaimSpec, Pod, PodSpec, PodTemplateSpec, Probe, ResourceRequirements,
SecretKeySelector, SecurityContext, VolumeMount,
},
},
apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::LabelSelector},
};
use kube::{
api::{Api, DeleteParams, ObjectMeta, Patch, PatchParams, ResourceExt},
api::{Api, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, ResourceExt},
Resource,
};
use std::{str, thread, time::Duration};

use k8s_openapi::{
api::core::v1::{EmptyDirVolumeSource, HTTPGetAction, Volume},
apimachinery::pkg::util::intstr::IntOrString,
};
use std::{collections::BTreeMap, sync::Arc};
use tracing::{debug, info};
use tracing::{debug, info, warn};

const PKGLIBDIR: &str = "/usr/lib/postgresql/15/lib";
const SHAREDIR: &str = "/usr/share/postgresql/15";
Expand Down Expand Up @@ -306,79 +307,133 @@ pub fn stateful_set_from_cdb(cdb: &CoreDB) -> StatefulSet {
sts
}

fn diff_pvcs(expected: &[String], actual: &[String]) -> Vec<String> {
let mut to_create = vec![];
for pvc in expected {
if !actual.contains(pvc) {
to_create.push(pvc.to_string());
}
}
to_create
}

async fn list_pvcs(ctx: Arc<Context>, sts_name: &str, sts_namespace: &str) -> Result<Vec<String>, Error> {
let label_selector = format!("statefulset={sts_name}");
let list_params = ListParams::default().labels(&label_selector);
let pvc_api: Api<PersistentVolumeClaim> = Api::namespaced(ctx.client.clone(), sts_namespace);

// list all PVCs in namespace
let all_pvcs = pvc_api.list(&list_params).await?;
Ok(all_pvcs
.into_iter()
.map(|pvc| pvc.metadata.name.unwrap())
.collect())
}

pub async fn reconcile_sts(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Error> {
let client = ctx.client.clone();

let sts: StatefulSet = stateful_set_from_cdb(cdb);

let sts_api: Api<StatefulSet> = Api::namespaced(client, &sts.clone().metadata.namespace.unwrap());
let sts_api: Api<StatefulSet> = Api::namespaced(client.clone(), &sts.clone().metadata.namespace.unwrap());

let sts_name = sts.clone().metadata.name.unwrap();
let sts_namespace = sts.clone().metadata.namespace.unwrap();

// If cdb is running and storage is resized in CoreDB Custom Resource then follow steps to resize PVC
// Reference Article: https://itnext.io/resizing-statefulset-persistent-volumes-with-zero-downtime-916ebc65b1d4

// determine pvcs that changed
let pvcs_to_update = match cdb.status.is_some() && cdb.status.clone().unwrap().running {
// determine pvcs that changed or need to be created
let (pvcs_to_update, pvcs_to_create) = match cdb.status.is_some() && cdb.status.clone().unwrap().running {
true => {
let mut pvcs_to_update = Vec::new();
let data_pvc_name = pvc_full_name("data", &sts_name);
let pkglib_pvc_name = pvc_full_name("pkglibdir", &sts_name);
let share_pvc_name = pvc_full_name("sharedir", &sts_name);

// reconcile expected vs actual pvcs, this is what needs to be created
let expected_pvcs = vec![
data_pvc_name.clone(),
pkglib_pvc_name.clone(),
share_pvc_name.clone(),
];
let actual_pvcs = list_pvcs(ctx.clone(), &sts_name, &sts_namespace).await?;
// if there is a diff, it needs to be created. assumes we never delete a pvc.
let pvcs_to_create = diff_pvcs(&expected_pvcs, &actual_pvcs);
debug!("pvcs_to_create: {:?}", pvcs_to_create);

// determine if PVCs changed
if cdb.status.clone().unwrap().storage != cdb.spec.storage {
pvcs_to_update.push(("data".to_string(), cdb.spec.storage.clone()));
pvcs_to_update.push((data_pvc_name, cdb.spec.storage.clone()));
}
if cdb.status.clone().unwrap().sharedirStorage != cdb.spec.sharedirStorage {
pvcs_to_update.push(("sharedir".to_string(), cdb.spec.sharedirStorage.clone()));
pvcs_to_update.push((share_pvc_name, cdb.spec.sharedirStorage.clone()));
}
if cdb.status.clone().unwrap().pkglibdirStorage != cdb.spec.pkglibdirStorage {
pvcs_to_update.push(("pkglibdir".to_string(), cdb.spec.pkglibdirStorage.clone()));
pvcs_to_update.push((pkglib_pvc_name, cdb.spec.pkglibdirStorage.clone()));
}
pvcs_to_update
debug!("pvcs_to_update: {:?}", pvcs_to_update);
(pvcs_to_update, pvcs_to_create)
}
false => {
debug!("cdb is not running");
(vec![], vec![])
}
false => Vec::new(),
};
debug!("pvcs_to_update: {:?}", pvcs_to_update);
if !pvcs_to_update.is_empty() {
let sts_name = sts.clone().metadata.name.unwrap();
let sts_namespace = sts.clone().metadata.namespace.unwrap();

// why do delete first, then update?
delete_sts_no_cascade(&sts_api, &sts_name).await;

if !pvcs_to_update.is_empty() {
delete_sts_no_cascade(&sts_api, &sts_name).await?;
let pvc_api: Api<PersistentVolumeClaim> = Api::namespaced(ctx.client.clone(), &sts_namespace);
for (pvc_name, qty) in pvcs_to_update {
update_pvc(&pvc_api, &sts_name, &pvc_name, qty).await;
for (pvc_full_name, qty) in pvcs_to_update {
update_pvc(&pvc_api, &pvc_full_name, qty).await;
}
}

if !pvcs_to_create.is_empty() {
delete_sts_no_cascade(&sts_api, &sts_name).await?;
let primary_pod = cdb.primary_pod(client.clone()).await?;
let pod_api: Api<Pod> = Api::namespaced(client.clone(), &sts_namespace);

let prim_pod_name = primary_pod.metadata.name.unwrap();
warn!("deleting pod to attach pvc: {}", prim_pod_name);
pod_api.delete(&prim_pod_name, &DeleteParams::default()).await?;
}

let ps = PatchParams::apply("cntrlr").force();
let _o = sts_api
.patch(&sts.clone().metadata.name.unwrap(), &ps, &Patch::Apply(&sts))
.await
.map_err(Error::KubeError)?;

Ok(())
}

async fn delete_sts_no_cascade(sts_api: &Api<StatefulSet>, sts_name: &str) {
async fn delete_sts_no_cascade(sts_api: &Api<StatefulSet>, sts_name: &str) -> Result<(), Error> {
let delete_params: DeleteParams = DeleteParams {
dry_run: false,
grace_period_seconds: None,
propagation_policy: Some(kube::api::PropagationPolicy::Orphan),
preconditions: None,
};

let _o = sts_api
info!("deleting_sts_no_cascade: {}", sts_name);
let _ = sts_api
.delete(sts_name, &delete_params)
.await
.map_err(Error::KubeError);
.map_err(Error::KubeError)?;
thread::sleep(Duration::from_millis(3000));
Ok(())
}

fn pvc_full_name(pvc_name: &str, sts_name: &str) -> String {
format!("{pvc_name}-{sts_name}-0")
}
Comment on lines +427 to 429
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this logic out, so we can create and use the pvc names upstream in the pvc reconcile process


async fn update_pvc(pvc_api: &Api<PersistentVolumeClaim>, sts_name: &str, pvc_name: &str, value: Quantity) {
info!(
"Updating PVC {} for StatefulSet {}, Value: {:?}",
pvc_name, sts_name, value
);
let pvc_full_name = format!("{pvc_name}-{sts_name}-0");
async fn update_pvc(pvc_api: &Api<PersistentVolumeClaim>, pvc_full_name: &str, value: Quantity) {
info!("Updating PVC {}, Value: {:?}", pvc_full_name, value);
let mut pvc_requests: BTreeMap<String, Quantity> = BTreeMap::new();
pvc_requests.insert("storage".to_string(), value);

let mut pvc = pvc_api.get(&pvc_full_name).await.unwrap();
let mut pvc = pvc_api.get(pvc_full_name).await.unwrap();

pvc.metadata.managed_fields = None;

Expand All @@ -399,7 +454,7 @@ async fn update_pvc(pvc_api: &Api<PersistentVolumeClaim>, sts_name: &str, pvc_na
};

let _o = pvc_api
.patch(&pvc_full_name, &patch_params, &Patch::Apply(pvc))
.patch(pvc_full_name, &patch_params, &Patch::Apply(pvc))
.await
.map_err(Error::KubeError);
}
Expand Down