Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pageserver): ensure upload happens after delete #9844

Merged
merged 6 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion libs/remote_storage/src/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,12 @@ impl RemoteStorage for LocalFs {
let mut objects = Vec::with_capacity(keys.len());
for key in keys {
let path = key.with_base(&self.storage_root);
let metadata = file_metadata(&path).await?;
let metadata = file_metadata(&path).await;
if let Err(DownloadError::NotFound) = metadata {
// Race: if the file is deleted between listing and metadata check, ignore it.
continue;
}
let metadata = metadata?;
if metadata.is_dir() {
continue;
}
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/deletion_queue/deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use utils::backoff;
use utils::pausable_failpoint;

use crate::metrics;

Expand Down Expand Up @@ -90,6 +91,7 @@ impl Deleter {
/// Block until everything in accumulator has been executed
async fn flush(&mut self) -> Result<(), DeletionQueueError> {
while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
pausable_failpoint!("deletion-queue-before-execute-pause");
match self.remote_delete().await {
Ok(()) => {
// Note: we assume that the remote storage layer returns Ok(()) if some
Expand Down
97 changes: 85 additions & 12 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ use utils::backoff::{
use utils::pausable_failpoint;
use utils::shard::ShardNumber;

use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::Duration;
Expand All @@ -223,7 +223,7 @@ use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::download::download_retry;
use crate::tenant::storage_layer::AsLayerDesc;
use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
use crate::tenant::upload_queue::{Delete, OpType, UploadQueueStoppedDeletable};
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::{
config::PageServerConf,
Expand Down Expand Up @@ -1090,7 +1090,7 @@ impl RemoteTimelineClient {
"scheduled layer file upload {layer}",
);

let op = UploadOp::UploadLayer(layer, metadata);
let op = UploadOp::UploadLayer(layer, metadata, None);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
}
Expand Down Expand Up @@ -1805,7 +1805,7 @@ impl RemoteTimelineClient {
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(_) => {
UploadOp::Delete(..) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
Expand Down Expand Up @@ -1833,19 +1833,32 @@ impl RemoteTimelineClient {
}

// We can launch this task. Remove it from the queue first.
let next_op = upload_queue.queued_operations.pop_front().unwrap();
let mut next_op = upload_queue.queued_operations.pop_front().unwrap();

debug!("starting op: {}", next_op);

// Update the counters
match next_op {
UploadOp::UploadLayer(_, _) => {
// Update the counters and prepare
match &mut next_op {
UploadOp::UploadLayer(layer, meta, mode) => {
if upload_queue
.recently_deleted
.remove(&(layer.layer_desc().layer_name().clone(), meta.generation))
{
*mode = Some(OpType::FlushDeletion);
} else {
*mode = Some(OpType::MayReorder)
}
upload_queue.num_inprogress_layer_uploads += 1;
}
UploadOp::UploadMetadata { .. } => {
upload_queue.num_inprogress_metadata_uploads += 1;
}
UploadOp::Delete(_) => {
UploadOp::Delete(Delete { layers }) => {
for (name, meta) in layers {
upload_queue
.recently_deleted
.insert((name.clone(), meta.generation));
}
upload_queue.num_inprogress_deletions += 1;
}
UploadOp::Barrier(sender) => {
Expand Down Expand Up @@ -1921,7 +1934,66 @@ impl RemoteTimelineClient {
}

let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => {
if let Some(OpType::FlushDeletion) = mode {
if self.config.read().unwrap().block_deletions {
// Of course, this is not efficient... but usually the queue should be empty.
let mut queue_locked = self.upload_queue.lock().unwrap();
let mut detected = false;
if let Ok(queue) = queue_locked.initialized_mut() {
for list in queue.blocked_deletions.iter_mut() {
list.layers.retain(|(name, meta)| {
if name == &layer.layer_desc().layer_name()
&& meta.generation == layer_metadata.generation
{
detected = true;
// remove the layer from deletion queue
false
} else {
// keep the layer
true
}
});
}
}
if detected {
info!(
"cancelled blocked deletion of layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
}
} else {
// TODO: we did not guarantee that upload task starts after deletion task, so there could be possibly race conditions
// that we still get the layer deleted. But this only happens if someone creates a layer immediately after it's deleted,
// which is not possible in the current system.
info!(
"waiting for deletion queue flush to complete before uploading layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
{
// We are going to flush, we can clean up the recently deleted list.
let mut queue_locked = self.upload_queue.lock().unwrap();
if let Ok(queue) = queue_locked.initialized_mut() {
queue.recently_deleted.clear();
}
}
if let Err(e) = self.deletion_queue_client.flush_execute().await {
warn!(
"failed to flush the deletion queue before uploading layer {} at gen {:?}, still proceeding to upload: {e:#} ",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
} else {
info!(
"done flushing deletion queue before uploading layer {} at gen {:?}",
layer.layer_desc().layer_name(),
layer_metadata.generation
);
}
}
}
let local_path = layer.local_path();

// We should only be uploading layers created by this `Tenant`'s lifetime, so
Expand Down Expand Up @@ -2085,7 +2157,7 @@ impl RemoteTimelineClient {
upload_queue.inprogress_tasks.remove(&task.task_id);

let lsn_update = match task.op {
UploadOp::UploadLayer(_, _) => {
UploadOp::UploadLayer(_, _, _) => {
upload_queue.num_inprogress_layer_uploads -= 1;
None
}
Expand Down Expand Up @@ -2162,7 +2234,7 @@ impl RemoteTimelineClient {
)> {
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
let res = match op {
UploadOp::UploadLayer(_, m) => (
UploadOp::UploadLayer(_, m, _) => (
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size),
Expand Down Expand Up @@ -2259,6 +2331,7 @@ impl RemoteTimelineClient {
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
recently_deleted: HashSet::new(),
};

let upload_queue = std::mem::replace(
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2647,6 +2647,7 @@ impl Timeline {
//
// NB: generation numbers naturally protect against this because they disambiguate
// (1) and (4)
// TODO: this is basically a no-op now, should we remove it?
self.remote_client.schedule_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
Expand Down
23 changes: 17 additions & 6 deletions pageserver/src/tenant/upload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::storage_layer::ResidentLayer;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use std::collections::HashSet;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;

Expand All @@ -14,7 +15,6 @@ use utils::lsn::AtomicLsn;
use std::sync::atomic::AtomicU32;
use utils::lsn::Lsn;

#[cfg(feature = "testing")]
use utils::generation::Generation;

// clippy warns that Uninitialized is much smaller than Initialized, which wastes
Expand All @@ -38,6 +38,12 @@ impl UploadQueue {
}
}

#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub(crate) enum OpType {
MayReorder,
FlushDeletion,
}

/// This keeps track of queued and in-progress tasks.
pub(crate) struct UploadQueueInitialized {
/// Counter to assign task IDs
Expand Down Expand Up @@ -88,6 +94,9 @@ pub(crate) struct UploadQueueInitialized {
#[cfg(feature = "testing")]
pub(crate) dangling_files: HashMap<LayerName, Generation>,

/// Ensure we order file operations correctly.
pub(crate) recently_deleted: HashSet<(LayerName, Generation)>,

/// Deletions that are blocked by the tenant configuration
pub(crate) blocked_deletions: Vec<Delete>,

Expand Down Expand Up @@ -183,6 +192,7 @@ impl UploadQueue {
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
recently_deleted: HashSet::new(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
Expand Down Expand Up @@ -224,6 +234,7 @@ impl UploadQueue {
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
recently_deleted: HashSet::new(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
Expand Down Expand Up @@ -282,8 +293,8 @@ pub(crate) struct Delete {

#[derive(Debug)]
pub(crate) enum UploadOp {
/// Upload a layer file
UploadLayer(ResidentLayer, LayerFileMetadata),
/// Upload a layer file. The last field indicates the last operation for thie file.
UploadLayer(ResidentLayer, LayerFileMetadata, Option<OpType>),

/// Upload a index_part.json file
UploadMetadata {
Expand All @@ -305,11 +316,11 @@ pub(crate) enum UploadOp {
impl std::fmt::Display for UploadOp {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
UploadOp::UploadLayer(layer, metadata) => {
UploadOp::UploadLayer(layer, metadata, mode) => {
write!(
f,
"UploadLayer({}, size={:?}, gen={:?})",
layer, metadata.file_size, metadata.generation
"UploadLayer({}, size={:?}, gen={:?}, mode={:?})",
layer, metadata.file_size, metadata.generation, mode
)
}
UploadOp::UploadMetadata { uploaded, .. } => {
Expand Down
22 changes: 18 additions & 4 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4947,6 +4947,7 @@ def last_flush_lsn_upload(
timeline_id: TimelineId,
pageserver_id: int | None = None,
auth_token: str | None = None,
wait_until_uploaded: bool = True,
) -> Lsn:
"""
Wait for pageserver to catch to the latest flush LSN of given endpoint,
Expand All @@ -4960,7 +4961,9 @@ def last_flush_lsn_upload(
for tenant_shard_id, pageserver in shards:
ps_http = pageserver.http_client(auth_token=auth_token)
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_checkpoint(
tenant_shard_id, timeline_id, wait_until_uploaded=wait_until_uploaded
)
return last_flush_lsn


Expand All @@ -4985,6 +4988,7 @@ def generate_uploads_and_deletions(
timeline_id: TimelineId | None = None,
data: str | None = None,
pageserver: NeonPageserver,
wait_until_uploaded: bool = True,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
Expand All @@ -5007,7 +5011,12 @@ def generate_uploads_and_deletions(
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
env,
endpoint,
tenant_id,
timeline_id,
pageserver_id=pageserver.id,
wait_until_uploaded=wait_until_uploaded,
)

def churn(data):
Expand All @@ -5030,7 +5039,12 @@ def churn(data):
# in a state where there are "future layers" in remote storage that will generate deletions
# after a restart.
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
env,
endpoint,
tenant_id,
timeline_id,
pageserver_id=pageserver.id,
wait_until_uploaded=wait_until_uploaded,
)

# Compaction should generate some GC-elegible layers
Expand All @@ -5046,4 +5060,4 @@ def churn(data):
# background ingest, no more uploads pending, and therefore no non-determinism
# in subsequent actions like pageserver restarts.
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=wait_until_uploaded)
6 changes: 4 additions & 2 deletions test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def tenant_list_locations(self):
assert isinstance(res_json["tenant_shards"], list)
return res_json

def tenant_get_location(self, tenant_id: TenantShardId):
def tenant_get_location(self, tenant_id: TenantId | TenantShardId):
res = self.get(
f"http://localhost:{self.port}/v1/location_config/{tenant_id}",
)
Expand Down Expand Up @@ -794,7 +794,9 @@ def timeline_checkpoint(
if compact is not None:
query["compact"] = "true" if compact else "false"

log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
log.info(
f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}"
)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
params=query,
Expand Down
Loading
Loading