Skip to content

Commit

Permalink
refactor to use the btree iterator API
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <chi@neon.tech>
  • Loading branch information
skyzh committed Jun 24, 2024
1 parent 8558e84 commit 62786da
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 79 deletions.
1 change: 1 addition & 0 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl<'a> BlockCursor<'a> {
///
/// The file is assumed to be immutable. This doesn't provide any functions
/// for modifying the file, nor for invalidating the cache if it is modified.
#[derive(Clone)]
pub struct FileBlockReader<'a> {
pub file: &'a VirtualFile,

Expand Down
26 changes: 16 additions & 10 deletions pageserver/src/tenant/disk_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl<'a, const L: usize> OnDiskNode<'a, L> {
///
/// Public reader object, to search the tree.
///
#[derive(Clone)]
pub struct DiskBtreeReader<R, const L: usize>
where
R: BlockReader,
Expand Down Expand Up @@ -259,27 +260,32 @@ where
Ok(result)
}

pub fn iter<'a>(
&'a self,
start_key: &'a [u8; L],
ctx: &'a RequestContext,
) -> DiskBtreeIterator<'a> {
pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
where
R: 'a,
{
DiskBtreeIterator {
stream: Box::pin(self.get_stream_from(start_key, ctx)),
stream: Box::pin(self.into_stream(start_key, ctx)),
}
}

