Skip to content

Commit

Permalink
pageserver: post-shard-split layer trimming (1/2) (#7572)
Browse files Browse the repository at this point in the history
## Problem

After a shard split of a large existing tenant, child tenants can end up
with oversized historic layers indefinitely, if those layers are
prevented from being GC'd by branchpoints.

This PR is followed by #7531

Related issue: #7504

## Summary of changes

- Add a new compaction phase `compact_shard_ancestors`, which identifies
layers that are no longer needed after a shard split.
- Add a Timeline->LayerMap code path called `rewrite_layers` , which is
currently only used to drop layers, but will later be used to rewrite
them as well in #7531
- Add a new test that compacts after a split, and checks that something
is deleted.

Note that this doesn't have much impact on a tenant's resident size
(since unused layers would end up evicted anyway), but it:
- Makes index_part.json much smaller
- Makes the system easier to reason about: avoid having tenants which
are like "my physical size is 4TiB but don't worry I'll never actually
download it", instead have tenants report the real physical size of what
they might download.

Why do we remove these layers in compaction rather than during the
split? Because we have existing split tenants that need cleaning up. We
can add it to the split operation in future as an optimization.
  • Loading branch information
jcsp authored May 7, 2024
1 parent ac7dc82 commit af849a1
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 3 deletions.
2 changes: 1 addition & 1 deletion libs/pageserver_api/src/keyspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl<'a> ShardedRange<'a> {
/// pages that would not actually be stored on this node.
///
/// Don't use this function in code that works with physical entities like layer files.
fn raw_size(range: &Range<Key>) -> u32 {
pub fn raw_size(range: &Range<Key>) -> u32 {
if is_contiguous_range(range) {
contiguous_range_len(range)
} else {
Expand Down
18 changes: 18 additions & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4434,6 +4434,24 @@ impl Timeline {
Ok(())
}

async fn rewrite_layers(
self: &Arc<Self>,
replace_layers: Vec<(Layer, ResidentLayer)>,
drop_layers: Vec<Layer>,
) -> anyhow::Result<()> {
let mut guard = self.layers.write().await;

guard.rewrite_layers(&replace_layers, &drop_layers, &self.metrics);

let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect();

if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_compaction_update(&drop_layers, &upload_layers)?;
}

Ok(())
}

/// Schedules the uploads of the given image layers
fn upload_new_image_layers(
self: &Arc<Self>,
Expand Down
147 changes: 145 additions & 2 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use anyhow::{anyhow, Context};
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::keyspace::ShardedRange;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};
use utils::id::TimelineId;
Expand Down Expand Up @@ -93,7 +94,7 @@ impl Timeline {
// Define partitioning schema if needed

// FIXME: the match should only cover repartitioning, not the next steps
match self
let partition_count = match self
.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
Expand Down Expand Up @@ -146,6 +147,7 @@ impl Timeline {
assert!(sparse_layers.is_empty());

self.upload_new_image_layers(dense_layers)?;
dense_partitioning.parts.len()
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
Expand All @@ -157,9 +159,150 @@ impl Timeline {
if !self.cancel.is_cancelled() {
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
}
1
}
};

if self.shard_identity.count >= ShardCount::new(2) {
// Limit the number of layer rewrites to the number of partitions: this means its
// runtime should be comparable to a full round of image layer creations, rather than
// being potentially much longer.
let rewrite_max = partition_count;

self.compact_shard_ancestors(rewrite_max, ctx).await?;
}

Ok(())
}

/// Check for layers that are elegible to be rewritten:
/// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
/// we don't indefinitely retain keys in this shard that aren't needed.
/// - For future use: layers beyond pitr_interval that are in formats we would
/// rather not maintain compatibility with indefinitely.
///
/// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
/// how much work it will try to do in each compaction pass.
async fn compact_shard_ancestors(
self: &Arc<Self>,
rewrite_max: usize,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut drop_layers = Vec::new();
let layers_to_rewrite: Vec<Layer> = Vec::new();

// We will use the PITR cutoff as a condition for rewriting layers.
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.pitr;

let layers = self.layers.read().await;
for layer_desc in layers.layer_map().iter_historic_layers() {
let layer = layers.get_from_desc(&layer_desc);
if layer.metadata().shard.shard_count == self.shard_identity.count {
// This layer does not belong to a historic ancestor, no need to re-image it.
continue;
}

// This layer was created on an ancestor shard: check if it contains any data for this shard.
let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
let layer_local_page_count = sharded_range.page_count();
let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
if layer_local_page_count == 0 {
// This ancestral layer only covers keys that belong to other shards.
// We include the full metadata in the log: if we had some critical bug that caused
// us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
info!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split, contains no keys for this shard.",
);

if cfg!(debug_assertions) {
// Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
// wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
// should be !is_key_disposable()
let range = layer_desc.get_key_range();
let mut key = range.start;
while key < range.end {
debug_assert!(self.shard_identity.is_key_disposable(&key));
key = key.next();
}
}

drop_layers.push(layer);
continue;
} else if layer_local_page_count != u32::MAX
&& layer_local_page_count == layer_raw_page_count
{
debug!(%layer,
"layer is entirely shard local ({} keys), no need to filter it",
layer_local_page_count
);
continue;
}

// Don't bother re-writing a layer unless it will at least halve its size
if layer_local_page_count != u32::MAX
&& layer_local_page_count > layer_raw_page_count / 2
{
debug!(%layer,
"layer is already mostly local ({}/{}), not rewriting",
layer_local_page_count,
layer_raw_page_count
);
}

// Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
// without incurring the I/O cost of a rewrite.
if layer_desc.get_lsn_range().end >= pitr_cutoff {
debug!(%layer, "Skipping rewrite of layer still in PITR window ({} >= {})",
layer_desc.get_lsn_range().end, pitr_cutoff);
continue;
}

if layer_desc.is_delta() {
// We do not yet implement rewrite of delta layers
debug!(%layer, "Skipping rewrite of delta layer");
continue;
}

// Only rewrite layers if they would have different remote paths: either they belong to this
// shard but an old generation, or they belonged to another shard. This also implicitly
// guarantees that the layer is persistent in remote storage (as only remote persistent
// layers are carried across shard splits, any local-only layer would be in the current generation)
if layer.metadata().generation == self.generation
&& layer.metadata().shard.shard_count == self.shard_identity.count
{
debug!(%layer, "Skipping rewrite, is not from old generation");
continue;
}

if layers_to_rewrite.len() >= rewrite_max {
tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
layers_to_rewrite.len()
);
continue;
}

// Fall through: all our conditions for doing a rewrite passed.
// TODO: implement rewriting
tracing::debug!(%layer, "Would rewrite layer");
}

// Drop the layers read lock: we will acquire it for write in [`Self::rewrite_layers`]
drop(layers);

// TODO: collect layers to rewrite
let replace_layers = Vec::new();

// Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
self.rewrite_layers(replace_layers, drop_layers).await?;

if let Some(remote_client) = self.remote_client.as_ref() {
// We wait for all uploads to complete before finishing this compaction stage. This is not
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
// load.
remote_client.wait_completion().await?;
}

Ok(())
}

Expand Down
18 changes: 18 additions & 0 deletions pageserver/src/tenant/timeline/layer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,24 @@ impl LayerManager {
updates.flush();
}

/// Called when compaction is completed.
pub(crate) fn rewrite_layers(
&mut self,
rewrite_layers: &[(Layer, ResidentLayer)],
drop_layers: &[Layer],
_metrics: &TimelineMetrics,
) {
let mut updates = self.layer_map.batch_update();

// TODO: implement rewrites (currently this code path only used for drops)
assert!(rewrite_layers.is_empty());

for l in drop_layers {
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
}
updates.flush();
}

/// Called when garbage collect has selected the layers to be removed.
pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
let mut updates = self.layer_map.batch_update();
Expand Down
61 changes: 61 additions & 0 deletions test_runner/regress/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,67 @@ def test_sharding_split_unsharded(
env.storage_controller.consistency_check()


def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder):
"""
Test that after a split, we clean up parent layer data in the child shards via compaction.
"""
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "3600s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
}

env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

# Check that we created with an unsharded TenantShardId: this is the default,
# but check it in case we change the default in future
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 0)) is not None

