diff --git a/libs/utils/src/fs_ext.rs b/libs/utils/src/fs_ext.rs index 090912d2765d..dfb7d5abbfa4 100644 --- a/libs/utils/src/fs_ext.rs +++ b/libs/utils/src/fs_ext.rs @@ -24,6 +24,20 @@ pub async fn is_directory_empty(path: impl AsRef) -> anyhow::Result Ok(dir.next_entry().await?.is_none()) } +pub async fn list_dir(path: impl AsRef) -> anyhow::Result> { + let mut dir = tokio::fs::read_dir(&path) + .await + .context(format!("read_dir({})", path.as_ref().display()))?; + + let mut content = vec![]; + while let Some(next) = dir.next_entry().await? { + let file_name = next.file_name(); + content.push(file_name.to_string_lossy().to_string()); + } + + Ok(content) +} + pub fn ignore_not_found(e: io::Error) -> io::Result<()> { if e.kind() == io::ErrorKind::NotFound { Ok(()) @@ -43,7 +57,7 @@ where mod test { use std::path::PathBuf; - use crate::fs_ext::is_directory_empty; + use crate::fs_ext::{is_directory_empty, list_dir}; use super::ignore_absent_files; @@ -109,4 +123,25 @@ mod test { assert!(!file_path.exists()); } + + #[tokio::test] + async fn list_dir_works() { + let dir = tempfile::tempdir().unwrap(); + let dir_path = dir.path(); + + assert!(list_dir(dir_path).await.unwrap().is_empty()); + + let file_path: PathBuf = dir_path.join("testfile"); + let _ = std::fs::File::create(&file_path).unwrap(); + + assert_eq!(&list_dir(dir_path).await.unwrap(), &["testfile"]); + + let another_dir_path: PathBuf = dir_path.join("testdir"); + std::fs::create_dir(another_dir_path).unwrap(); + + let expected = &["testdir", "testfile"]; + let mut actual = list_dir(dir_path).await.unwrap(); + actual.sort(); + assert_eq!(actual, expected); + } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 35690431010e..85bb4bfac133 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -353,7 +353,7 @@ fn start_pageserver( let order = pageserver::InitializationOrder { initial_tenant_load: Some(init_done_tx), initial_logical_size_can_start: init_done_rx.clone(), - initial_logical_size_attempt: init_logical_size_done_tx, + initial_logical_size_attempt: Some(init_logical_size_done_tx), background_jobs_can_start: background_jobs_barrier.clone(), }; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index be806c77ec05..f2aa2f365eb4 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -31,7 +31,9 @@ use utils::{ use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig; use crate::tenant::config::TenantConf; use crate::tenant::config::TenantConfOpt; -use crate::tenant::{TENANT_ATTACHING_MARKER_FILENAME, TIMELINES_SEGMENT_NAME}; +use crate::tenant::{ + TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME, +}; use crate::{ IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX, @@ -613,6 +615,11 @@ impl PageServerConf { ) } + pub fn tenant_deleted_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf { + self.tenant_path(tenant_id) + .join(TENANT_DELETED_MARKER_FILE_NAME) + } + pub fn traces_path(&self) -> PathBuf { self.workdir.join("traces") } diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 290492203a2b..38e07f172d3d 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -93,6 +93,47 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" + delete: + description: | + Attempts to delete specified tenant. 500 and 409 errors should be retried until 404 is retrieved. + 404 means that deletion successfully finished" + responses: + "400": + description: Error when no tenant id found in path + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" + "404": + description: Tenant not found + content: + application/json: + schema: + $ref: "#/components/schemas/NotFoundError" + "409": + description: Deletion is already in progress, continue polling + content: + application/json: + schema: + $ref: "#/components/schemas/ConflictError" + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" /v1/tenant/{tenant_id}/timeline: parameters: @@ -820,6 +861,7 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" + /v1/tenant/config: put: description: | diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index c10223beedef..1e8dada85eb1 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -187,7 +187,7 @@ impl From for ApiError { format!("Cannot delete timeline which has child timelines: {children:?}") .into_boxed_str(), ), - a @ AlreadyInProgress => ApiError::Conflict(a.to_string()), + a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()), Other(e) => ApiError::InternalServerError(e), } } @@ -208,6 +208,19 @@ impl From for ApiError { } } +impl From for ApiError { + fn from(value: crate::tenant::delete::DeleteTenantError) -> Self { + use crate::tenant::delete::DeleteTenantError::*; + match value { + Get(g) => ApiError::from(g), + e @ AlreadyInProgress => ApiError::Conflict(e.to_string()), + Timeline(t) => ApiError::from(t), + Other(o) => ApiError::InternalServerError(o), + e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()), + } + } +} + // Helper function to construct a TimelineInfo struct for a timeline async fn build_timeline_info( timeline: &Arc, @@ -617,6 +630,23 @@ async fn tenant_status( json_response(StatusCode::OK, tenant_info) } +async fn tenant_delete_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + // TODO openapi spec + let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + check_permission(&request, Some(tenant_id))?; + + let state = get_state(&request); + + mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id) + .instrument(info_span!("tenant_delete_handler", %tenant_id)) + .await?; + + json_response(StatusCode::ACCEPTED, ()) +} + /// HTTP endpoint to query the current tenant_size of a tenant. /// /// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used @@ -1345,6 +1375,9 @@ pub fn make_router( .get("/v1/tenant", |r| api_handler(r, tenant_list_handler)) .post("/v1/tenant", |r| api_handler(r, tenant_create_handler)) .get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status)) + .delete("/v1/tenant/:tenant_id", |r| { + api_handler(r, tenant_delete_handler) + }) .get("/v1/tenant/:tenant_id/synthetic_size", |r| { api_handler(r, tenant_size_handler) }) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 1b1d0acaee22..40298b0e5706 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -190,7 +190,7 @@ pub struct InitializationOrder { /// Each timeline owns a clone of this to be consumed on the initial logical size calculation /// attempt. It is important to drop this once the attempt has completed. - pub initial_logical_size_attempt: utils::completion::Completion, + pub initial_logical_size_attempt: Option, /// Barrier for when we can start any background jobs. /// diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 9fd60053303b..36494a14f2cd 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -28,6 +28,7 @@ use std::cmp::min; use std::collections::hash_map::Entry; use std::collections::BTreeSet; use std::collections::HashMap; +use std::fmt::Debug; use std::fs; use std::fs::File; use std::fs::OpenOptions; @@ -46,8 +47,10 @@ use std::sync::{Mutex, RwLock}; use std::time::{Duration, Instant}; use self::config::TenantConf; +use self::delete::DeleteTenantFlow; use self::metadata::LoadMetadataError; use self::metadata::TimelineMetadata; +use self::mgr::TenantsMap; use self::remote_timeline_client::RemoteTimelineClient; use self::timeline::uninit::TimelineUninitMark; use self::timeline::uninit::UninitializedTimeline; @@ -105,6 +108,7 @@ macro_rules! pausable_failpoint { pub mod blob_io; pub mod block_io; + pub mod disk_btree; pub(crate) mod ephemeral_file; pub mod layer_map; @@ -117,6 +121,7 @@ mod remote_timeline_client; pub mod storage_layer; pub mod config; +pub mod delete; pub mod mgr; pub mod tasks; pub mod upload_queue; @@ -144,6 +149,8 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines"; pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching"; +pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted"; + /// /// Tenant consists of multiple timelines. Keep them in a hash table. /// @@ -182,6 +189,8 @@ pub struct Tenant { cached_synthetic_tenant_size: Arc, eviction_task_tenant_state: tokio::sync::Mutex, + + pub(crate) delete_progress: Arc>, } // We should not blindly overwrite local metadata with remote one. @@ -273,7 +282,7 @@ pub enum LoadLocalTimelineError { ResumeDeletion(#[source] anyhow::Error), } -#[derive(Debug, thiserror::Error)] +#[derive(thiserror::Error)] pub enum DeleteTimelineError { #[error("NotFound")] NotFound, @@ -282,17 +291,37 @@ pub enum DeleteTimelineError { HasChildren(Vec), #[error("Timeline deletion is already in progress")] - AlreadyInProgress, + AlreadyInProgress(Arc>), #[error(transparent)] Other(#[from] anyhow::Error), } +impl Debug for DeleteTimelineError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::NotFound => write!(f, "NotFound"), + Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(), + Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(), + Self::Other(e) => f.debug_tuple("Other").field(e).finish(), + } + } +} + pub enum SetStoppingError { AlreadyStopping(completion::Barrier), Broken, } +impl Debug for SetStoppingError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(), + Self::Broken => write!(f, "Broken"), + } + } +} + struct RemoteStartupData { index_part: IndexPart, remote_metadata: TimelineMetadata, @@ -615,7 +644,7 @@ impl Tenant { // For every timeline, download the metadata file, scan the local directory, // and build a layer map that contains an entry for each remote and local // layer file. - let sorted_timelines = tree_sort_timelines(timeline_ancestors)?; + let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?; for (timeline_id, remote_metadata) in sorted_timelines { let (index_part, remote_client) = remote_index_and_client .remove(&timeline_id) @@ -739,12 +768,13 @@ impl Tenant { /// If the loading fails for some reason, the Tenant will go into Broken /// state. #[instrument(skip_all, fields(tenant_id=%tenant_id))] - pub fn spawn_load( + pub(crate) fn spawn_load( conf: &'static PageServerConf, tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, init_order: Option, + tenants: &'static tokio::sync::RwLock, ctx: &RequestContext, ) -> Arc { span::debug_assert_current_span_has_tenant_id(); @@ -764,7 +794,7 @@ impl Tenant { tenant_conf, wal_redo_manager, tenant_id, - remote_storage, + remote_storage.clone(), ); let tenant = Arc::new(tenant); @@ -780,27 +810,83 @@ impl Tenant { "initial tenant load", false, async move { + let make_broken = |t: &Tenant, err: anyhow::Error| { + error!("load failed, setting tenant state to Broken: {err:?}"); + t.state.send_modify(|state| { + assert!( + matches!(*state, TenantState::Loading | TenantState::Stopping { .. }), + "the loading task owns the tenant state until activation is complete" + ); + *state = TenantState::broken_from_reason(err.to_string()); + }); + }; + let mut init_order = init_order; // take the completion because initial tenant loading will complete when all of // these tasks complete. - let _completion = init_order.as_mut().and_then(|x| x.initial_tenant_load.take()); + let _completion = init_order + .as_mut() + .and_then(|x| x.initial_tenant_load.take()); + + // Dont block pageserver startup on figuring out deletion status + let pending_deletion = { + match DeleteTenantFlow::should_resume_deletion( + conf, + remote_storage.as_ref(), + &tenant_clone, + ) + .await + { + Ok(should_resume_deletion) => should_resume_deletion, + Err(err) => { + make_broken(&tenant_clone, anyhow::anyhow!(err)); + return Ok(()); + } + } + }; + + info!("pending deletion {}", pending_deletion.is_some()); + + if let Some(deletion) = pending_deletion { + // as we are no longer loading, signal completion by dropping + // the completion while we resume deletion + drop(_completion); + // do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout + let _ = init_order + .as_mut() + .and_then(|x| x.initial_logical_size_attempt.take()); + + match DeleteTenantFlow::resume( + deletion, + &tenant_clone, + init_order.as_ref(), + tenants, + &ctx, + ) + .await + { + Err(err) => { + make_broken(&tenant_clone, anyhow::anyhow!(err)); + return Ok(()); + } + Ok(()) => return Ok(()), + } + } + + let background_jobs_can_start = + init_order.as_ref().map(|x| &x.background_jobs_can_start); match tenant_clone.load(init_order.as_ref(), &ctx).await { Ok(()) => { - debug!("load finished, activating"); - let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start); + debug!("load finished",); + tenant_clone.activate(broker_client, background_jobs_can_start, &ctx); } - Err(err) => { - error!("load failed, setting tenant state to Broken: {err:?}"); - tenant_clone.state.send_modify(|state| { - assert_eq!(*state, TenantState::Loading, "the loading task owns the tenant state until activation is complete"); - *state = TenantState::broken_from_reason(err.to_string()); - }); - } + Err(err) => make_broken(&tenant_clone, err), } - Ok(()) + + Ok(()) } .instrument({ let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id); @@ -876,6 +962,8 @@ impl Tenant { ) })?; + info!("Found deletion mark for timeline {}", timeline_id); + match load_metadata(self.conf, &self.tenant_id, &timeline_id) { Ok(metadata) => { timelines_to_resume_deletion.push((timeline_id, Some(metadata))) @@ -965,9 +1053,11 @@ impl Tenant { // Sort the array of timeline IDs into tree-order, so that parent comes before // all its children. - tree_sort_timelines(timelines_to_load).map(|sorted_timelines| TenantDirectoryScan { - sorted_timelines_to_load: sorted_timelines, - timelines_to_resume_deletion, + tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| { + TenantDirectoryScan { + sorted_timelines_to_load: sorted_timelines, + timelines_to_resume_deletion, + } }) } @@ -1679,7 +1769,7 @@ impl Tenant { // It's mesed up. // we just ignore the failure to stop - match self.set_stopping(shutdown_progress).await { + match self.set_stopping(shutdown_progress, false).await { Ok(()) => {} Err(SetStoppingError::Broken) => { // assume that this is acceptable @@ -1719,18 +1809,25 @@ impl Tenant { /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state. /// /// This function is not cancel-safe! - async fn set_stopping(&self, progress: completion::Barrier) -> Result<(), SetStoppingError> { + /// + /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant. + async fn set_stopping( + &self, + progress: completion::Barrier, + allow_transition_from_loading: bool, + ) -> Result<(), SetStoppingError> { let mut rx = self.state.subscribe(); // cannot stop before we're done activating, so wait out until we're done activating rx.wait_for(|state| match state { - TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { + TenantState::Activating(_) | TenantState::Attaching => { info!( "waiting for {} to turn Active|Broken|Stopping", <&'static str>::from(state) ); false } + TenantState::Loading => allow_transition_from_loading, TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true, }) .await @@ -1739,9 +1836,16 @@ impl Tenant { // we now know we're done activating, let's see whether this task is the winner to transition into Stopping let mut err = None; let stopping = self.state.send_if_modified(|current_state| match current_state { - TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { + TenantState::Activating(_) | TenantState::Attaching => { unreachable!("we ensured above that we're done with activation, and, there is no re-activation") } + TenantState::Loading => { + if !allow_transition_from_loading { + unreachable!("we ensured above that we're done with activation, and, there is no re-activation") + }; + *current_state = TenantState::Stopping { progress }; + true + } TenantState::Active => { // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines // are created after the transition to Stopping. That's harmless, as the Timelines @@ -1810,6 +1914,10 @@ impl Tenant { .expect("cannot drop self.state while on a &self method"); // we now know we're done activating, let's see whether this task is the winner to transition into Broken + self.set_broken_no_wait(reason) + } + + pub(crate) fn set_broken_no_wait(&self, reason: String) { self.state.send_modify(|current_state| { match *current_state { TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { @@ -1875,22 +1983,28 @@ impl Tenant { /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id), /// perform a topological sort, so that the parent of each timeline comes /// before the children. -fn tree_sort_timelines( - timelines: HashMap, -) -> anyhow::Result> { +/// E extracts the ancestor from T +/// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc. +fn tree_sort_timelines( + timelines: HashMap, + extractor: E, +) -> anyhow::Result> +where + E: Fn(&T) -> Option, +{ let mut result = Vec::with_capacity(timelines.len()); let mut now = Vec::with_capacity(timelines.len()); // (ancestor, children) - let mut later: HashMap> = + let mut later: HashMap> = HashMap::with_capacity(timelines.len()); - for (timeline_id, metadata) in timelines { - if let Some(ancestor_id) = metadata.ancestor_timeline() { + for (timeline_id, value) in timelines { + if let Some(ancestor_id) = extractor(&value) { let children = later.entry(ancestor_id).or_default(); - children.push((timeline_id, metadata)); + children.push((timeline_id, value)); } else { - now.push((timeline_id, metadata)); + now.push((timeline_id, value)); } } @@ -2059,7 +2173,7 @@ impl Tenant { remote_client, pg_version, initial_logical_size_can_start.cloned(), - initial_logical_size_attempt.cloned(), + initial_logical_size_attempt.cloned().flatten(), state, ); @@ -2143,6 +2257,7 @@ impl Tenant { cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()), cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()), + delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())), } } @@ -2159,6 +2274,7 @@ impl Tenant { // FIXME If the config file is not found, assume that we're attaching // a detached tenant and config is passed via attach command. // https://github.com/neondatabase/neon/issues/1555 + // OR: we're loading after incomplete deletion that managed to remove config. if !target_config_path.exists() { info!("tenant config not found in {target_config_display}"); return Ok(TenantConfOpt::default()); diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs new file mode 100644 index 000000000000..bdeb117c7953 --- /dev/null +++ b/pageserver/src/tenant/delete.rs @@ -0,0 +1,546 @@ +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::Context; +use pageserver_api::models::TenantState; +use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; +use tokio::sync::OwnedMutexGuard; +use tracing::{error, info, instrument, warn, Instrument, Span}; + +use utils::{ + completion, crashsafe, fs_ext, + id::{TenantId, TimelineId}, +}; + +use crate::{ + config::PageServerConf, + context::RequestContext, + task_mgr::{self, TaskKind}, + InitializationOrder, +}; + +use super::{ + mgr::{GetTenantError, TenantsMap}, + span, + timeline::delete::DeleteTimelineFlow, + tree_sort_timelines, DeleteTimelineError, Tenant, +}; + +const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u8 = 3; + +#[derive(Debug, thiserror::Error)] +pub enum DeleteTenantError { + #[error("GetTenant {0}")] + Get(#[from] GetTenantError), + + #[error("Invalid state {0}. Expected Active or Broken")] + InvalidState(TenantState), + + #[error("Tenant deletion is already in progress")] + AlreadyInProgress, + + #[error("Timeline {0}")] + Timeline(#[from] DeleteTimelineError), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +type DeletionGuard = tokio::sync::OwnedMutexGuard; + +fn remote_tenant_delete_mark_path( + conf: &PageServerConf, + tenant_id: &TenantId, +) -> anyhow::Result { + let tenant_remote_path = conf + .tenant_path(tenant_id) + .strip_prefix(&conf.workdir) + .context("Failed to strip workdir prefix") + .and_then(RemotePath::new) + .context("tenant path")?; + Ok(tenant_remote_path.join(Path::new("deleted"))) +} + +async fn create_remote_delete_mark( + conf: &PageServerConf, + remote_storage: &GenericRemoteStorage, + tenant_id: &TenantId, +) -> Result<(), DeleteTenantError> { + let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id)?; + + let data: &[u8] = &[]; + remote_storage + .upload(data, 0, &remote_mark_path, None) + .await + .context("mark upload")?; + + Ok(()) +} + +async fn create_local_delete_mark( + conf: &PageServerConf, + tenant_id: &TenantId, +) -> Result<(), DeleteTenantError> { + let marker_path = conf.tenant_deleted_mark_file_path(tenant_id); + + // Note: we're ok to replace existing file. + let _ = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open(&marker_path) + .with_context(|| format!("could not create delete marker file {marker_path:?}"))?; + + crashsafe::fsync_file_and_parent(&marker_path).context("sync_mark")?; + + Ok(()) +} + +async fn schedule_ordered_timeline_deletions( + tenant: &Arc, +) -> Result>, TimelineId)>, DeleteTenantError> { + // Tenant is stopping at this point. We know it will be deleted. + // No new timelines should be created. + // Tree sort timelines to delete from leafs to the root. + // NOTE: by calling clone we release the mutex which creates a possibility for a race: pending deletion + // can complete and remove timeline from the map in between our call to clone + // and `DeleteTimelineFlow::run`, so `run` wont find timeline in `timelines` map. + // timelines.lock is currently synchronous so we cant hold it across await point. + // So just ignore NotFound error if we get it from `run`. + // Beware: in case it becomes async and we try to hold it here, `run` also locks it, which can create a deadlock. + let timelines = tenant.timelines.lock().unwrap().clone(); + let sorted = + tree_sort_timelines(timelines, |t| t.get_ancestor_timeline_id()).context("tree sort")?; + + let mut already_running_deletions = vec![]; + + for (timeline_id, _) in sorted.into_iter().rev() { + if let Err(e) = DeleteTimelineFlow::run(tenant, timeline_id, true).await { + match e { + DeleteTimelineError::NotFound => { + // Timeline deletion finished after call to clone above but before call + // to `DeleteTimelineFlow::run` and removed timeline from the map. + continue; + } + DeleteTimelineError::AlreadyInProgress(guard) => { + already_running_deletions.push((guard, timeline_id)); + continue; + } + e => return Err(DeleteTenantError::Timeline(e)), + } + } + } + + Ok(already_running_deletions) +} + +async fn ensure_timelines_dir_empty(timelines_path: &Path) -> Result<(), DeleteTenantError> { + // Assert timelines dir is empty. + if !fs_ext::is_directory_empty(timelines_path).await? { + // Display first 10 items in directory + let list = &fs_ext::list_dir(timelines_path).await.context("list_dir")?[..10]; + return Err(DeleteTenantError::Other(anyhow::anyhow!( + "Timelines directory is not empty after all timelines deletion: {list:?}" + ))); + } + + Ok(()) +} + +async fn remove_tenant_remote_delete_mark( + conf: &PageServerConf, + remote_storage: Option<&GenericRemoteStorage>, + tenant_id: &TenantId, +) -> Result<(), DeleteTenantError> { + if let Some(remote_storage) = remote_storage { + remote_storage + .delete(&remote_tenant_delete_mark_path(conf, tenant_id)?) + .await?; + } + Ok(()) +} + +// Cleanup fs traces: tenant config, timelines dir local delete mark, tenant dir +async fn cleanup_remaining_fs_traces( + conf: &PageServerConf, + tenant_id: &TenantId, +) -> Result<(), DeleteTenantError> { + let rm = |p: PathBuf, is_dir: bool| async move { + if is_dir { + tokio::fs::remove_dir(&p).await + } else { + tokio::fs::remove_file(&p).await + } + .or_else(fs_ext::ignore_not_found) + .with_context(|| { + let to_display = p.display(); + format!("failed to delete {to_display}") + }) + }; + + rm(conf.tenant_config_path(tenant_id), false).await?; + + fail::fail_point!("tenant-delete-before-remove-timelines-dir", |_| { + Err(anyhow::anyhow!( + "failpoint: tenant-delete-before-remove-timelines-dir" + ))? + }); + + rm(conf.timelines_path(tenant_id), true).await?; + + fail::fail_point!("tenant-delete-before-remove-deleted-mark", |_| { + Err(anyhow::anyhow!( + "failpoint: tenant-delete-before-remove-deleted-mark" + ))? + }); + + rm(conf.tenant_deleted_mark_file_path(tenant_id), false).await?; + + fail::fail_point!("tenant-delete-before-remove-tenant-dir", |_| { + Err(anyhow::anyhow!( + "failpoint: tenant-delete-before-remove-tenant-dir" + ))? + }); + + rm(conf.tenant_path(tenant_id), true).await?; + + Ok(()) +} + +/// Orchestrates tenant shut down of all tasks, removes its in-memory structures, +/// and deletes its data from both disk and s3. +/// The sequence of steps: +/// 1. Upload remote deletion mark. +/// 2. Create local mark file. +/// 3. Shutdown tasks +/// 4. Run ordered timeline deletions +/// 5. Wait for timeline deletion operations that were scheduled before tenant deletion was requested +/// 6. Remove remote mark +/// 7. Cleanup remaining fs traces, tenant dir, config, timelines dir, local delete mark +/// It is resumable from any step in case a crash/restart occurs. +/// There are three entrypoints to the process: +/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler. +/// 2. [`DeleteTenantFlow::resume`] is called during restarts when local or remote deletion marks are still there. +/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function. +#[derive(Default)] +pub enum DeleteTenantFlow { + #[default] + NotStarted, + InProgress, + Finished, +} + +impl DeleteTenantFlow { + // These steps are run in the context of management api request handler. + // Long running steps are continued to run in the background. + // NB: If this fails half-way through, and is retried, the retry will go through + // all the same steps again. Make sure the code here is idempotent, and don't + // error out if some of the shutdown tasks have already been completed! + // NOTE: static needed for background part. + // We assume that calling code sets up the span with tenant_id. + #[instrument(skip_all)] + pub(crate) async fn run( + conf: &'static PageServerConf, + remote_storage: Option, + tenants: &'static tokio::sync::RwLock, + tenant_id: TenantId, + ) -> Result<(), DeleteTenantError> { + span::debug_assert_current_span_has_tenant_id(); + + let (tenant, mut guard) = Self::prepare(tenants, tenant_id).await?; + + if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await { + tenant.set_broken(format!("{e:#}")).await; + return Err(e); + } + + Self::schedule_background(guard, conf, remote_storage, tenants, tenant); + + Ok(()) + } + + // Helper function needed to be able to match once on returned error and transition tenant into broken state. + // This is needed because tenant.shutwodn is not idempotent. If tenant state is set to stopping another call to tenant.shutdown + // will result in an error, but here we need to be able to retry shutdown when tenant deletion is retried. + // So the solution is to set tenant state to broken. + async fn run_inner( + guard: &mut OwnedMutexGuard, + conf: &'static PageServerConf, + remote_storage: Option<&GenericRemoteStorage>, + tenant: &Tenant, + ) -> Result<(), DeleteTenantError> { + guard.mark_in_progress()?; + + fail::fail_point!("tenant-delete-before-create-remote-mark", |_| { + Err(anyhow::anyhow!( + "failpoint: tenant-delete-before-create-remote-mark" + ))? + }); + + // IDEA: implement detach as delete without remote storage. Then they would use the same lock (deletion_progress) so wont contend. + // Though sounds scary, different mark name? + // Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state. + if let Some(remote_storage) = &remote_storage { + create_remote_delete_mark(conf, remote_storage, &tenant.tenant_id) + .await + .context("remote_mark")? + } + + fail::fail_point!("tenant-delete-before-create-local-mark", |_| { + Err(anyhow::anyhow!( + "failpoint: tenant-delete-before-create-local-mark" + ))? + }); + + create_local_delete_mark(conf, &tenant.tenant_id) + .await + .context("local delete mark")?; + + fail::fail_point!("tenant-delete-before-background", |_| { + Err(anyhow::anyhow!( + "failpoint: tenant-delete-before-background" + ))? + }); + + Ok(()) + } + + fn mark_in_progress(&mut self) -> anyhow::Result<()> { + match self { + Self::Finished => anyhow::bail!("Bug. Is in finished state"), + Self::InProgress { .. } => { /* We're in a retry */ } + Self::NotStarted => { /* Fresh start */ } + } + + *self = Self::InProgress; + + Ok(()) + } + + pub async fn should_resume_deletion( + conf: &'static PageServerConf, + remote_storage: Option<&GenericRemoteStorage>, + tenant: &Tenant, + ) -> Result, DeleteTenantError> { + let acquire = |t: &Tenant| { + Some( + Arc::clone(&t.delete_progress) + .try_lock_owned() + .expect("we're the only owner during init"), + ) + }; + + let tenant_id = tenant.tenant_id; + // Check local mark first, if its there there is no need to go to s3 to check whether remote one exists. + if conf.tenant_deleted_mark_file_path(&tenant_id).exists() { + return Ok(acquire(tenant)); + } + + // If remote storage is there we rely on it + if let Some(remote_storage) = remote_storage { + let remote_mark_path = remote_tenant_delete_mark_path(conf, &tenant_id)?; + + let attempt = 1; + loop { + match remote_storage.download(&remote_mark_path).await { + Ok(_) => return Ok(acquire(tenant)), + Err(e) => { + if matches!(e, DownloadError::NotFound) { + return Ok(None); + } + if attempt > SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS { + return Err(anyhow::anyhow!(e))?; + } + + warn!( + "failed to fetch tenant deletion mark at {} attempt {}", + &remote_mark_path, attempt + ) + } + } + } + } + + Ok(None) + } + + pub(crate) async fn resume( + guard: DeletionGuard, + tenant: &Arc, + init_order: Option<&InitializationOrder>, + tenants: &'static tokio::sync::RwLock, + ctx: &RequestContext, + ) -> Result<(), DeleteTenantError> { + let (_, progress) = completion::channel(); + + tenant + .set_stopping(progress, true) + .await + .expect("cant be stopping or broken"); + + // Do not consume valuable resources during the load phase, continue deletion once init phase is complete. + let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start); + if let Some(background) = background_jobs_can_start { + info!("waiting for backgound jobs barrier"); + background.clone().wait().await; + info!("ready for backgound jobs barrier"); + } + + // Tenant may not be loadable if we fail late in cleanup_remaining_fs_traces (e g remove timelines dir) + let timelines_path = tenant.conf.timelines_path(&tenant.tenant_id); + if timelines_path.exists() { + tenant.load(init_order, ctx).await.context("load")?; + } + + Self::background( + guard, + tenant.conf, + tenant.remote_storage.clone(), + tenants, + tenant, + ) + .await + } + + async fn prepare( + tenants: &tokio::sync::RwLock, + tenant_id: TenantId, + ) -> Result<(Arc, tokio::sync::OwnedMutexGuard), DeleteTenantError> { + let m = tenants.read().await; + + let tenant = m + .get(&tenant_id) + .ok_or(GetTenantError::NotFound(tenant_id))?; + + // FIXME: unsure about active only. Our init jobs may not be cancellable properly, + // so at least for now allow deletions only for active tenants. TODO recheck + // Broken and Stopping is needed for retries. + if !matches!( + tenant.current_state(), + TenantState::Active | TenantState::Broken { .. } + ) { + return Err(DeleteTenantError::InvalidState(tenant.current_state())); + } + + let guard = Arc::clone(&tenant.delete_progress) + .try_lock_owned() + .map_err(|_| DeleteTenantError::AlreadyInProgress)?; + + fail::fail_point!("tenant-delete-before-shutdown", |_| { + Err(anyhow::anyhow!("failpoint: tenant-delete-before-shutdown"))? + }); + + // make pageserver shutdown not to wait for our completion + let (_, progress) = completion::channel(); + + // It would be good to only set stopping here and continue shutdown in the background, but shutdown is not idempotent. + // i e it is an error to do: + // tenant.set_stopping + // tenant.shutdown + // Its also bad that we're holding tenants.read here. + // TODO relax set_stopping to be idempotent? + if tenant.shutdown(progress, false).await.is_err() { + return Err(DeleteTenantError::Other(anyhow::anyhow!( + "tenant shutdown is already in progress" + ))); + } + + Ok((Arc::clone(tenant), guard)) + } + + fn schedule_background( + guard: OwnedMutexGuard, + conf: &'static PageServerConf, + remote_storage: Option, + tenants: &'static tokio::sync::RwLock, + tenant: Arc, + ) { + let tenant_id = tenant.tenant_id; + + task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), + TaskKind::TimelineDeletionWorker, + Some(tenant_id), + None, + "tenant_delete", + false, + async move { + if let Err(err) = + Self::background(guard, conf, remote_storage, tenants, &tenant).await + { + error!("Error: {err:#}"); + tenant.set_broken(format!("{err:#}")).await; + }; + Ok(()) + } + .instrument({ + let span = tracing::info_span!(parent: None, "delete_tenant", tenant_id=%tenant_id); + span.follows_from(Span::current()); + span + }), + ); + } + + async fn background( + mut guard: OwnedMutexGuard, + conf: &PageServerConf, + remote_storage: Option, + tenants: &'static tokio::sync::RwLock, + tenant: &Arc, + ) -> Result<(), DeleteTenantError> { + // Tree sort timelines, schedule delete for them. Mention retries from the console side. + // Note that if deletion fails we dont mark timelines as broken, + // the whole tenant will become broken as by `Self::schedule_background` logic + let already_running_timeline_deletions = schedule_ordered_timeline_deletions(tenant) + .await + .context("schedule_ordered_timeline_deletions")?; + + fail::fail_point!("tenant-delete-before-polling-ongoing-deletions", |_| { + Err(anyhow::anyhow!( + "failpoint: tenant-delete-before-polling-ongoing-deletions" + ))? + }); + + // Wait for deletions that were already running at the moment when tenant deletion was requested. + // When we can lock deletion guard it means that corresponding timeline deletion finished. + for (guard, timeline_id) in already_running_timeline_deletions { + let flow = guard.lock().await; + if !flow.is_finished() { + return Err(DeleteTenantError::Other(anyhow::anyhow!( + "already running timeline deletion failed: {timeline_id}" + ))); + } + } + + let timelines_path = conf.timelines_path(&tenant.tenant_id); + // May not exist if we fail in cleanup_remaining_fs_traces after removing it + if timelines_path.exists() { + // sanity check to guard against layout changes + ensure_timelines_dir_empty(&timelines_path) + .await + .context("timelines dir not empty")?; + } + + remove_tenant_remote_delete_mark(conf, remote_storage.as_ref(), &tenant.tenant_id).await?; + + fail::fail_point!("tenant-delete-before-cleanup-remaining-fs-traces", |_| { + Err(anyhow::anyhow!( + "failpoint: tenant-delete-before-cleanup-remaining-fs-traces" + ))? + }); + + cleanup_remaining_fs_traces(conf, &tenant.tenant_id) + .await + .context("cleanup_remaining_fs_traces")?; + + let mut locked = tenants.write().await; + if locked.remove(&tenant.tenant_id).is_none() { + warn!("Tenant got removed from tenants map during deletion"); + }; + + *guard = Self::Finished; + + Ok(()) + } +} diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 2635953e6dab..ae6d237066c4 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -20,17 +20,19 @@ use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; +use crate::tenant::delete::DeleteTenantFlow; use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState}; use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME}; use utils::fs_ext::PathExt; use utils::id::{TenantId, TimelineId}; +use super::delete::DeleteTenantError; use super::timeline::delete::DeleteTimelineFlow; /// The tenants known to the pageserver. /// The enum variants are used to distinguish the different states that the pageserver can be in. -enum TenantsMap { +pub(crate) enum TenantsMap { /// [`init_tenant_mgr`] is not done yet. Initializing, /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded. @@ -42,13 +44,13 @@ enum TenantsMap { } impl TenantsMap { - fn get(&self, tenant_id: &TenantId) -> Option<&Arc> { + pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc> { match self { TenantsMap::Initializing => None, TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id), } } - fn remove(&mut self, tenant_id: &TenantId) -> Option> { + pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option> { match self { TenantsMap::Initializing => None, TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id), @@ -97,7 +99,9 @@ pub async fn init_tenant_mgr( ); } } else { - // This case happens if we crash during attach before creating the attach marker file + // This case happens if we: + // * crash during attach before creating the attach marker file + // * crash during tenant delete before removing tenant directory let is_empty = tenant_dir_path.is_empty_dir().with_context(|| { format!("Failed to check whether {tenant_dir_path:?} is an empty dir") })?; @@ -124,6 +128,7 @@ pub async fn init_tenant_mgr( broker_client.clone(), remote_storage.clone(), Some(init_order.clone()), + &TENANTS, &ctx, ) { Ok(tenant) => { @@ -154,12 +159,13 @@ pub async fn init_tenant_mgr( Ok(()) } -pub fn schedule_local_tenant_processing( +pub(crate) fn schedule_local_tenant_processing( conf: &'static PageServerConf, tenant_path: &Path, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, init_order: Option, + tenants: &'static tokio::sync::RwLock, ctx: &RequestContext, ) -> anyhow::Result> { anyhow::ensure!( @@ -219,6 +225,7 @@ pub fn schedule_local_tenant_processing( broker_client, remote_storage, init_order, + tenants, ctx, ) }; @@ -356,7 +363,7 @@ pub async fn create_tenant( // See https://github.com/neondatabase/neon/issues/4233 let created_tenant = - schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?; + schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, &TENANTS, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 @@ -417,6 +424,14 @@ pub async fn get_tenant( } } +pub async fn delete_tenant( + conf: &'static PageServerConf, + remote_storage: Option, + tenant_id: TenantId, +) -> Result<(), DeleteTenantError> { + DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant_id).await +} + #[derive(Debug, thiserror::Error)] pub enum DeleteTimelineError { #[error("Tenant {0}")] @@ -432,7 +447,7 @@ pub async fn delete_timeline( _ctx: &RequestContext, ) -> Result<(), DeleteTimelineError> { let tenant = get_tenant(tenant_id, true).await?; - DeleteTimelineFlow::run(&tenant, timeline_id).await?; + DeleteTimelineFlow::run(&tenant, timeline_id, false).await?; Ok(()) } @@ -507,7 +522,7 @@ pub async fn load_tenant( .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; } - let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx) + let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, &TENANTS, ctx) .with_context(|| { format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; @@ -588,7 +603,7 @@ pub async fn attach_tenant( .context("check for attach marker file existence")?; anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file"); - let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?; + let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, &TENANTS, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 1e4e7dde2096..08042d6a37bc 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -919,7 +919,7 @@ impl Timeline { pub fn set_state(&self, new_state: TimelineState) { match (self.current_state(), new_state) { (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => { - warn!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); + info!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); } (st, TimelineState::Loading) => { error!("ignoring transition from {st:?} into Loading state"); diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index d99ca2d2927c..dba6475c2758 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -219,27 +219,13 @@ async fn delete_local_layer_files( } }; - let r = if metadata.is_dir() { - // There shouldnt be any directories inside timeline dir as of current layout. + if metadata.is_dir() { + warn!(path=%entry.path().display(), "unexpected directory under timeline dir"); tokio::fs::remove_dir(entry.path()).await } else { tokio::fs::remove_file(entry.path()).await - }; - - if let Err(e) = r { - if e.kind() == std::io::ErrorKind::NotFound { - warn!( - timeline_dir=?local_timeline_directory, - path=?entry.path().display(), - "got not found err while removing timeline dir, proceeding anyway" - ); - continue; - } - anyhow::bail!(anyhow::anyhow!( - "Failed to remove: {}. Error: {e}", - entry.path().display() - )); } + .with_context(|| format!("Failed to remove: {}", entry.path().display()))?; } info!("finished deleting layer files, releasing layer_removal_cs.lock()"); @@ -359,10 +345,11 @@ impl DeleteTimelineFlow { // NB: If this fails half-way through, and is retried, the retry will go through // all the same steps again. Make sure the code here is idempotent, and don't // error out if some of the shutdown tasks have already been completed! - #[instrument(skip_all, fields(tenant_id=%tenant.tenant_id, %timeline_id))] + #[instrument(skip(tenant), fields(tenant_id=%tenant.tenant_id))] pub async fn run( tenant: &Arc, timeline_id: TimelineId, + inplace: bool, ) -> Result<(), DeleteTimelineError> { let (timeline, mut guard) = Self::prepare(tenant, timeline_id)?; @@ -380,7 +367,11 @@ impl DeleteTimelineFlow { ))? }); - Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline); + if inplace { + Self::background(guard, tenant.conf, tenant, &timeline).await? + } else { + Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline); + } Ok(()) } @@ -398,6 +389,8 @@ impl DeleteTimelineFlow { } /// Shortcut to create Timeline in stopping state and spawn deletion task. + /// See corresponding parts of [`crate::tenant::delete::DeleteTenantFlow`] + #[instrument(skip_all, fields(%timeline_id))] pub async fn resume_deletion( tenant: Arc, timeline_id: TimelineId, @@ -444,11 +437,15 @@ impl DeleteTimelineFlow { Ok(()) } + #[instrument(skip_all, fields(%timeline_id))] pub async fn cleanup_remaining_timeline_fs_traces( tenant: &Tenant, timeline_id: TimelineId, ) -> anyhow::Result<()> { - cleanup_remaining_timeline_fs_traces(tenant.conf, tenant.tenant_id, timeline_id).await + let r = + cleanup_remaining_timeline_fs_traces(tenant.conf, tenant.tenant_id, timeline_id).await; + info!("Done"); + r } fn prepare( @@ -494,11 +491,17 @@ impl DeleteTimelineFlow { // At the end of the operation we're holding the guard and need to lock timelines map // to remove the timeline from it. // Always if you have two locks that are taken in different order this can result in a deadlock. - let delete_lock_guard = DeletionGuard( - Arc::clone(&timeline.delete_progress) - .try_lock_owned() - .map_err(|_| DeleteTimelineError::AlreadyInProgress)?, - ); + + let delete_progress = Arc::clone(&timeline.delete_progress); + let delete_lock_guard = match delete_progress.try_lock_owned() { + Ok(guard) => DeletionGuard(guard), + Err(_) => { + // Unfortunately if lock fails arc is consumed. + return Err(DeleteTimelineError::AlreadyInProgress(Arc::clone( + &timeline.delete_progress, + ))); + } + }; timeline.set_state(TimelineState::Stopping); @@ -553,10 +556,14 @@ impl DeleteTimelineFlow { remove_timeline_from_tenant(tenant, timeline.timeline_id, &guard).await?; - *guard.0 = Self::Finished; + *guard = Self::Finished; Ok(()) } + + pub(crate) fn is_finished(&self) -> bool { + matches!(self, Self::Finished) + } } struct DeletionGuard(OwnedMutexGuard); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 79c1bb055b49..6f35780cbb10 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -32,6 +32,7 @@ from _pytest.config import Config from _pytest.config.argparsing import Parser from _pytest.fixtures import FixtureRequest +from mypy_boto3_s3 import S3Client # Type-related stuff from psycopg2.extensions import connection as PgConnection @@ -440,7 +441,7 @@ def __init__( self.port_distributor = port_distributor self.remote_storage = remote_storage self.ext_remote_storage: Optional[S3Storage] = None - self.remote_storage_client: Optional[Any] = None + self.remote_storage_client: Optional[S3Client] = None self.remote_storage_users = remote_storage_users self.broker = broker self.run_id = run_id @@ -883,7 +884,14 @@ def get_safekeeper_connstrs(self) -> str: def timeline_dir(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: """Get a timeline directory's path based on the repo directory of the test environment""" - return self.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) + return self.tenant_dir(tenant_id) / "timelines" / str(timeline_id) + + def tenant_dir( + self, + tenant_id: TenantId, + ) -> Path: + """Get a tenant directory's path based on the repo directory of the test environment""" + return self.repo_dir / "tenants" / str(tenant_id) def get_pageserver_version(self) -> str: bin_pageserver = str(self.neon_binpath / "pageserver") diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 8c053c8073ab..a179ebdd09ae 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -210,6 +210,10 @@ def tenant_detach(self, tenant_id: TenantId, detach_ignored=False): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params) self.verbose_error(res) + def tenant_delete(self, tenant_id: TenantId): + res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") + self.verbose_error(res) + def tenant_load(self, tenant_id: TenantId): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load") self.verbose_error(res) diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 119c99bb9684..76777eca0860 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -1,9 +1,11 @@ import time -from typing import Any, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional from fixtures.log_helper import log from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient +from fixtures.remote_storage import RemoteStorageKind, S3Storage from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.utils import wait_until def assert_tenant_state( @@ -199,20 +201,19 @@ def wait_timeline_detail_404( timeline_id: TimelineId, iterations: int, ): - last_exc = None - for _ in range(iterations): - time.sleep(0.250) + def timeline_is_missing(): + data = {} try: data = pageserver_http.timeline_detail(tenant_id, timeline_id) - log.info(f"detail {data}") + log.info(f"timeline detail {data}") except PageserverApiException as e: log.debug(e) if e.status_code == 404: return - last_exc = e + raise RuntimeError(f"Timeline exists state {data.get('state')}") - raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}") + wait_until(iterations, interval=0.250, func=timeline_is_missing) def timeline_delete_wait_completed( @@ -224,3 +225,71 @@ def timeline_delete_wait_completed( ): pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args) wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations) + + +if TYPE_CHECKING: + # TODO avoid by combining remote storage related stuff in single type + # and just passing in this type instead of whole builder + from fixtures.neon_fixtures import NeonEnvBuilder + + +def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None): + # For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api. + assert neon_env_builder.remote_storage_kind in ( + RemoteStorageKind.MOCK_S3, + RemoteStorageKind.REAL_S3, + ) + # For mypy + assert isinstance(neon_env_builder.remote_storage, S3Storage) + assert neon_env_builder.remote_storage_client is not None + + # Note that this doesnt use pagination, so list is not guaranteed to be exhaustive. + response = neon_env_builder.remote_storage_client.list_objects_v2( + Bucket=neon_env_builder.remote_storage.bucket_name, + Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "", + ) + objects = response.get("Contents") + assert ( + response["KeyCount"] == 0 + ), f"remote dir with prefix {prefix} is not empty after deletion: {objects}" + + +def wait_tenant_status_404( + pageserver_http: PageserverHttpClient, + tenant_id: TenantId, + iterations: int, +): + def tenant_is_missing(): + data = {} + try: + data = pageserver_http.tenant_status(tenant_id) + log.info(f"tenant status {data}") + except PageserverApiException as e: + log.debug(e) + if e.status_code == 404: + return + + raise RuntimeError(f"Timeline exists state {data.get('state')}") + + wait_until(iterations, interval=0.250, func=tenant_is_missing) + + +def tenant_delete_wait_completed( + pageserver_http: PageserverHttpClient, + tenant_id: TenantId, + iterations: int, +): + pageserver_http.tenant_delete(tenant_id=tenant_id) + wait_tenant_status_404(pageserver_http, tenant_id=tenant_id, iterations=iterations) + + +MANY_SMALL_LAYERS_TENANT_CONFIG = { + "gc_period": "0s", + "compaction_period": "0s", + "checkpoint_distance": f"{1024**2}", + "image_creation_threshold": "100", +} + + +def poll_for_remote_storage_iterations(remote_storage_kind: RemoteStorageKind) -> int: + return 20 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 6 diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 2c8b4f4303ae..81754c95f7a3 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -6,13 +6,16 @@ import tarfile import time from pathlib import Path -from typing import Any, Callable, Dict, List, Tuple, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple, TypeVar from urllib.parse import urlencode import allure from psycopg2.extensions import cursor from fixtures.log_helper import log + +if TYPE_CHECKING: + from fixtures.neon_fixtures import PgBin from fixtures.types import TimelineId Fn = TypeVar("Fn", bound=Callable[..., Any]) @@ -314,3 +317,15 @@ def wait_while(number_of_iterations: int, interval: float, func): except Exception: return raise Exception("timed out while waiting for %s" % func) + + +def run_pg_bench_small(pg_bin: "PgBin", connstr: str): + """ + Fast way to populate data. + For more layers consider combining with these tenant settings: + { + "checkpoint_distance": 1024 ** 2, + "image_creation_threshold": 100, + } + """ + pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", connstr]) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 68116985d3f0..bfe9046ecabf 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -97,6 +97,11 @@ def test_remote_storage_backup_and_restore( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + # Thats because of UnreliableWrapper's injected failures + env.pageserver.allowed_errors.append( + f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*" + ) + checkpoint_numbers = range(1, 3) for checkpoint_number in checkpoint_numbers: diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py new file mode 100644 index 000000000000..06189053316d --- /dev/null +++ b/test_runner/regress/test_tenant_delete.py @@ -0,0 +1,250 @@ +import enum +import os + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + PgBin, + last_flush_lsn_upload, + wait_for_last_flush_lsn, +) +from fixtures.pageserver.http import PageserverApiException +from fixtures.pageserver.utils import ( + MANY_SMALL_LAYERS_TENANT_CONFIG, + assert_prefix_empty, + poll_for_remote_storage_iterations, + tenant_delete_wait_completed, + wait_tenant_status_404, + wait_until_tenant_active, + wait_until_tenant_state, +) +from fixtures.remote_storage import RemoteStorageKind, available_remote_storages +from fixtures.types import TenantId +from fixtures.utils import run_pg_bench_small + + +@pytest.mark.parametrize( + "remote_storage_kind", [RemoteStorageKind.NOOP, *available_remote_storages()] +) +def test_tenant_delete_smoke( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, + pg_bin: PgBin, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_tenant_delete_smoke", + ) + + env = neon_env_builder.init_start() + + ps_http = env.pageserver.http_client() + + # first try to delete non existing tenant + tenant_id = TenantId.generate() + env.pageserver.allowed_errors.append(f".*NotFound: tenant {tenant_id}.*") + with pytest.raises(PageserverApiException, match=f"NotFound: tenant {tenant_id}"): + ps_http.tenant_delete(tenant_id=tenant_id) + + env.neon_cli.create_tenant( + tenant_id=tenant_id, + conf=MANY_SMALL_LAYERS_TENANT_CONFIG, + ) + + # create two timelines one being the parent of another + parent = None + for timeline in ["first", "second"]: + timeline_id = env.neon_cli.create_branch( + timeline, tenant_id=tenant_id, ancestor_branch_name=parent + ) + with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint: + run_pg_bench_small(pg_bin, endpoint.connstr()) + wait_for_last_flush_lsn(env, endpoint, tenant=tenant_id, timeline=timeline_id) + + parent = timeline + + iterations = poll_for_remote_storage_iterations(remote_storage_kind) + + tenant_delete_wait_completed(ps_http, tenant_id, iterations) + + tenant_path = env.tenant_dir(tenant_id=tenant_id) + assert not tenant_path.exists() + + if remote_storage_kind in [RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3]: + assert_prefix_empty( + neon_env_builder, + prefix="/".join( + ( + "tenants", + str(tenant_id), + ) + ), + ) + + +class Check(enum.Enum): + RETRY_WITHOUT_RESTART = enum.auto() + RETRY_WITH_RESTART = enum.auto() + + +FAILPOINTS = [ + "tenant-delete-before-shutdown", + "tenant-delete-before-create-remote-mark", + "tenant-delete-before-create-local-mark", + "tenant-delete-before-background", + "tenant-delete-before-polling-ongoing-deletions", + "tenant-delete-before-cleanup-remaining-fs-traces", + "tenant-delete-before-remove-timelines-dir", + "tenant-delete-before-remove-deleted-mark", + "tenant-delete-before-remove-tenant-dir", + # Some failpoints from timeline deletion + "timeline-delete-before-index-deleted-at", + "timeline-delete-before-rm", + "timeline-delete-before-index-delete", + "timeline-delete-after-rm-dir", +] + +FAILPOINTS_BEFORE_BACKGROUND = [ + "timeline-delete-before-schedule", + "tenant-delete-before-shutdown", + "tenant-delete-before-create-remote-mark", + "tenant-delete-before-create-local-mark", + "tenant-delete-before-background", +] + + +def combinations(): + result = [] + + remotes = [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3] + if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"): + remotes.append(RemoteStorageKind.REAL_S3) + + for remote_storage_kind in remotes: + for delete_failpoint in FAILPOINTS: + if remote_storage_kind == RemoteStorageKind.NOOP and delete_failpoint in ( + "timeline-delete-before-index-delete", + ): + # the above failpoint are not relevant for config without remote storage + continue + + result.append((remote_storage_kind, delete_failpoint)) + return result + + +@pytest.mark.parametrize("remote_storage_kind, failpoint", combinations()) +@pytest.mark.parametrize("check", list(Check)) +def test_delete_tenant_exercise_crash_safety_failpoints( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, + failpoint: str, + check: Check, + pg_bin: PgBin, +): + neon_env_builder.enable_remote_storage( + remote_storage_kind, "test_delete_tenant_exercise_crash_safety_failpoints" + ) + + env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG) + + tenant_id = env.initial_tenant + + env.pageserver.allowed_errors.extend( + [ + # From deletion polling + f".*NotFound: tenant {env.initial_tenant}.*", + # allow errors caused by failpoints + f".*failpoint: {failpoint}", + # It appears when we stopped flush loop during deletion (attempt) and then pageserver is stopped + ".*freeze_and_flush_on_shutdown.*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited", + # We may leave some upload tasks in the queue. They're likely deletes. + # For uploads we explicitly wait with `last_flush_lsn_upload` below. + # So by ignoring these instead of waiting for empty upload queue + # we execute more distinct code paths. + '.*stopping left-over name="remote upload".*', + ] + ) + + ps_http = env.pageserver.http_client() + + timeline_id = env.neon_cli.create_timeline("delete", tenant_id=tenant_id) + with env.endpoints.create_start("delete", tenant_id=tenant_id) as endpoint: + # generate enough layers + run_pg_bench_small(pg_bin, endpoint.connstr()) + if remote_storage_kind is RemoteStorageKind.NOOP: + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + else: + last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) + + ps_http.configure_failpoints((failpoint, "return")) + + iterations = poll_for_remote_storage_iterations(remote_storage_kind) + + # These failpoints are earlier than background task is spawned. + # so they result in api request failure. + if failpoint in FAILPOINTS_BEFORE_BACKGROUND: + with pytest.raises(PageserverApiException, match=failpoint): + ps_http.tenant_delete(tenant_id) + + else: + ps_http.tenant_delete(tenant_id) + tenant_info = wait_until_tenant_state( + pageserver_http=ps_http, + tenant_id=tenant_id, + expected_state="Broken", + iterations=iterations, + ) + + reason = tenant_info["state"]["data"]["reason"] + log.info(f"tenant broken: {reason}") + + # failpoint may not be the only error in the stack + assert reason.endswith(f"failpoint: {failpoint}"), reason + + if check is Check.RETRY_WITH_RESTART: + env.pageserver.stop() + env.pageserver.start() + + if ( + remote_storage_kind is RemoteStorageKind.NOOP + and failpoint == "tenant-delete-before-create-local-mark" + ): + tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations) + elif failpoint in ( + "tenant-delete-before-shutdown", + "tenant-delete-before-create-remote-mark", + ): + wait_until_tenant_active( + ps_http, tenant_id=tenant_id, iterations=iterations, period=0.25 + ) + tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations) + else: + # Pageserver should've resumed deletion after restart. + wait_tenant_status_404(ps_http, tenant_id, iterations=iterations + 10) + elif check is Check.RETRY_WITHOUT_RESTART: + # this should succeed + # this also checks that delete can be retried even when tenant is in Broken state + ps_http.configure_failpoints((failpoint, "off")) + + tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations) + + # Check remote is impty + if remote_storage_kind is RemoteStorageKind.MOCK_S3: + assert_prefix_empty( + neon_env_builder, + prefix="/".join( + ( + "tenants", + str(tenant_id), + ) + ), + ) + + tenant_dir = env.tenant_dir(tenant_id) + # Check local is empty + assert not tenant_dir.exists() + + +# TODO test concurrent deletions with "hang" failpoint +# TODO test tenant delete continues after attach diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 932d8219543e..b189510a9e41 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -66,6 +66,10 @@ def test_tenant_reattach( env.pageserver.allowed_errors.append( f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" ) + # Thats because of UnreliableWrapper's injected failures + env.pageserver.allowed_errors.append( + f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*" + ) with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: with endpoint.cursor() as cur: diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index 79a3b353d458..fee95e5420c9 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -146,6 +146,11 @@ def test_tenants_attached_after_download( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + # Thats because of UnreliableWrapper's injected failures + env.pageserver.allowed_errors.append( + f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*" + ) + for checkpoint_number in range(1, 3): with endpoint.cursor() as cur: cur.execute( diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 1e8dd3620618..26caeb8ffb68 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -4,7 +4,6 @@ import shutil import threading from pathlib import Path -from typing import Optional import pytest import requests @@ -18,6 +17,8 @@ ) from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import ( + assert_prefix_empty, + poll_for_remote_storage_iterations, timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, @@ -27,7 +28,6 @@ ) from fixtures.remote_storage import ( RemoteStorageKind, - S3Storage, available_remote_storages, ) from fixtures.types import Lsn, TenantId, TimelineId @@ -187,10 +187,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints( 8. Retry or restart without the failpoint and check the result. """ - if remote_storage_kind is not None: - neon_env_builder.enable_remote_storage( - remote_storage_kind, "test_delete_timeline_exercise_crash_safety_failpoints" - ) + neon_env_builder.enable_remote_storage( + remote_storage_kind, "test_delete_timeline_exercise_crash_safety_failpoints" + ) env = neon_env_builder.init_start( initial_tenant_conf={ @@ -231,7 +230,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints( ps_http.configure_failpoints((failpoint, "return")) - iterations = 20 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 4 + iterations = poll_for_remote_storage_iterations(remote_storage_kind) # These failpoints are earlier than background task is spawned. # so they result in api request failure. @@ -280,14 +279,14 @@ def test_delete_timeline_exercise_crash_safety_failpoints( "remote_storage_s3_request_seconds_count", filter={"request_type": "get_object", "result": "err"}, ).value - == 1 + == 2 # One is missing tenant deletion mark, second is missing index part ) assert ( m.query_one( "remote_storage_s3_request_seconds_count", filter={"request_type": "get_object", "result": "ok"}, ).value - == 1 + == 1 # index part for initial timeline ) elif check is Check.RETRY_WITHOUT_RESTART: # this should succeed @@ -413,27 +412,6 @@ def test_timeline_resurrection_on_attach( assert all([tl["state"] == "Active" for tl in timelines]) -def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str] = None): - # For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api. - assert neon_env_builder.remote_storage_kind in ( - RemoteStorageKind.MOCK_S3, - RemoteStorageKind.REAL_S3, - ) - # For mypy - assert isinstance(neon_env_builder.remote_storage, S3Storage) - - # Note that this doesnt use pagination, so list is not guaranteed to be exhaustive. - assert neon_env_builder.remote_storage_client is not None - response = neon_env_builder.remote_storage_client.list_objects_v2( - Bucket=neon_env_builder.remote_storage.bucket_name, - Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "", - ) - objects = response.get("Contents") - assert ( - response["KeyCount"] == 0 - ), f"remote dir with prefix {prefix} is not empty after deletion: {objects}" - - def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuilder): """ When deleting a timeline, if we succeed in setting the deleted flag remotely