diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8a061f3ae15ce..913d45d63c6dc 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -74,6 +74,7 @@ use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; use crate::tenant::storage_layer::LayerName; use crate::tenant::timeline::CompactFlags; +use crate::tenant::timeline::CompactionError; use crate::tenant::timeline::Timeline; use crate::tenant::GetTimelineError; use crate::tenant::SpawnMode; @@ -1813,11 +1814,22 @@ async fn timeline_checkpoint_handler( timeline .freeze_and_flush() .await - .map_err(ApiError::InternalServerError)?; + .map_err(|e| { + match e { + tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown, + other => ApiError::InternalServerError(other.into()), + + } + })?; timeline .compact(&cancel, flags, &ctx) .await - .map_err(|e| ApiError::InternalServerError(e.into()))?; + .map_err(|e| + match e { + CompactionError::ShuttingDown => ApiError::ShuttingDown, + CompactionError::Other(e) => ApiError::InternalServerError(e) + } + )?; if wait_until_uploaded { timeline.remote_client.wait_completion().await.map_err(ApiError::InternalServerError)?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e9651165b1068..35150b210e355 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -66,6 +66,7 @@ use crate::tenant::mgr::GetTenantError; use crate::tenant::mgr::ShardResolveResult; use crate::tenant::mgr::ShardSelector; use crate::tenant::mgr::TenantManager; +use crate::tenant::timeline::FlushLayerError; use crate::tenant::timeline::WaitLsnError; use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; @@ -830,7 +831,10 @@ impl PageServerHandler { // We only want to persist the data, and it doesn't matter if it's in the // shape of deltas or images. info!("flushing layers"); - timeline.freeze_and_flush().await?; + timeline.freeze_and_flush().await.map_err(|e| match e { + FlushLayerError::Cancelled => QueryError::Shutdown, + other => QueryError::Other(other.into()), + })?; info!("done"); Ok(()) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 4480c7df6e688..0fc846e5f3919 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -78,11 +78,19 @@ pub enum LsnForTimestamp { } #[derive(Debug, thiserror::Error)] -pub enum CalculateLogicalSizeError { +pub(crate) enum CalculateLogicalSizeError { #[error("cancelled")] Cancelled, + + /// Something went wrong while reading the metadata we use to calculate logical size + /// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`] + /// in the `From` implementation for this variant. #[error(transparent)] - Other(#[from] anyhow::Error), + PageRead(PageReconstructError), + + /// Something went wrong deserializing metadata that we read to calculate logical size + #[error("decode error: {0}")] + Decode(#[from] DeserializeError), } #[derive(Debug, thiserror::Error)] @@ -110,7 +118,7 @@ impl From for CalculateLogicalSizeError { PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => { Self::Cancelled } - _ => Self::Other(pre.into()), + _ => Self::PageRead(pre), } } } @@ -763,7 +771,7 @@ impl Timeline { /// # Cancel-Safety /// /// This method is cancellation-safe. - pub async fn get_current_logical_size_non_incremental( + pub(crate) async fn get_current_logical_size_non_incremental( &self, lsn: Lsn, ctx: &RequestContext, @@ -772,7 +780,7 @@ impl Timeline { // Fetch list of database dirs and iterate them let buf = self.get(DBDIR_KEY, lsn, ctx).await?; - let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?; + let dbdir = DbDirectory::des(&buf)?; let mut total_size: u64 = 0; for (spcnode, dbnode) in dbdir.dbdirs.keys() { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e6bfd57a447a9..311338554c6ab 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4154,7 +4154,7 @@ mod tests { .await?; writer.finish_write(lsn); } - tline.freeze_and_flush().await + tline.freeze_and_flush().await.map_err(|e| e.into()) } #[tokio::test] diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 3ac799c69a219..1ec13882da8c2 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -366,7 +366,10 @@ impl Layer { .0 .get_or_maybe_download(true, Some(ctx)) .await - .map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?; + .map_err(|err| match err { + DownloadError::DownloadCancelled => GetVectoredError::Cancelled, + other => GetVectoredError::Other(anyhow::anyhow!(other)), + })?; self.0 .access_stats @@ -1158,6 +1161,11 @@ impl LayerInner { let consecutive_failures = 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed); + if timeline.cancel.is_cancelled() { + // If we're shutting down, drop out before logging the error + return Err(e); + } + tracing::error!(consecutive_failures, "layer file download failed: {e:#}"); let backoff = utils::backoff::exponential_backoff_duration_seconds( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d4f6e25843b7b..d07e2352fa8c7 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -138,7 +138,7 @@ use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub(super) enum FlushLoopState { +pub(crate) enum FlushLoopState { NotStarted, Running { #[cfg(test)] @@ -577,7 +577,7 @@ impl PageReconstructError { } #[derive(thiserror::Error, Debug)] -enum CreateImageLayersError { +pub(crate) enum CreateImageLayersError { #[error("timeline shutting down")] Cancelled, @@ -591,17 +591,35 @@ enum CreateImageLayersError { Other(#[from] anyhow::Error), } -#[derive(thiserror::Error, Debug)] -enum FlushLayerError { +#[derive(thiserror::Error, Debug, Clone)] +pub(crate) enum FlushLayerError { /// Timeline cancellation token was cancelled #[error("timeline shutting down")] Cancelled, + /// We tried to flush a layer while the Timeline is in an unexpected state + #[error("cannot flush frozen layers when flush_loop is not running, state is {0:?}")] + NotRunning(FlushLoopState), + + // Arc<> the following non-clonable error types: we must be Clone-able because the flush error is propagated from the flush + // loop via a watch channel, where we can only borrow it. #[error(transparent)] - CreateImageLayersError(CreateImageLayersError), + CreateImageLayersError(Arc), #[error(transparent)] - Other(#[from] anyhow::Error), + Other(#[from] Arc), +} + +impl FlushLayerError { + // When crossing from generic anyhow errors to this error type, we explicitly check + // for timeline cancellation to avoid logging inoffensive shutdown errors as warn/err. + fn from_anyhow(timeline: &Timeline, err: anyhow::Error) -> Self { + if timeline.cancel.is_cancelled() { + Self::Cancelled + } else { + Self::Other(Arc::new(err)) + } + } } #[derive(thiserror::Error, Debug)] @@ -696,7 +714,7 @@ impl From for FlushLayerError { fn from(e: CreateImageLayersError) -> Self { match e { CreateImageLayersError::Cancelled => FlushLayerError::Cancelled, - any => FlushLayerError::CreateImageLayersError(any), + any => FlushLayerError::CreateImageLayersError(Arc::new(any)), } } } @@ -1547,13 +1565,13 @@ impl Timeline { /// Flush to disk all data that was written with the put_* functions #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))] - pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> { + pub(crate) async fn freeze_and_flush(&self) -> Result<(), FlushLayerError> { self.freeze_and_flush0().await } // This exists to provide a non-span creating version of `freeze_and_flush` we can call without // polluting the span hierarchy. - pub(crate) async fn freeze_and_flush0(&self) -> anyhow::Result<()> { + pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> { let to_lsn = self.freeze_inmem_layer(false).await; self.flush_frozen_layers_and_wait(to_lsn).await } @@ -2735,11 +2753,6 @@ impl Timeline { self.current_logical_size.initialized.add_permits(1); } - enum BackgroundCalculationError { - Cancelled, - Other(anyhow::Error), - } - let try_once = |attempt: usize| { let background_ctx = &background_ctx; let self_ref = &self; @@ -2757,10 +2770,10 @@ impl Timeline { (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit) } _ = self_ref.cancel.cancelled() => { - return Err(BackgroundCalculationError::Cancelled); + return Err(CalculateLogicalSizeError::Cancelled); } _ = cancel.cancelled() => { - return Err(BackgroundCalculationError::Cancelled); + return Err(CalculateLogicalSizeError::Cancelled); }, () = skip_concurrency_limiter.cancelled() => { // Some action that is part of a end user interaction requested logical size @@ -2787,18 +2800,7 @@ impl Timeline { .await { Ok(calculated_size) => Ok((calculated_size, metrics_guard)), - Err(CalculateLogicalSizeError::Cancelled) => { - Err(BackgroundCalculationError::Cancelled) - } - Err(CalculateLogicalSizeError::Other(err)) => { - if let Some(PageReconstructError::AncestorStopping(_)) = - err.root_cause().downcast_ref() - { - Err(BackgroundCalculationError::Cancelled) - } else { - Err(BackgroundCalculationError::Other(err)) - } - } + Err(e) => Err(e), } } }; @@ -2810,8 +2812,11 @@ impl Timeline { match try_once(attempt).await { Ok(res) => return ControlFlow::Continue(res), - Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()), - Err(BackgroundCalculationError::Other(e)) => { + Err(CalculateLogicalSizeError::Cancelled) => return ControlFlow::Break(()), + Err( + e @ (CalculateLogicalSizeError::Decode(_) + | CalculateLogicalSizeError::PageRead(_)), + ) => { warn!(attempt, "initial size calculation failed: {e:?}"); // exponential back-off doesn't make sense at these long intervals; // use fixed retry interval with generous jitter instead @@ -3717,7 +3722,9 @@ impl Timeline { return; } err @ Err( - FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_), + FlushLayerError::NotRunning(_) + | FlushLayerError::Other(_) + | FlushLayerError::CreateImageLayersError(_), ) => { error!("could not flush frozen layer: {err:?}"); break err.map(|_| ()); @@ -3763,7 +3770,10 @@ impl Timeline { /// `last_record_lsn` may be higher than the highest LSN of a frozen layer: if this is the case, /// it means no data will be written between the top of the highest frozen layer and to_lsn, /// e.g. because this tenant shard has ingested up to to_lsn and not written any data locally for that part of the WAL. - async fn flush_frozen_layers_and_wait(&self, last_record_lsn: Lsn) -> anyhow::Result<()> { + async fn flush_frozen_layers_and_wait( + &self, + last_record_lsn: Lsn, + ) -> Result<(), FlushLayerError> { let mut rx = self.layer_flush_done_tx.subscribe(); // Increment the flush cycle counter and wake up the flush task. @@ -3774,7 +3784,7 @@ impl Timeline { let flush_loop_state = { *self.flush_loop_state.lock().unwrap() }; if !matches!(flush_loop_state, FlushLoopState::Running { .. }) { - anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}") + return Err(FlushLayerError::NotRunning(flush_loop_state)); } self.layer_flush_start_tx.send_modify(|(counter, lsn)| { @@ -3787,14 +3797,11 @@ impl Timeline { { let (last_result_counter, last_result) = &*rx.borrow(); if *last_result_counter >= my_flush_request { - if let Err(_err) = last_result { + if let Err(err) = last_result { // We already logged the original error in // flush_loop. We cannot propagate it to the caller // here, because it might not be Cloneable - anyhow::bail!( - "Could not flush frozen layer. Request id: {}", - my_flush_request - ); + return Err(err.clone()); } else { return Ok(()); } @@ -3803,7 +3810,7 @@ impl Timeline { trace!("waiting for flush to complete"); tokio::select! { rx_e = rx.changed() => { - rx_e?; + rx_e.map_err(|_| FlushLayerError::NotRunning(*self.flush_loop_state.lock().unwrap()))?; }, // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring // the notification from [`flush_loop`] that it completed. @@ -3875,7 +3882,8 @@ impl Timeline { EnumSet::empty(), ctx, ) - .await?; + .await + .map_err(|e| FlushLayerError::from_anyhow(self, e))?; if self.cancel.is_cancelled() { return Err(FlushLayerError::Cancelled); @@ -3899,7 +3907,8 @@ impl Timeline { Some(metadata_keyspace.0.ranges[0].clone()), ctx, ) - .await? + .await + .map_err(|e| FlushLayerError::from_anyhow(self, e))? } else { None }; @@ -3926,7 +3935,11 @@ impl Timeline { // Normal case, write out a L0 delta layer file. // `create_delta_layer` will not modify the layer map. // We will remove frozen layer and add delta layer in one atomic operation later. - let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else { + let Some(layer) = self + .create_delta_layer(&frozen_layer, None, ctx) + .await + .map_err(|e| FlushLayerError::from_anyhow(self, e))? + else { panic!("delta layer cannot be empty if no filter is applied"); }; ( @@ -3959,7 +3972,8 @@ impl Timeline { if self.set_disk_consistent_lsn(disk_consistent_lsn) { // Schedule remote uploads that will reflect our new disk_consistent_lsn - self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?; + self.schedule_uploads(disk_consistent_lsn, layers_to_upload) + .map_err(|e| FlushLayerError::from_anyhow(self, e))?; } // release lock on 'layers' }; diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index e6ddabe5b5c65..4fc89330ba429 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use super::{layer_manager::LayerManager, Timeline}; +use super::{layer_manager::LayerManager, FlushLayerError, Timeline}; use crate::{ context::{DownloadBehavior, RequestContext}, task_mgr::TaskKind, @@ -23,7 +23,7 @@ pub(crate) enum Error { #[error("shutting down, please retry later")] ShuttingDown, #[error("flushing failed")] - FlushAncestor(#[source] anyhow::Error), + FlushAncestor(#[source] FlushLayerError), #[error("layer download failed")] RewrittenDeltaDownloadFailed(#[source] anyhow::Error), #[error("copying LSN prefix locally failed")] diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index b137fb3a5c2a7..6fe23846c7339 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -402,7 +402,7 @@ def get_resident_physical_size(): env.pageserver.allowed_errors.extend( [ ".*download failed: downloading evicted layer file failed.*", - f".*initial_size_calculation.*{tenant_id}.*{timeline_id}.*initial size calculation failed: downloading evicted layer file failed", + f".*initial_size_calculation.*{tenant_id}.*{timeline_id}.*initial size calculation failed.*downloading evicted layer file failed", ] )