workload = Workload(env, tenant_id, timeline_id, branch_name="main")
workload.init()
workload.write_rows(256)
workload.validate()
workload.stop()

# Split one shard into two
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)

# Check we got the shard IDs we expected
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None

workload.validate()
workload.stop()

env.storage_controller.consistency_check()

# Cleanup part 1: while layers are still in PITR window, we should only drop layers that are fully redundant
for shard in shards:
ps = env.get_tenant_pageserver(shard)

# Invoke compaction: this should drop any layers that don't overlap with the shard's key stripes
detail_before = ps.http_client().timeline_detail(shard, timeline_id)
ps.http_client().timeline_compact(shard, timeline_id)
detail_after = ps.http_client().timeline_detail(shard, timeline_id)

# Physical size should shrink because some layers have been dropped
assert detail_after["current_physical_size"] < detail_before["current_physical_size"]

# Compaction shouldn't make anything unreadable
workload.validate()


def test_sharding_split_smoke(
neon_env_builder: NeonEnvBuilder,
):
Expand Down

1 comment on commit af849a1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2973 tests run: 2830 passed, 3 failed, 140 skipped (full report)


Failures on Postgres 14

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release
  • test_bulk_tenant_create[github-actions-selfhosted-5]: release
  • test_parallel_copy_different_tables[neon-github-actions-selfhosted]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg14-github-actions-selfhosted] or test_bulk_tenant_create[release-pg14-github-actions-selfhosted-5] or test_parallel_copy_different_tables[neon-release-pg14-github-actions-selfhosted]"
Flaky tests (3)

Postgres 15

  • test_partial_evict_tenant[relative_spare]: release

Postgres 14

Code coverage* (full report)

  • functions: 31.4% (6242 of 19887 functions)
  • lines: 47.0% (46747 of 99413 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
af849a1 at 2024-05-07T11:56:54.050Z :recycle:

Please sign in to comment.