Skip to content

Commit

Permalink
a little refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <chi@neon.tech>
  • Loading branch information
skyzh committed Dec 8, 2024
1 parent 43c3492 commit 4b01a48
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 85 deletions.
9 changes: 4 additions & 5 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2041,13 +2041,12 @@ async fn timeline_compact_handler(
.map(|r| r.sub_compaction)
.unwrap_or(false);
let options = CompactOptions {
compact_range: compact_request
compact_key_range: compact_request
.as_ref()
.and_then(|r| r.compact_range.clone()),
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
compact_above_lsn: compact_request
.and_then(|r| r.compact_key_range.clone()),
compact_lsn_range: compact_request
.as_ref()
.and_then(|r| r.compact_above_lsn.as_ref().map(|x| x.0)),
.and_then(|r| r.compact_lsn_range.clone()),
flags,
sub_compaction,
};
Expand Down
21 changes: 19 additions & 2 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use timeline::compaction::GcCompactJob;
use timeline::compaction::ScheduledCompactionTask;
use timeline::import_pgdata;
use timeline::offload::offload_timeline;
Expand Down Expand Up @@ -3017,7 +3018,9 @@ impl Tenant {
} else if next_scheduled_compaction_task.options.sub_compaction {
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = timeline
.gc_compaction_split_jobs(next_scheduled_compaction_task.options)
.gc_compaction_split_jobs(GcCompactJob::from_compact_options(
next_scheduled_compaction_task.options,
))
.await
.map_err(CompactionError::Other)?;
if jobs.is_empty() {
Expand All @@ -3029,7 +3032,21 @@ impl Tenant {
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
for (idx, job) in jobs.into_iter().enumerate() {
tline_pending_tasks.push_back(ScheduledCompactionTask {
options: job,
// TODO: use a single `GcCompactJob` here instead of converting it back
// to `CompactOptions`
options: CompactOptions {
flags: if job.dry_run {
let mut flags: EnumSet<CompactFlags> =
EnumSet::default();
flags |= CompactFlags::DryRun;
flags
} else {
EnumSet::default()
},
sub_compaction: false,
compact_key_range: Some(job.compact_key_range.into()),
compact_lsn_range: Some(job.compact_lsn_range.into()),
},
result_tx: if idx == jobs_len - 1 {
// The last compaction job sends the completion signal
next_scheduled_compaction_task.result_tx.take()
Expand Down
67 changes: 51 additions & 16 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,35 +780,71 @@ pub(crate) enum CompactFlags {
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactRequest {
pub compact_range: Option<CompactRange>,
pub compact_below_lsn: Option<Lsn>,
pub compact_above_lsn: Option<Lsn>,
pub compact_key_range: Option<CompactKeyRange>,
pub compact_lsn_range: Option<CompactLsnRange>,
/// Whether the compaction job should be scheduled.
#[serde(default)]
pub scheduled: bool,
/// Whether the compaction job should be split across key ranges.
#[serde(default)]
pub sub_compaction: bool,
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}

impl From<Range<Key>> for CompactRange {
impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}

impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
CompactRange {
Self {
start: range.start,
end: range.end,
}
}
}

impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}

impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}

#[derive(Debug, Clone, Default)]
pub(crate) struct CompactOptions {
pub flags: EnumSet<CompactFlags>,
/// If set, the compaction will only compact the key range specified by this option.
/// This option is only used by GC compaction.
pub compact_range: Option<CompactRange>,
/// If set, the compaction will only compact the LSN below this value.
/// This option is only used by GC compaction.
pub compact_below_lsn: Option<Lsn>,
/// If set, the compaction will only compact the LSN above this value.
/// This option is only used by GC compaction.
pub compact_above_lsn: Option<Lsn>,
/// This option is only used by GC compaction. For the full explanation, see [`GcCompactionJob`].
pub compact_key_range: Option<CompactKeyRange>,
/// If set, the compaction will only compact the LSN within this value.
/// This option is only used by GC compaction. For the full explanation, see [`GcCompactionJob`].
pub compact_lsn_range: Option<CompactLsnRange>,
/// Enable sub-compaction (split compaction job across key ranges).
/// This option is only used by GC compaction.
pub sub_compaction: bool,
Expand Down Expand Up @@ -1633,9 +1669,8 @@ impl Timeline {
cancel,
CompactOptions {
flags,
compact_range: None,
compact_below_lsn: None,
compact_above_lsn: None,
compact_key_range: None,
compact_lsn_range: None,
sub_compaction: false,
},
ctx,
Expand Down
Loading

0 comments on commit 4b01a48

Please sign in to comment.