Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction: unify key and value reference vecs #4888

Merged
merged 14 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 43 additions & 56 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,25 +549,10 @@ impl DeltaLayer {
&self.layer_name(),
)
}

/// Obtains all keys and value references stored in the layer
/// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
///
/// The value can be obtained via the [`ValueRef::load`] function.
pub async fn load_val_refs(
&self,
ctx: &RequestContext,
) -> Result<Vec<(Key, Lsn, ValueRef<Arc<DeltaLayerInner>>)>> {
let inner = self
.load(LayerAccessKind::Iter, ctx)
.await
.context("load delta layer")?;
DeltaLayerInner::load_val_refs(inner)
.await
.context("Layer index is corrupted")
}

/// Loads all keys stored in the layer. Returns key, lsn and value size.
pub async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, u64)>> {
pub async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.await
Expand Down Expand Up @@ -711,6 +696,17 @@ impl DeltaLayerWriterInner {
.metadata()
.context("get file metadata to determine size")?;

// 5GB limit for objects without multipart upload (which we don't want to use)
// Make it a little bit below to account for differing GB units
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path.display(),
metadata.len()
);

// Note: Because we opened the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.file here. The first read will have to re-open it.
Expand Down Expand Up @@ -955,15 +951,17 @@ impl DeltaLayerInner {
}
}

pub(super) async fn load_val_refs<T: AsRef<DeltaLayerInner> + Clone>(
pub(super) async fn load_keys<T: AsRef<DeltaLayerInner> + Clone>(
this: &T,
) -> Result<Vec<(Key, Lsn, ValueRef<T>)>> {
) -> Result<Vec<DeltaEntry>> {
let dl = this.as_ref();
let file = &dl.file;

let tree_reader =
DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file);

let mut all_offsets = Vec::<(Key, Lsn, ValueRef<T>)>::new();
let mut all_keys: Vec<DeltaEntry> = Vec::new();

tree_reader
.visit(
&[0u8; DELTA_KEY_SIZE],
Expand All @@ -972,56 +970,45 @@ impl DeltaLayerInner {
let delta_key = DeltaKey::from_slice(key);
let val_ref = ValueRef {
blob_ref: BlobRef(value),
reader: BlockCursor::new(Adapter(this.clone())),
reader: BlockCursor::new(Adapter(dl)),
};
all_offsets.push((delta_key.key(), delta_key.lsn(), val_ref));
true
},
)
.await?;

Ok(all_offsets)
}

pub(super) async fn load_keys(&self) -> Result<Vec<(Key, Lsn, u64)>> {
let file = &self.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
file,
);

let mut all_keys: Vec<(Key, Lsn, u64)> = Vec::new();
tree_reader
.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
|key, value| {
let delta_key = DeltaKey::from_slice(key);
let pos = BlobRef(value).pos();
if let Some(last) = all_keys.last_mut() {
if last.0 == delta_key.key() {
return true;
} else {
// subtract offset of new key BLOB and first blob of this key
// to get total size if values associated with this key
let first_pos = last.2;
last.2 = pos - first_pos;
}
// subtract offset of the current and last entries to get the size
// of the value associated with this (key, lsn) tuple
let first_pos = last.size;
last.size = pos - first_pos;
}
all_keys.push((delta_key.key(), delta_key.lsn(), pos));
let entry = DeltaEntry {
key: delta_key.key(),
lsn: delta_key.lsn(),
size: pos,
val: val_ref,
};
all_keys.push(entry);
true
},
)
.await?;
if let Some(last) = all_keys.last_mut() {
// Last key occupies all space till end of layer
last.2 = std::fs::metadata(&file.file.path)?.len() - last.2;
// Last key occupies all space till end of value storage,
// which corresponds to beginning of the index
last.size = dl.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
}
Ok(all_keys)
}
}

/// A set of data associated with a delta layer key and its value
pub struct DeltaEntry<'a> {
pub key: Key,
pub lsn: Lsn,
/// Size of the stored value
pub size: u64,
/// Reference to the on-disk value
pub val: ValueRef<&'a DeltaLayerInner>,
}

