Skip to content

Commit

Permalink
Compaction: sort on slices directly instead of kmerge (#4839)
Browse files Browse the repository at this point in the history
## Problem

The k-merge in pageserver compaction currently relies on iterators over
the keys and also over the values. This approach does not support async
code because we are using iterators and those don't support async in
general. Also, the k-merge implementation we use doesn't support async
either. Instead, as we already load all the keys into memory, just do
sorting in-memory.

## Summary of changes

The PR can be read commit-by-commit, but most importantly, it:

* Stops using kmerge in compaction, using slice sorting instead.
* Makes `load_keys` and `load_val_refs` async, using `Handle::block_on`
in the compaction code as we don't want to turn the compaction function,
called inside `spawn_blocking`, into an async fn.

Builds on top of #4836, part of
#4743
  • Loading branch information
arpad-m authored Aug 3, 2023
1 parent df49a9b commit 416c14b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 44 deletions.
4 changes: 2 additions & 2 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,15 +614,15 @@ impl DeltaLayer {
/// Obtains all keys and value references stored in the layer
///
/// The value can be obtained via the [`ValueRef::load`] function.
pub fn load_val_refs(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, ValueRef)>> {
pub async fn load_val_refs(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, ValueRef)>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.context("load delta layer")?;
DeltaLayerInner::load_val_refs(inner).context("Layer index is corrupted")
}

/// Loads all keys stored in the layer. Returns key, lsn and value size.
pub fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, u64)>> {
pub async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, u64)>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.context("load delta layer keys")?;
Expand Down
80 changes: 38 additions & 42 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use pageserver_api::models::{
use remote_storage::GenericRemoteStorage;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::{oneshot, watch, TryAcquireError};
use tokio_util::sync::CancellationToken;
use tracing::*;
Expand Down Expand Up @@ -3518,16 +3519,41 @@ impl Timeline {
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
for (next_key, _next_lsn, _size) in itertools::process_results(
deltas_to_compact.iter().map(|l| -> Result<_> {
Ok(l.clone()
.downcast_delta_layer()
.expect("delta layer")
.load_keys(ctx)?
.into_iter())
}),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 < b.0),
)? {

let mut all_value_refs = Vec::new();
for l in deltas_to_compact.iter() {
// TODO: replace this with an await once we fully go async
all_value_refs.extend(
Handle::current().block_on(
l.clone()
.downcast_delta_layer()
.expect("delta layer")
.load_val_refs(ctx),
)?,
);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_value_refs.sort_by_key(|(key, _lsn, _value_ref)| *key);

let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
// TODO: replace this with an await once we fully go async
all_keys.extend(
Handle::current().block_on(
l.clone()
.downcast_delta_layer()
.expect("delta layer")
.load_keys(ctx),
)?,
);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|(key, _lsn, _size)| *key);

for (next_key, _next_lsn, _size) in all_keys.iter() {
let next_key = *next_key;
if let Some(prev_key) = prev {
// just first fast filter
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
Expand Down Expand Up @@ -3560,40 +3586,10 @@ impl Timeline {

// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = itertools::process_results(
deltas_to_compact.iter().map(|l| -> Result<_> {
Ok(l.clone()
.downcast_delta_layer()
.expect("delta layer")
.load_val_refs(ctx)?
.into_iter())
}),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
let (b_key, b_lsn, _) = b;
(a_key, a_lsn) < (b_key, b_lsn)
})
},
)?;
let all_values_iter = all_value_refs.into_iter();

// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = itertools::process_results(
deltas_to_compact.iter().map(|l| -> Result<_> {
Ok(l.clone()
.downcast_delta_layer()
.expect("delta layer")
.load_keys(ctx)?
.into_iter())
}),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
let (b_key, b_lsn, _) = b;
(a_key, a_lsn) < (b_key, b_lsn)
})
},
)?;
let mut all_keys_iter = all_keys.into_iter();

stats.prepare_iterators_micros = stats.read_lock_drop_micros.till_now();

Expand Down

1 comment on commit 416c14b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1332 tests run: 1274 passed, 0 failed, 58 skipped (full report)


Please sign in to comment.