Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(pageserver): make partitioning an ArcSwap #10377

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions libs/utils/src/guard_arc_swap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! A wrapper around `ArcSwap` that ensures there is only one writer at a time and writes
//! don't block reads.

use arc_swap::ArcSwap;
use std::sync::Arc;
use tokio::sync::TryLockError;

pub struct GuardArcSwap<T> {
inner: ArcSwap<T>,
guard: tokio::sync::Mutex<()>,
}

pub struct Guard<'a, T> {
_guard: tokio::sync::MutexGuard<'a, ()>,
inner: &'a ArcSwap<T>,
}

impl<T> GuardArcSwap<T> {
pub fn new(inner: T) -> Self {
Self {
inner: ArcSwap::new(Arc::new(inner)),
guard: tokio::sync::Mutex::new(()),
}
}

pub fn read(&self) -> Arc<T> {
self.inner.load_full()
}

pub async fn write_guard(&self) -> Guard<'_, T> {
Guard {
_guard: self.guard.lock().await,
inner: &self.inner,
}
}

pub fn try_write_guard(&self) -> Result<Guard<'_, T>, TryLockError> {
let guard = self.guard.try_lock()?;
Ok(Guard {
_guard: guard,
inner: &self.inner,
})
}
}

impl<T> Guard<'_, T> {
pub fn read(&self) -> Arc<T> {
self.inner.load_full()
}

pub fn write(&mut self, value: T) {
self.inner.store(Arc::new(value));
}
}
2 changes: 2 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ pub mod try_rcu;

pub mod pprof;

pub mod guard_arc_swap;

// Re-export used in macro. Avoids adding git-version as dep in target crates.
#[doc(hidden)]
pub use git_version;
Expand Down
26 changes: 13 additions & 13 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{
fs_ext, pausable_failpoint,
fs_ext,
guard_arc_swap::GuardArcSwap,
pausable_failpoint,
postgres_client::PostgresClientProtocol,
sync::gate::{Gate, GateGuard},
};
Expand Down Expand Up @@ -351,8 +353,8 @@ pub struct Timeline {
// though let's keep them both for better error visibility.
pub initdb_lsn: Lsn,

/// When did we last calculate the partitioning? Make it pub to test cases.
pub(super) partitioning: tokio::sync::Mutex<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,
/// The repartitioning result. Allows a single writer and multiple readers.
pub(crate) partitioning: GuardArcSwap<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,

/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
Expand Down Expand Up @@ -2335,7 +2337,8 @@ impl Timeline {
// initial logical size is 0.
LogicalSize::empty_initial()
},
partitioning: tokio::sync::Mutex::new((

partitioning: GuardArcSwap::new((
(KeyPartitioning::new(), KeyPartitioning::new().into_sparse()),
Lsn(0),
)),
Expand Down Expand Up @@ -4022,18 +4025,15 @@ impl Timeline {
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
let Ok(mut guard) = self.partitioning.try_write_guard() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
// and hence before the compaction task starts.
// Note that there are a third "caller" that will take the `partitioning` lock. It is `gc_compaction_split_jobs` for
// gc-compaction where it uses the repartition data to determine the split jobs. In the future, it might use its own
// heuristics, but for now, we should allow concurrent access to it and let the caller retry compaction.
return Err(CompactionError::Other(anyhow!(
"repartition() called concurrently, this is rare and a retry should be fine"
"repartition() called concurrently"
)));
};
let ((dense_partition, sparse_partition), partition_lsn) = &*partitioning_guard;
let ((dense_partition, sparse_partition), partition_lsn) = &*guard.read();
if lsn < *partition_lsn {
return Err(CompactionError::Other(anyhow!(
"repartition() called with LSN going backwards, this should not happen"
Expand Down Expand Up @@ -4061,9 +4061,9 @@ impl Timeline {
let sparse_partitioning = SparseKeyPartitioning {
parts: vec![sparse_ks],
}; // no partitioning for metadata keys for now
*partitioning_guard = ((dense_partitioning, sparse_partitioning), lsn);

Ok((partitioning_guard.0.clone(), partitioning_guard.1))
let result = ((dense_partitioning, sparse_partitioning), lsn);
guard.write(result.clone());
Ok(result)
}

// Is it time to create a new image layer for the given partition?
Expand Down
7 changes: 1 addition & 6 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2111,12 +2111,7 @@ impl Timeline {
let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let ((dense_ks, sparse_ks), _) = {
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock during gc-compaction");
};
partition.clone()
};
let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
// Truncate the key range to be within user specified compaction range.
fn truncate_to(
source_start: &Key,
Expand Down
Loading