/// Reference to an on-disk value
pub struct ValueRef<T: AsRef<DeltaLayerInner>> {
blob_ref: BlobRef,
Expand Down
81 changes: 51 additions & 30 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::context::{
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
LayerAccessStats, LayerFileName, RemoteLayer,
Expand Down Expand Up @@ -3312,10 +3313,10 @@ struct CompactLevel0Phase1StatsBuilder {
timeline_id: Option<TimelineId>,
read_lock_acquisition_micros: DurationRecorder,
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
read_lock_held_key_sort_micros: DurationRecorder,
read_lock_held_prerequisites_micros: DurationRecorder,
read_lock_held_compute_holes_micros: DurationRecorder,
read_lock_drop_micros: DurationRecorder,
prepare_iterators_micros: DurationRecorder,
write_layer_files_micros: DurationRecorder,
level0_deltas_count: Option<usize>,
new_deltas_count: Option<usize>,
Expand All @@ -3332,10 +3333,10 @@ struct CompactLevel0Phase1Stats {
timeline_id: TimelineId,
read_lock_acquisition_micros: RecordedDuration,
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
read_lock_held_key_sort_micros: RecordedDuration,
read_lock_held_prerequisites_micros: RecordedDuration,
read_lock_held_compute_holes_micros: RecordedDuration,
read_lock_drop_micros: RecordedDuration,
prepare_iterators_micros: RecordedDuration,
write_layer_files_micros: RecordedDuration,
level0_deltas_count: usize,
new_deltas_count: usize,
Expand All @@ -3362,6 +3363,10 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
.read_lock_held_spawn_blocking_startup_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
read_lock_held_key_sort_micros: value
.read_lock_held_key_sort_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
read_lock_held_prerequisites_micros: value
.read_lock_held_prerequisites_micros
.into_recorded()
Expand All @@ -3374,10 +3379,6 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
.read_lock_drop_micros
.into_recorded()
.ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
prepare_iterators_micros: value
.prepare_iterators_micros
.into_recorded()
.ok_or_else(|| anyhow!("prepare_iterators_micros not set"))?,
write_layer_files_micros: value
.write_layer_files_micros
.into_recorded()
Expand Down Expand Up @@ -3547,28 +3548,24 @@ impl Timeline {
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;

let mut all_value_refs = Vec::new();
let mut all_keys = Vec::new();

for l in deltas_to_compact.iter() {
let downcast_deltas: Vec<_> = deltas_to_compact
.iter()
.map(|l| l.clone().downcast_delta_layer().expect("delta layer"))
.collect();
for dl in downcast_deltas.iter() {
// TODO: replace this with an await once we fully go async
let delta = l.clone().downcast_delta_layer().expect("delta layer");
Handle::current().block_on(async {
all_value_refs.extend(delta.load_val_refs(ctx).await?);
all_keys.extend(delta.load_keys(ctx).await?);
anyhow::Ok(())
})?;
all_keys.extend(Handle::current().block_on(DeltaLayer::load_keys(dl, ctx))?);
}

// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_value_refs.sort_by_key(|(key, lsn, _value_ref)| (*key, *lsn));
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));

// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|(key, lsn, _size)| (*key, *lsn));
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();

for (next_key, _next_lsn, _size) in all_keys.iter() {
for DeltaEntry { key: next_key, .. } in all_keys.iter() {
let next_key = *next_key;
if let Some(prev_key) = prev {
// just first fast filter
Expand All @@ -3592,8 +3589,7 @@ impl Timeline {
}
prev = Some(next_key.next());
}
stats.read_lock_held_compute_holes_micros =
stats.read_lock_held_prerequisites_micros.till_now();
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
drop_rlock(guard);
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
let mut holes = heap.into_vec();
Expand All @@ -3602,12 +3598,26 @@ impl Timeline {

// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = all_value_refs.into_iter();
let all_values_iter = all_keys.iter();

// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = all_keys.into_iter();

stats.prepare_iterators_micros = stats.read_lock_drop_micros.till_now();
let mut all_keys_iter = all_keys
.iter()
.map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
.coalesce(|mut prev, cur| {
// Coalesce keys that belong to the same key pair.
// This ensures that compaction doesn't put them
// into different layer files.
// Still limit this by the target file size,
// so that we keep the size of the files in
// check.
if prev.0 == cur.0 && prev.2 < target_file_size {
prev.2 += cur.2;
Ok(prev)
} else {
Err((prev, cur))
}
});

// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
Expand Down Expand Up @@ -3662,8 +3672,11 @@ impl Timeline {

// TODO remove this block_on wrapper once we fully go async
Handle::current().block_on(async {
for (key, lsn, value_ref) in all_values_iter {
let value = value_ref.load().await?;
for &DeltaEntry {
key, lsn, ref val, ..
} in all_values_iter
{
let value = val.load().await?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
Expand Down Expand Up @@ -3764,6 +3777,16 @@ impl Timeline {

// Sync layers
if !new_layers.is_empty() {
// Print a warning if the created layer is larger than double the target size
let warn_limit = target_file_size * 2;
for layer in new_layers.iter() {
if layer.desc.file_size > warn_limit {
warn!(
%layer,
"created delta file of size {} larger than double of target of {target_file_size}", layer.desc.file_size
);
}
}
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();

// Fsync all the layer files and directory using multiple threads to
Expand All @@ -3776,12 +3799,10 @@ impl Timeline {
layer_paths.pop().unwrap();
}

stats.write_layer_files_micros = stats.prepare_iterators_micros.till_now();
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
stats.new_deltas_count = Some(new_layers.len());
stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum());

drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed

match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
.and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
{
Expand Down
8 changes: 4 additions & 4 deletions test_runner/regress/test_remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data):
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 10000) g
FROM generate_series(1, 20000) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
Expand Down Expand Up @@ -370,7 +370,7 @@ def churn_while_failpoints_active(result):
log.info("restarting postgres to validate")
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 10000
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 20000


@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
Expand Down Expand Up @@ -418,7 +418,7 @@ def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data):
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 10000) g
FROM generate_series(1, 20000) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
Expand Down Expand Up @@ -509,7 +509,7 @@ def churn(data_pass1, data_pass2):
log.info("restarting postgres to validate")
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 10000
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 20000

# ensure that we updated the calls_started download metric
fetch_calls_started()
Expand Down