From d53c33cad13146d0bb52c3c4a1f09bafc687aa30 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 11 Apr 2023 19:56:59 +0200 Subject: [PATCH 01/16] eviction: regression test + distinguish layer write from map insert --- libs/pageserver_api/src/models.rs | 11 +- pageserver/src/tenant/storage_layer.rs | 31 ++- pageserver/src/tenant/timeline.rs | 22 ++- .../src/tenant/timeline/eviction_task.rs | 9 +- test_runner/fixtures/neon_fixtures.py | 6 + test_runner/regress/test_metric_collection.py | 7 - .../regress/test_threshold_based_eviction.py | 187 ++++++++++++++++++ 7 files changed, 250 insertions(+), 23 deletions(-) create mode 100644 test_runner/regress/test_threshold_based_eviction.py diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a351761f4aee..323bb11b71d2 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -272,6 +272,7 @@ pub enum LayerAccessKind { Iter, KeyIter, Dump, + LayerMapInsertion, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -299,15 +300,19 @@ pub struct LayerResidenceEvent { } /// The reason for recording a given [`ResidenceEvent`]. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] pub enum LayerResidenceEventReason { /// The layer map is being populated, e.g. during timeline load or attach. /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`]. /// We need to record such events because there is no persistent storage for the events. LayerLoad, - /// We just created the layer (e.g., freeze_and_flush or compaction). + /// We just wrote the layer file to disk (e.g., freeze_and_flush or compaction). + /// The layer file is not yet part of the layer map. + /// Such layers are always [`LayerResidenceStatus::Resident`]. + LayerCreateFileWritten, + /// We just inserted the layer file to the layer map. /// Such layers are always [`LayerResidenceStatus::Resident`]. - LayerCreate, + LayerCreateMapInserted, /// We on-demand downloaded or evicted the given layer. ResidenceChange, } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 2ee723e7c363..50455403b144 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -168,7 +168,7 @@ impl LayerAccessStats { let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); new.record_residence_event( LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, + LayerResidenceEventReason::LayerCreateFileWritten, ); new } @@ -255,25 +255,40 @@ impl LayerAccessStats { ret } + pub(crate) fn record_layer_map_insert_of_created_layer(&self) { + self.record_residence_event( + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreateMapInserted, + ); + } + fn most_recent_access_or_residence_event( &self, - ) -> Either { + ) -> anyhow::Result> { let locked = self.0.lock().unwrap(); let inner = &locked.for_eviction_policy; - match inner.last_accesses.recent() { + let res = match inner.last_accesses.recent() { Some(a) => Either::Left(*a), None => match inner.last_residence_changes.recent() { - Some(e) => Either::Right(e.clone()), + Some(e) => { + if e.reason == LayerResidenceEventReason::LayerCreateFileWritten { + anyhow::bail!("layer is not part of the layer map, call record_layer_map_insert_of_created_layer first"); + } + Either::Right(e.clone()) + } None => unreachable!("constructors for LayerAccessStats ensure that there's always a residence change event"), } - } + }; + Ok(res) } - pub(crate) fn latest_activity(&self) -> SystemTime { - match self.most_recent_access_or_residence_event() { + /// Fails until `record_layer_map_insert` was called on this instance + /// or a predecessor from which this instance was `clone_for_residence_change`'d from. + pub(crate) fn latest_activity(&self) -> anyhow::Result { + Ok(match self.most_recent_access_or_residence_event()? { Either::Left(mra) => mra.when, Either::Right(re) => re.timestamp, - } + }) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 29d8b544cc22..a4466fca1d9f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1446,7 +1446,9 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer))?; + let l = Arc::new(layer); + l.access_stats().record_layer_map_insert_of_created_layer(); + updates.insert_historic(l)?; num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -2687,11 +2689,13 @@ impl Timeline { ])?; // Add it to the layer map + let l = Arc::new(new_delta); + l.access_stats().record_layer_map_insert_of_created_layer(); self.layers .write() .unwrap() .batch_update() - .insert_historic(Arc::new(new_delta))?; + .insert_historic(l)?; // update the timeline's physical size let sz = new_delta_path.metadata()?.len(); @@ -2896,7 +2900,9 @@ impl Timeline { self.metrics .resident_physical_size_gauge .add(metadata.len()); - updates.insert_historic(Arc::new(l))?; + let l = Arc::new(l); + l.access_stats().record_layer_map_insert_of_created_layer(); + updates.insert_historic(l)?; } updates.flush(); drop(layers); @@ -3329,6 +3335,7 @@ impl Timeline { new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); let x: Arc = Arc::new(l); + x.access_stats().record_layer_map_insert_of_created_layer(); updates.insert_historic(x)?; } @@ -4107,7 +4114,14 @@ impl Timeline { continue; } - let last_activity_ts = l.access_stats().latest_activity(); + let last_activity_ts = match l.access_stats().latest_activity() { + Ok(ts) => ts, + Err(error) => { + warn!(%error, layer=%l.filename().file_name(), "latest activity not available, using UNIX epoch as fallback"); + // If we're under disk pressure, we should not hide this layer's existence. + SystemTime::UNIX_EPOCH + } + }; resident_layers.push(LocalLayerInfoForDiskUsageEviction { layer: l, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index cf799a880859..4964315d4e9f 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -184,7 +184,14 @@ impl Timeline { if hist_layer.is_remote_layer() { continue; } - let last_activity_ts = hist_layer.access_stats().latest_activity(); + let last_activity_ts = match hist_layer.access_stats().latest_activity() { + Ok(ts) => ts, + Err(error) => { + warn!(%error, layer=%hist_layer.filename().file_name(), "latest activity not available, likely implementation error"); + stats.errors += 1; + continue; + } + }; let no_activity_for = match now.duration_since(last_activity_ts) { Ok(d) => d, Err(_e) => { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e9f0363843e1..8bbe1f8ad701 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -292,6 +292,12 @@ def port_distributor(worker_base_port: int) -> PortDistributor: return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM) +@pytest.fixture(scope="session") +def httpserver_listen_address(port_distributor: PortDistributor): + port = port_distributor.get_port() + return ("localhost", port) + + @pytest.fixture(scope="function") def default_broker( port_distributor: PortDistributor, diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index ecbce1f8f733..ebf22cf369f2 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -24,13 +24,6 @@ from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response - -@pytest.fixture(scope="session") -def httpserver_listen_address(port_distributor: PortDistributor): - port = port_distributor.get_port() - return ("localhost", port) - - # ============================================================================== # Storage metrics tests # ============================================================================== diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py new file mode 100644 index 000000000000..79dabd8ba2a0 --- /dev/null +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -0,0 +1,187 @@ +import time +from dataclasses import dataclass +from typing import List, Set, Tuple + +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + PgBin, + RemoteStorageKind, + wait_for_last_flush_lsn, + wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn, +) +from fixtures.pageserver.http import LayerMapInfo +from fixtures.types import TimelineId +from pytest_httpserver import HTTPServer + +# NB: basic config change tests are in test_tenant_conf.py + + +def test_threshold_based_eviction( + request, + httpserver: HTTPServer, + httpserver_listen_address, + pg_bin: PgBin, + neon_env_builder: NeonEnvBuilder, +): + neon_env_builder.enable_remote_storage(RemoteStorageKind.LOCAL_FS, f"{request.node.name}") + + # Start with metrics collection enabled, so that the eviction task + # imitates its accesses. We'll use a non-existent endpoint to make it fail. + # The synthetic size calculation will run regardless. + host, port = httpserver_listen_address + neon_env_builder.pageserver_config_override = f""" + metric_collection_interval="1s" + synthetic_size_calculation_interval="2s" + metric_collection_endpoint="http://{host}:{port}/nonexistent" + """ + metrics_refused_log_line = ".*metrics endpoint refused the sent metrics.*/nonexistent.*" + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.append(metrics_refused_log_line) + + tenant_id, timeline_id = env.initial_tenant, env.initial_timeline + assert isinstance(timeline_id, TimelineId) + + ps_http = env.pageserver.http_client() + assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { + "kind": "NoEviction" + } + + eviction_threshold = 5 + eviction_period = 1 + ps_http.set_tenant_config( + tenant_id, + { + "eviction_policy": { + "kind": "LayerAccessThreshold", + "threshold": f"{eviction_threshold}s", + "period": f"{eviction_period}s", + }, + }, + ) + assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { + "kind": "LayerAccessThreshold", + "threshold": f"{eviction_threshold}s", + "period": f"{eviction_period}s", + } + + # restart because changing tenant config is not instant + env.pageserver.stop() + env.pageserver.start() + + assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { + "kind": "LayerAccessThreshold", + "threshold": f"{eviction_threshold}s", + "period": f"{eviction_period}s", + } + + # create a bunch of L1s, only the least of which will need to be resident + compaction_threshold = 3 # create L1 layers quickly + ps_http.patch_tenant_config_client_side( + tenant_id, + inserts={ + # Disable gc and compaction to avoid on-demand downloads from their side. + # The only on-demand downloads should be from the eviction tasks's "imitate access" functions. + "gc_period": "0s", + "compaction_period": "0s", + # low checkpoint_distance so that pgbench creates many layers + "checkpoint_distance": 1024**2, + # Low compaction target size to create many L1's with tight key ranges. + # This is so that the "imitate access" don't download all the layers. + "compaction_target_size": 1 * 1024**2, # all keys into one L1 + # Turn L0's into L1's fast. + "compaction_threshold": compaction_threshold, + # Prevent compaction from collapsing L1 delta layers into image layers. We want many layers here. + "image_creation_threshold": 100, + # Much larger so that synthetic size caluclation worker, which is part of metric collection, + # computes logical size for initdb_lsn every time, instead of some moving lsn as we insert data. + # This makes the set of downloaded layers predictable, + # thereby allowing the residence statuses to stabilize below. + "gc_horizon": 1024**4, + }, + ) + + # create a bunch of layers + with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + pg_bin.run(["pgbench", "-i", "-s", "3", pg.connstr()]) + wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + # wrap up and shutdown safekeepers so that no more layers will be created after the final checkpoint + wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn( + tenant_id, timeline_id, env.safekeepers, env.pageserver + ) + for sk in env.safekeepers: + sk.stop() + ps_http.timeline_checkpoint(tenant_id, timeline_id) + + # wait for evictions and assert that they stabilize + @dataclass + class MapInfoProjection: + original_data: LayerMapInfo + remote_layers: Set[str] + local_layers: Set[str] + layer_file_names: Set[str] + + @classmethod + def from_map_info(cls, info: LayerMapInfo): + return cls( + original_data=info, + remote_layers={ + layer.layer_file_name for layer in info.historic_layers if layer.remote + }, + local_layers={ + layer.layer_file_name for layer in info.historic_layers if not layer.remote + }, + layer_file_names={layer.layer_file_name for layer in info.historic_layers}, + ) + + def __eq__(self, other): + return ( + self.remote_layers == other.remote_layers + and self.local_layers == other.local_layers + ) + + def __repr__(self) -> str: + out = ["MapInfoProjection:"] + for layer in sorted( + self.original_data.historic_layers, key=lambda layer: layer.layer_file_name + ): + remote = "R" if layer.remote else "L" + out += [f" {remote} {layer.layer_file_name}"] + return "\n".join(out) + + observation_window = 8 * eviction_threshold + consider_stable_when_no_change_for_seconds = 3 * eviction_threshold + poll_interval = eviction_threshold / 3 + started_waiting_at = time.time() + map_info_changes: List[Tuple[float, MapInfoProjection]] = [] + while time.time() - started_waiting_at < observation_window: + current = ( + time.time(), + MapInfoProjection.from_map_info(ps_http.layer_map_info(tenant_id, timeline_id)), + ) + last = map_info_changes[-1] if map_info_changes else (0, None) + if last[1] is None or current[1] != last[1]: + map_info_changes.append(current) + log.info("change in layer map\n before: %s\n after: %s", last, current) + else: + stable_for = current[0] - last[0] + log.info("residencies stable for %s", stable_for) + if stable_for > consider_stable_when_no_change_for_seconds: + break + time.sleep(poll_interval) + + log.info("len(map_info_changes)=%s", len(map_info_changes)) + + # TODO: can we be more precise here? E.g., require we're stable _within_ X*threshold, + # instead of what we do here, i.e., stable _for at least_ X*threshold toward the end of the observation window + assert ( + stable_for > consider_stable_when_no_change_for_seconds + ), "layer residencies did not become stable within the observation window" + + pre, post = map_info_changes[0][1], map_info_changes[-1][1] + log.info("pre: %s", pre) + log.info("post: %s", post) + + assert len(post.local_layers) < len(pre.local_layers), "some layers should have been evicted" + + assert env.pageserver.log_contains(metrics_refused_log_line) From ed29e17f0518368fc591ddbd4817fb1c67da1280 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 11 Apr 2023 20:15:19 +0200 Subject: [PATCH 02/16] refactor: simplify MapInfoProjection type & more preceise asserts at the end --- .../regress/test_threshold_based_eviction.py | 44 +++++++++---------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index 79dabd8ba2a0..cbecd4d7a77c 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -115,36 +115,32 @@ def test_threshold_based_eviction( # wait for evictions and assert that they stabilize @dataclass - class MapInfoProjection: - original_data: LayerMapInfo + class ByLocalAndRemote: remote_layers: Set[str] local_layers: Set[str] - layer_file_names: Set[str] - @classmethod - def from_map_info(cls, info: LayerMapInfo): - return cls( - original_data=info, + class MapInfoProjection: + def __init__(self, info: LayerMapInfo): + self.info = info + + def by_local_and_remote(self) -> ByLocalAndRemote: + return ByLocalAndRemote( remote_layers={ - layer.layer_file_name for layer in info.historic_layers if layer.remote + layer.layer_file_name for layer in self.info.historic_layers if layer.remote }, local_layers={ - layer.layer_file_name for layer in info.historic_layers if not layer.remote + layer.layer_file_name for layer in self.info.historic_layers if not layer.remote }, - layer_file_names={layer.layer_file_name for layer in info.historic_layers}, ) def __eq__(self, other): - return ( - self.remote_layers == other.remote_layers - and self.local_layers == other.local_layers - ) + if not isinstance(other, MapInfoProjection): + return False + return self.by_local_and_remote() == other.by_local_and_remote() def __repr__(self) -> str: out = ["MapInfoProjection:"] - for layer in sorted( - self.original_data.historic_layers, key=lambda layer: layer.layer_file_name - ): + for layer in sorted(self.info.historic_layers, key=lambda layer: layer.layer_file_name): remote = "R" if layer.remote else "L" out += [f" {remote} {layer.layer_file_name}"] return "\n".join(out) @@ -157,7 +153,7 @@ def __repr__(self) -> str: while time.time() - started_waiting_at < observation_window: current = ( time.time(), - MapInfoProjection.from_map_info(ps_http.layer_map_info(tenant_id, timeline_id)), + MapInfoProjection(ps_http.layer_map_info(tenant_id, timeline_id)), ) last = map_info_changes[-1] if map_info_changes else (0, None) if last[1] is None or current[1] != last[1]: @@ -178,10 +174,10 @@ def __repr__(self) -> str: stable_for > consider_stable_when_no_change_for_seconds ), "layer residencies did not become stable within the observation window" - pre, post = map_info_changes[0][1], map_info_changes[-1][1] - log.info("pre: %s", pre) - log.info("post: %s", post) - - assert len(post.local_layers) < len(pre.local_layers), "some layers should have been evicted" + post = map_info_changes[-1][1].by_local_and_remote() + assert len(post.remote_layers) > 0, "some layers should be evicted once it's stabilized" + assert len(post.local_layers) > 0, "the imitate accesses should keep some layers resident" - assert env.pageserver.log_contains(metrics_refused_log_line) + assert env.pageserver.log_contains( + metrics_refused_log_line + ), "ensure the metrics collection worker ran" From 409b0f359ee42aadcd57b2c3f7437e44a4473252 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 13 Apr 2023 14:31:55 +0200 Subject: [PATCH 03/16] feat: technically the same applies for Timeline::download_remote_layer --- pageserver/src/tenant/timeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a4466fca1d9f..35e3ecc5882a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3842,9 +3842,9 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); let mut layers = self_clone.layers.write().unwrap(); let mut updates = layers.batch_update(); + let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); { use crate::tenant::layer_map::Replacement; let l: Arc = remote_layer.clone(); From 30a77e0470703ccd98948d9ea8abd06c7c623cac Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 13 Apr 2023 14:40:07 +0200 Subject: [PATCH 04/16] Revert "feat: technically the same applies for Timeline::download_remote_layer" This reverts commit 3f96e00ed027487f35798dce50b15ded3e6a1baf. --- pageserver/src/tenant/timeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 35e3ecc5882a..a4466fca1d9f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3842,9 +3842,9 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. + let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); let mut layers = self_clone.layers.write().unwrap(); let mut updates = layers.batch_update(); - let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); { use crate::tenant::layer_map::Replacement; let l: Arc = remote_layer.clone(); From 767c57f04f72b70123fd95513e360bd576b93f3f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 13 Apr 2023 15:49:45 +0200 Subject: [PATCH 05/16] feat: alternative approach that takes a witness for layer map lock held --- libs/pageserver_api/src/models.rs | 11 +- pageserver/src/tenant/storage_layer.rs | 111 ++++++++++-------- .../src/tenant/storage_layer/delta_layer.rs | 6 +- .../src/tenant/storage_layer/image_layer.rs | 6 +- .../src/tenant/storage_layer/remote_layer.rs | 21 +++- pageserver/src/tenant/timeline.rs | 58 +++++---- .../src/tenant/timeline/eviction_task.rs | 10 +- 7 files changed, 134 insertions(+), 89 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 323bb11b71d2..a351761f4aee 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -272,7 +272,6 @@ pub enum LayerAccessKind { Iter, KeyIter, Dump, - LayerMapInsertion, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -300,19 +299,15 @@ pub struct LayerResidenceEvent { } /// The reason for recording a given [`ResidenceEvent`]. -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum LayerResidenceEventReason { /// The layer map is being populated, e.g. during timeline load or attach. /// This includes [`RemoteLayer`] objects created in [`reconcile_with_remote`]. /// We need to record such events because there is no persistent storage for the events. LayerLoad, - /// We just wrote the layer file to disk (e.g., freeze_and_flush or compaction). - /// The layer file is not yet part of the layer map. - /// Such layers are always [`LayerResidenceStatus::Resident`]. - LayerCreateFileWritten, - /// We just inserted the layer file to the layer map. + /// We just created the layer (e.g., freeze_and_flush or compaction). /// Such layers are always [`LayerResidenceStatus::Resident`]. - LayerCreateMapInserted, + LayerCreate, /// We on-demand downloaded or evicted the given layer. ResidenceChange, } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 50455403b144..03afd60df2b8 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -13,7 +13,6 @@ use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use anyhow::Result; use bytes::Bytes; -use either::Either; use enum_map::EnumMap; use enumset::EnumSet; use pageserver_api::models::LayerAccessKind; @@ -37,6 +36,8 @@ pub use image_layer::{ImageLayer, ImageLayerWriter}; pub use inmemory_layer::InMemoryLayer; pub use remote_layer::RemoteLayer; +use super::layer_map::BatchedUpdates; + pub fn range_overlaps(a: &Range, b: &Range) -> bool where T: PartialOrd, @@ -158,41 +159,82 @@ impl LayerAccessStatFullDetails { } impl LayerAccessStats { - pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self { - let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); - new.record_residence_event(status, LayerResidenceEventReason::LayerLoad); - new + /// Create an empty stats object. + /// + /// The caller is responsible for recording a residence event + /// using [`record_residence_event`] before calling `latest_activity`. + /// If they don't, [`latest_activity`] will return `None`. + pub(crate) fn empty_will_record_residence_event_later() -> Self { + LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())) } - pub(crate) fn for_new_layer_file() -> Self { + /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status. + /// + /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. + pub(crate) fn for_loading_layer( + layer_map_lock_held_witness: &BatchedUpdates<'_, L>, + status: LayerResidenceStatus, + ) -> Self + where + L: ?Sized + Layer, + { let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); new.record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreateFileWritten, + layer_map_lock_held_witness, + status, + LayerResidenceEventReason::LayerLoad, ); new } /// Creates a clone of `self` and records `new_status` in the clone. - /// The `new_status` is not recorded in `self` - pub(crate) fn clone_for_residence_change( + /// + /// The `new_status` is not recorded in `self`. + /// + /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. + pub(crate) fn clone_for_residence_change( &self, + layer_map_lock_held_witness: &BatchedUpdates<'_, L>, new_status: LayerResidenceStatus, - ) -> LayerAccessStats { + ) -> LayerAccessStats + where + L: ?Sized + Layer, + { let clone = { let inner = self.0.lock().unwrap(); inner.clone() }; let new = LayerAccessStats(Mutex::new(clone)); - new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange); + new.record_residence_event( + layer_map_lock_held_witness, + new_status, + LayerResidenceEventReason::ResidenceChange, + ); new } - fn record_residence_event( + /// Record a change in layer residency. + /// + /// Recording the event must happen while holding the layer map lock to + /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs) + /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`. + /// + /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock, + /// the following race could happen: + /// + /// Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp. + /// Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map. + /// Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. + /// Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event. + /// + pub(crate) fn record_residence_event( &self, + _layer_map_lock_held_witness: &BatchedUpdates<'_, L>, status: LayerResidenceStatus, reason: LayerResidenceEventReason, - ) { + ) where + L: ?Sized + Layer, + { let mut locked = self.0.lock().unwrap(); locked.iter_mut().for_each(|inner| { inner @@ -255,40 +297,17 @@ impl LayerAccessStats { ret } - pub(crate) fn record_layer_map_insert_of_created_layer(&self) { - self.record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreateMapInserted, - ); - } - - fn most_recent_access_or_residence_event( - &self, - ) -> anyhow::Result> { + /// Get the latest access timestamp, with fallback for latest residence event. + /// + /// To guarantee that this method returns `Some`, make sure to record a residence event first. + /// Use the [`record_residence_event`] method for that. + pub(crate) fn latest_activity(&self) -> Option { let locked = self.0.lock().unwrap(); let inner = &locked.for_eviction_policy; - let res = match inner.last_accesses.recent() { - Some(a) => Either::Left(*a), - None => match inner.last_residence_changes.recent() { - Some(e) => { - if e.reason == LayerResidenceEventReason::LayerCreateFileWritten { - anyhow::bail!("layer is not part of the layer map, call record_layer_map_insert_of_created_layer first"); - } - Either::Right(e.clone()) - } - None => unreachable!("constructors for LayerAccessStats ensure that there's always a residence change event"), - } - }; - Ok(res) - } - - /// Fails until `record_layer_map_insert` was called on this instance - /// or a predecessor from which this instance was `clone_for_residence_change`'d from. - pub(crate) fn latest_activity(&self) -> anyhow::Result { - Ok(match self.most_recent_access_or_residence_event()? { - Either::Left(mra) => mra.when, - Either::Right(re) => re.timestamp, - }) + match inner.last_accesses.recent() { + Some(a) => Some(a.when), + None => inner.last_residence_changes.recent().map(|e| e.timestamp), + } } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 98cbcc5f07d5..ba3ab6dd4c27 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -57,7 +57,7 @@ use utils::{ use super::{ DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter, - LayerKeyIter, LayerResidenceStatus, PathOrConf, + LayerKeyIter, PathOrConf, }; /// @@ -637,7 +637,7 @@ impl DeltaLayer { key_range: summary.key_range, lsn_range: summary.lsn_range, file_size: metadata.len(), - access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, @@ -808,7 +808,7 @@ impl DeltaLayerWriterInner { key_range: self.key_start..key_end, lsn_range: self.lsn_range.clone(), file_size: metadata.len(), - access_stats: LayerAccessStats::for_new_layer_file(), + access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(DeltaLayerInner { loaded: false, file: None, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index a99b1b491f3d..d298b3e852ef 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -53,7 +53,7 @@ use utils::{ }; use super::filename::{ImageFileName, LayerFileName}; -use super::{Layer, LayerAccessStatsReset, LayerIter, LayerResidenceStatus, PathOrConf}; +use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf}; /// /// Header stored in the beginning of the file @@ -438,7 +438,7 @@ impl ImageLayer { key_range: summary.key_range, lsn: summary.lsn, file_size: metadata.len(), - access_stats: LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(ImageLayerInner { file: None, loaded: false, @@ -598,7 +598,7 @@ impl ImageLayerWriterInner { key_range: self.key_range.clone(), lsn: self.lsn, file_size: metadata.len(), - access_stats: LayerAccessStats::for_new_layer_file(), + access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(ImageLayerInner { loaded: false, file: None, diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 2eb7eb0cb68a..2106587ab20b 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -4,6 +4,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::Key; +use crate::tenant::layer_map::BatchedUpdates; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use anyhow::{bail, Result}; @@ -246,11 +247,15 @@ impl RemoteLayer { } /// Create a Layer struct representing this layer, after it has been downloaded. - pub fn create_downloaded_layer( + pub fn create_downloaded_layer( &self, + layer_map_lock_held_witness: &BatchedUpdates<'_, L>, conf: &'static PageServerConf, file_size: u64, - ) -> Arc { + ) -> Arc + where + L: ?Sized + Layer, + { if self.is_delta { let fname = DeltaFileName { key_range: self.key_range.clone(), @@ -262,8 +267,10 @@ impl RemoteLayer { self.tenantid, &fname, file_size, - self.access_stats - .clone_for_residence_change(LayerResidenceStatus::Resident), + self.access_stats.clone_for_residence_change( + layer_map_lock_held_witness, + LayerResidenceStatus::Resident, + ), )) } else { let fname = ImageFileName { @@ -276,8 +283,10 @@ impl RemoteLayer { self.tenantid, &fname, file_size, - self.access_stats - .clone_for_residence_change(LayerResidenceStatus::Resident), + self.access_stats.clone_for_residence_change( + layer_map_lock_held_witness, + LayerResidenceStatus::Resident, + ), )) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a4466fca1d9f..d5b9daf2d3a1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -11,7 +11,8 @@ use itertools::Itertools; use once_cell::sync::OnceCell; use pageserver_api::models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, - DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState, + DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus, + TimelineState, }; use remote_storage::GenericRemoteStorage; use storage_broker::BrokerClientChannel; @@ -1099,7 +1100,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(LayerResidenceStatus::Evicted), + .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), ), LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( self.tenant_id, @@ -1108,7 +1109,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(LayerResidenceStatus::Evicted), + .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), ), }); @@ -1441,13 +1442,12 @@ impl Timeline { self.tenant_id, &imgfilename, file_size, - LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident), ); trace!("found layer {}", layer.path().display()); total_physical_size += file_size; let l = Arc::new(layer); - l.access_stats().record_layer_map_insert_of_created_layer(); updates.insert_historic(l)?; num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { @@ -1475,7 +1475,7 @@ impl Timeline { self.tenant_id, &deltafilename, file_size, - LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident), ); trace!("found layer {}", layer.path().display()); @@ -1610,7 +1610,10 @@ impl Timeline { self.timeline_id, imgfilename, &remote_layer_metadata, - LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), + LayerAccessStats::for_loading_layer( + &updates, + LayerResidenceStatus::Evicted, + ), ); let remote_layer = Arc::new(remote_layer); @@ -1639,7 +1642,10 @@ impl Timeline { self.timeline_id, deltafilename, &remote_layer_metadata, - LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), + LayerAccessStats::for_loading_layer( + &updates, + LayerResidenceStatus::Evicted, + ), ); let remote_layer = Arc::new(remote_layer); if let Some(local_layer) = &local_layer { @@ -2690,12 +2696,15 @@ impl Timeline { // Add it to the layer map let l = Arc::new(new_delta); - l.access_stats().record_layer_map_insert_of_created_layer(); - self.layers - .write() - .unwrap() - .batch_update() - .insert_historic(l)?; + let mut layers = self.layers.write().unwrap(); + let mut batch_updates = layers.batch_update(); + l.access_stats().record_residence_event( + &batch_updates, + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + batch_updates.insert_historic(l)?; + batch_updates.flush(); // update the timeline's physical size let sz = new_delta_path.metadata()?.len(); @@ -2901,7 +2910,11 @@ impl Timeline { .resident_physical_size_gauge .add(metadata.len()); let l = Arc::new(l); - l.access_stats().record_layer_map_insert_of_created_layer(); + l.access_stats().record_residence_event( + &updates, + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); updates.insert_historic(l)?; } updates.flush(); @@ -3335,7 +3348,11 @@ impl Timeline { new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); let x: Arc = Arc::new(l); - x.access_stats().record_layer_map_insert_of_created_layer(); + x.access_stats().record_residence_event( + &updates, + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); updates.insert_historic(x)?; } @@ -3842,9 +3859,9 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); let mut layers = self_clone.layers.write().unwrap(); let mut updates = layers.batch_update(); + let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { use crate::tenant::layer_map::Replacement; let l: Arc = remote_layer.clone(); @@ -4115,10 +4132,11 @@ impl Timeline { } let last_activity_ts = match l.access_stats().latest_activity() { - Ok(ts) => ts, - Err(error) => { - warn!(%error, layer=%l.filename().file_name(), "latest activity not available, using UNIX epoch as fallback"); + Some(ts) => ts, + None => { // If we're under disk pressure, we should not hide this layer's existence. + // Log a warning and + warn!(layer=%l.filename().file_name(), "latest activity not available, did we forget to add a residence event?"); SystemTime::UNIX_EPOCH } }; diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 4964315d4e9f..b4efc92c6681 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -185,9 +185,13 @@ impl Timeline { continue; } let last_activity_ts = match hist_layer.access_stats().latest_activity() { - Ok(ts) => ts, - Err(error) => { - warn!(%error, layer=%hist_layer.filename().file_name(), "latest activity not available, likely implementation error"); + Some(ts) => ts, + None => { + // This is an implementation error. + // Give the layer a free ride and hope someone notices the warning. + // If this gets us under disk pressure, disk-usage-based eviction will save us. + // At that point, someone will definitely notice the warning in the logs. + warn!(layer=%hist_layer.filename().file_name(), "latest activity not available, likely implementation error, ignoring layer"); stats.errors += 1; continue; } From a55987ae17c841d7b6f40c71c50e413475ba781b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 14 Apr 2023 10:09:34 +0200 Subject: [PATCH 06/16] fixup rebase: eviction: env.postgres was renamed to env.endpoints --- test_runner/regress/test_threshold_based_eviction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index cbecd4d7a77c..52d3e558bcfb 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -102,7 +102,7 @@ def test_threshold_based_eviction( ) # create a bunch of layers - with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + with env.endpoints.create_start("main", tenant_id=tenant_id) as pg: pg_bin.run(["pgbench", "-i", "-s", "3", pg.connstr()]) wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) # wrap up and shutdown safekeepers so that no more layers will be created after the final checkpoint From c2f5a4dafb34e7d4ee3e06af9a6c9c8f1c7f78e8 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 14 Apr 2023 10:18:38 +0200 Subject: [PATCH 07/16] fix doc string in "feat: feat: alternative approach that takes a witness for layer map lock held" --- pageserver/src/tenant/storage_layer.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 03afd60df2b8..4351cfd3df4a 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -222,10 +222,10 @@ impl LayerAccessStats { /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock, /// the following race could happen: /// - /// Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp. - /// Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map. - /// Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. - /// Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event. + /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp. + /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map. + /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. + /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event. /// pub(crate) fn record_residence_event( &self, From f480844ccfc7c07b05177782c0563e97a6b76564 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 14 Apr 2023 14:05:40 +0200 Subject: [PATCH 08/16] feat: rate-limit logging + centralize in the latest_activity method function --- libs/utils/src/lib.rs | 2 + pageserver/src/tenant/storage_layer.rs | 45 ++++++++++++++++--- pageserver/src/tenant/timeline.rs | 11 +---- .../src/tenant/timeline/eviction_task.rs | 20 ++++----- 4 files changed, 50 insertions(+), 28 deletions(-) diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index d4176911acda..17a0348f8b22 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -54,6 +54,8 @@ pub mod measured_stream; pub mod serde_percent; pub mod serde_regex; +pub mod rate_limit_closure; + /// use with fail::cfg("$name", "return(2000)") #[macro_export] macro_rules! failpoint_sleep_millis_async { diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 4351cfd3df4a..e3c6855a8aea 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -15,6 +15,7 @@ use anyhow::Result; use bytes::Bytes; use enum_map::EnumMap; use enumset::EnumSet; +use once_cell::sync::Lazy; use pageserver_api::models::LayerAccessKind; use pageserver_api::models::{ HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, @@ -22,8 +23,10 @@ use pageserver_api::models::{ use std::ops::Range; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tracing::warn; use utils::history_buffer::HistoryBufferWithDropCounter; +use utils::rate_limit_closure::RateLimitClosure; use utils::{ id::{TenantId, TimelineId}, @@ -297,16 +300,44 @@ impl LayerAccessStats { ret } - /// Get the latest access timestamp, with fallback for latest residence event. + /// Get the latest access timestamp, falling back to latest residence event, falling back to `fallback`. /// - /// To guarantee that this method returns `Some`, make sure to record a residence event first. - /// Use the [`record_residence_event`] method for that. - pub(crate) fn latest_activity(&self) -> Option { + /// The `fallback` must be supplied for the case where there has not yet been a call to the + /// [`record_residence_event`] method. + /// If `fallback` needs to be used, we log a rate-limited warning in global scope. + pub(crate) fn latest_activity(&self, fallback: F) -> SystemTime + where + F: FnOnce() -> SystemTime, + { let locked = self.0.lock().unwrap(); let inner = &locked.for_eviction_policy; match inner.last_accesses.recent() { - Some(a) => Some(a.when), - None => inner.last_residence_changes.recent().map(|e| e.timestamp), + Some(a) => a.when, + None => match inner.last_residence_changes.recent() { + Some(e) => e.timestamp, + None => { + // TODO: use type system to avoid the need for `fallback`. + // The approach in https://github.com/neondatabase/neon/pull/3775 + // could be used to enforce that a residence event is recorded + // before a layer is added to the layer map. We could also have + // a layer wrapper type that holds the LayerAccessStats, and ensure + // that that type can only be produced by inserting into the layer map. + static WARN_RATE_LIMIT: Lazy> = + Lazy::new(|| { + Mutex::new((0, RateLimitClosure::new(Duration::from_secs(10)))) + }); + let mut guard = WARN_RATE_LIMIT.lock().unwrap(); + guard.0 += 1; + let occurences = guard.0; + guard.1.call(move || { + // Spawn a new task to have a new tracing root. It's ok, it only happens every 10 secs. + tokio::spawn(async move { + warn!(occurences, "latest_activity not available, this is an implementation bug, using fallback value"); + }); + }); + fallback() + } + }, } } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d5b9daf2d3a1..b618d64f0a61 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4131,15 +4131,8 @@ impl Timeline { continue; } - let last_activity_ts = match l.access_stats().latest_activity() { - Some(ts) => ts, - None => { - // If we're under disk pressure, we should not hide this layer's existence. - // Log a warning and - warn!(layer=%l.filename().file_name(), "latest activity not available, did we forget to add a residence event?"); - SystemTime::UNIX_EPOCH - } - }; + // Use UNIX_EPOCH as a fallback timestamp in case we have a bug and forget to record a residence event for the layer. + let last_activity_ts = l.access_stats().latest_activity(|| SystemTime::UNIX_EPOCH); resident_layers.push(LocalLayerInfoForDiskUsageEviction { layer: l, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index b4efc92c6681..156243ac42d5 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -184,18 +184,14 @@ impl Timeline { if hist_layer.is_remote_layer() { continue; } - let last_activity_ts = match hist_layer.access_stats().latest_activity() { - Some(ts) => ts, - None => { - // This is an implementation error. - // Give the layer a free ride and hope someone notices the warning. - // If this gets us under disk pressure, disk-usage-based eviction will save us. - // At that point, someone will definitely notice the warning in the logs. - warn!(layer=%hist_layer.filename().file_name(), "latest activity not available, likely implementation error, ignoring layer"); - stats.errors += 1; - continue; - } - }; + + let last_activity_ts = hist_layer.access_stats().latest_activity(|| { + // We only use this fallback if there's an implementation error. + // `latest_activity` already does rate-limited warn!() log. + debug!(layer=%hist_layer.filename().file_name(), "last_activity giving layer a free ride"); + SystemTime::now() + }); + let no_activity_for = match now.duration_since(last_activity_ts) { Ok(d) => d, Err(_e) => { From 63e59930c372af45d2869e5ed99e3df464259286 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 14 Apr 2023 14:08:50 +0200 Subject: [PATCH 09/16] fix: use Mutex::default() Co-authored-by: Joonas Koivunen --- pageserver/src/tenant/storage_layer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index e3c6855a8aea..26a3d225cc4f 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -168,7 +168,7 @@ impl LayerAccessStats { /// using [`record_residence_event`] before calling `latest_activity`. /// If they don't, [`latest_activity`] will return `None`. pub(crate) fn empty_will_record_residence_event_later() -> Self { - LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())) + LayerAccessStats(Mutex::default()) } /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status. From 8de967c1c1902c6c445e6d1c4129b77703fe7a9b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 14 Apr 2023 15:40:13 +0200 Subject: [PATCH 10/16] feat: forgot to commit rate_limit_closure module --- libs/utils/src/rate_limit_closure.rs | 66 ++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 libs/utils/src/rate_limit_closure.rs diff --git a/libs/utils/src/rate_limit_closure.rs b/libs/utils/src/rate_limit_closure.rs new file mode 100644 index 000000000000..2e2915327fb7 --- /dev/null +++ b/libs/utils/src/rate_limit_closure.rs @@ -0,0 +1,66 @@ +//! A helper to rate limit the invocation of a closure. + +use std::time::{Duration, Instant}; + +pub struct RateLimitClosure { + last: Option, + interval: Duration, +} + +impl RateLimitClosure { + pub fn new(interval: Duration) -> Self { + Self { + last: None, + interval, + } + } + + pub fn call(&mut self, f: F) { + let now = Instant::now(); + match self.last { + Some(last) if now - last > self.interval => { + f(); + self.last = Some(now); + } + None => { + f(); + self.last = Some(now); + } + Some(_) => {} + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicUsize; + + #[test] + fn basics() { + use super::RateLimitClosure; + use std::sync::atomic::Ordering::Relaxed; + use std::time::Duration; + + let called = AtomicUsize::new(0); + let mut f = RateLimitClosure::new(Duration::from_millis(100)); + + let cl = || { + called.fetch_add(1, Relaxed); + }; + + f.call(cl); + assert_eq!(called.load(Relaxed), 1); + f.call(cl); + assert_eq!(called.load(Relaxed), 1); + f.call(cl); + assert_eq!(called.load(Relaxed), 1); + std::thread::sleep(Duration::from_millis(100)); + f.call(cl); + assert_eq!(called.load(Relaxed), 2); + f.call(cl); + assert_eq!(called.load(Relaxed), 2); + std::thread::sleep(Duration::from_millis(100)); + f.call(cl); + assert_eq!(called.load(Relaxed), 3); + } +} From 40ef242b1b7e789c97a91cf88f6e5db1cc7126b5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 May 2023 09:27:34 +0200 Subject: [PATCH 11/16] apply Joonas's suggestion --- libs/utils/src/rate_limit_closure.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/libs/utils/src/rate_limit_closure.rs b/libs/utils/src/rate_limit_closure.rs index 2e2915327fb7..91f862a81f36 100644 --- a/libs/utils/src/rate_limit_closure.rs +++ b/libs/utils/src/rate_limit_closure.rs @@ -18,15 +18,13 @@ impl RateLimitClosure { pub fn call(&mut self, f: F) { let now = Instant::now(); match self.last { - Some(last) if now - last > self.interval => { - f(); - self.last = Some(now); + Some(last) if now - last <= self.interval => { + // ratelimit } - None => { - f(); + _ => { self.last = Some(now); + f(); } - Some(_) => {} } } } From da19e643167eae4dd4387f8e76eeb4c380e49848 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 May 2023 09:30:44 +0200 Subject: [PATCH 12/16] rename rate_limit_closure => rate_limit / RateLimitClosure=>RateLimit --- libs/utils/src/lib.rs | 2 +- .../src/{rate_limit_closure.rs => rate_limit.rs} | 12 +++++++----- pageserver/src/tenant/storage_layer.rs | 8 +++----- 3 files changed, 11 insertions(+), 11 deletions(-) rename libs/utils/src/{rate_limit_closure.rs => rate_limit.rs} (84%) diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 17a0348f8b22..6115a2e59dee 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -54,7 +54,7 @@ pub mod measured_stream; pub mod serde_percent; pub mod serde_regex; -pub mod rate_limit_closure; +pub mod rate_limit; /// use with fail::cfg("$name", "return(2000)") #[macro_export] diff --git a/libs/utils/src/rate_limit_closure.rs b/libs/utils/src/rate_limit.rs similarity index 84% rename from libs/utils/src/rate_limit_closure.rs rename to libs/utils/src/rate_limit.rs index 91f862a81f36..557955bb880e 100644 --- a/libs/utils/src/rate_limit_closure.rs +++ b/libs/utils/src/rate_limit.rs @@ -1,13 +1,13 @@ -//! A helper to rate limit the invocation of a closure. +//! A helper to rate limit operations. use std::time::{Duration, Instant}; -pub struct RateLimitClosure { +pub struct RateLimit { last: Option, interval: Duration, } -impl RateLimitClosure { +impl RateLimit { pub fn new(interval: Duration) -> Self { Self { last: None, @@ -15,6 +15,8 @@ impl RateLimitClosure { } } + /// Call `f` if the rate limit allows. + /// Don't call it otherwise. pub fn call(&mut self, f: F) { let now = Instant::now(); match self.last { @@ -35,12 +37,12 @@ mod tests { #[test] fn basics() { - use super::RateLimitClosure; + use super::RateLimit; use std::sync::atomic::Ordering::Relaxed; use std::time::Duration; let called = AtomicUsize::new(0); - let mut f = RateLimitClosure::new(Duration::from_millis(100)); + let mut f = RateLimit::new(Duration::from_millis(100)); let cl = || { called.fetch_add(1, Relaxed); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 26a3d225cc4f..2307e7a98937 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -26,7 +26,7 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tracing::warn; use utils::history_buffer::HistoryBufferWithDropCounter; -use utils::rate_limit_closure::RateLimitClosure; +use utils::rate_limit::RateLimit; use utils::{ id::{TenantId, TimelineId}, @@ -322,10 +322,8 @@ impl LayerAccessStats { // before a layer is added to the layer map. We could also have // a layer wrapper type that holds the LayerAccessStats, and ensure // that that type can only be produced by inserting into the layer map. - static WARN_RATE_LIMIT: Lazy> = - Lazy::new(|| { - Mutex::new((0, RateLimitClosure::new(Duration::from_secs(10)))) - }); + static WARN_RATE_LIMIT: Lazy> = + Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10))))); let mut guard = WARN_RATE_LIMIT.lock().unwrap(); guard.0 += 1; let occurences = guard.0; From 0233cca18202e4c5e9d7108f041aeb3f837dff4f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 May 2023 09:36:28 +0200 Subject: [PATCH 13/16] better way than tokio::spawn for logging at global scope --- pageserver/src/tenant/storage_layer.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 2307e7a98937..9df276dcc2e8 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -328,12 +328,8 @@ impl LayerAccessStats { guard.0 += 1; let occurences = guard.0; guard.1.call(move || { - // Spawn a new task to have a new tracing root. It's ok, it only happens every 10 secs. - tokio::spawn(async move { - warn!(occurences, "latest_activity not available, this is an implementation bug, using fallback value"); - }); - }); - fallback() + warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value"); + }) } }, } From fedaa65a45ab8520312e17ce30a3a7828d34ae73 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 May 2023 09:47:54 +0200 Subject: [PATCH 14/16] latest_activity(): return Option and use unwrap_or{,_else} in the callers --- pageserver/src/tenant/storage_layer.rs | 33 +++++++++---------- pageserver/src/tenant/timeline.rs | 5 ++- .../src/tenant/timeline/eviction_task.rs | 2 +- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 9df276dcc2e8..d30d6c5c6efb 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -300,28 +300,26 @@ impl LayerAccessStats { ret } - /// Get the latest access timestamp, falling back to latest residence event, falling back to `fallback`. + /// Get the latest access timestamp, falling back to latest residence event. /// - /// The `fallback` must be supplied for the case where there has not yet been a call to the - /// [`record_residence_event`] method. - /// If `fallback` needs to be used, we log a rate-limited warning in global scope. - pub(crate) fn latest_activity(&self, fallback: F) -> SystemTime - where - F: FnOnce() -> SystemTime, - { + /// This function can only return `None` if there has not yet been a call to the + /// [`record_residence_event`] method. That would generally be considered an + /// implementation error. This function logs a rate-limited warning in that case. + /// + /// TODO: use type system to avoid the need for `fallback`. + /// The approach in https://github.com/neondatabase/neon/pull/3775 + /// could be used to enforce that a residence event is recorded + /// before a layer is added to the layer map. We could also have + /// a layer wrapper type that holds the LayerAccessStats, and ensure + /// that that type can only be produced by inserting into the layer map. + pub(crate) fn latest_activity(&self) -> Option { let locked = self.0.lock().unwrap(); let inner = &locked.for_eviction_policy; match inner.last_accesses.recent() { - Some(a) => a.when, + Some(a) => Some(a.when), None => match inner.last_residence_changes.recent() { - Some(e) => e.timestamp, + Some(e) => Some(e.timestamp), None => { - // TODO: use type system to avoid the need for `fallback`. - // The approach in https://github.com/neondatabase/neon/pull/3775 - // could be used to enforce that a residence event is recorded - // before a layer is added to the layer map. We could also have - // a layer wrapper type that holds the LayerAccessStats, and ensure - // that that type can only be produced by inserting into the layer map. static WARN_RATE_LIMIT: Lazy> = Lazy::new(|| Mutex::new((0, RateLimit::new(Duration::from_secs(10))))); let mut guard = WARN_RATE_LIMIT.lock().unwrap(); @@ -329,7 +327,8 @@ impl LayerAccessStats { let occurences = guard.0; guard.1.call(move || { warn!(parent: None, occurences, "latest_activity not available, this is an implementation bug, using fallback value"); - }) + }); + None } }, } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b618d64f0a61..74bfe809dfe1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4132,7 +4132,10 @@ impl Timeline { } // Use UNIX_EPOCH as a fallback timestamp in case we have a bug and forget to record a residence event for the layer. - let last_activity_ts = l.access_stats().latest_activity(|| SystemTime::UNIX_EPOCH); + let last_activity_ts = l + .access_stats() + .latest_activity() + .unwrap_or(SystemTime::UNIX_EPOCH); resident_layers.push(LocalLayerInfoForDiskUsageEviction { layer: l, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 156243ac42d5..15ecdd1d7c2d 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -185,7 +185,7 @@ impl Timeline { continue; } - let last_activity_ts = hist_layer.access_stats().latest_activity(|| { + let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| { // We only use this fallback if there's an implementation error. // `latest_activity` already does rate-limited warn!() log. debug!(layer=%hist_layer.filename().file_name(), "last_activity giving layer a free ride"); From b7114494979a1e376a11b70f276387f9c42411de Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 May 2023 09:55:16 +0200 Subject: [PATCH 15/16] use SystemTime::now() instead of SystemTime::UNIX_EPOCH as fallback Following Joonas's earlier suggestion in https://github.com/neondatabase/neon/pull/4005#discussion_r1164148906 --- pageserver/src/tenant/timeline.rs | 8 ++++++-- pageserver/src/tenant/timeline/eviction_task.rs | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 74bfe809dfe1..07134b7a1a44 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4131,11 +4131,15 @@ impl Timeline { continue; } - // Use UNIX_EPOCH as a fallback timestamp in case we have a bug and forget to record a residence event for the layer. let last_activity_ts = l .access_stats() .latest_activity() - .unwrap_or(SystemTime::UNIX_EPOCH); + .unwrap_or_else(|| { + // We only use this fallback if there's an implementation error. + // `latest_activity` already does rate-limited warn!() log. + debug!(layer=%l.filename().file_name(), "last_activity returns None, using SystemTime::now"); + SystemTime::now() + }); resident_layers.push(LocalLayerInfoForDiskUsageEviction { layer: l, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 15ecdd1d7c2d..58321762ea15 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -188,7 +188,7 @@ impl Timeline { let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| { // We only use this fallback if there's an implementation error. // `latest_activity` already does rate-limited warn!() log. - debug!(layer=%hist_layer.filename().file_name(), "last_activity giving layer a free ride"); + debug!(layer=%hist_layer.filename().file_name(), "last_activity returns None, using SystemTime::now"); SystemTime::now() }); From 9b41b5f3f8426dd6456f10625bf0383ef34808fb Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 May 2023 11:15:23 +0200 Subject: [PATCH 16/16] fix merge fallout --- test_runner/regress/test_threshold_based_eviction.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/test_runner/regress/test_threshold_based_eviction.py b/test_runner/regress/test_threshold_based_eviction.py index 52d3e558bcfb..c7083d92be89 100644 --- a/test_runner/regress/test_threshold_based_eviction.py +++ b/test_runner/regress/test_threshold_based_eviction.py @@ -7,8 +7,7 @@ NeonEnvBuilder, PgBin, RemoteStorageKind, - wait_for_last_flush_lsn, - wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn, + last_flush_lsn_upload, ) from fixtures.pageserver.http import LayerMapInfo from fixtures.types import TimelineId @@ -104,11 +103,8 @@ def test_threshold_based_eviction( # create a bunch of layers with env.endpoints.create_start("main", tenant_id=tenant_id) as pg: pg_bin.run(["pgbench", "-i", "-s", "3", pg.connstr()]) - wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + last_flush_lsn_upload(env, pg, tenant_id, timeline_id) # wrap up and shutdown safekeepers so that no more layers will be created after the final checkpoint - wait_for_sk_commit_lsn_to_arrive_at_pageserver_last_record_lsn( - tenant_id, timeline_id, env.safekeepers, env.pageserver - ) for sk in env.safekeepers: sk.stop() ps_http.timeline_checkpoint(tenant_id, timeline_id)