From ba7077ae95e4bf38c717eea87dd45200f3731f76 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 24 Aug 2023 18:35:35 +0300 Subject: [PATCH] review fixes, few refactorings review comment: xref tested string doc: address review comment by jcsp drop TODO about better load time api LayerE::drop comments refactor: fix residency and metrics to layermanager refactor: LayerManager, remove arc blanket rename reorder, get rid of TODO move Layer and all to storage_layer::layer refactor: Layer initialization test: migrate to Layer::for_resident test: use guard_against_eviction from outside fix: rename the written out file in Layer ctor try to apply backoff *after* download might not work as we could get cancelled, but doing it right before seems wrong as well. We already retry the download. doc: few touches refactor: split get_or_maybe_download refactor: split evicting refactor: minor cleanup, doc test: fix test_timeline_deletion_with_files_stuck_in_upload_queue string change fix: delete and only then report evicted test: fix allowed error typo doc: minor fixes restore Layer::dump layer: remove dead comment and code fix: allow dropping from UploadQueue by spawn_blocking rename: garbage_collect => &_on_drop refactor: split guard_against_eviction into three - download - keep_resident - download_and_keep_resident No need to bool enum. botched rebase: lost impl AsRef doc: fix typo doc: remove obsolete comment doc: link to inmemorylayer doc: cleanup, add missing "the" doc: link to LayerMap::search doc: delete fixme about gentlemans agreements and strings doc: inmemlayer: cleanup comments eviction_task: remove confusing drop(candidates) doc: add cancellation safe comment refactor: simplify schedule upload and tests (rebase conflicts) lot of the non-conflicting changes were fixed in rebase here. refactor: Result<(), NeedsDownload> doc: typo doc: explain what the consecutive failures are for doc: another pass on LayerInner fix: subscribe before evicting reorder: get and get_or_apply_evictedness doc: check_expected_download info: stop using stat we no longer need to use it because in the latter versions we initialize to correct on-filesystem state with Layer::for_{resident,evicted}. doc: residentlayer vs. downloadedlayer and eviction doc: cancellation safety with evict_and_wait doc: note running without remote storage again reorder: 1. DownloadedLayer, 2. ResidentLayer doc: explain DownloadedLayer::get owner param doc: add validation doc: drop comment in favor of drop_eviction_guard image/deltalayer: shuffle comments around doc: simplify comment doc: create guard => new download has been started doc: when => while doc: remove comment about backoff doc: adjust while in queue doc: adjust more LayerInner::on_drop doc: assert &Arc and DownloadedLayer::owner fixup residentlayer comment test: allow witnessing stopping before broken doc: fix link refactor: rename LayerInner::on_drop to on_downloaded_layer_drop doc: fix outdated commit doc: fix broken link doc: comment about chance of both evictions selecting same layer refactor: move all metrics updates to layer this fixes some missing increments for num_persistent_files_created, persistent_bytes_written and removes double entries for residence events. refactor: rename for_written_tempfile -> finish_creating --- libs/utils/src/sync/heavier_once_cell.rs | 2 +- pageserver/src/disk_usage_eviction_task.rs | 5 +- pageserver/src/tenant.rs | 14 +- pageserver/src/tenant/layer_map.rs | 4 +- .../src/tenant/remote_timeline_client.rs | 134 +- .../tenant/remote_timeline_client/upload.rs | 2 + pageserver/src/tenant/storage_layer.rs | 1082 +-------------- .../src/tenant/storage_layer/delta_layer.rs | 143 +- .../src/tenant/storage_layer/image_layer.rs | 68 +- .../tenant/storage_layer/inmemory_layer.rs | 5 - pageserver/src/tenant/storage_layer/layer.rs | 1210 +++++++++++++++++ .../src/tenant/storage_layer/layer_desc.rs | 43 +- pageserver/src/tenant/timeline.rs | 132 +- .../src/tenant/timeline/eviction_task.rs | 17 +- .../src/tenant/timeline/layer_manager.rs | 60 +- test_runner/fixtures/neon_fixtures.py | 2 +- test_runner/regress/test_remote_storage.py | 2 +- 17 files changed, 1473 insertions(+), 1452 deletions(-) create mode 100644 pageserver/src/tenant/storage_layer/layer.rs diff --git a/libs/utils/src/sync/heavier_once_cell.rs b/libs/utils/src/sync/heavier_once_cell.rs index 634128d675dc..03011f701600 100644 --- a/libs/utils/src/sync/heavier_once_cell.rs +++ b/libs/utils/src/sync/heavier_once_cell.rs @@ -220,7 +220,7 @@ mod tests { #[tokio::test(start_paused = true)] async fn reinit_waits_for_deinit() { - // witht he tokio::time paused, we will "sleep" for 1s while holding the reinitialization + // with he tokio::time paused, we will "sleep" for 1s while holding the reinitialization let sleep_for = Duration::from_secs(1); let initial = 42; let reinit = 1; diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 12de754c2bf7..105879d93602 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -62,8 +62,7 @@ use crate::{ task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ self, - storage_layer::{AsLayerDesc, LayerE}, - timeline::EvictionError, + storage_layer::{AsLayerDesc, EvictionError, Layer}, Timeline, }, }; @@ -481,7 +480,7 @@ pub async fn disk_usage_eviction_task_iteration_impl( #[derive(Clone)] struct EvictionCandidate { timeline: Arc, - layer: Arc, + layer: Layer, last_activity_ts: SystemTime, } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index b79fbb77a950..bcdb9ee4ec15 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3979,8 +3979,7 @@ mod tests { Ok(()) } - /// FIXME: I don't want to add dump to LayerE, it should be in the ctl - /*#[tokio::test] + #[tokio::test] async fn delta_layer_dumping() -> anyhow::Result<()> { use storage_layer::AsLayerDesc; let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await; @@ -3990,22 +3989,23 @@ mod tests { make_some_layers(tline.as_ref(), Lsn(0x20)).await?; let layer_map = tline.layers.read().await; - let level0_deltas = layer_map.layer_map().get_level0_deltas()?; + let level0_deltas = layer_map + .layer_map() + .get_level0_deltas()? + .into_iter() + .map(|desc| layer_map.get_from_desc(&desc)) + .collect::>(); assert!(!level0_deltas.is_empty()); for delta in level0_deltas { - let delta = layer_map.get_from_desc(&delta); // Ensure we are dumping a delta layer here assert!(delta.layer_desc().is_delta); - - delta.dump(false, &ctx).await.unwrap(); delta.dump(true, &ctx).await.unwrap(); } Ok(()) } - */ #[tokio::test] async fn corrupt_metadata() -> anyhow::Result<()> { diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 23d753f535f1..9b6225501fdb 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -639,8 +639,8 @@ impl LayerMap { } println!("historic_layers:"); - for layer in self.iter_historic_layers() { - layer.dump(verbose, ctx)?; + for desc in self.iter_historic_layers() { + desc.dump(); } println!("End dump LayerMap"); Ok(()) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index c40a15d96461..e2b8705a7bf8 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -231,7 +231,7 @@ use crate::metrics::{ }; use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; -use crate::tenant::remote_timeline_client::index::LayerFileMetadata; +pub(crate) use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::AsLayerDesc; use crate::tenant::upload_queue::Delete; use crate::tenant::TIMELINES_SEGMENT_NAME; @@ -610,21 +610,19 @@ impl RemoteTimelineClient { pub(crate) fn schedule_layer_file_upload( self: &Arc, layer: ResidentLayer, - layer_metadata: &LayerFileMetadata, ) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - // FIXME: we might be still including no longer existing files in the index_part because - // that consistency is built on strings and gentleman agreements, not Weak which - // could be upgraded at the time of rendering of index_part. + let metadata = layer.metadata(); + upload_queue .latest_files - .insert(layer.layer_desc().filename(), layer_metadata.clone()); + .insert(layer.layer_desc().filename(), metadata.clone()); upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; info!("scheduled layer file upload {layer}"); - let op = UploadOp::UploadLayer(layer, layer_metadata.clone()); + let op = UploadOp::UploadLayer(layer, metadata); self.calls_unfinished_metric_begin(&op); upload_queue.queued_operations.push_back(op); @@ -1461,7 +1459,7 @@ mod tests { context::RequestContext, tenant::{ harness::{TenantHarness, TIMELINE_ID}, - storage_layer::{LayerE, PersistentLayerDesc}, + storage_layer::Layer, Generation, Tenant, Timeline, }, DEFAULT_PG_VERSION, @@ -1599,73 +1597,29 @@ mod tests { let generation = harness.generation; // Create a couple of dummy files, schedule upload for them - let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); - let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(); - let layer_file_name_3: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(); - let content_1 = dummy_contents("foo"); - let content_2 = dummy_contents("bar"); - let content_3 = dummy_contents("baz"); - - for (filename, content) in [ - (&layer_file_name_1, &content_1), - (&layer_file_name_2, &content_2), - (&layer_file_name_3, &content_3), - ] { - std::fs::write(timeline_path.join(filename.file_name()), content).unwrap(); - } - let layer_file_1 = LayerE::for_written( - harness.conf, - &timeline, - PersistentLayerDesc::from_filename( - timeline.tenant_id, - timeline.timeline_id, - layer_file_name_1.clone(), - content_1.len() as u64, - ), - ) - .unwrap(); - - // FIXME: need that api for local files - assert!(layer_file_1.needs_download_blocking().unwrap().is_none()); - - let layer_file_2 = LayerE::for_written( - harness.conf, - &timeline, - PersistentLayerDesc::from_filename( - timeline.tenant_id, - timeline.timeline_id, - layer_file_name_2.clone(), - content_2.len() as u64, - ), - ) - .unwrap(); - assert!(layer_file_2.needs_download_blocking().unwrap().is_none()); - - let layer_file_3 = LayerE::for_written( - harness.conf, - &timeline, - PersistentLayerDesc::from_filename( - timeline.tenant_id, - timeline.timeline_id, - layer_file_name_3.clone(), - content_3.len() as u64, - ), - ) - .unwrap(); - assert!(layer_file_3.needs_download_blocking().unwrap().is_none()); + let layers = [ + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz")) + ] + .into_iter() + .map(|(name, contents): (LayerFileName, Vec)| { + std::fs::write(timeline_path.join(name.file_name()), &contents).unwrap(); + + Layer::for_resident( + harness.conf, + &timeline, + name, + LayerFileMetadata::new(contents.len() as u64, generation), + ) + }).collect::>(); client - .schedule_layer_file_upload( - layer_file_1.clone(), - &LayerFileMetadata::new(content_1.len() as u64, generation), - ) + .schedule_layer_file_upload(layers[0].clone()) .unwrap(); client - .schedule_layer_file_upload( - layer_file_2.clone(), - &LayerFileMetadata::new(content_2.len() as u64, generation), - ) + .schedule_layer_file_upload(layers[1].clone()) .unwrap(); // Check that they are started immediately, not queued @@ -1719,21 +1673,18 @@ mod tests { .collect(), &[ &initial_layer.file_name(), - &layer_file_name_1.file_name(), - &layer_file_name_2.file_name(), + &layers[0].layer_desc().filename().file_name(), + &layers[1].layer_desc().filename().file_name(), ], ); assert_eq!(index_part.metadata, metadata); // Schedule upload and then a deletion. Check that the deletion is queued client - .schedule_layer_file_upload( - layer_file_3.clone(), - &LayerFileMetadata::new(content_3.len() as u64, generation), - ) + .schedule_layer_file_upload(layers[2].clone()) .unwrap(); client - .schedule_layer_file_deletion(&[layer_file_name_1.clone()]) + .schedule_layer_file_deletion(&[layers[0].layer_desc().filename()]) .unwrap(); { let mut guard = client.upload_queue.lock().unwrap(); @@ -1749,8 +1700,8 @@ mod tests { assert_remote_files( &[ &initial_layer.file_name(), - &layer_file_name_1.file_name(), - &layer_file_name_2.file_name(), + &layers[0].layer_desc().filename().file_name(), + &layers[1].layer_desc().filename().file_name(), "index_part.json", ], &remote_timeline_dir, @@ -1763,8 +1714,8 @@ mod tests { assert_remote_files( &[ &initial_layer.file_name(), - &layer_file_name_2.file_name(), - &layer_file_name_3.file_name(), + &layers[1].layer_desc().filename().file_name(), + &layers[2].layer_desc().filename().file_name(), "index_part.json", ], &remote_timeline_dir, @@ -1793,20 +1744,12 @@ mod tests { ) .unwrap(); - let layer_file_1 = LayerE::for_written( + let layer_file_1 = Layer::for_resident( harness.conf, &timeline, - PersistentLayerDesc::from_filename( - timeline.tenant_id, - timeline.timeline_id, - layer_file_name_1.clone(), - content_1.len() as u64, - ), - ) - .unwrap(); - - // FIXME: need that api for local files that actually exist - assert!(layer_file_1.needs_download_blocking().unwrap().is_none()); + layer_file_name_1.clone(), + LayerFileMetadata::new(content_1.len() as u64, harness.generation), + ); #[derive(Debug, PartialEq, Clone, Copy)] struct BytesStartedFinished { @@ -1843,10 +1786,7 @@ mod tests { let actual_a = get_bytes_started_stopped(); client - .schedule_layer_file_upload( - layer_file_1.clone(), - &LayerFileMetadata::new(content_1.len() as u64, harness.generation), - ) + .schedule_layer_file_upload(layer_file_1.clone()) .unwrap(); let actual_b = get_bytes_started_stopped(); diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index c442c4f4456e..83dae2a01fba 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -68,6 +68,8 @@ pub(super) async fn upload_timeline_layer<'a>( // upload. However, a nonexistent file can also be indicative of // something worse, like when a file is scheduled for upload before // it has been written to disk yet. + // + // This is tested against `test_compaction_delete_before_upload` info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."); return Ok(()); } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 07b951773803..3b2a61dcbaa1 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -4,30 +4,23 @@ pub mod delta_layer; mod filename; mod image_layer; mod inmemory_layer; +mod layer; mod layer_desc; -use crate::config::PageServerConf; use crate::context::{AccessStatsBehavior, RequestContext}; -use crate::repository::Key; use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; -use anyhow::Context; -use anyhow::Result; use bytes::Bytes; use enum_map::EnumMap; use enumset::EnumSet; use once_cell::sync::Lazy; -use pageserver_api::models::LayerAccessKind; use pageserver_api::models::{ - HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, + LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, }; use std::ops::Range; -use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::Mutex; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tracing::warn; -use tracing::Instrument; use utils::history_buffer::HistoryBufferWithDropCounter; use utils::rate_limit::RateLimit; @@ -42,11 +35,7 @@ pub use image_layer::{ImageLayer, ImageLayerWriter}; pub use inmemory_layer::InMemoryLayer; pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; -use self::delta_layer::DeltaEntry; -use super::remote_timeline_client::index::LayerFileMetadata; -use super::remote_timeline_client::RemoteTimelineClient; -use super::Timeline; -use utils::sync::heavier_once_cell; +pub(crate) use layer::{EvictionError, Layer, ResidentLayer}; pub fn range_overlaps(a: &Range, b: &Range) -> bool where @@ -81,7 +70,7 @@ pub struct ValueReconstructState { pub img: Option<(Lsn, Bytes)>, } -/// Return value from [`LayerE::get_value_reconstruct_data`] +/// Return value from [`Layer::get_value_reconstruct_data`] #[derive(Clone, Copy, Debug)] pub enum ValueReconstructResult { /// Got all the data needed to reconstruct the requested page @@ -308,1067 +297,6 @@ impl LayerAccessStats { } } -/// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted. -/// -/// However when we want something evicted, we cannot evict it right away as there might be current -/// reads happening on it. It has been for example searched from [`LayerMap`] but not yet -/// [`LayerE::get_value_reconstruct_data`]. -/// -/// [`LayerMap`]: crate::tenant::layer_map::LayerMap -enum ResidentOrWantedEvicted { - Resident(Arc), - WantedEvicted(Weak), -} - -impl ResidentOrWantedEvicted { - fn get(&self) -> Option> { - match self { - ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()), - ResidentOrWantedEvicted::WantedEvicted(weak) => weak.upgrade(), - } - } - /// When eviction is first requested, drop down to holding a [`Weak`]. - /// - /// Returns `true` if this was the first time eviction was requested. - fn downgrade(&mut self) -> &Weak { - let _was_first = match self { - ResidentOrWantedEvicted::Resident(strong) => { - let weak = Arc::downgrade(strong); - *self = ResidentOrWantedEvicted::WantedEvicted(weak); - // returning the weak is not useful, because the drop could had already ran with - // the replacement above, and that will take care of cleaning the Option we are in - true - } - ResidentOrWantedEvicted::WantedEvicted(_) => false, - }; - - match self { - ResidentOrWantedEvicted::WantedEvicted(ref weak) => weak, - _ => unreachable!("just wrote wanted evicted"), - } - } -} - -/// A Layer contains all data in a "rectangle" consisting of a range of keys and -/// range of LSNs. -/// -/// There are two kinds of layers, in-memory and on-disk layers. In-memory -/// layers are used to ingest incoming WAL, and provide fast access to the -/// recent page versions. On-disk layers are stored as files on disk, and are -/// immutable. This trait presents the common functionality of in-memory and -/// on-disk layers. -/// -/// Furthermore, there are two kinds of on-disk layers: delta and image layers. -/// A delta layer contains all modifications within a range of LSNs and keys. -/// An image layer is a snapshot of all the data in a key-range, at a single -/// LSN. -/// -/// This type models the on-disk layers, which can be evicted and on-demand downloaded. -// TODO: -// - internal arc, because I've now worked away majority of external wrapping -// - load time api which checks that files are present, fixmes in load time, remote timeline -// client tests -pub(crate) struct LayerE { - // do we really need this? - conf: &'static PageServerConf, - path: PathBuf, - - desc: PersistentLayerDesc, - - timeline: Weak, - - access_stats: LayerAccessStats, - - /// This custom OnceCell is backed by std mutex, but only held for short time periods. - /// Initialization and deinitialization is done while holding a permit. - inner: heavier_once_cell::OnceCell, - - /// Do we want to garbage collect this when `LayerE` is dropped, where garbage collection - /// means: - /// - schedule remote deletion - /// - instant local deletion - wanted_garbage_collected: AtomicBool, - - /// Accessed using `Ordering::Acquire` or `Ordering::Release` to have happens before together - /// to allow wait-less `evict` - /// - /// FIXME: this is likely bogus assumption, there is still time for us to set the flag in - /// `evict` after the task holding the lock has made the check and is dropping the mutex guard. - /// - /// However eviction will try to evict this again, so maybe it's fine? - wanted_evicted: AtomicBool, - - /// Version is to make sure we will in fact only evict a file if no new guard has been created - /// for it. - version: AtomicUsize, - - have_remote_client: bool, - - /// Allow subscribing to when the layer actually gets evicted. - /// - /// This might never come unless eviction called periodically. - status: tokio::sync::broadcast::Sender, -} - -#[derive(Debug, Clone, Copy)] -enum Status { - Evicted, - Downloaded, -} - -impl std::fmt::Display for LayerE { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.layer_desc().short_id()) - } -} - -impl std::fmt::Debug for LayerE { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self) - } -} - -impl AsLayerDesc for LayerE { - fn layer_desc(&self) -> &PersistentLayerDesc { - &self.desc - } -} - -impl Drop for LayerE { - fn drop(&mut self) { - if !*self.wanted_garbage_collected.get_mut() { - // should we try to evict if the last wish was for eviction? - // feels like there's some hazard of overcrowding near shutdown near by, but we don't - // run drops during shutdown (yet) - return; - } - - // TODO: spawn_blocking? - let span = tracing::info_span!(parent: None, "layer_drop", tenant_id = %self.layer_desc().tenant_id, timeline_id = %self.layer_desc().timeline_id, layer = %self); - - // SEMITODO: yes, this is sync, could spawn as well.. - let _g = span.entered(); - - let mut removed = false; - match std::fs::remove_file(&self.path) { - Ok(()) => { - removed = true; - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - // FIXME: unsure how to handle this. there are no deleted by mistake here, but it - // feels like the downloadedness state tracking, and so knowing here if the file - // should be present or not, requires load_layer_map fixing. - } - Err(e) => { - // FIXME: it is possible, that we've just evicted the layer or it was always remote - tracing::error!(layer = %self, "failed to remove garbage collected layer: {e}"); - } - } - - if let Some(timeline) = self.timeline.upgrade() { - if removed { - timeline - .metrics - .resident_physical_size_gauge - .sub(self.layer_desc().file_size); - } - if let Some(remote_client) = timeline.remote_client.as_ref() { - let res = - remote_client.schedule_layer_file_deletion(&[self.layer_desc().filename()]); - - if let Err(e) = res { - if !timeline.is_active() { - // downgrade the warning to info maybe? - } - tracing::warn!(layer=%self, "scheduling deletion on drop failed: {e:#}"); - } - } - } else { - // no need to nag that timeline is gone - } - } -} - -impl LayerE { - pub(crate) fn for_evicted( - conf: &'static PageServerConf, - timeline: &Arc, - file_name: LayerFileName, - metadata: LayerFileMetadata, - ) -> Arc { - let path = conf - .timeline_path(&timeline.tenant_id, &timeline.timeline_id) - .join(file_name.file_name()); - - let desc = PersistentLayerDesc::from_filename( - timeline.tenant_id, - timeline.timeline_id, - file_name, - metadata.file_size(), - ); - - let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted); - - let outer = Arc::new(LayerE { - conf, - path, - desc, - timeline: Arc::downgrade(timeline), - access_stats, - inner: heavier_once_cell::OnceCell::default(), - wanted_garbage_collected: AtomicBool::default(), - wanted_evicted: AtomicBool::default(), - version: AtomicUsize::default(), - have_remote_client: timeline.remote_client.is_some(), - status: tokio::sync::broadcast::channel(1).0, - }); - - debug_assert!(outer.needs_download_blocking().unwrap().is_some()); - - outer - } - - pub(crate) fn for_resident( - conf: &'static PageServerConf, - timeline: &Arc, - file_name: LayerFileName, - metadata: LayerFileMetadata, - ) -> ResidentLayer { - let path = conf - .timeline_path(&timeline.tenant_id, &timeline.timeline_id) - .join(file_name.file_name()); - - let desc = PersistentLayerDesc::from_filename( - timeline.tenant_id, - timeline.timeline_id, - file_name, - metadata.file_size(), - ); - - let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident); - - let mut resident = None; - - let outer = Arc::new_cyclic(|owner| { - let inner = Arc::new(DownloadedLayer { - owner: owner.clone(), - kind: tokio::sync::OnceCell::default(), - }); - resident = Some(inner.clone()); - LayerE { - conf, - path, - desc, - timeline: Arc::downgrade(timeline), - have_remote_client: timeline.remote_client.is_some(), - access_stats, - wanted_garbage_collected: AtomicBool::new(false), - wanted_evicted: AtomicBool::new(false), - inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)), - version: AtomicUsize::new(0), - status: tokio::sync::broadcast::channel(1).0, - } - }); - - debug_assert!(outer.needs_download_blocking().unwrap().is_none()); - - let downloaded = resident.expect("just initialized"); - - ResidentLayer { - downloaded, - owner: outer, - } - } - - pub(crate) fn for_written( - conf: &'static PageServerConf, - timeline: &Arc, - desc: PersistentLayerDesc, - ) -> anyhow::Result { - let path = conf - .timeline_path(&desc.tenant_id, &desc.timeline_id) - .join(desc.filename().to_string()); - - let mut resident = None; - - let outer = Arc::new_cyclic(|owner| { - let inner = Arc::new(DownloadedLayer { - owner: owner.clone(), - kind: tokio::sync::OnceCell::default(), - }); - resident = Some(inner.clone()); - LayerE { - conf, - path, - desc, - timeline: Arc::downgrade(timeline), - have_remote_client: timeline.remote_client.is_some(), - access_stats: LayerAccessStats::empty_will_record_residence_event_later(), - wanted_garbage_collected: AtomicBool::new(false), - wanted_evicted: AtomicBool::new(false), - inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)), - version: AtomicUsize::new(0), - status: tokio::sync::broadcast::channel(1).0, - } - }); - - // FIXME: ugly, but if we don't do this check here, any error will pop up at read time - // but we cannot check it because DeltaLayerWriter and ImageLayerWriter create the - // instances *before* renaming the file to final destination - // anyhow::ensure!( - // outer.needs_download_blocking()?.is_none(), - // "should not need downloading if it was just written" - // ); - - // FIXME: because we can now do garbage collection on drop, should we mark these files as - // garbage collected until they get really get added to LayerMap? consider that files are - // written out to disk, fsynced, renamed by `{Delta,Image}LayerWriter`, then waiting for - // remaining files to be generated (compaction, create_image_layers) before being added to - // LayerMap. We could panic or just error out during that time, even for unrelated reasons, - // but the files would be left. - - Ok(ResidentLayer { - downloaded: resident.expect("just wrote Some"), - owner: outer, - }) - } - - pub(crate) async fn evict_and_wait( - self: &Arc, - _: &RemoteTimelineClient, - ) -> Result<(), super::timeline::EvictionError> { - use tokio::sync::broadcast::error::RecvError; - - assert!(self.have_remote_client); - - self.wanted_evicted.store(true, Ordering::Release); - - let mut rx = self.status.subscribe(); - - // why call get instead of looking at the watch? because get will downgrade any - // Arc<_> it finds, because we set the wanted_evicted - if self.get().is_none() { - // it was not evictable in the first place - // our store to the wanted_evicted does not matter; it will be reset by next download - return Err(super::timeline::EvictionError::NotFound); - } - - match rx.recv().await { - Ok(Status::Evicted) => Ok(()), - Ok(Status::Downloaded) => Err(super::timeline::EvictionError::Downloaded), - Err(RecvError::Closed) => { - unreachable!("sender cannot be dropped while we are in &self method") - } - Err(RecvError::Lagged(_)) => { - // this is quite unlikely, but we are blocking a lot in the async context, so - // we might be missing this because we are stuck on a LIFO slot on a thread - // which is busy blocking for a 1TB database create_image_layers. - // - // use however late (compared to the initial expressing of wanted) as the - // "outcome" now - match self.get() { - Some(_) => Err(super::timeline::EvictionError::Downloaded), - None => Ok(()), - } - } - } - } - - /// Access the current state without waiting for the file to be downloaded. - /// - /// Used by eviction only. Requires that we've initialized to state which is respective to the - /// actual residency state. - fn get(&self) -> Option> { - let locked = self.inner.get(); - Self::get_or_apply_evictedness(locked, &self.wanted_evicted) - } - - /// Delete the layer file when the `self` gets dropped, also schedule a remote index upload - /// then perhaps. - pub(crate) fn garbage_collect(&self) { - self.wanted_garbage_collected.store(true, Ordering::Release); - } - - /// Return data needed to reconstruct given page at LSN. - /// - /// It is up to the caller to collect more data from previous layer and - /// perform WAL redo, if necessary. - /// - /// See PageReconstructResult for possible return values. The collected data - /// is appended to reconstruct_data; the caller should pass an empty struct - /// on first call, or a struct with a cached older image of the page if one - /// is available. If this returns ValueReconstructResult::Continue, look up - /// the predecessor layer and call again with the same 'reconstruct_data' to - /// collect more data. - pub(crate) async fn get_value_reconstruct_data( - self: &Arc, - key: Key, - lsn_range: Range, - reconstruct_data: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> anyhow::Result { - use anyhow::ensure; - - let layer = self.get_or_maybe_download(true, Some(ctx)).await?; - self.access_stats - .record_access(LayerAccessKind::GetValueReconstructData, ctx); - - if self.layer_desc().is_delta { - ensure!(lsn_range.start >= self.layer_desc().lsn_range.start); - ensure!(self.layer_desc().key_range.contains(&key)); - } else { - ensure!(self.layer_desc().key_range.contains(&key)); - ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn()); - ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn()); - } - - layer - .get_value_reconstruct_data(key, lsn_range, reconstruct_data, self) - .await - } - - /// Creates a guard object which prohibit evicting this layer as long as the value is kept - /// around. - pub(crate) async fn guard_against_eviction( - self: &Arc, - allow_download: bool, - ) -> anyhow::Result { - let downloaded = self.get_or_maybe_download(allow_download, None).await?; - - Ok(ResidentLayer { - downloaded, - owner: self.clone(), - }) - } - - pub(crate) async fn get_or_download(self: &Arc) -> anyhow::Result<()> { - self.get_or_maybe_download(true, None).await?; - Ok(()) - } - - fn get_or_apply_evictedness( - guard: Option>, - wanted_evicted: &AtomicBool, - ) -> Option> { - if let Some(mut x) = guard { - if let Some(won) = x.get() { - // there are no guarantees that we will always get to observe a concurrent call - // to evict - if wanted_evicted.load(Ordering::Acquire) { - x.downgrade(); - } - return Some(won); - } - } - - None - } - - /// Cancellation safe. - async fn get_or_maybe_download( - self: &Arc, - allow_download: bool, - ctx: Option<&RequestContext>, - ) -> Result, DownloadError> { - let download = move || async move { - // disable any scheduled but not yet running eviction deletions for this - self.version.fetch_add(1, Ordering::Relaxed); - - // no need to make the evict_and_wait wait for the actual download to complete - drop(self.status.send(Status::Downloaded)); - - // technically the mutex could be dropped here. - let timeline = self - .timeline - .upgrade() - .ok_or_else(|| DownloadError::TimelineShutdown)?; - - let can_ever_evict = timeline.remote_client.as_ref().is_some(); - - // check if we really need to be downloaded; could have been already downloaded by a - // cancelled previous attempt. - let needs_download = self - .needs_download() - .await - .map_err(DownloadError::PreStatFailed)?; - - if let Some(reason) = needs_download { - // only reset this after we've decided we really need to download. otherwise it'd - // be impossible to mark cancelled downloads for eviction, like one could imagine - // we would like to do for prefetching which was not needed. - self.wanted_evicted.store(false, Ordering::Release); - - if !can_ever_evict { - return Err(DownloadError::NoRemoteStorage); - } - - if self.wanted_garbage_collected.load(Ordering::Acquire) { - // it will fail because we should had already scheduled a delete and an - // index update - tracing::info!(%reason, "downloading a wanted garbage collected layer, this might fail"); - // FIXME: we probably do not gc delete until the file goes away...? unsure - } else { - tracing::debug!(%reason, "downloading layer"); - } - - if let Some(ctx) = ctx { - use crate::context::DownloadBehavior::*; - let b = ctx.download_behavior(); - match b { - Download => {} - Warn | Error => { - warn!( - "unexpectedly on-demand downloading remote layer {self} for task kind {:?}", - ctx.task_kind() - ); - crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc(); - - let really_error = matches!(b, Error) - && !self.conf.ondemand_download_behavior_treat_error_as_warn; - - if really_error { - // this check is only probablistic, seems like flakyness footgun - return Err(DownloadError::ContextAndConfigReallyDeniesDownloads); - } - } - } - } - - if !allow_download { - // this does look weird, but for LayerE the "downloading" means also changing - // internal once related state ... - return Err(DownloadError::DownloadRequired); - } - - let task_name = format!("download layer {}", self); - - let (tx, rx) = tokio::sync::oneshot::channel(); - // this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot - // block tenant::mgr::remove_tenant_from_memory. - - let this = self.clone(); - crate::task_mgr::spawn( - &tokio::runtime::Handle::current(), - TaskKind::RemoteDownloadTask, - Some(self.desc.tenant_id), - Some(self.desc.timeline_id), - &task_name, - false, - async move { - let client = timeline - .remote_client - .as_ref() - .expect("checked above with have_remote_client"); - - let result = client.download_layer_file( - &this.desc.filename(), - &crate::tenant::remote_timeline_client::index::LayerFileMetadata::new( - this.desc.file_size, - ), - ) - .await; - - let result = match result { - Ok(size) => { - timeline.metrics.resident_physical_size_gauge.add(size); - Ok(()) - } - Err(e) => { - Err(e) - } - }; - - if let Err(res) = tx.send(result) { - match res { - Ok(()) => { - // our caller is cancellation safe so this is fine; if someone - // else requests the layer, they'll find it already downloaded - // or redownload. - // - // however, could be that we should consider marking the layer - // for eviction? alas, cannot: because only DownloadedLayer - // will handle that. - }, - Err(e) => { - // our caller is cancellation safe, but we might be racing with - // another attempt to reinitialize. before we have cancellation - // token support: these attempts should converge regardless of - // their completion order. - tracing::error!("layer file download failed, and additionally failed to communicate this to caller: {e:?}"); - } - } - } - - Ok(()) - } - .in_current_span(), - ); - match rx.await { - Ok(Ok(())) => { - if let Some(reason) = self - .needs_download() - .await - .map_err(DownloadError::PostStatFailed)? - { - // this is really a bug in needs_download or remote timeline client - panic!("post-condition failed: needs_download returned {reason:?}"); - } - } - Ok(Err(e)) => { - tracing::error!("layer file download failed: {e:#}"); - return Err(DownloadError::DownloadFailed); - // FIXME: we need backoff here so never spiral to download loop, maybe, - // because remote timeline client already retries - } - Err(_gone) => { - return Err(DownloadError::DownloadCancelled); - } - } - } else { - // the file is present locally and we could even be running without remote - // storage - } - - let res = Arc::new(DownloadedLayer { - owner: Arc::downgrade(self), - kind: tokio::sync::OnceCell::default(), - }); - - self.access_stats.record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::ResidenceChange, - ); - - Ok(if self.wanted_evicted.load(Ordering::Acquire) { - // because we reset wanted_evictness earlier, this most likely means when we were downloading someone - // wanted to evict this layer. - ResidentOrWantedEvicted::WantedEvicted(Arc::downgrade(&res)) - } else { - ResidentOrWantedEvicted::Resident(res.clone()) - }) - }; - - let locked = self.inner.get_or_init(download).await?; - - Ok( - Self::get_or_apply_evictedness(Some(locked), &self.wanted_evicted) - .expect("It is not none, we just received it"), - ) - } - - pub(crate) fn local_path(&self) -> &std::path::Path { - // maybe it does make sense to have this or maybe not - &self.path - } - - async fn needs_download(&self) -> Result, std::io::Error> { - match tokio::fs::metadata(self.local_path()).await { - Ok(m) => Ok(self.is_file_present_and_good_size(&m)), - Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)), - Err(e) => Err(e), - } - } - - pub(crate) fn needs_download_blocking(&self) -> Result, std::io::Error> { - match self.local_path().metadata() { - Ok(m) => Ok(self.is_file_present_and_good_size(&m)), - Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)), - Err(e) => Err(e), - } - } - - fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Option { - // in future, this should include sha2-256 the file, hopefully rarely, because info uses - // this as well - if !m.is_file() { - Some(NeedsDownload::NotFile) - } else if m.len() != self.desc.file_size { - Some(NeedsDownload::WrongSize { - actual: m.len(), - expected: self.desc.file_size, - }) - } else { - None - } - } - - pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { - let layer_file_name = self.desc.filename().file_name(); - - let remote = self - .needs_download_blocking() - .map(|maybe| maybe.is_some()) - .unwrap_or(false); - let access_stats = self.access_stats.as_api_model(reset); - - if self.desc.is_delta { - let lsn_range = &self.desc.lsn_range; - - HistoricLayerInfo::Delta { - layer_file_name, - layer_file_size: self.desc.file_size, - lsn_start: lsn_range.start, - lsn_end: lsn_range.end, - remote, - access_stats, - } - } else { - let lsn = self.desc.image_layer_lsn(); - - HistoricLayerInfo::Image { - layer_file_name, - layer_file_size: self.desc.file_size, - lsn_start: lsn, - remote, - access_stats, - } - } - } - - pub(crate) fn access_stats(&self) -> &LayerAccessStats { - &self.access_stats - } - - /// Our resident layer has been dropped, we might hold the lock elsewhere. - fn on_drop(self: Arc) { - let gc = self.wanted_garbage_collected.load(Ordering::Acquire); - let evict = self.wanted_evicted.load(Ordering::Acquire); - let can_evict = self.have_remote_client; - - if gc { - // do nothing now, only when the whole layer is dropped. gc will end up deleting the - // whole layer, in case there is no reference cycle. - } else if can_evict && evict { - // we can remove this right now, but ... we really should not block or do anything. - // spawn a task which first does a version check, and that version is also incremented - // on get_or_download, so we will not collide? - let version = self.version.load(Ordering::Relaxed); - - let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_id, timeline_id = %self.desc.timeline_id, layer=%self); - - // downgrade in case there's a queue backing up, or we are just tearing stuff down, and - // would soon delete anyways. - let this = Arc::downgrade(&self); - drop(self); - - let eviction = { - let span = tracing::info_span!(parent: span.clone(), "blocking"); - async move { - // the layer is already gone, don't do anything. LayerE drop has already ran. - let Some(this) = this.upgrade() else { return; }; - - // deleted or detached timeline, don't do anything. - let Some(timeline) = this.timeline.upgrade() else { return; }; - - // to avoid starting a new download while we evict, keep holding on to the - // permit. note that we will not close the semaphore when done, because it will - // be used by the re-download. - let _permit = { - let maybe_downloaded = this.inner.get(); - // relaxed ordering: we dont have any other atomics pending - if version != this.version.load(Ordering::Relaxed) { - // downloadness-state has advanced, we might no longer be the latest eviction - // work; don't do anything. - return; - } - - // free the DownloadedLayer allocation - match maybe_downloaded.map(|mut g| g.take_and_deinit()) { - Some((taken, permit)) => { - assert!(matches!(taken, ResidentOrWantedEvicted::WantedEvicted(_))); - permit - } - None => { - unreachable!("we do the version checking for this exact reason") - } - } - }; - - if !this.wanted_evicted.load(Ordering::Acquire) { - // if there's already interest, should we just early exit? this is not - // currently *cleared* on interest, maybe it shouldn't? - // FIXME: wanted_evicted cannot be unset right now - // - // NOTE: us holding the permit prevents a new round of download happening - // right now - return; - } - - let path = this.path.to_owned(); - - let capture_mtime_and_delete = tokio::task::spawn_blocking({ - let span = span.clone(); - move || { - let _e = span.entered(); - // FIXME: we can now initialize the mtime during first get_or_download, - // and track that in-memory for the following? does that help? - let m = path.metadata()?; - let local_layer_mtime = m.modified()?; - std::fs::remove_file(&path)?; - Ok::<_, std::io::Error>(local_layer_mtime) - } - }); - - let res = capture_mtime_and_delete.await; - - this.access_stats.record_residence_event(LayerResidenceStatus::Evicted, LayerResidenceEventReason::ResidenceChange); - - drop(this.status.send(Status::Evicted)); - - match res { - Ok(Ok(local_layer_mtime)) => { - let duration = - std::time::SystemTime::now().duration_since(local_layer_mtime); - match duration { - Ok(elapsed) => { - timeline - .metrics - .evictions_with_low_residence_duration - .read() - .unwrap() - .observe(elapsed); - tracing::info!( - residence_millis = elapsed.as_millis(), - "evicted layer after known residence period" - ); - } - Err(_) => { - tracing::info!("evicted layer after unknown residence period"); - } - } - timeline - .metrics - .resident_physical_size_gauge - .sub(this.desc.file_size); - } - Ok(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => { - tracing::info!("failed to evict file from disk, it was already gone"); - } - Ok(Err(e)) => { - tracing::warn!("failed to evict file from disk: {e:#}"); - } - Err(je) if je.is_cancelled() => unreachable!("unsupported"), - Err(je) if je.is_panic() => { /* already logged */ } - Err(je) => { - tracing::warn!(error = ?je, "unexpected join_error while evicting the file") - } - } - } - } - .instrument(span); - - crate::task_mgr::BACKGROUND_RUNTIME.spawn(eviction); - } - } -} - -#[derive(Debug, thiserror::Error)] -enum DownloadError { - #[error("timeline has already shutdown")] - TimelineShutdown, - #[error("no remote storage configured")] - NoRemoteStorage, - #[error("context denies downloading")] - ContextAndConfigReallyDeniesDownloads, - #[error("downloading is really required but not allowed by this method")] - DownloadRequired, - /// Why no error here? Because it will be reported by page_service. We should had also done - /// retries already. - #[error("downloading evicted layer file failed")] - DownloadFailed, - #[error("downloading failed, possibly for shutdown")] - DownloadCancelled, - #[error("pre-condition: stat before download failed")] - PreStatFailed(#[source] std::io::Error), - #[error("post-condition: stat after download failed")] - PostStatFailed(#[source] std::io::Error), -} - -#[derive(Debug, PartialEq)] -pub(crate) enum NeedsDownload { - NotFound, - NotFile, - WrongSize { actual: u64, expected: u64 }, -} - -impl std::fmt::Display for NeedsDownload { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - NeedsDownload::NotFound => write!(f, "file was not found"), - NeedsDownload::NotFile => write!(f, "path is not a file"), - NeedsDownload::WrongSize { actual, expected } => { - write!(f, "file size mismatch {actual} vs. {expected}") - } - } - } -} - -/// Holds both Arc requriring that both components stay resident while holding this alive and no evictions -/// or garbage collection happens. -#[derive(Clone)] -pub(crate) struct ResidentLayer { - owner: Arc, - downloaded: Arc, -} - -impl std::fmt::Display for ResidentLayer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.owner) - } -} - -impl std::fmt::Debug for ResidentLayer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.owner) - } -} - -impl ResidentLayer { - pub(crate) fn drop_eviction_guard(self) -> Arc { - self.into() - } - - /// Loads all keys stored in the layer. Returns key, lsn and value size. - pub(crate) async fn load_keys( - &self, - ctx: &RequestContext, - ) -> anyhow::Result>> { - use LayerKind::*; - - match self.downloaded.get(&self.owner).await? { - Delta(d) => { - self.owner - .access_stats - .record_access(LayerAccessKind::KeyIter, ctx); - - // this is valid because the DownloadedLayer::kind is a OnceCell, not a - // Mutex, so we cannot go and deinitialize the value with OnceCell::take - // while it's being held. - d.load_keys().await.context("Layer index is corrupted") - } - Image(_) => anyhow::bail!("cannot load_keys on a image layer"), - } - } -} - -impl AsLayerDesc for ResidentLayer { - fn layer_desc(&self) -> &PersistentLayerDesc { - self.owner.layer_desc() - } -} - -impl AsRef> for ResidentLayer { - fn as_ref(&self) -> &Arc { - &self.owner - } -} - -/// Allow slimming down if we don't want the `2*usize` with eviction candidates? -impl From for Arc { - fn from(value: ResidentLayer) -> Self { - value.owner - } -} - -impl std::ops::Deref for ResidentLayer { - type Target = LayerE; - - fn deref(&self) -> &Self::Target { - &self.owner - } -} - -#[derive(Debug, thiserror::Error)] -#[error("Layer has been removed from LayerMap already")] -pub(crate) struct RemovedFromLayerMap; - -/// Holds the actual downloaded layer, and handles evicting the file on drop. -pub(crate) struct DownloadedLayer { - owner: Weak, - kind: tokio::sync::OnceCell>, -} - -impl std::fmt::Debug for DownloadedLayer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DownloadedLayer") - // FIXME: this is not useful, always "Weak" - .field("owner", &self.owner) - .field("kind", &self.kind) - .finish() - } -} - -impl Drop for DownloadedLayer { - fn drop(&mut self) { - if let Some(owner) = self.owner.upgrade() { - owner.on_drop(); - } else { - // no need to do anything, we are shutting down - } - } -} - -impl DownloadedLayer { - async fn get(&self, owner: &LayerE) -> anyhow::Result<&LayerKind> { - // the owner is required so that we don't have to upgrade the self.owner, which will only - // be used on drop. this way, initializing a DownloadedLayer without an owner is statically - // impossible, so we can just not worry about it. - let init = || async { - // there is nothing async here, but it should be async - if owner.desc.is_delta { - let summary = Some(delta_layer::Summary::expected( - owner.desc.tenant_id, - owner.desc.timeline_id, - owner.desc.key_range.clone(), - owner.desc.lsn_range.clone(), - )); - delta_layer::DeltaLayerInner::load(&owner.path, summary).map(LayerKind::Delta) - } else { - let lsn = owner.desc.image_layer_lsn(); - let summary = Some(image_layer::Summary::expected( - owner.desc.tenant_id, - owner.desc.timeline_id, - owner.desc.key_range.clone(), - lsn, - )); - image_layer::ImageLayerInner::load(&owner.path, lsn, summary).map(LayerKind::Image) - } - // this will be a permanent failure - .context("load layer") - }; - self.kind.get_or_init(init).await.as_ref().map_err(|e| { - // errors are not clonabled, cannot but stringify - // test_broken_timeline matches this string - anyhow::anyhow!("layer loading failed: {e:#}") - }) - } - - async fn get_value_reconstruct_data( - &self, - key: Key, - lsn_range: Range, - reconstruct_data: &mut ValueReconstructState, - owner: &LayerE, - ) -> anyhow::Result { - use LayerKind::*; - - match self.get(owner).await? { - Delta(d) => { - d.get_value_reconstruct_data(key, lsn_range, reconstruct_data) - .await - } - Image(i) => i.get_value_reconstruct_data(key, reconstruct_data).await, - } - } -} - -/// Wrapper around an actual layer implementation. -#[derive(Debug)] -enum LayerKind { - Delta(delta_layer::DeltaLayerInner), - Image(image_layer::ImageLayerInner), -} - /// Get a layer descriptor from a layer. pub trait AsLayerDesc { /// Get the layer descriptor. diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index e48b25f1047d..ecfa667681be 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -34,7 +34,7 @@ use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; -use crate::tenant::storage_layer::{LayerE, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::tenant::Timeline; use crate::virtual_file::VirtualFile; use crate::{walrecord, TEMP_FILE_SUFFIX}; @@ -179,20 +179,12 @@ impl DeltaKey { } } -/// DeltaLayer is the in-memory data structure associated with an on-disk delta -/// file. -/// -/// We keep a DeltaLayer in memory for each file, in the LayerMap. If a layer -/// is in "loaded" state, we have a copy of the index in memory, in 'inner'. -/// Otherwise the struct is just a placeholder for a file that exists on disk, -/// and it needs to be loaded before using it in queries. +/// This is used only from `pagectl`. Within pageserver, all layers are +/// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`]. pub struct DeltaLayer { path: PathBuf, - pub desc: PersistentLayerDesc, - access_stats: LayerAccessStats, - inner: OnceCell>, } @@ -209,6 +201,8 @@ impl std::fmt::Debug for DeltaLayer { } } +/// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta +/// file. pub struct DeltaLayerInner { // values copied from summary index_start_blk: u32, @@ -242,16 +236,7 @@ impl AsLayerDesc for DeltaLayer { impl DeltaLayer { pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { - println!( - "----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----", - self.desc.tenant_id, - self.desc.timeline_id, - self.desc.key_range.start, - self.desc.key_range.end, - self.desc.lsn_range.start, - self.desc.lsn_range.end, - self.desc.file_size, - ); + self.desc.dump(); if !verbose { return Ok(()); @@ -259,55 +244,7 @@ impl DeltaLayer { let inner = self.load(LayerAccessKind::Dump, ctx).await?; - println!( - "index_start_blk: {}, root {}", - inner.index_start_blk, inner.index_root_blk - ); - - let file = &inner.file; - let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( - inner.index_start_blk, - inner.index_root_blk, - file, - ); - - tree_reader.dump().await?; - - let keys = DeltaLayerInner::load_keys(&inner).await?; - - // A subroutine to dump a single blob - async fn dump_blob(val: ValueRef<'_>) -> Result { - let buf = val.reader.read_blob(val.blob_ref.pos()).await?; - let val = Value::des(&buf)?; - let desc = match val { - Value::Image(img) => { - format!(" img {} bytes", img.len()) - } - Value::WalRecord(rec) => { - let wal_desc = walrecord::describe_wal_record(&rec)?; - format!( - " rec {} bytes will_init: {} {}", - buf.len(), - rec.will_init(), - wal_desc - ) - } - }; - Ok(desc) - } - - for entry in keys { - let DeltaEntry { key, lsn, val, .. } = entry; - let desc = match dump_blob(val).await { - Ok(desc) => desc, - Err(err) => { - format!("ERROR: {err}") - } - }; - println!(" key {key} at {lsn}: {desc}"); - } - - Ok(()) + inner.dump().await } fn temp_path_for( @@ -353,7 +290,7 @@ impl DeltaLayer { async fn load_inner(&self) -> Result> { let path = self.path(); - let loaded = DeltaLayerInner::load(&path, None)?; + let loaded = DeltaLayerInner::load(&path, None).await?; // not production code @@ -561,12 +498,7 @@ impl DeltaLayerWriterInner { // fsync the file file.sync_all()?; - let layer = LayerE::for_written(self.conf, timeline, desc)?; - // Rename the file to its final name - // - // Note: This overwrites any existing file. There shouldn't be any. - // FIXME: throw an error instead? - std::fs::rename(self.path, layer.local_path())?; + let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?; trace!("created delta layer {}", layer.local_path().display()); @@ -834,6 +766,57 @@ impl DeltaLayerInner { } Ok(all_keys) } + + pub(super) async fn dump(&self) -> anyhow::Result<()> { + println!( + "index_start_blk: {}, root {}", + self.index_start_blk, self.index_root_blk + ); + + let file = &self.file; + let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( + self.index_start_blk, + self.index_root_blk, + file, + ); + + tree_reader.dump().await?; + + let keys = self.load_keys().await?; + + async fn dump_blob(val: ValueRef<'_>) -> anyhow::Result { + let buf = val.reader.read_blob(val.blob_ref.pos()).await?; + let val = Value::des(&buf)?; + let desc = match val { + Value::Image(img) => { + format!(" img {} bytes", img.len()) + } + Value::WalRecord(rec) => { + let wal_desc = walrecord::describe_wal_record(&rec)?; + format!( + " rec {} bytes will_init: {} {}", + buf.len(), + rec.will_init(), + wal_desc + ) + } + }; + Ok(desc) + } + + for entry in keys { + let DeltaEntry { key, lsn, val, .. } = entry; + let desc = match dump_blob(val).await { + Ok(desc) => desc, + Err(err) => { + format!("ERROR: {err}") + } + }; + println!(" key {key} at {lsn}: {desc}"); + } + + Ok(()) + } } /// A set of data associated with a delta layer key and its value @@ -869,3 +852,9 @@ impl> Adapter { self.0.as_ref().file.read_blk(blknum).await } } + +impl AsRef for DeltaLayerInner { + fn as_ref(&self) -> &DeltaLayerInner { + self + } +} diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 7865a9da8f6e..495db1fbf628 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -59,7 +59,7 @@ use utils::{ }; use super::filename::ImageFileName; -use super::{AsLayerDesc, LayerE, PersistentLayerDesc, ResidentLayer}; +use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer}; /// /// Header stored in the beginning of the file @@ -117,22 +117,14 @@ impl Summary { } } -/// ImageLayer is the in-memory data structure associated with an on-disk image -/// file. -/// -/// We keep an ImageLayer in memory for each file, in the LayerMap. If a layer -/// is in "loaded" state, we have a copy of the index in memory, in 'inner'. -/// Otherwise the struct is just a placeholder for a file that exists on disk, -/// and it needs to be loaded before using it in queries. +/// This is used only from `pagectl`. Within pageserver, all layers are +/// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`]. pub struct ImageLayer { path: PathBuf, - pub desc: PersistentLayerDesc, // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn pub lsn: Lsn, - access_stats: LayerAccessStats, - inner: OnceCell, } @@ -149,6 +141,8 @@ impl std::fmt::Debug for ImageLayer { } } +/// ImageLayer is the in-memory data structure associated with an on-disk image +/// file. pub struct ImageLayerInner { // values copied from summary index_start_blk: u32, @@ -169,6 +163,25 @@ impl std::fmt::Debug for ImageLayerInner { } } +impl ImageLayerInner { + pub(super) async fn dump(&self) -> anyhow::Result<()> { + let file = &self.file; + let tree_reader = + DiskBtreeReader::<_, KEY_SIZE>::new(self.index_start_blk, self.index_root_blk, file); + + tree_reader.dump().await?; + + tree_reader + .visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| { + println!("key: {} offset {}", hex::encode(key), value); + true + }) + .await?; + + Ok(()) + } +} + /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers. impl std::fmt::Display for ImageLayer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -184,34 +197,15 @@ impl AsLayerDesc for ImageLayer { impl ImageLayer { pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { - println!( - "----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----", - self.desc.tenant_id, - self.desc.timeline_id, - self.desc.key_range.start, - self.desc.key_range.end, - self.lsn, - self.desc.is_incremental(), - self.desc.file_size - ); + self.desc.dump(); if !verbose { return Ok(()); } let inner = self.load(LayerAccessKind::Dump, ctx).await?; - let file = &inner.file; - let tree_reader = - DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file); - tree_reader.dump().await?; - - tree_reader - .visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| { - println!("key: {} offset {}", hex::encode(key), value); - true - }) - .await?; + inner.dump().await?; Ok(()) } @@ -251,7 +245,7 @@ impl ImageLayer { async fn load_inner(&self) -> Result { let path = self.path(); - let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None)?; + let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None).await?; // not production code let actual_filename = self.path.file_name().unwrap().to_str().unwrap().to_owned(); @@ -495,13 +489,7 @@ impl ImageLayerWriterInner { // fsync the file file.sync_all()?; - let layer = LayerE::for_written(self.conf, timeline, desc)?; - - // Rename the file to its final name - // - // Note: This overwrites any existing file. There shouldn't be any. - // FIXME: throw an error instead? - std::fs::rename(self.path, layer.local_path())?; + let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?; trace!("created image layer {}", layer.local_path().display()); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index f2a2e830a976..876c7bb59da6 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -31,7 +31,6 @@ use tokio::sync::RwLock; use super::{DeltaLayerWriter, ResidentLayer}; -/// InMemoryLayer is always incremental. pub struct InMemoryLayer { conf: &'static PageServerConf, tenant_id: TenantId, @@ -213,17 +212,13 @@ impl std::fmt::Display for InMemoryLayer { } impl InMemoryLayer { - /// /// Get layer size. - /// pub async fn size(&self) -> Result { let inner = self.inner.read().await; Ok(inner.file.len()) } - /// /// Create a new, empty, in-memory layer - /// pub fn create( conf: &'static PageServerConf, timeline_id: TimelineId, diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs new file mode 100644 index 000000000000..4cb0a024343d --- /dev/null +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -0,0 +1,1210 @@ +use anyhow::Context; +use pageserver_api::models::{ + HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus, +}; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, Weak}; +use std::time::SystemTime; +use tracing::Instrument; +use utils::lsn::Lsn; +use utils::sync::heavier_once_cell; + +use crate::config::PageServerConf; +use crate::context::RequestContext; +use crate::repository::Key; +use crate::tenant::{remote_timeline_client::LayerFileMetadata, RemoteTimelineClient, Timeline}; + +use super::delta_layer::{self, DeltaEntry}; +use super::image_layer; +use super::{ + AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerFileName, PersistentLayerDesc, + ValueReconstructResult, ValueReconstructState, +}; + +use utils::generation::Generation; + +/// A Layer contains all data in a "rectangle" consisting of a range of keys and +/// range of LSNs. +/// +/// There are two kinds of layers, in-memory and on-disk layers. In-memory +/// layers are used to ingest incoming WAL, and provide fast access to the +/// recent page versions. On-disk layers are stored as files on disk, and are +/// immutable. This type represents the on-disk kind while in-memory kind are represented by +/// [`InMemoryLayer`]. +/// +/// Furthermore, there are two kinds of on-disk layers: delta and image layers. +/// A delta layer contains all modifications within a range of LSNs and keys. +/// An image layer is a snapshot of all the data in a key-range, at a single +/// LSN. +/// +/// This type models the on-disk layers, which can be evicted and on-demand downloaded. +/// +/// [`InMemoryLayer`]: super::inmemory_layer::InMemoryLayer +#[derive(Clone)] +pub(crate) struct Layer(Arc); + +impl std::fmt::Display for Layer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.layer_desc().short_id()) + } +} + +impl std::fmt::Debug for Layer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self) + } +} + +impl AsLayerDesc for Layer { + fn layer_desc(&self) -> &PersistentLayerDesc { + self.0.layer_desc() + } +} + +impl Layer { + /// Creates a layer value for a file we know to not be resident. + pub(crate) fn for_evicted( + conf: &'static PageServerConf, + timeline: &Arc, + file_name: LayerFileName, + metadata: LayerFileMetadata, + ) -> Self { + let desc = PersistentLayerDesc::from_filename( + timeline.tenant_id, + timeline.timeline_id, + file_name, + metadata.file_size(), + ); + + let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted); + + let owner = Layer(Arc::new(LayerInner::new( + conf, + timeline, + access_stats, + desc, + None, + metadata.generation, + ))); + + debug_assert!(owner.0.needs_download_blocking().unwrap().is_some()); + + owner + } + + /// Creates a Layer value for a file we know to be resident in timeline directory. + pub(crate) fn for_resident( + conf: &'static PageServerConf, + timeline: &Arc, + file_name: LayerFileName, + metadata: LayerFileMetadata, + ) -> ResidentLayer { + let desc = PersistentLayerDesc::from_filename( + timeline.tenant_id, + timeline.timeline_id, + file_name, + metadata.file_size(), + ); + + let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident); + + let mut resident = None; + + let owner = Layer(Arc::new_cyclic(|owner| { + let inner = Arc::new(DownloadedLayer { + owner: owner.clone(), + kind: tokio::sync::OnceCell::default(), + }); + resident = Some(inner.clone()); + + LayerInner::new( + conf, + timeline, + access_stats, + desc, + Some(inner), + metadata.generation, + ) + })); + + let downloaded = resident.expect("just initialized"); + + debug_assert!(owner.0.needs_download_blocking().unwrap().is_none()); + + timeline + .metrics + .resident_physical_size_gauge + .add(metadata.file_size()); + + ResidentLayer { downloaded, owner } + } + + /// Creates a Layer value for freshly written out new layer file by renaming it from a + /// temporary path. + pub(crate) fn finish_creating( + conf: &'static PageServerConf, + timeline: &Arc, + desc: PersistentLayerDesc, + temp_path: &Path, + ) -> anyhow::Result { + let mut resident = None; + + let owner = Layer(Arc::new_cyclic(|owner| { + let inner = Arc::new(DownloadedLayer { + owner: owner.clone(), + kind: tokio::sync::OnceCell::default(), + }); + resident = Some(inner.clone()); + let access_stats = LayerAccessStats::empty_will_record_residence_event_later(); + access_stats.record_residence_event( + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + LayerInner::new( + conf, + timeline, + access_stats, + desc, + Some(inner), + timeline.generation, + ) + })); + + let downloaded = resident.expect("just initialized"); + + // if the rename works, the path is as expected + std::fs::rename(temp_path, owner.local_path()) + .context("rename temporary file as correct path for {owner}")?; + + { + let metrics = &timeline.metrics; + let file_size = owner.layer_desc().file_size; + + metrics.resident_physical_size_gauge.add(file_size); + metrics.num_persistent_files_created.inc_by(1); + metrics.persistent_bytes_written.inc_by(file_size); + } + + Ok(ResidentLayer { downloaded, owner }) + } + + /// Requests the layer to be evicted and waits for this to be done. + /// + /// If the file is not resident, an [`EvictionError::NotFound`] is returned. + /// + /// If for a bad luck or blocking of the executor, we miss the actual eviction and the layer is + /// re-downloaded, [`EvictionError::Downloaded`] is returned. + /// + /// Technically cancellation safe, but cancelling might shift the viewpoint of what generation + /// of download-evict cycle on retry. + pub(crate) async fn evict_and_wait( + &self, + rtc: &RemoteTimelineClient, + ) -> Result<(), EvictionError> { + self.0.evict_and_wait(rtc).await + } + + /// Delete the layer file when the `self` gets dropped, also try to schedule a remote index upload + /// then. + pub(crate) fn garbage_collect_on_drop(&self) { + self.0.garbage_collect_on_drop(); + } + + /// Return data needed to reconstruct given page at LSN. + /// + /// It is up to the caller to collect more data from the previous layer and + /// perform WAL redo, if necessary. + pub(crate) async fn get_value_reconstruct_data( + &self, + key: Key, + lsn_range: Range, + reconstruct_data: &mut ValueReconstructState, + ctx: &RequestContext, + ) -> anyhow::Result { + use anyhow::ensure; + + let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?; + self.0 + .access_stats + .record_access(LayerAccessKind::GetValueReconstructData, ctx); + + if self.layer_desc().is_delta { + ensure!(lsn_range.start >= self.layer_desc().lsn_range.start); + ensure!(self.layer_desc().key_range.contains(&key)); + } else { + ensure!(self.layer_desc().key_range.contains(&key)); + ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn()); + ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn()); + } + + layer + .get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0) + .await + } + + /// Download the layer if evicted. + /// + /// Will not error when the layer is already downloaded. + pub(crate) async fn download(&self) -> anyhow::Result<()> { + self.0.get_or_maybe_download(true, None).await?; + Ok(()) + } + + /// Assuming the layer is already downloaded, returns a guard which will prohibit eviction + /// while the guard exists. + /// + /// Returns None if the layer is currently evicted. + pub(crate) async fn keep_resident(&self) -> anyhow::Result> { + let downloaded = match self.0.get_or_maybe_download(false, None).await { + Ok(d) => d, + // technically there are a lot of possible errors, but in practice it should only be + // DownloadRequired which is tripped up. could work to improve this situation + // statically later. + Err(DownloadError::DownloadRequired) => return Ok(None), + Err(e) => return Err(e.into()), + }; + + Ok(Some(ResidentLayer { + downloaded, + owner: self.clone(), + })) + } + + /// Downloads if necessary and creates a guard, which will keep this layer from being evicted. + pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result { + let downloaded = self.0.get_or_maybe_download(true, None).await?; + + Ok(ResidentLayer { + downloaded, + owner: self.clone(), + }) + } + + pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { + self.0.info(reset) + } + + pub(crate) fn access_stats(&self) -> &LayerAccessStats { + &self.0.access_stats + } + + pub(crate) fn local_path(&self) -> &Path { + &self.0.path + } + + pub(crate) fn metadata(&self) -> LayerFileMetadata { + self.0.metadata() + } + + /// Traditional debug dumping facility + #[allow(unused)] + pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> { + self.0.desc.dump(); + + if verbose { + // for now, unconditionally download everything, even if that might not be wanted. + let l = self.0.get_or_maybe_download(true, Some(ctx)).await?; + l.dump(&self.0).await? + } + + Ok(()) + } +} + +/// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted. +/// +/// However when we want something evicted, we cannot evict it right away as there might be current +/// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet +/// read with [`Layer::get_value_reconstruct_data`]. +/// +/// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search +enum ResidentOrWantedEvicted { + Resident(Arc), + WantedEvicted(Weak), +} + +impl ResidentOrWantedEvicted { + fn get(&self) -> Option> { + match self { + ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()), + ResidentOrWantedEvicted::WantedEvicted(weak) => weak.upgrade(), + } + } + /// When eviction is first requested, drop down to holding a [`Weak`]. + /// + /// Returns `true` if this was the first time eviction was requested. + fn downgrade(&mut self) -> &Weak { + let _was_first = match self { + ResidentOrWantedEvicted::Resident(strong) => { + let weak = Arc::downgrade(strong); + *self = ResidentOrWantedEvicted::WantedEvicted(weak); + // returning the weak is not useful, because the drop could had already ran with + // the replacement above, and that will take care of cleaning the Option we are in + true + } + ResidentOrWantedEvicted::WantedEvicted(_) => false, + }; + + match self { + ResidentOrWantedEvicted::WantedEvicted(ref weak) => weak, + _ => unreachable!("just wrote wanted evicted"), + } + } +} + +struct LayerInner { + /// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of + /// [`Self::path`]. + conf: &'static PageServerConf, + + /// Full path to the file; unclear if this should exist anymore. + path: PathBuf, + + desc: PersistentLayerDesc, + + /// Timeline access is needed for remote timeline client and metrics. + timeline: Weak, + + /// Cached knowledge of [`Timeline::remote_client`] being `Some`. + have_remote_client: bool, + + access_stats: LayerAccessStats, + + /// This custom OnceCell is backed by std mutex, but only held for short time periods. + /// Initialization and deinitialization are done while holding a permit. + inner: heavier_once_cell::OnceCell, + + /// Do we want to garbage collect this when `LayerInner` is dropped + wanted_garbage_collected: AtomicBool, + + /// Do we want to evict this layer as soon as possible? After being set to `true`, all accesses + /// will try to downgrade [`ResidentOrWantedEvicted`], which will eventually trigger + /// [`LayerInner::on_downloaded_layer_drop`]. + wanted_evicted: AtomicBool, + + /// Version is to make sure we will in fact only evict a file if no new download has been + /// started. + version: AtomicUsize, + + /// Allow subscribing to when the layer actually gets evicted. + status: tokio::sync::broadcast::Sender, + + /// Counter for exponential backoff with the download + consecutive_failures: AtomicUsize, + + /// The generation of this Layer. + /// + /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`], + /// for created layers from [`Timeline::generation`]. + generation: Generation, +} + +impl std::fmt::Display for LayerInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.layer_desc().short_id()) + } +} + +impl AsLayerDesc for LayerInner { + fn layer_desc(&self) -> &PersistentLayerDesc { + &self.desc + } +} + +#[derive(Debug, Clone, Copy)] +enum Status { + Evicted, + Downloaded, +} + +impl Drop for LayerInner { + fn drop(&mut self) { + if !*self.wanted_garbage_collected.get_mut() { + // should we try to evict if the last wish was for eviction? + // feels like there's some hazard of overcrowding near shutdown near by, but we don't + // run drops during shutdown (yet) + return; + } + + let span = tracing::info_span!(parent: None, "layer_gc", tenant_id = %self.layer_desc().tenant_id, timeline_id = %self.layer_desc().timeline_id, layer = %self); + + let path = std::mem::take(&mut self.path); + let file_name = self.layer_desc().filename(); + let file_size = self.layer_desc().file_size; + let timeline = self.timeline.clone(); + + crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || { + let _g = span.entered(); + + let mut removed = false; + match std::fs::remove_file(path) { + Ok(()) => { + removed = true; + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // until we no longer do detaches by removing all local files before removing the + // tenant from the global map, we will always get these errors even if we knew what + // is the latest state. + // + // we currently do not track the latest state, so we'll also end up here on evicted + // layers. + } + Err(e) => { + tracing::error!("failed to remove garbage collected layer: {e}"); + } + } + + if let Some(timeline) = timeline.upgrade() { + if removed { + timeline.metrics.resident_physical_size_gauge.sub(file_size); + } + if let Some(remote_client) = timeline.remote_client.as_ref() { + let res = remote_client.schedule_layer_file_deletion(&[file_name]); + + if let Err(e) = res { + // test_timeline_deletion_with_files_stuck_in_upload_queue is good at + // demonstrating this deadlock (without spawn_blocking): stop will drop + // queued items, which will have ResidentLayer's, and those drops would try + // to re-entrantly lock the RemoteTimelineClient inner state. + if !timeline.is_active() { + tracing::info!("scheduling deletion on drop failed: {e:#}"); + } else { + tracing::warn!("scheduling deletion on drop failed: {e:#}"); + } + } + } + } else { + // no need to nag that timeline is gone: under normal situation on + // task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped. + } + }); + } +} + +impl LayerInner { + fn new( + conf: &'static PageServerConf, + timeline: &Arc, + access_stats: LayerAccessStats, + desc: PersistentLayerDesc, + downloaded: Option>, + generation: Generation, + ) -> Self { + let path = conf + .timeline_path(&timeline.tenant_id, &timeline.timeline_id) + .join(desc.filename().to_string()); + + LayerInner { + conf, + path, + desc, + timeline: Arc::downgrade(timeline), + have_remote_client: timeline.remote_client.is_some(), + access_stats, + wanted_garbage_collected: AtomicBool::new(false), + wanted_evicted: AtomicBool::new(false), + inner: if let Some(inner) = downloaded { + heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)) + } else { + heavier_once_cell::OnceCell::default() + }, + version: AtomicUsize::new(0), + status: tokio::sync::broadcast::channel(1).0, + consecutive_failures: AtomicUsize::new(0), + generation, + } + } + + fn garbage_collect_on_drop(&self) { + self.wanted_garbage_collected.store(true, Ordering::Release); + } + + pub(crate) async fn evict_and_wait( + &self, + _: &RemoteTimelineClient, + ) -> Result<(), EvictionError> { + use tokio::sync::broadcast::error::RecvError; + + assert!(self.have_remote_client); + + let mut rx = self.status.subscribe(); + + self.wanted_evicted.store(true, Ordering::Release); + + if self.get().is_none() { + // it was not evictable in the first place + // our store to the wanted_evicted does not matter; it will be reset by next download + return Err(EvictionError::NotFound); + } + + match rx.recv().await { + Ok(Status::Evicted) => Ok(()), + Ok(Status::Downloaded) => Err(EvictionError::Downloaded), + Err(RecvError::Closed) => { + unreachable!("sender cannot be dropped while we are in &self method") + } + Err(RecvError::Lagged(_)) => { + // this is quite unlikely, but we are blocking a lot in the async context, so + // we might be missing this because we are stuck on a LIFO slot on a thread + // which is busy blocking for a 1TB database create_image_layers. + // + // use however late (compared to the initial expressing of wanted) as the + // "outcome" now + match self.get() { + Some(_) => Err(EvictionError::Downloaded), + None => Ok(()), + } + } + } + } + + /// Should be cancellation safe, but cancellation is troublesome together with the spawned + /// download. + async fn get_or_maybe_download( + self: &Arc, + allow_download: bool, + ctx: Option<&RequestContext>, + ) -> Result, DownloadError> { + let download = move || async move { + // disable any scheduled but not yet running eviction deletions for this + self.version.fetch_add(1, Ordering::Relaxed); + + // no need to make the evict_and_wait wait for the actual download to complete + drop(self.status.send(Status::Downloaded)); + + let timeline = self + .timeline + .upgrade() + .ok_or_else(|| DownloadError::TimelineShutdown)?; + + let can_ever_evict = timeline.remote_client.as_ref().is_some(); + + // check if we really need to be downloaded; could have been already downloaded by a + // cancelled previous attempt. + let needs_download = self + .needs_download() + .await + .map_err(DownloadError::PreStatFailed)?; + + if let Some(reason) = needs_download { + // only reset this after we've decided we really need to download. otherwise it'd + // be impossible to mark cancelled downloads for eviction, like one could imagine + // we would like to do for prefetching which was not needed. + self.wanted_evicted.store(false, Ordering::Release); + + if !can_ever_evict { + return Err(DownloadError::NoRemoteStorage); + } + + tracing::debug!(%reason, "downloading layer"); + + if let Some(ctx) = ctx { + self.check_expected_download(ctx)?; + } + + if !allow_download { + // this does look weird, but for LayerInner the "downloading" means also changing + // internal once related state ... + return Err(DownloadError::DownloadRequired); + } + + self.spawn_download_and_wait(timeline).await?; + } else { + // the file is present locally, probably by a previous but cancelled call to + // get_or_maybe_download. alternatively we might be running without remote storage. + } + + let res = Arc::new(DownloadedLayer { + owner: Arc::downgrade(self), + kind: tokio::sync::OnceCell::default(), + }); + + self.access_stats.record_residence_event( + LayerResidenceStatus::Resident, + LayerResidenceEventReason::ResidenceChange, + ); + + Ok(if self.wanted_evicted.load(Ordering::Acquire) { + // because we reset wanted_evictness earlier, this most likely means while we were + // downloading someone wanted to evict this layer. + ResidentOrWantedEvicted::WantedEvicted(Arc::downgrade(&res)) + } else { + ResidentOrWantedEvicted::Resident(res.clone()) + }) + }; + + let locked = self.inner.get_or_init(download).await?; + + Ok( + Self::get_or_apply_evictedness(Some(locked), &self.wanted_evicted) + .expect("It is not none, we just received it"), + ) + } + + /// Nag or fail per RequestContext policy + fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> { + use crate::context::DownloadBehavior::*; + let b = ctx.download_behavior(); + match b { + Download => Ok(()), + Warn | Error => { + tracing::warn!( + "unexpectedly on-demand downloading remote layer {self} for task kind {:?}", + ctx.task_kind() + ); + crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc(); + + let really_error = + matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn; + + if really_error { + // this check is only probablistic, seems like flakyness footgun + Err(DownloadError::ContextAndConfigReallyDeniesDownloads) + } else { + Ok(()) + } + } + } + } + + /// Actual download, at most one is executed at the time. + async fn spawn_download_and_wait( + self: &Arc, + timeline: Arc, + ) -> Result<(), DownloadError> { + let task_name = format!("download layer {}", self); + + let (tx, rx) = tokio::sync::oneshot::channel(); + // this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot + // block tenant::mgr::remove_tenant_from_memory. + + let this: Arc = self.clone(); + crate::task_mgr::spawn( + &tokio::runtime::Handle::current(), + crate::task_mgr::TaskKind::RemoteDownloadTask, + Some(self.desc.tenant_id), + Some(self.desc.timeline_id), + &task_name, + false, + async move { + let client = timeline + .remote_client + .as_ref() + .expect("checked above with have_remote_client"); + + let result = client.download_layer_file( + &this.desc.filename(), + &this.metadata(), + ) + .await; + + let result = match result { + Ok(size) => { + timeline.metrics.resident_physical_size_gauge.add(size); + Ok(()) + } + Err(e) => { + Err(e) + } + }; + + if let Err(res) = tx.send(result) { + match res { + Ok(()) => { + // our caller is cancellation safe so this is fine; if someone + // else requests the layer, they'll find it already downloaded + // or redownload. + // + // however, could be that we should consider marking the layer + // for eviction? alas, cannot: because only DownloadedLayer + // will handle that. + }, + Err(e) => { + // our caller is cancellation safe, but we might be racing with + // another attempt to initialize. before we have cancellation + // token support: these attempts should converge regardless of + // their completion order. + tracing::error!("layer file download failed, and additionally failed to communicate this to caller: {e:?}"); + } + } + } + + Ok(()) + } + .in_current_span(), + ); + match rx.await { + Ok(Ok(())) => { + if let Some(reason) = self + .needs_download() + .await + .map_err(DownloadError::PostStatFailed)? + { + // this is really a bug in needs_download or remote timeline client + panic!("post-condition failed: needs_download returned {reason:?}"); + } + + self.consecutive_failures.store(0, Ordering::Relaxed); + + Ok(()) + } + Ok(Err(e)) => { + let consecutive_failures = + self.consecutive_failures.fetch_add(1, Ordering::Relaxed); + tracing::error!(consecutive_failures, "layer file download failed: {e:#}"); + let backoff = utils::backoff::exponential_backoff_duration_seconds( + consecutive_failures.min(u32::MAX as usize) as u32, + 1.5, + 60.0, + ); + let backoff = std::time::Duration::from_secs_f64(backoff); + + tokio::time::sleep(backoff).await; + Err(DownloadError::DownloadFailed) + } + Err(_gone) => Err(DownloadError::DownloadCancelled), + } + } + + /// Access the current state without waiting for the file to be downloaded. + /// + /// Requires that we've initialized to state which is respective to the + /// actual residency state. + fn get(&self) -> Option> { + let locked = self.inner.get(); + Self::get_or_apply_evictedness(locked, &self.wanted_evicted) + } + + fn get_or_apply_evictedness( + guard: Option>, + wanted_evicted: &AtomicBool, + ) -> Option> { + if let Some(mut x) = guard { + if let Some(won) = x.get() { + // there are no guarantees that we will always get to observe a concurrent call + // to evict + if wanted_evicted.load(Ordering::Acquire) { + x.downgrade(); + } + return Some(won); + } + } + + None + } + + async fn needs_download(&self) -> Result, std::io::Error> { + match tokio::fs::metadata(&self.path).await { + Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)), + Err(e) => Err(e), + } + } + + fn needs_download_blocking(&self) -> Result, std::io::Error> { + match self.path.metadata() { + Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)), + Err(e) => Err(e), + } + } + + fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Result<(), NeedsDownload> { + // in future, this should include sha2-256 validation of the file. + if !m.is_file() { + Err(NeedsDownload::NotFile) + } else if m.len() != self.desc.file_size { + Err(NeedsDownload::WrongSize { + actual: m.len(), + expected: self.desc.file_size, + }) + } else { + Ok(()) + } + } + + fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { + let layer_file_name = self.desc.filename().file_name(); + + let remote = self.get().is_none(); + + let access_stats = self.access_stats.as_api_model(reset); + + if self.desc.is_delta { + let lsn_range = &self.desc.lsn_range; + + HistoricLayerInfo::Delta { + layer_file_name, + layer_file_size: self.desc.file_size, + lsn_start: lsn_range.start, + lsn_end: lsn_range.end, + remote, + access_stats, + } + } else { + let lsn = self.desc.image_layer_lsn(); + + HistoricLayerInfo::Image { + layer_file_name, + layer_file_size: self.desc.file_size, + lsn_start: lsn, + remote, + access_stats, + } + } + } + + /// `DownloadedLayer` is being dropped, so it calls this method. + fn on_downloaded_layer_drop(self: Arc) { + let gc = self.wanted_garbage_collected.load(Ordering::Acquire); + let evict = self.wanted_evicted.load(Ordering::Acquire); + let can_evict = self.have_remote_client; + + if gc { + // do nothing now, only in LayerInner::drop + } else if can_evict && evict { + let version = self.version.load(Ordering::Relaxed); + + let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_id, timeline_id = %self.desc.timeline_id, layer=%self); + + // downgrade for queueing, in case there's a tear down already ongoing we should not + // hold it alive. + let this = Arc::downgrade(&self); + drop(self); + + crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || { + let _g = span.entered(); + + // if LayerInner is already dropped here, do nothing because the garbage collection + // has already ran while we were in queue + let Some(this) = this.upgrade() else { return; }; + this.evict_blocking(version); + }); + } + } + + fn evict_blocking(&self, version: usize) { + // deleted or detached timeline, don't do anything. + let Some(timeline) = self.timeline.upgrade() else { return; }; + + // to avoid starting a new download while we evict, keep holding on to the + // permit. + let _permit = { + let maybe_downloaded = self.inner.get(); + + if version != self.version.load(Ordering::Relaxed) { + // downloadness-state has advanced, we might no longer be the latest eviction + // work; don't do anything. + return; + } + + // free the DownloadedLayer allocation + match maybe_downloaded.map(|mut g| g.take_and_deinit()) { + Some((taken, permit)) => { + assert!(matches!(taken, ResidentOrWantedEvicted::WantedEvicted(_))); + permit + } + None => { + unreachable!("we do the version checking for this exact reason") + } + } + }; + + self.access_stats.record_residence_event( + LayerResidenceStatus::Evicted, + LayerResidenceEventReason::ResidenceChange, + ); + + match capture_mtime_and_remove(&self.path) { + Ok(local_layer_mtime) => { + let duration = SystemTime::now().duration_since(local_layer_mtime); + match duration { + Ok(elapsed) => { + timeline + .metrics + .evictions_with_low_residence_duration + .read() + .unwrap() + .observe(elapsed); + tracing::info!( + residence_millis = elapsed.as_millis(), + "evicted layer after known residence period" + ); + } + Err(_) => { + tracing::info!("evicted layer after unknown residence period"); + } + } + timeline + .metrics + .resident_physical_size_gauge + .sub(self.desc.file_size); + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + tracing::info!("failed to evict file from disk, it was already gone"); + } + Err(e) => { + tracing::warn!("failed to evict file from disk: {e:#}"); + } + } + + // we are still holding the permit, so no new spawn_download_and_wait can happen + drop(self.status.send(Status::Evicted)); + } + + fn metadata(&self) -> LayerFileMetadata { + LayerFileMetadata::new(self.desc.file_size, self.generation) + } +} + +fn capture_mtime_and_remove(path: &Path) -> Result { + let m = path.metadata()?; + let local_layer_mtime = m.modified()?; + std::fs::remove_file(path)?; + Ok(local_layer_mtime) +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum EvictionError { + #[error("layer was already evicted")] + NotFound, + + /// Evictions must always lose to downloads in races, and this time it happened. + #[error("layer was downloaded instead")] + Downloaded, +} + +/// Error internal to the [`LayerInner::get_or_maybe_download`] +#[derive(Debug, thiserror::Error)] +enum DownloadError { + #[error("timeline has already shutdown")] + TimelineShutdown, + #[error("no remote storage configured")] + NoRemoteStorage, + #[error("context denies downloading")] + ContextAndConfigReallyDeniesDownloads, + #[error("downloading is really required but not allowed by this method")] + DownloadRequired, + /// Why no error here? Because it will be reported by page_service. We should had also done + /// retries already. + #[error("downloading evicted layer file failed")] + DownloadFailed, + #[error("downloading failed, possibly for shutdown")] + DownloadCancelled, + #[error("pre-condition: stat before download failed")] + PreStatFailed(#[source] std::io::Error), + #[error("post-condition: stat after download failed")] + PostStatFailed(#[source] std::io::Error), +} + +#[derive(Debug, PartialEq)] +pub(crate) enum NeedsDownload { + NotFound, + NotFile, + WrongSize { actual: u64, expected: u64 }, +} + +impl std::fmt::Display for NeedsDownload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + NeedsDownload::NotFound => write!(f, "file was not found"), + NeedsDownload::NotFile => write!(f, "path is not a file"), + NeedsDownload::WrongSize { actual, expected } => { + write!(f, "file size mismatch {actual} vs. {expected}") + } + } + } +} + +/// Existence of `DownloadedLayer` means that we have the file locally, and can later evict it. +pub(crate) struct DownloadedLayer { + owner: Weak, + kind: tokio::sync::OnceCell>, +} + +impl std::fmt::Debug for DownloadedLayer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DownloadedLayer") + // FIXME: this is not useful, always "Weak" + .field("owner", &self.owner) + .field("kind", &self.kind) + .finish() + } +} + +impl Drop for DownloadedLayer { + fn drop(&mut self) { + if let Some(owner) = self.owner.upgrade() { + owner.on_downloaded_layer_drop(); + } else { + // no need to do anything, we are shutting down + } + } +} + +impl DownloadedLayer { + /// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`], or fails to + /// initialize it permanently. + /// + /// `owner` parameter is a strong reference at the same `LayerInner` as the + /// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called, + /// we will always have the LayerInner on the callstack, so we can just use it. + async fn get(&self, owner: &Arc) -> anyhow::Result<&LayerKind> { + let init = || async { + assert_eq!( + Weak::as_ptr(&self.owner), + Arc::as_ptr(owner), + "these are the same, just avoiding the upgrade" + ); + + // there is nothing async here, but it should be async + if owner.desc.is_delta { + let summary = Some(delta_layer::Summary::expected( + owner.desc.tenant_id, + owner.desc.timeline_id, + owner.desc.key_range.clone(), + owner.desc.lsn_range.clone(), + )); + delta_layer::DeltaLayerInner::load(&owner.path, summary) + .await + .map(LayerKind::Delta) + } else { + let lsn = owner.desc.image_layer_lsn(); + let summary = Some(image_layer::Summary::expected( + owner.desc.tenant_id, + owner.desc.timeline_id, + owner.desc.key_range.clone(), + lsn, + )); + image_layer::ImageLayerInner::load(&owner.path, lsn, summary) + .await + .map(LayerKind::Image) + } + // this will be a permanent failure + .context("load layer") + }; + self.kind.get_or_init(init).await.as_ref().map_err(|e| { + // errors are not clonabled, cannot but stringify + // test_broken_timeline matches this string + anyhow::anyhow!("layer loading failed: {e:#}") + }) + } + + async fn get_value_reconstruct_data( + &self, + key: Key, + lsn_range: Range, + reconstruct_data: &mut ValueReconstructState, + owner: &Arc, + ) -> anyhow::Result { + use LayerKind::*; + + match self.get(owner).await? { + Delta(d) => { + d.get_value_reconstruct_data(key, lsn_range, reconstruct_data) + .await + } + Image(i) => i.get_value_reconstruct_data(key, reconstruct_data).await, + } + } + + async fn dump(&self, owner: &Arc) -> anyhow::Result<()> { + use LayerKind::*; + match self.get(owner).await? { + Delta(d) => d.dump().await?, + Image(i) => i.dump().await?, + } + + Ok(()) + } +} + +/// Wrapper around an actual layer implementation. +#[derive(Debug)] +enum LayerKind { + Delta(delta_layer::DeltaLayerInner), + Image(image_layer::ImageLayerInner), +} + +/// Guard for forcing a layer be resident while it exists. +#[derive(Clone)] +pub(crate) struct ResidentLayer { + owner: Layer, + downloaded: Arc, +} + +impl std::fmt::Display for ResidentLayer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.owner) + } +} + +impl std::fmt::Debug for ResidentLayer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.owner) + } +} + +impl ResidentLayer { + /// Release the eviction guard, converting back into a plain [`Layer`]. + pub(crate) fn drop_eviction_guard(self) -> Layer { + self.into() + } + + /// Loads all keys stored in the layer. Returns key, lsn and value size. + pub(crate) async fn load_keys( + &self, + ctx: &RequestContext, + ) -> anyhow::Result>> { + use LayerKind::*; + + let owner = &self.owner.0; + + match self.downloaded.get(owner).await? { + Delta(d) => { + owner + .access_stats + .record_access(LayerAccessKind::KeyIter, ctx); + + // this is valid because the DownloadedLayer::kind is a OnceCell, not a + // Mutex, so we cannot go and deinitialize the value with OnceCell::take + // while it's being held. + d.load_keys().await.context("Layer index is corrupted") + } + Image(_) => anyhow::bail!("cannot load_keys on a image layer"), + } + } + + pub(crate) fn local_path(&self) -> &Path { + &self.owner.0.path + } + + pub(crate) fn access_stats(&self) -> &LayerAccessStats { + self.owner.access_stats() + } + + pub(crate) fn metadata(&self) -> LayerFileMetadata { + self.owner.metadata() + } +} + +impl AsLayerDesc for ResidentLayer { + fn layer_desc(&self) -> &PersistentLayerDesc { + self.owner.layer_desc() + } +} + +impl AsRef for ResidentLayer { + fn as_ref(&self) -> &Layer { + &self.owner + } +} + +/// Allow slimming down if we don't want the `2*usize` with eviction candidates? +impl From for Layer { + fn from(value: ResidentLayer) -> Self { + value.owner + } +} diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index ce55b3e018b7..2e0b0b3e645c 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use core::fmt::Display; use std::ops::Range; use utils::{ @@ -6,7 +5,7 @@ use utils::{ lsn::Lsn, }; -use crate::{context::RequestContext, repository::Key}; +use crate::repository::Key; use super::{DeltaFileName, ImageFileName, LayerFileName}; @@ -189,21 +188,31 @@ impl PersistentLayerDesc { self.is_delta } - pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { - println!( - "----- layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----", - self.tenant_id, - self.timeline_id, - self.key_range.start, - self.key_range.end, - self.lsn_range.start, - self.lsn_range.end, - self.is_delta, - self.is_incremental(), - self.file_size, - ); - - Ok(()) + pub fn dump(&self) { + if self.is_delta { + println!( + "----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} is_incremental {} size {} ----", + self.tenant_id, + self.timeline_id, + self.key_range.start, + self.key_range.end, + self.lsn_range.start, + self.lsn_range.end, + self.is_incremental(), + self.file_size, + ); + } else { + println!( + "----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----", + self.tenant_id, + self.timeline_id, + self.key_range.start, + self.key_range.end, + self.image_layer_lsn(), + self.is_incremental(), + self.file_size + ); + } } pub fn file_size(&self) -> u64 { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 47280b310914..b4c66df8c8d8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -12,8 +12,7 @@ use bytes::Bytes; use fail::fail_point; use itertools::Itertools; use pageserver_api::models::{ - DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo, - LayerResidenceEventReason, LayerResidenceStatus, TimelineState, + DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo, TimelineState, }; use serde_with::serde_as; use storage_broker::BrokerClientChannel; @@ -34,11 +33,11 @@ use std::time::{Duration, Instant, SystemTime}; use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, }; -use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::delta_layer::DeltaEntry; use crate::tenant::storage_layer::{ - AsLayerDesc, DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStatsReset, LayerE, - LayerFileName, ResidentLayer, ValueReconstructResult, ValueReconstructState, + AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer, + LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult, + ValueReconstructState, }; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ @@ -149,7 +148,7 @@ pub struct Timeline { /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects. /// Never changes for the lifetime of this [`Timeline`] object. - generation: Generation, + pub(crate) generation: Generation, pub pg_version: u32, @@ -736,10 +735,7 @@ impl Timeline { .map_err(anyhow::Error::from)?; if let Some(remote_client) = &self.remote_client { for layer in layers { - let m = - LayerFileMetadata::new(layer.layer_desc().file_size, self.generation); - - remote_client.schedule_layer_file_upload(layer, &m)?; + remote_client.schedule_layer_file_upload(layer)?; } } @@ -1015,7 +1011,7 @@ impl Timeline { return Ok(Some(false)); } - layer.guard_against_eviction(true).await?; + layer.download().await?; Ok(Some(true)) } @@ -1027,9 +1023,9 @@ impl Timeline { return Ok(None); }; - let Ok(local_layer) = local_layer.guard_against_eviction(false).await else { return Ok(Some(false)); }; + let Some(local_layer) = local_layer.keep_resident().await? else { return Ok(Some(false)); }; - let local_layer: Arc = local_layer.into(); + let local_layer: Layer = local_layer.into(); let remote_client = self .remote_client @@ -1052,7 +1048,7 @@ impl Timeline { /// Evict a batch of layers. pub(crate) async fn evict_layers( &self, - layers_to_evict: &[Arc], + layers_to_evict: &[Layer], cancel: &CancellationToken, ) -> anyhow::Result>>> { let remote_client = self @@ -1081,7 +1077,7 @@ impl Timeline { async fn evict_layer_batch( &self, remote_client: &Arc, - layers_to_evict: &[Arc], + layers_to_evict: &[Layer], cancel: &CancellationToken, ) -> anyhow::Result>>> { // ensure that the layers have finished uploading @@ -1139,16 +1135,6 @@ impl Timeline { } } -#[derive(Debug, thiserror::Error)] -pub(crate) enum EvictionError { - #[error("layer was already evicted")] - NotFound, - - /// Evictions must always lose to downloads in races, and this time it happened. - #[error("layer was downloaded instead")] - Downloaded, -} - /// Number of times we will compute partition within a checkpoint distance. const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10; @@ -1569,17 +1555,17 @@ impl Timeline { let layer = match decision { NeedsUpload(m) => { total_physical_size += m.file_size(); - let resident = LayerE::for_resident(conf, &this, name, m.clone()); + let resident = Layer::for_resident(conf, &this, name, m.clone()); let layer = resident.as_ref().clone(); - needs_upload.push((resident, m)); + needs_upload.push(resident); layer } UseLocal(m) => { total_physical_size += m.file_size(); - LayerE::for_resident(conf, &this, name, m).drop_eviction_guard() + Layer::for_resident(conf, &this, name, m).drop_eviction_guard() } Evicted(remote) | UseRemote { remote, .. } => { - LayerE::for_evicted(conf, &this, name, remote) + Layer::for_evicted(conf, &this, name, remote) } }; @@ -1602,8 +1588,8 @@ impl Timeline { if let Some(rtc) = self.remote_client.as_ref() { let (needs_upload, needs_cleanup) = to_sync; - for (layer, m) in needs_upload { - rtc.schedule_layer_file_upload(layer, &m)?; + for layer in needs_upload { + rtc.schedule_layer_file_upload(layer)?; } rtc.schedule_layer_file_deletion(&needs_cleanup)?; rtc.schedule_index_upload_for_file_changes()?; @@ -1615,9 +1601,6 @@ impl Timeline { "loaded layer map with {} layers at {}, total physical size: {}", num_layers, disk_consistent_lsn, total_physical_size ); - self.metrics - .resident_physical_size_gauge - .set(total_physical_size); timer.stop_and_record(); Ok(()) @@ -1941,7 +1924,7 @@ impl Timeline { } } - async fn find_layer(&self, layer_file_name: &str) -> Option> { + async fn find_layer(&self, layer_file_name: &str) -> Option { let guard = self.layers.read().await; for historic_layer in guard.layer_map().iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); @@ -1960,7 +1943,7 @@ trait TraversalLayerExt { fn traversal_id(&self) -> TraversalId; } -impl TraversalLayerExt for Arc { +impl TraversalLayerExt for Layer { fn traversal_id(&self) -> TraversalId { self.local_path().display().to_string() } @@ -2207,7 +2190,7 @@ impl Timeline { result, cont_lsn, Box::new({ - let layer = Arc::clone(&layer); + let layer = layer.to_owned(); move || layer.traversal_id() }), )); @@ -2452,7 +2435,7 @@ impl Timeline { // the mapping in `create_delta_layer`. { let mut guard = self.layers.write().await; - guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics); + guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer); } // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, @@ -2531,8 +2514,7 @@ impl Timeline { if let Some(remote_client) = &self.remote_client { for layer in layer_paths_to_upload { - let m = LayerFileMetadata::new(layer.layer_desc().file_size, self.generation); - remote_client.schedule_layer_file_upload(layer, &m)?; + remote_client.schedule_layer_file_upload(layer)?; } remote_client.schedule_index_upload_for_metadata_update(&metadata)?; } @@ -2810,16 +2792,6 @@ impl Timeline { let mut guard = self.layers.write().await; - for l in &image_layers { - // FIXME: these should be in guard - self.metrics - .resident_physical_size_gauge - .add(l.layer_desc().file_size); - l.access_stats().record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - } guard.track_new_image_layers(&image_layers); drop_wlock(guard); timer.stop_and_record(); @@ -2831,7 +2803,7 @@ impl Timeline { #[derive(Default)] struct CompactLevel0Phase1Result { new_layers: Vec, - deltas_to_compact: Vec>, + deltas_to_compact: Vec, } /// Top-level failure to compact. @@ -3016,7 +2988,6 @@ impl Timeline { // // This failpoint is a superset of both of the cases. if cfg!(feature = "testing") { - // FIXME: utils does not depend on `fail` so there's no non-macro answer to this let active = (|| { ::fail::fail_point!("compact-level0-phase1-return-same", |_| true); false @@ -3028,7 +2999,7 @@ impl Timeline { // we are just faking these layers as being produced again for this failpoint new_layers.push( delta - .guard_against_eviction(true) + .download_and_keep_resident() .await .context("download layer for failpoint")?, ); @@ -3061,14 +3032,17 @@ impl Timeline { let first_level0_delta = level0_deltas_iter.next().unwrap(); let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end; let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len()); - deltas_to_compact.push(first_level0_delta.guard_against_eviction(true).await?); + + // FIXME: downloading while holding layer_removal_cs is not great, but we will remove that + // soon + deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?); for l in level0_deltas_iter { let lsn_range = &l.layer_desc().lsn_range; if lsn_range.start != prev_lsn_end { break; } - deltas_to_compact.push(l.guard_against_eviction(true).await?); + deltas_to_compact.push(l.download_and_keep_resident().await?); prev_lsn_end = lsn_range.end; } let lsn_range = Range { @@ -3463,9 +3437,8 @@ impl Timeline { // on-demand download overwriting the overwritten layer before it's uploaded. for l in new_layers { if let Some(remote_client) = &self.remote_client { - let m = LayerFileMetadata::new(l.layer_desc().file_size, self.generation); // upload even if duplicated, because we may have changed the contents - remote_client.schedule_layer_file_upload(l.clone(), &m)?; + remote_client.schedule_layer_file_upload(l.clone())?; } if guard.contains(l.as_ref()) { duplicated_layers.insert(l.layer_desc().key()); @@ -3485,12 +3458,7 @@ impl Timeline { // deletion will happen later, the layer file manager sets wanted_garbage_collected - guard.finish_compact_l0( - &layer_removal_cs, - remove_layers, - &insert_layers, - &self.metrics, - )?; + guard.finish_compact_l0(&layer_removal_cs, remove_layers, &insert_layers)?; drop_wlock(guard); @@ -3812,7 +3780,7 @@ impl Timeline { let gc_layers = layers_to_remove .iter() .map(|x| guard.get_from_desc(x)) - .collect::>>(); + .collect::>(); result.layers_removed = gc_layers.len() as u64; @@ -4035,7 +4003,7 @@ impl Timeline { js.spawn( async move { - let res = next.get_or_download().await; + let res = next.download().await; (next, res) } .instrument(span), @@ -4089,7 +4057,7 @@ pub(crate) struct DiskUsageEvictionInfo { } pub(crate) struct LocalLayerInfoForDiskUsageEviction { - pub layer: Arc, + pub layer: Layer, pub last_activity_ts: SystemTime, } @@ -4133,7 +4101,16 @@ impl Timeline { let l = guard.get_from_desc(&l); - let Ok(l) = l.guard_against_eviction(false).await else { continue; }; + let l = match l.keep_resident().await { + Ok(Some(l)) => l, + Ok(None) => continue, + Err(e) => { + // these should not happen, but we cannot make them statically impossible right + // now. + tracing::warn!(layer=%l, "failed to keep the layer resident: {e:#}"); + continue; + } + }; let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| { // We only use this fallback if there's an implementation error. @@ -4143,7 +4120,6 @@ impl Timeline { }); resident_layers.push(LocalLayerInfoForDiskUsageEviction { - // we explicitly don't want to keep this layer downloaded layer: l.drop_eviction_guard(), last_activity_ts, }); @@ -4264,12 +4240,10 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> { #[cfg(test)] mod tests { - use std::sync::Arc; - use utils::{id::TimelineId, lsn::Lsn}; use crate::tenant::{ - harness::TenantHarness, storage_layer::LayerE, timeline::EvictionError, Timeline, + harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline, }; #[tokio::test] @@ -4290,8 +4264,12 @@ mod tests { .expect("just configured this"); let layer = find_some_layer(&timeline).await; - let layer = layer.guard_against_eviction(false).await.unwrap(); - let layer = layer.drop_eviction_guard(); + let layer = layer + .keep_resident() + .await + .expect("no download => no downloading errors") + .expect("should had been resident") + .drop_eviction_guard(); let cancel = tokio_util::sync::CancellationToken::new(); let batch = [layer]; @@ -4317,10 +4295,8 @@ mod tests { let (first, second) = (only_one(first), only_one(second)); - batch[0] - .needs_download_blocking() - .unwrap() - .expect("should now have a reason to download"); + let res = batch[0].keep_resident().await; + assert!(matches!(res, Ok(None)), "{res:?}"); match (first, second) { (Ok(()), Ok(())) => { @@ -4348,7 +4324,7 @@ mod tests { .expect("no cancellation") } - async fn find_some_layer(timeline: &Timeline) -> Arc { + async fn find_some_layer(timeline: &Timeline) -> Layer { let layers = timeline.layers.read().await; let desc = layers .layer_map() diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 377aad1b2eb3..6a20431571e9 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -200,9 +200,19 @@ impl Timeline { for hist_layer in layers.iter_historic_layers() { let hist_layer = guard.get_from_desc(&hist_layer); - // funny: this is the best way to get local layers is to lock them into - // memory for the duration of eviction - let Ok(guard) = hist_layer.guard_against_eviction(false).await else { continue; }; + // guard against eviction while we inspect it; it might be that eviction_task and + // disk_usage_eviction_task both select the same layers to be evicted, and + // seemingly free up double the space. both succeeding is of no consequence. + let guard = match hist_layer.keep_resident().await { + Ok(Some(l)) => l, + Ok(None) => continue, + Err(e) => { + // these should not happen, but we cannot make them statically impossible right + // now. + tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}"); + continue; + } + }; let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| { // We only use this fallback if there's an implementation error. @@ -263,7 +273,6 @@ impl Timeline { Ok(results) => results, }; assert_eq!(results.len(), candidates.len()); - drop(candidates); for result in results { match result { None => { diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 1a15020cf04c..7ded804ea06f 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -1,5 +1,4 @@ use anyhow::{bail, ensure, Context, Result}; -use pageserver_api::models::{LayerResidenceEventReason, LayerResidenceStatus}; use std::{collections::HashMap, sync::Arc}; use tracing::trace; use utils::{ @@ -12,7 +11,7 @@ use crate::{ tenant::{ layer_map::{BatchedUpdates, LayerMap}, storage_layer::{ - AsLayerDesc, InMemoryLayer, LayerE, PersistentLayerDesc, PersistentLayerKey, + AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey, ResidentLayer, }, }, @@ -21,7 +20,7 @@ use crate::{ /// Provides semantic APIs to manipulate the layer map. pub(crate) struct LayerManager { layer_map: LayerMap, - layer_fmgr: LayerFileManager, + layer_fmgr: LayerFileManager, } /// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after @@ -42,7 +41,7 @@ impl LayerManager { } } - pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer { self.layer_fmgr.get_from_desc(desc) } @@ -59,7 +58,7 @@ impl LayerManager { /// 2. next open layer (with disk disk_consistent_lsn LSN) pub(crate) fn initialize_local_layers( &mut self, - on_disk_layers: Vec>, + on_disk_layers: Vec, next_open_layer_at: Lsn, ) { let mut updates = self.layer_map.batch_update(); @@ -157,10 +156,6 @@ impl LayerManager { pub(crate) fn track_new_image_layers(&mut self, image_layers: &[ResidentLayer]) { let mut updates = self.layer_map.batch_update(); for layer in image_layers { - layer.access_stats().record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr); } updates.flush(); @@ -171,7 +166,6 @@ impl LayerManager { &mut self, delta_layer: Option<&ResidentLayer>, frozen_layer_for_check: &Arc, - metrics: &crate::metrics::TimelineMetrics, ) { let inmem = self .layer_map @@ -186,15 +180,6 @@ impl LayerManager { if let Some(l) = delta_layer { let mut updates = self.layer_map.batch_update(); - l.access_stats().record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - let sz = l.layer_desc().file_size; - metrics.resident_physical_size_gauge.add(sz); - metrics.num_persistent_files_created.inc_by(1); - metrics.persistent_bytes_written.inc_by(sz); - Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr); updates.flush(); } @@ -204,20 +189,11 @@ impl LayerManager { pub(crate) fn finish_compact_l0( &mut self, layer_removal_cs: &Arc>, - compact_from: Vec>, + compact_from: Vec, compact_to: &[ResidentLayer], - metrics: &crate::metrics::TimelineMetrics, ) -> Result<()> { let mut updates = self.layer_map.batch_update(); for l in compact_to { - l.access_stats().record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - metrics - .resident_physical_size_gauge - .add(l.layer_desc().file_size); - Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr); } for l in compact_from { @@ -234,7 +210,7 @@ impl LayerManager { pub(crate) fn finish_gc_timeline( &mut self, layer_removal_cs: &Arc>, - gc_layers: Vec>, + gc_layers: Vec, ) -> Result { let mut updates = self.layer_map.batch_update(); for doomed_layer in gc_layers { @@ -250,9 +226,9 @@ impl LayerManager { /// Helper function to insert a layer into the layer map and file manager. fn insert_historic_layer( - layer: Arc, + layer: Layer, updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, + mapping: &mut LayerFileManager, ) { updates.insert_historic(layer.layer_desc().clone()); mapping.insert(layer); @@ -263,9 +239,9 @@ impl LayerManager { fn delete_historic_layer( // we cannot remove layers otherwise, since gc and compaction will race _layer_removal_cs: &Arc>, - layer: Arc, + layer: Layer, updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, + mapping: &mut LayerFileManager, ) -> anyhow::Result<()> { let desc = layer.layer_desc(); @@ -276,20 +252,20 @@ impl LayerManager { // map index without actually rebuilding the index. updates.remove_historic(desc); mapping.remove(&layer); - layer.garbage_collect(); + layer.garbage_collect_on_drop(); Ok(()) } - pub(crate) fn contains(&self, layer: &Arc) -> bool { + pub(crate) fn contains(&self, layer: &Layer) -> bool { self.layer_fmgr.contains(layer) } } -pub(crate) struct LayerFileManager(HashMap>); +pub(crate) struct LayerFileManager(HashMap); -impl LayerFileManager { - fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { +impl LayerFileManager { + fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T { // The assumption for the `expect()` is that all code maintains the following invariant: // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor. self.0 @@ -299,14 +275,14 @@ impl LayerFileManager { .clone() } - pub(crate) fn insert(&mut self, layer: Arc) { + pub(crate) fn insert(&mut self, layer: T) { let present = self.0.insert(layer.layer_desc().key(), layer.clone()); if present.is_some() && cfg!(debug_assertions) { panic!("overwriting a layer: {:?}", layer.layer_desc()) } } - pub(crate) fn contains(&self, layer: &Arc) -> bool { + pub(crate) fn contains(&self, layer: &T) -> bool { self.0.contains_key(&layer.layer_desc().key()) } @@ -314,7 +290,7 @@ impl LayerFileManager { Self(HashMap::new()) } - pub(crate) fn remove(&mut self, layer: &Arc) { + pub(crate) fn remove(&mut self, layer: &T) { let present = self.0.remove(&layer.layer_desc().key()); if present.is_none() && cfg!(debug_assertions) { panic!( diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 7fd16f75ac71..e5f2a48996e9 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1534,7 +1534,7 @@ def __init__(self, env: NeonEnv, port: PageserverPort, config_override: Optional ".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped ".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs ".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition - ".*compaction_loop.*Compaction failed, retrying in.*Timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs + ".*compaction_loop.*Compaction failed, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs ".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock() ".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress ".*task iteration took longer than the configured period.*", diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index ae91d8173f7a..832f623d10f1 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -599,7 +599,7 @@ def assert_compacted_and_uploads_queued(): log.info("sending delete request") checkpoint_allowed_to_fail.set() env.pageserver.allowed_errors.append( - ".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping" + ".* ERROR .*Error processing HTTP request: InternalServerError\\(The timeline or pageserver is shutting down" ".* ERROR .*[Cc]ould not flush frozen layer.*" )