From 62786da9b3b26c221c5851b4e326e49667d3befe Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Mon, 24 Jun 2024 15:25:21 -0400 Subject: [PATCH] refactor to use the btree iterator API Signed-off-by: Alex Chi Z --- pageserver/src/tenant/block_io.rs | 1 + pageserver/src/tenant/disk_btree.rs | 26 ++-- .../src/tenant/storage_layer/delta_layer.rs | 18 +-- .../src/tenant/storage_layer/image_layer.rs | 130 ++++++++++-------- 4 files changed, 96 insertions(+), 79 deletions(-) diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 92928116c1f6..b406d5033243 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -160,6 +160,7 @@ impl<'a> BlockCursor<'a> { /// /// The file is assumed to be immutable. This doesn't provide any functions /// for modifying the file, nor for invalidating the cache if it is modified. +#[derive(Clone)] pub struct FileBlockReader<'a> { pub file: &'a VirtualFile, diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 119df3e6c408..0ae9c08dcf0c 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -212,6 +212,7 @@ impl<'a, const L: usize> OnDiskNode<'a, L> { /// /// Public reader object, to search the tree. /// +#[derive(Clone)] pub struct DiskBtreeReader where R: BlockReader, @@ -259,27 +260,32 @@ where Ok(result) } - pub fn iter<'a>( - &'a self, - start_key: &'a [u8; L], - ctx: &'a RequestContext, - ) -> DiskBtreeIterator<'a> { + pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a> + where + R: 'a, + { DiskBtreeIterator { - stream: Box::pin(self.get_stream_from(start_key, ctx)), + stream: Box::pin(self.into_stream(start_key, ctx)), } } /// Return a stream which yields all key, value pairs from the index /// starting from the first key greater or equal to `start_key`. /// - /// Note that this is a copy of [`Self::visit`]. + /// Note 1: that this is a copy of [`Self::visit`]. /// TODO: Once the sequential read path is removed this will become /// the only index traversal method. - pub fn get_stream_from<'a>( - &'a self, + /// + /// Note 2: this function used to take `&self` but due to the iterator-based API refactor, + /// it now consumes `self`. Feel free to add the `&self` variant back if it's necessary. + pub fn into_stream<'a>( + self, start_key: &'a [u8; L], ctx: &'a RequestContext, - ) -> impl Stream, u64), DiskBtreeError>> + 'a { + ) -> impl Stream, u64), DiskBtreeError>> + 'a + where + R: 'a, + { try_stream! { let mut stack = Vec::new(); stack.push((self.root_blk, None)); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 2b04971ba9d5..ab3ef4980fbe 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -941,7 +941,7 @@ impl DeltaLayerInner { ); let mut result = Vec::new(); let mut stream = - Box::pin(self.stream_index_forwards(&index_reader, &[0; DELTA_KEY_SIZE], ctx)); + Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx)); let block_reader = FileBlockReader::new(&self.file, self.file_id); let cursor = block_reader.block_cursor(); let mut buf = Vec::new(); @@ -976,7 +976,7 @@ impl DeltaLayerInner { ctx: &RequestContext, ) -> anyhow::Result> where - Reader: BlockReader, + Reader: BlockReader + Clone, { let ctx = RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::DeltaLayerBtreeNode) @@ -986,7 +986,7 @@ impl DeltaLayerInner { let mut range_end_handled = false; let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start); - let index_stream = index_reader.get_stream_from(&start_key.0, &ctx); + let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx); let mut index_stream = std::pin::pin!(index_stream); while let Some(index_entry) = index_stream.next().await { @@ -1241,7 +1241,7 @@ impl DeltaLayerInner { block_reader, ); - let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx); + let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx); let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos)); // put in a sentinel value for getting the end offset for last item, and not having to // repeat the whole read part @@ -1459,17 +1459,17 @@ impl DeltaLayerInner { fn stream_index_forwards<'a, R>( &'a self, - reader: &'a DiskBtreeReader, + reader: DiskBtreeReader, start: &'a [u8; DELTA_KEY_SIZE], ctx: &'a RequestContext, ) -> impl futures::stream::Stream< Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>, > + 'a where - R: BlockReader, + R: BlockReader + 'a, { use futures::stream::TryStreamExt; - let stream = reader.get_stream_from(start, ctx); + let stream = reader.into_stream(start, ctx); stream.map_ok(|(key, value)| { let key = DeltaKey::from_slice(&key); let (key, lsn) = (key.key(), key.lsn()); @@ -2073,7 +2073,7 @@ mod test { source.index_root_blk, &source_reader, ); - let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx); + let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx); let source_stream = source_stream.filter(|res| match res { Ok((_, lsn, _)) => ready(lsn < &truncated_at), _ => ready(true), @@ -2086,7 +2086,7 @@ mod test { truncated.index_root_blk, &truncated_reader, ); - let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx); + let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx); let mut truncated_stream = std::pin::pin!(truncated_stream); let mut scratch_left = Vec::new(); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 4f26d4fcb8d3..66b9a021956b 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -495,7 +495,7 @@ impl ImageLayerInner { let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader); let mut result = Vec::new(); - let mut stream = Box::pin(tree_reader.get_stream_from(&[0; KEY_SIZE], ctx)); + let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx)); let block_reader = FileBlockReader::new(&self.file, self.file_id); let cursor = block_reader.block_cursor(); while let Some(item) = stream.next().await { @@ -544,7 +544,7 @@ impl ImageLayerInner { let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; range.start.write_to_byte_slice(&mut search_key); - let index_stream = tree_reader.get_stream_from(&search_key, &ctx); + let index_stream = tree_reader.clone().into_stream(&search_key, &ctx); let mut index_stream = std::pin::pin!(index_stream); while let Some(index_entry) = index_stream.next().await { @@ -691,14 +691,15 @@ impl ImageLayerInner { } #[cfg(test)] - pub(crate) fn iter<'a, 'ctx>( - &'a self, - ctx: &'ctx RequestContext, - ) -> ImageLayerIterator<'a, 'ctx> { + pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> { + let block_reader = FileBlockReader::new(&self.file, self.file_id); + let tree_reader = + DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader); ImageLayerIterator { image_layer: self, ctx, - next_batch_start_key: Key::MIN, + index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx), + last_batch_last_item: None, key_values_batch: Vec::new(), next_idx_in_batch: 0, is_end: false, @@ -961,11 +962,12 @@ impl Drop for ImageLayerWriter { } #[cfg(test)] -pub struct ImageLayerIterator<'a, 'ctx> { +pub struct ImageLayerIterator<'a> { image_layer: &'a ImageLayerInner, - ctx: &'ctx RequestContext, - next_batch_start_key: Key, + ctx: &'a RequestContext, + index_iter: crate::tenant::disk_btree::DiskBtreeIterator<'a>, key_values_batch: Vec<(Key, Lsn, Value)>, + last_batch_last_item: Option<(Vec, u64)>, next_idx_in_batch: usize, is_end: bool, /// Limit of number of keys per batch @@ -975,66 +977,61 @@ pub struct ImageLayerIterator<'a, 'ctx> { } #[cfg(test)] -impl<'a, 'ctx> ImageLayerIterator<'a, 'ctx> { +impl<'a> ImageLayerIterator<'a> { /// Retrieve a batch of key-value pairs into the iterator buffer. async fn next_batch(&mut self) -> anyhow::Result<()> { assert!(self.next_idx_in_batch >= self.key_values_batch.len()); assert!(!self.is_end); self.key_values_batch.clear(); self.next_idx_in_batch = 0; - - let block_reader = FileBlockReader::new(&self.image_layer.file, self.image_layer.file_id); - let tree_reader = DiskBtreeReader::new( - self.image_layer.index_start_blk, - self.image_layer.index_root_blk, - &block_reader, - ); - let mut search_key = [0; KEY_SIZE]; - self.next_batch_start_key - .write_to_byte_slice(&mut search_key); // We want to have exactly one read syscall (plus several others for index lookup) for each `next_batch` call. // Therefore, we enforce `self.max_read_size` by ourselves instead of using the VectoredReadPlanner's capability, // to avoid splitting into two I/Os. let mut read_planner = VectoredReadPlanner::new_caller_controlled_max_limit(); let mut cnt = 0; - let mut start_pos = None; + let mut begin_offset = + if let Some((last_key, last_offset)) = self.last_batch_last_item.take() { + read_planner.handle( + Key::from_slice(&last_key[..KEY_SIZE]), + self.image_layer.lsn, + last_offset, + BlobFlag::None, + ); + cnt += 1; + Some(last_offset) + } else { + None + }; let mut range_end_handled = false; - // TODO: dedup with vectored read? - tree_reader - .visit( - &search_key, - VisitDirection::Forwards, - |raw_key, offset| { - if start_pos - .map(|start_pos| offset - start_pos >= self.max_read_size) - .unwrap_or(false) - || cnt >= self.batch_size - { - // At this point, the I/O size might have already exceeded `self.max_read_size`. This is fine. We do not expect - // `self.max_read_size` to be a hard limit. - read_planner.handle_range_end(offset); - range_end_handled = true; - return false; - } - start_pos = Some(offset); - read_planner.handle( - Key::from_slice(&raw_key[..KEY_SIZE]), - self.image_layer.lsn, - offset, - BlobFlag::None, - ); - cnt += 1; - true - }, - self.ctx, - ) - .await?; + while let Some(res) = self.index_iter.next().await { + let (raw_key, offset) = res?; + if begin_offset.is_none() { + begin_offset = Some(offset); + } + if (offset - begin_offset.unwrap() >= self.max_read_size && cnt >= 1/* ensure at least one key is in the batch */) + || cnt >= self.batch_size + { + // We either reach the limit of max_read_size, or the batch will have more than batch_size items if we read this key. + self.last_batch_last_item = Some((raw_key, offset)); // Handle the current key in the next batch, exclude it from this batch. + read_planner.handle_range_end(offset); // Use the current offset to finish the read plan. + range_end_handled = true; + break; + } + read_planner.handle( + Key::from_slice(&raw_key[..KEY_SIZE]), + self.image_layer.lsn, + offset, + BlobFlag::None, + ); + cnt += 1; + } if cnt == 0 { self.is_end = true; return Ok(()); } if !range_end_handled { let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64; + self.last_batch_last_item = None; read_planner.handle_range_end(payload_end); } let plan = read_planner.finish(); @@ -1052,8 +1049,6 @@ impl<'a, 'ctx> ImageLayerIterator<'a, 'ctx> { next_batch.push((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf))); } } - let (last_key, _, _) = next_batch.last().unwrap(); - self.next_batch_start_key = last_key.next(); self.key_values_batch = next_batch; Ok(()) } @@ -1306,7 +1301,7 @@ mod test { } async fn assert_img_iter_equal( - img_iter: &mut ImageLayerIterator<'_, '_>, + img_iter: &mut ImageLayerIterator<'_>, expect: &[(Key, Bytes)], expect_lsn: Lsn, ) { @@ -1352,11 +1347,26 @@ mod test { .await .unwrap(); let img_layer = resident_layer.get_as_image(&ctx).await.unwrap(); - for batch_size in [1, 2, 4, 8, 3, 7, 13] { - println!("running with batch_size={batch_size}"); - let mut iter = img_layer.iter(&ctx); - iter.batch_size = batch_size; - assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await; + for max_read_size in [1, 1024] { + for batch_size in [1, 2, 4, 8, 3, 7, 13] { + println!("running with batch_size={batch_size} max_read_size={max_read_size}"); + // Test if the batch size is correctly determined + let mut iter = img_layer.iter(&ctx); + iter.batch_size = batch_size; + iter.max_read_size = max_read_size; + iter.next_batch().await.unwrap(); + if max_read_size == 1 { + // every key should be a batch b/c the value is larger than max_read_size + assert_eq!(iter.key_values_batch.len(), 1); + } else { + assert_eq!(iter.key_values_batch.len(), batch_size); + } + // Test if the result is correct + let mut iter = img_layer.iter(&ctx); + iter.batch_size = batch_size; + iter.max_read_size = max_read_size; + assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await; + } } } }