From 0ab7bbd74745aab36473e635f419fc024e36e037 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 21 Nov 2024 15:53:28 -0500 Subject: [PATCH 1/6] fix(pageserver): ensure upload happens after delete Signed-off-by: Alex Chi Z --- libs/remote_storage/src/local_fs.rs | 7 +- pageserver/src/deletion_queue/deleter.rs | 2 + pageserver/src/deletion_queue/validator.rs | 9 ++ .../src/tenant/remote_timeline_client.rs | 84 ++++++++++++++++--- pageserver/src/tenant/timeline.rs | 1 + pageserver/src/tenant/upload_queue.rs | 24 ++++-- test_runner/fixtures/neon_fixtures.py | 22 ++++- test_runner/fixtures/pageserver/http.py | 4 +- .../regress/test_layers_from_future.py | 62 ++++++++++---- .../regress/test_pageserver_generations.py | 6 +- 10 files changed, 180 insertions(+), 41 deletions(-) diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 553153826e94..ee2fc9d6e2dd 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -360,7 +360,12 @@ impl RemoteStorage for LocalFs { let mut objects = Vec::with_capacity(keys.len()); for key in keys { let path = key.with_base(&self.storage_root); - let metadata = file_metadata(&path).await?; + let metadata = file_metadata(&path).await; + if let Err(DownloadError::NotFound) = metadata { + // Race: if the file is deleted between listing and metadata check, ignore it. + continue; + } + let metadata = metadata?; if metadata.is_dir() { continue; } diff --git a/pageserver/src/deletion_queue/deleter.rs b/pageserver/src/deletion_queue/deleter.rs index 1f04bc0410f5..3d02387c98d8 100644 --- a/pageserver/src/deletion_queue/deleter.rs +++ b/pageserver/src/deletion_queue/deleter.rs @@ -15,6 +15,7 @@ use tokio_util::sync::CancellationToken; use tracing::info; use tracing::warn; use utils::backoff; +use utils::pausable_failpoint; use crate::metrics; @@ -90,6 +91,7 @@ impl Deleter { /// Block until everything in accumulator has been executed async fn flush(&mut self) -> Result<(), DeletionQueueError> { while !self.accumulator.is_empty() && !self.cancel.is_cancelled() { + pausable_failpoint!("deletion-queue-before-execute-pause"); match self.remote_delete().await { Ok(()) => { // Note: we assume that the remote storage layer returns Ok(()) if some diff --git a/pageserver/src/deletion_queue/validator.rs b/pageserver/src/deletion_queue/validator.rs index 1d55581ebd80..ebdcc38122c7 100644 --- a/pageserver/src/deletion_queue/validator.rs +++ b/pageserver/src/deletion_queue/validator.rs @@ -304,6 +304,15 @@ where async fn flush(&mut self) -> Result<(), DeletionQueueError> { tracing::debug!("Flushing with {} pending lists", self.pending_lists.len()); + // Fast path to skip validation if we do not have anything to flush. + if self.pending_lists.is_empty() + && self.validated_lists.is_empty() + && self.pending_key_count == 0 + { + // Fast path: nothing to do + return Ok(()); + } + // Issue any required generation validation calls to the control plane self.validate().await?; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 377bc23542b1..245680471278 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -223,7 +223,7 @@ use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::download::download_retry; use crate::tenant::storage_layer::AsLayerDesc; -use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable}; +use crate::tenant::upload_queue::{Delete, OpType, UploadQueueStoppedDeletable}; use crate::tenant::TIMELINES_SEGMENT_NAME; use crate::{ config::PageServerConf, @@ -1090,7 +1090,7 @@ impl RemoteTimelineClient { "scheduled layer file upload {layer}", ); - let op = UploadOp::UploadLayer(layer, metadata); + let op = UploadOp::UploadLayer(layer, metadata, None); self.metric_begin(&op); upload_queue.queued_operations.push_back(op); } @@ -1805,7 +1805,7 @@ impl RemoteTimelineClient { // have finished. upload_queue.inprogress_tasks.is_empty() } - UploadOp::Delete(_) => { + UploadOp::Delete(..) => { // Wait for preceding uploads to finish. Concurrent deletions are OK, though. upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len() } @@ -1833,19 +1833,28 @@ impl RemoteTimelineClient { } // We can launch this task. Remove it from the queue first. - let next_op = upload_queue.queued_operations.pop_front().unwrap(); + let mut next_op = upload_queue.queued_operations.pop_front().unwrap(); debug!("starting op: {}", next_op); - // Update the counters - match next_op { - UploadOp::UploadLayer(_, _) => { + // Update the counters and prepare + match &mut next_op { + UploadOp::UploadLayer(layer, meta, last_op) => { + *last_op = upload_queue.last_operation.insert( + (layer.layer_desc().layer_name().clone(), meta.generation), + OpType::Upload, + ); upload_queue.num_inprogress_layer_uploads += 1; } UploadOp::UploadMetadata { .. } => { upload_queue.num_inprogress_metadata_uploads += 1; } - UploadOp::Delete(_) => { + UploadOp::Delete(Delete { layers }) => { + for (name, meta) in layers { + upload_queue + .last_operation + .insert((name.clone(), meta.generation), OpType::Delete); + } upload_queue.num_inprogress_deletions += 1; } UploadOp::Barrier(sender) => { @@ -1921,7 +1930,59 @@ impl RemoteTimelineClient { } let upload_result: anyhow::Result<()> = match &task.op { - UploadOp::UploadLayer(ref layer, ref layer_metadata) => { + UploadOp::UploadLayer(ref layer, ref layer_metadata, last_op) => { + if let Some(OpType::Delete) = last_op { + if self.config.read().unwrap().block_deletions { + // Of course, this is not efficient... but usually the queue should be empty. + let mut queue_locked = self.upload_queue.lock().unwrap(); + let mut detected = false; + if let Ok(queue) = queue_locked.initialized_mut() { + for list in queue.blocked_deletions.iter_mut() { + list.layers.retain(|(name, meta)| { + if name == &layer.layer_desc().layer_name() + && meta.generation == layer_metadata.generation + { + detected = true; + // remove the layer from deletion queue + false + } else { + // keep the layer + true + } + }); + } + } + if detected { + info!( + "upload_queue_reordering: cancelled blocked deletion of layer {} at gen {:?}", + layer.layer_desc().layer_name(), + layer_metadata.generation + ); + } + } else { + // TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions + // that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted, + // which is not possible in the current system. + info!( + "upload_queue_reordering: waiting for deletion queue flush to complete before uploading layer {} at gen {:?}", + layer.layer_desc().layer_name(), + layer_metadata.generation + ); + if let Err(e) = self.deletion_queue_client.flush_execute().await { + warn!( + "upload_queue_reordering: failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ", + layer.layer_desc().layer_name(), + layer_metadata.generation + ); + } else { + info!( + "upload_queue_reordering: done flushing deletion queue before uploading layer {} at gen {:?}", + layer.layer_desc().layer_name(), + layer_metadata.generation + ); + } + } + } let local_path = layer.local_path(); // We should only be uploading layers created by this `Tenant`'s lifetime, so @@ -2085,7 +2146,7 @@ impl RemoteTimelineClient { upload_queue.inprogress_tasks.remove(&task.task_id); let lsn_update = match task.op { - UploadOp::UploadLayer(_, _) => { + UploadOp::UploadLayer(_, _, _) => { upload_queue.num_inprogress_layer_uploads -= 1; None } @@ -2162,7 +2223,7 @@ impl RemoteTimelineClient { )> { use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize; let res = match op { - UploadOp::UploadLayer(_, m) => ( + UploadOp::UploadLayer(_, m, _) => ( RemoteOpFileKind::Layer, RemoteOpKind::Upload, RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size), @@ -2259,6 +2320,7 @@ impl RemoteTimelineClient { blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), + last_operation: HashMap::new(), }; let upload_queue = std::mem::replace( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 95864af4d0af..89bdb512f771 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2647,6 +2647,7 @@ impl Timeline { // // NB: generation numbers naturally protect against this because they disambiguate // (1) and (4) + // TODO: this is basically a no-op now, should we remove it? self.remote_client.schedule_barrier()?; // Tenant::create_timeline will wait for these uploads to happen before returning, or // on retry. diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index f14bf2f8c381..ff059e765873 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -38,6 +38,12 @@ impl UploadQueue { } } +#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum OpType { + Upload, + Delete, +} + /// This keeps track of queued and in-progress tasks. pub(crate) struct UploadQueueInitialized { /// Counter to assign task IDs @@ -88,6 +94,12 @@ pub(crate) struct UploadQueueInitialized { #[cfg(feature = "testing")] pub(crate) dangling_files: HashMap, + /// Ensure we order file operations correctly. This hashmap will grow infinitely... But it's fine, + /// assume the user uploaded 100k files, then we have 100k entries in this hashmap. Assume each + /// entry is 100B, then we have 10MB of memory used. This is fine. In theory, we can flush this + /// hashmap once deletion queue is flushed, but it requires complex locking and dealing with races. + pub(crate) last_operation: HashMap<(LayerName, Generation), OpType>, + /// Deletions that are blocked by the tenant configuration pub(crate) blocked_deletions: Vec, @@ -183,6 +195,7 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), + last_operation: HashMap::new(), blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), @@ -224,6 +237,7 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), + last_operation: HashMap::new(), blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), @@ -282,8 +296,8 @@ pub(crate) struct Delete { #[derive(Debug)] pub(crate) enum UploadOp { - /// Upload a layer file - UploadLayer(ResidentLayer, LayerFileMetadata), + /// Upload a layer file. The last field indicates the last operation for thie file. + UploadLayer(ResidentLayer, LayerFileMetadata, Option), /// Upload a index_part.json file UploadMetadata { @@ -305,11 +319,11 @@ pub(crate) enum UploadOp { impl std::fmt::Display for UploadOp { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { - UploadOp::UploadLayer(layer, metadata) => { + UploadOp::UploadLayer(layer, metadata, last_op) => { write!( f, - "UploadLayer({}, size={:?}, gen={:?})", - layer, metadata.file_size, metadata.generation + "UploadLayer({}, size={:?}, gen={:?}, last_op={:?})", + layer, metadata.file_size, metadata.generation, last_op ) } UploadOp::UploadMetadata { uploaded, .. } => { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 195b788c7e41..9e778e6476f6 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4947,6 +4947,7 @@ def last_flush_lsn_upload( timeline_id: TimelineId, pageserver_id: int | None = None, auth_token: str | None = None, + wait_until_uploaded: bool = True, ) -> Lsn: """ Wait for pageserver to catch to the latest flush LSN of given endpoint, @@ -4960,7 +4961,9 @@ def last_flush_lsn_upload( for tenant_shard_id, pageserver in shards: ps_http = pageserver.http_client(auth_token=auth_token) wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn) - ps_http.timeline_checkpoint(tenant_shard_id, timeline_id, wait_until_uploaded=True) + ps_http.timeline_checkpoint( + tenant_shard_id, timeline_id, wait_until_uploaded=wait_until_uploaded + ) return last_flush_lsn @@ -4985,6 +4988,7 @@ def generate_uploads_and_deletions( timeline_id: TimelineId | None = None, data: str | None = None, pageserver: NeonPageserver, + wait_until_uploaded: bool = True, ): """ Using the environment's default tenant + timeline, generate a load pattern @@ -5007,7 +5011,12 @@ def generate_uploads_and_deletions( if init: endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") last_flush_lsn_upload( - env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id + env, + endpoint, + tenant_id, + timeline_id, + pageserver_id=pageserver.id, + wait_until_uploaded=wait_until_uploaded, ) def churn(data): @@ -5030,7 +5039,12 @@ def churn(data): # in a state where there are "future layers" in remote storage that will generate deletions # after a restart. last_flush_lsn_upload( - env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id + env, + endpoint, + tenant_id, + timeline_id, + pageserver_id=pageserver.id, + wait_until_uploaded=wait_until_uploaded, ) # Compaction should generate some GC-elegible layers @@ -5046,4 +5060,4 @@ def churn(data): # background ingest, no more uploads pending, and therefore no non-determinism # in subsequent actions like pageserver restarts. flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id) - ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True) + ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=wait_until_uploaded) diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 98330ba3506d..feba43f7afd2 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -343,7 +343,7 @@ def tenant_list_locations(self): assert isinstance(res_json["tenant_shards"], list) return res_json - def tenant_get_location(self, tenant_id: TenantShardId): + def tenant_get_location(self, tenant_id: TenantId | TenantShardId): res = self.get( f"http://localhost:{self.port}/v1/location_config/{tenant_id}", ) @@ -794,7 +794,7 @@ def timeline_checkpoint( if compact is not None: query["compact"] = "true" if compact else "false" - log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}") + log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}") res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", params=query, diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index 309e0f301525..6d98c648f722 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -2,6 +2,7 @@ import time +import pytest from fixtures.common_types import Lsn from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver @@ -19,7 +20,11 @@ from fixtures.utils import query_scalar, wait_until -def test_issue_5878(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize( + "attach_mode", + ["default_generation", "same_generation"], +) +def test_issue_5878(neon_env_builder: NeonEnvBuilder, attach_mode: str): """ Regression test for issue https://github.com/neondatabase/neon/issues/5878 . @@ -168,11 +173,32 @@ def get_generation_number(): tenant_conf = ps_http.tenant_config(tenant_id) generation_before_detach = get_generation_number() env.pageserver.tenant_detach(tenant_id) - failpoint_name = "before-delete-layer-pausable" + failpoint_deletion_queue = "deletion-queue-before-execute-pause" + + ps_http.configure_failpoints((failpoint_deletion_queue, "pause")) + + if attach_mode == "default_generation": + env.pageserver.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides) + elif attach_mode == "same_generation": + # Attach with the same generation number -- this is possible with timeline offload and detach ancestor + env.pageserver.tenant_attach( + tenant_id, + tenant_conf.tenant_specific_overrides, + generation=generation_before_detach, + # We want to avoid the generation bump and don't want to talk with the storcon + override_storage_controller_generation=False, + ) + else: + raise AssertionError(f"Unknown attach_mode: {attach_mode}") - ps_http.configure_failpoints((failpoint_name, "pause")) - env.pageserver.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides) - generation_after_reattach = get_generation_number() + # Get it from pageserver API instead of storcon API b/c we might not have attached using the storcon + # API if attach_mode == "same_generation" + tenant_location = env.pageserver.http_client().tenant_get_location(tenant_id) + generation_after_reattach = tenant_location["generation"] + + if attach_mode == "same_generation": + # The generation number should be the same as before the detach + assert generation_before_detach == generation_after_reattach wait_until_tenant_active(ps_http, tenant_id) # Ensure the IndexPart upload that unlinks the layer file finishes, i.e., doesn't clog the queue. @@ -182,15 +208,8 @@ def future_layer_is_gone_from_index_part(): wait_until(10, 0.5, future_layer_is_gone_from_index_part) - # NB: the layer file is unlinked index part now, but, because we made the delete - # operation stuck, the layer file itself is still in the remote_storage - wait_until( - 10, - 0.5, - lambda: env.pageserver.assert_log_contains( - f".*{tenant_id}.*at failpoint.*{failpoint_name}" - ), - ) + # We already make deletion stuck here, but we don't necessarily hit the failpoint + # because deletions are batched. future_layer_path = env.pageserver_remote_storage.remote_layer_path( tenant_id, timeline_id, future_layer.to_str(), generation=generation_before_detach ) @@ -224,11 +243,13 @@ def future_layer_is_gone_from_index_part(): break time.sleep(1) - # Window has passed, unstuck the delete, let upload queue drain. + # Window has passed, unstuck the delete, let deletion queue drain; the upload queue should + # have drained because we put these layer deletion operations into the deletion queue and + # have consumed the operation from the upload queue. log.info("unstuck the DELETE") - ps_http.configure_failpoints(("before-delete-layer-pausable", "off")) - + ps_http.configure_failpoints((failpoint_deletion_queue, "off")) wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) + env.pageserver.http_client().deletion_queue_flush(True) # Examine the resulting S3 state. log.info("integrity-check the remote storage") @@ -247,3 +268,10 @@ def future_layer_is_gone_from_index_part(): final_stat = future_layer_path.stat() log.info(f"future layer path: {future_layer_path}") assert final_stat.st_mtime != pre_stat.st_mtime + + # Ensure no weird errors in the end... + wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) + + if attach_mode == "same_generation": + # we should have detected a race upload and deferred it + env.pageserver.assert_log_contains("upload_queue_reordering") diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index d5bbfbc7fc84..6ba5753420c7 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -459,7 +459,11 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): env.pageserver.start() # The pageserver should provide service to clients - generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver) + # Because it is in emergency mode, it will not attempt to validate deletions required by the initial barrier, and therefore + # other files cannot be uploaded b/c it's waiting for the initial barrier to be validated. + generate_uploads_and_deletions( + env, init=False, pageserver=env.pageserver, wait_until_uploaded=False + ) # The pageserver should neither validate nor execute any deletions, it should have # loaded the DeletionLists from before though From 5682c7aea3814a60e96edac490783c1023147f77 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 21 Nov 2024 17:26:37 -0500 Subject: [PATCH 2/6] fix fmt Signed-off-by: Alex Chi Z --- test_runner/fixtures/pageserver/http.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index feba43f7afd2..56386fdd373f 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -794,7 +794,9 @@ def timeline_checkpoint( if compact is not None: query["compact"] = "true" if compact else "false" - log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}") + log.info( + f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}" + ) res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", params=query, From 9e7fa9802452c7ab5f511ae4ecc8bf764abf62cc Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 21 Nov 2024 17:31:04 -0500 Subject: [PATCH 3/6] revert fast path validation Signed-off-by: Alex Chi Z --- pageserver/src/deletion_queue/validator.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pageserver/src/deletion_queue/validator.rs b/pageserver/src/deletion_queue/validator.rs index ebdcc38122c7..1d55581ebd80 100644 --- a/pageserver/src/deletion_queue/validator.rs +++ b/pageserver/src/deletion_queue/validator.rs @@ -304,15 +304,6 @@ where async fn flush(&mut self) -> Result<(), DeletionQueueError> { tracing::debug!("Flushing with {} pending lists", self.pending_lists.len()); - // Fast path to skip validation if we do not have anything to flush. - if self.pending_lists.is_empty() - && self.validated_lists.is_empty() - && self.pending_key_count == 0 - { - // Fast path: nothing to do - return Ok(()); - } - // Issue any required generation validation calls to the control plane self.validate().await?; From a3971c3e58a7abffc6f0d315a22c20238e84ab1e Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 22 Nov 2024 05:29:23 -0500 Subject: [PATCH 4/6] rename + bound recently_deleted hashset Signed-off-by: Alex Chi Z --- .../src/tenant/remote_timeline_client.rs | 24 +++++++++++-------- pageserver/src/tenant/upload_queue.rs | 23 ++++++++---------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 245680471278..c15f3111ab93 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -199,7 +199,7 @@ use utils::backoff::{ use utils::pausable_failpoint; use utils::shard::ShardNumber; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex, OnceLock}; use std::time::Duration; @@ -1839,11 +1839,15 @@ impl RemoteTimelineClient { // Update the counters and prepare match &mut next_op { - UploadOp::UploadLayer(layer, meta, last_op) => { - *last_op = upload_queue.last_operation.insert( - (layer.layer_desc().layer_name().clone(), meta.generation), - OpType::Upload, - ); + UploadOp::UploadLayer(layer, meta, mode) => { + if upload_queue + .recently_deleted + .remove(&(layer.layer_desc().layer_name().clone(), meta.generation)) + { + *mode = Some(OpType::FlushDeletion); + } else { + *mode = Some(OpType::MayReorder) + } upload_queue.num_inprogress_layer_uploads += 1; } UploadOp::UploadMetadata { .. } => { @@ -1852,8 +1856,8 @@ impl RemoteTimelineClient { UploadOp::Delete(Delete { layers }) => { for (name, meta) in layers { upload_queue - .last_operation - .insert((name.clone(), meta.generation), OpType::Delete); + .recently_deleted + .insert((name.clone(), meta.generation)); } upload_queue.num_inprogress_deletions += 1; } @@ -1931,7 +1935,7 @@ impl RemoteTimelineClient { let upload_result: anyhow::Result<()> = match &task.op { UploadOp::UploadLayer(ref layer, ref layer_metadata, last_op) => { - if let Some(OpType::Delete) = last_op { + if let Some(OpType::FlushDeletion) = last_op { if self.config.read().unwrap().block_deletions { // Of course, this is not efficient... but usually the queue should be empty. let mut queue_locked = self.upload_queue.lock().unwrap(); @@ -2320,7 +2324,7 @@ impl RemoteTimelineClient { blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), - last_operation: HashMap::new(), + recently_deleted: HashSet::new(), }; let upload_queue = std::mem::replace( diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index ff059e765873..ef3aa759f303 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -3,6 +3,7 @@ use super::storage_layer::ResidentLayer; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::remote_timeline_client::index::IndexPart; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; +use std::collections::HashSet; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; @@ -14,7 +15,6 @@ use utils::lsn::AtomicLsn; use std::sync::atomic::AtomicU32; use utils::lsn::Lsn; -#[cfg(feature = "testing")] use utils::generation::Generation; // clippy warns that Uninitialized is much smaller than Initialized, which wastes @@ -40,8 +40,8 @@ impl UploadQueue { #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] pub(crate) enum OpType { - Upload, - Delete, + MayReorder, + FlushDeletion, } /// This keeps track of queued and in-progress tasks. @@ -94,11 +94,8 @@ pub(crate) struct UploadQueueInitialized { #[cfg(feature = "testing")] pub(crate) dangling_files: HashMap, - /// Ensure we order file operations correctly. This hashmap will grow infinitely... But it's fine, - /// assume the user uploaded 100k files, then we have 100k entries in this hashmap. Assume each - /// entry is 100B, then we have 10MB of memory used. This is fine. In theory, we can flush this - /// hashmap once deletion queue is flushed, but it requires complex locking and dealing with races. - pub(crate) last_operation: HashMap<(LayerName, Generation), OpType>, + /// Ensure we order file operations correctly. + pub(crate) recently_deleted: HashSet<(LayerName, Generation)>, /// Deletions that are blocked by the tenant configuration pub(crate) blocked_deletions: Vec, @@ -195,7 +192,7 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), - last_operation: HashMap::new(), + recently_deleted: HashSet::new(), blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), @@ -237,7 +234,7 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), - last_operation: HashMap::new(), + recently_deleted: HashSet::new(), blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), @@ -319,11 +316,11 @@ pub(crate) enum UploadOp { impl std::fmt::Display for UploadOp { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { - UploadOp::UploadLayer(layer, metadata, last_op) => { + UploadOp::UploadLayer(layer, metadata, mode) => { write!( f, - "UploadLayer({}, size={:?}, gen={:?}, last_op={:?})", - layer, metadata.file_size, metadata.generation, last_op + "UploadLayer({}, size={:?}, gen={:?}, mode={:?})", + layer, metadata.file_size, metadata.generation, mode ) } UploadOp::UploadMetadata { uploaded, .. } => { From 78c02f32459e4cfbeaebd0f00568889941119a0d Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 22 Nov 2024 05:34:20 -0500 Subject: [PATCH 5/6] clean the list if flush the queue Signed-off-by: Alex Chi Z --- pageserver/src/tenant/remote_timeline_client.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index c15f3111ab93..e4c009f78b0f 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1934,8 +1934,8 @@ impl RemoteTimelineClient { } let upload_result: anyhow::Result<()> = match &task.op { - UploadOp::UploadLayer(ref layer, ref layer_metadata, last_op) => { - if let Some(OpType::FlushDeletion) = last_op { + UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => { + if let Some(OpType::FlushDeletion) = mode { if self.config.read().unwrap().block_deletions { // Of course, this is not efficient... but usually the queue should be empty. let mut queue_locked = self.upload_queue.lock().unwrap(); @@ -1972,6 +1972,13 @@ impl RemoteTimelineClient { layer.layer_desc().layer_name(), layer_metadata.generation ); + { + // We are going to flush, we can clean up the recently deleted list. + let mut queue_locked = self.upload_queue.lock().unwrap(); + if let Ok(queue) = queue_locked.initialized_mut() { + queue.recently_deleted.clear(); + } + } if let Err(e) = self.deletion_queue_client.flush_execute().await { warn!( "upload_queue_reordering: failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ", From 2a7d00296edc3c7adea82227080c8bbeadb7688e Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 22 Nov 2024 12:21:37 -0500 Subject: [PATCH 6/6] drop log prefix Signed-off-by: Alex Chi Z --- pageserver/src/tenant/remote_timeline_client.rs | 8 ++++---- test_runner/regress/test_layers_from_future.py | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index e4c009f78b0f..4c8828221416 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1958,7 +1958,7 @@ impl RemoteTimelineClient { } if detected { info!( - "upload_queue_reordering: cancelled blocked deletion of layer {} at gen {:?}", + "cancelled blocked deletion of layer {} at gen {:?}", layer.layer_desc().layer_name(), layer_metadata.generation ); @@ -1968,7 +1968,7 @@ impl RemoteTimelineClient { // that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted, // which is not possible in the current system. info!( - "upload_queue_reordering: waiting for deletion queue flush to complete before uploading layer {} at gen {:?}", + "waiting for deletion queue flush to complete before uploading layer {} at gen {:?}", layer.layer_desc().layer_name(), layer_metadata.generation ); @@ -1981,13 +1981,13 @@ impl RemoteTimelineClient { } if let Err(e) = self.deletion_queue_client.flush_execute().await { warn!( - "upload_queue_reordering: failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ", + "failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ", layer.layer_desc().layer_name(), layer_metadata.generation ); } else { info!( - "upload_queue_reordering: done flushing deletion queue before uploading layer {} at gen {:?}", + "done flushing deletion queue before uploading layer {} at gen {:?}", layer.layer_desc().layer_name(), layer_metadata.generation ); diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index 6d98c648f722..761ec7568f4b 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -274,4 +274,6 @@ def future_layer_is_gone_from_index_part(): if attach_mode == "same_generation": # we should have detected a race upload and deferred it - env.pageserver.assert_log_contains("upload_queue_reordering") + env.pageserver.assert_log_contains( + "waiting for deletion queue flush to complete before uploading layer" + )