/// Return a stream which yields all key, value pairs from the index
/// starting from the first key greater or equal to `start_key`.
///
/// Note that this is a copy of [`Self::visit`].
/// Note 1: that this is a copy of [`Self::visit`].
/// TODO: Once the sequential read path is removed this will become
/// the only index traversal method.
pub fn get_stream_from<'a>(
&'a self,
///
/// Note 2: this function used to take `&self` but due to the iterator-based API refactor,
/// it now consumes `self`. Feel free to add the `&self` variant back if it's necessary.
pub fn into_stream<'a>(
self,
start_key: &'a [u8; L],
ctx: &'a RequestContext,
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a
where
R: 'a,
{
try_stream! {
let mut stack = Vec::new();
stack.push((self.root_blk, None));
Expand Down
18 changes: 9 additions & 9 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ impl DeltaLayerInner {
);
let mut result = Vec::new();
let mut stream =
Box::pin(self.stream_index_forwards(&index_reader, &[0; DELTA_KEY_SIZE], ctx));
Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx));
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let cursor = block_reader.block_cursor();
let mut buf = Vec::new();
Expand Down Expand Up @@ -976,7 +976,7 @@ impl DeltaLayerInner {
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>>
where
Reader: BlockReader,
Reader: BlockReader + Clone,
{
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
Expand All @@ -986,7 +986,7 @@ impl DeltaLayerInner {
let mut range_end_handled = false;

let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
let index_stream = index_reader.get_stream_from(&start_key.0, &ctx);
let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
let mut index_stream = std::pin::pin!(index_stream);

while let Some(index_entry) = index_stream.next().await {
Expand Down Expand Up @@ -1241,7 +1241,7 @@ impl DeltaLayerInner {
block_reader,
);

let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
// put in a sentinel value for getting the end offset for last item, and not having to
// repeat the whole read part
Expand Down Expand Up @@ -1459,17 +1459,17 @@ impl DeltaLayerInner {

fn stream_index_forwards<'a, R>(
&'a self,
reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
start: &'a [u8; DELTA_KEY_SIZE],
ctx: &'a RequestContext,
) -> impl futures::stream::Stream<
Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
> + 'a
where
R: BlockReader,
R: BlockReader + 'a,
{
use futures::stream::TryStreamExt;
let stream = reader.get_stream_from(start, ctx);
let stream = reader.into_stream(start, ctx);
stream.map_ok(|(key, value)| {
let key = DeltaKey::from_slice(&key);
let (key, lsn) = (key.key(), key.lsn());
Expand Down Expand Up @@ -2073,7 +2073,7 @@ mod test {
source.index_root_blk,
&source_reader,
);
let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
let source_stream = source_stream.filter(|res| match res {
Ok((_, lsn, _)) => ready(lsn < &truncated_at),
_ => ready(true),
Expand All @@ -2086,7 +2086,7 @@ mod test {
truncated.index_root_blk,
&truncated_reader,
);
let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
let mut truncated_stream = std::pin::pin!(truncated_stream);

let mut scratch_left = Vec::new();
Expand Down
130 changes: 70 additions & 60 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ impl ImageLayerInner {
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
let mut result = Vec::new();
let mut stream = Box::pin(tree_reader.get_stream_from(&[0; KEY_SIZE], ctx));
let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let cursor = block_reader.block_cursor();
while let Some(item) = stream.next().await {
Expand Down Expand Up @@ -544,7 +544,7 @@ impl ImageLayerInner {
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
range.start.write_to_byte_slice(&mut search_key);

let index_stream = tree_reader.get_stream_from(&search_key, &ctx);
let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
let mut index_stream = std::pin::pin!(index_stream);

while let Some(index_entry) = index_stream.next().await {
Expand Down Expand Up @@ -691,14 +691,15 @@ impl ImageLayerInner {
}

#[cfg(test)]
pub(crate) fn iter<'a, 'ctx>(
&'a self,
ctx: &'ctx RequestContext,
) -> ImageLayerIterator<'a, 'ctx> {
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
ImageLayerIterator {
image_layer: self,
ctx,
next_batch_start_key: Key::MIN,
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
last_batch_last_item: None,
key_values_batch: Vec::new(),
next_idx_in_batch: 0,
is_end: false,
Expand Down Expand Up @@ -961,11 +962,12 @@ impl Drop for ImageLayerWriter {
}

#[cfg(test)]
pub struct ImageLayerIterator<'a, 'ctx> {
pub struct ImageLayerIterator<'a> {
image_layer: &'a ImageLayerInner,
ctx: &'ctx RequestContext,
next_batch_start_key: Key,
ctx: &'a RequestContext,
index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>,
key_values_batch: Vec<(Key, Lsn, Value)>,
last_batch_last_item: Option<(Vec<u8>, u64)>,
next_idx_in_batch: usize,
is_end: bool,
/// Limit of number of keys per batch
Expand All @@ -975,66 +977,61 @@ pub struct ImageLayerIterator<'a, 'ctx> {
}

#[cfg(test)]
impl<'a, 'ctx> ImageLayerIterator<'a, 'ctx> {
impl<'a> ImageLayerIterator<'a> {
/// Retrieve a batch of key-value pairs into the iterator buffer.
async fn next_batch(&mut self) -> anyhow::Result<()> {
assert!(self.next_idx_in_batch >= self.key_values_batch.len());
assert!(!self.is_end);
self.key_values_batch.clear();
self.next_idx_in_batch = 0;

let block_reader = FileBlockReader::new(&self.image_layer.file, self.image_layer.file_id);
let tree_reader = DiskBtreeReader::new(
self.image_layer.index_start_blk,
self.image_layer.index_root_blk,
&block_reader,
);
let mut search_key = [0; KEY_SIZE];
self.next_batch_start_key
.write_to_byte_slice(&mut search_key);
// We want to have exactly one read syscall (plus several others for index lookup) for each `next_batch` call.
// Therefore, we enforce `self.max_read_size` by ourselves instead of using the VectoredReadPlanner's capability,
// to avoid splitting into two I/Os.
let mut read_planner = VectoredReadPlanner::new_caller_controlled_max_limit();
let mut cnt = 0;
let mut start_pos = None;
let mut begin_offset =
if let Some((last_key, last_offset)) = self.last_batch_last_item.take() {
read_planner.handle(
Key::from_slice(&last_key[..KEY_SIZE]),
self.image_layer.lsn,
last_offset,
BlobFlag::None,
);
cnt += 1;
Some(last_offset)
} else {
None
};
let mut range_end_handled = false;
// TODO: dedup with vectored read?
tree_reader
.visit(
&search_key,
VisitDirection::Forwards,
|raw_key, offset| {
if start_pos
.map(|start_pos| offset - start_pos >= self.max_read_size)
.unwrap_or(false)
|| cnt >= self.batch_size
{
// At this point, the I/O size might have already exceeded `self.max_read_size`. This is fine. We do not expect
// `self.max_read_size` to be a hard limit.
read_planner.handle_range_end(offset);
range_end_handled = true;
return false;
}
start_pos = Some(offset);
read_planner.handle(
Key::from_slice(&raw_key[..KEY_SIZE]),
self.image_layer.lsn,
offset,
BlobFlag::None,
);
cnt += 1;
true
},
self.ctx,
)
.await?;
while let Some(res) = self.index_iter.next().await {
let (raw_key, offset) = res?;
if begin_offset.is_none() {
begin_offset = Some(offset);
}
if (offset - begin_offset.unwrap() >= self.max_read_size && cnt >= 1/* ensure at least one key is in the batch */)
|| cnt >= self.batch_size
{
// We either reach the limit of max_read_size, or the batch will have more than batch_size items if we read this key.
self.last_batch_last_item = Some((raw_key, offset)); // Handle the current key in the next batch, exclude it from this batch.
read_planner.handle_range_end(offset); // Use the current offset to finish the read plan.
range_end_handled = true;
break;
}
read_planner.handle(
Key::from_slice(&raw_key[..KEY_SIZE]),
self.image_layer.lsn,
offset,
BlobFlag::None,
);
cnt += 1;
}
if cnt == 0 {
self.is_end = true;
return Ok(());
}
if !range_end_handled {
let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
self.last_batch_last_item = None;
read_planner.handle_range_end(payload_end);
}
let plan = read_planner.finish();
Expand All @@ -1052,8 +1049,6 @@ impl<'a, 'ctx> ImageLayerIterator<'a, 'ctx> {
next_batch.push((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
}
}
let (last_key, _, _) = next_batch.last().unwrap();
self.next_batch_start_key = last_key.next();
self.key_values_batch = next_batch;
Ok(())
}
Expand Down Expand Up @@ -1306,7 +1301,7 @@ mod test {
}

async fn assert_img_iter_equal(
img_iter: &mut ImageLayerIterator<'_, '_>,
img_iter: &mut ImageLayerIterator<'_>,
expect: &[(Key, Bytes)],
expect_lsn: Lsn,
) {
Expand Down Expand Up @@ -1352,11 +1347,26 @@ mod test {
.await
.unwrap();
let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size}");
let mut iter = img_layer.iter(&ctx);
iter.batch_size = batch_size;
assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
for max_read_size in [1, 1024] {
for batch_size in [1, 2, 4, 8, 3, 7, 13] {
println!("running with batch_size={batch_size} max_read_size={max_read_size}");
// Test if the batch size is correctly determined
let mut iter = img_layer.iter(&ctx);
iter.batch_size = batch_size;
iter.max_read_size = max_read_size;
iter.next_batch().await.unwrap();
if max_read_size == 1 {
// every key should be a batch b/c the value is larger than max_read_size
assert_eq!(iter.key_values_batch.len(), 1);
} else {
assert_eq!(iter.key_values_batch.len(), batch_size);
}
// Test if the result is correct
let mut iter = img_layer.iter(&ctx);
iter.batch_size = batch_size;
iter.max_read_size = max_read_size;
assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
}
}
}
}

0 comments on commit 62786da

Please sign in to comment.