Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove BlockReader::read_blk in favour of BlockCursor #5015

Merged
merged 4 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pageserver/ctl/src/layer_map_analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{fs, path::Path, str};

use pageserver::page_cache::PAGE_SZ;
use pageserver::repository::{Key, KEY_SIZE};
use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
use pageserver::tenant::block_io::FileBlockReader;
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
use pageserver::tenant::storage_layer::range_overlaps;
Expand Down
4 changes: 1 addition & 3 deletions pageserver/ctl/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ pub(crate) enum LayerCmd {
}

async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
use pageserver::tenant::block_io::BlockReader;

let path = path.as_ref();
virtual_file::init(10);
page_cache::init(100);
Expand All @@ -70,7 +68,7 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
},
)
.await?;
let cursor = BlockCursor::new(&file);
let cursor = BlockCursor::new_fileblockreader_virtual(&file);
for (k, v) in all {
let value = cursor.read_blob(v.pos()).await?;
println!("key:{} value_len:{}", k, value.len());
Expand Down
7 changes: 2 additions & 5 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::{BlockCursor, BlockReader};
use crate::tenant::block_io::BlockCursor;
use std::cmp::min;
use std::io::{Error, ErrorKind};

impl<R> BlockCursor<R>
where
R: BlockReader,
{
impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
pub async fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
Expand Down
97 changes: 65 additions & 32 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
//! Low-level Block-oriented I/O functions
//!

use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::fs::File;
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;

Expand All @@ -13,32 +17,20 @@ use std::os::unix::fs::FileExt;
/// There are currently two implementations: EphemeralFile, and FileBlockReader
/// below.
pub trait BlockReader {
///
/// Read a block. Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
///
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error>;

///
/// Create a new "cursor" for reading from this reader.
///
/// A cursor caches the last accessed page, allowing for faster
/// access if the same block is accessed repeatedly.
fn block_cursor(&self) -> BlockCursor<&Self>
where
Self: Sized,
{
BlockCursor::new(self)
}
fn block_cursor(&self) -> BlockCursor<'_>;
}

impl<B> BlockReader for &B
where
B: BlockReader,
{
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
(*self).read_blk(blknum)
fn block_cursor(&self) -> BlockCursor<'_> {
(*self).block_cursor()
}
}

Expand Down Expand Up @@ -76,6 +68,30 @@ impl<'a> Deref for BlockLease<'a> {
}
}

pub(crate) enum BlockReaderRef<'a> {
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>),
FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>),
koivunej marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(test)]
TestDisk(&'a super::disk_btree::tests::TestDisk),
}

impl<'a> BlockReaderRef<'a> {
#[inline(always)]
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
use BlockReaderRef::*;
match self {
FileBlockReaderVirtual(r) => r.read_blk(blknum),
FileBlockReaderFile(r) => r.read_blk(blknum),
EphemeralFile(r) => r.read_blk(blknum),
Adapter(r) => r.read_blk(blknum),
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
}
}
}

///
/// A "cursor" for efficiently reading multiple pages from a BlockReader
///
Expand All @@ -93,21 +109,27 @@ impl<'a> Deref for BlockLease<'a> {
/// // do stuff with 'buf'
/// ```
///
pub struct BlockCursor<R>
where
R: BlockReader,
{
reader: R,
pub struct BlockCursor<'a> {
reader: BlockReaderRef<'a>,
}

impl<R> BlockCursor<R>
where
R: BlockReader,
{
pub fn new(reader: R) -> Self {
impl<'a> BlockCursor<'a> {
pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
BlockCursor { reader }
}
// Needed by cli
pub fn new_fileblockreader_virtual(reader: &'a FileBlockReader<VirtualFile>) -> Self {
BlockCursor {
reader: BlockReaderRef::FileBlockReaderVirtual(reader),
}
}

/// Read a block.
///
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum)
}
Expand Down Expand Up @@ -139,13 +161,12 @@ where
assert!(buf.len() == PAGE_SZ);
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
}
}

impl<F> BlockReader for FileBlockReader<F>
where
F: FileExt,
{
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
/// Read a block.
///
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
Expand All @@ -170,6 +191,18 @@ where
}
}

impl BlockReader for FileBlockReader<File> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
}
}

impl BlockReader for FileBlockReader<VirtualFile> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self))
}
}

///
/// Trait for block-oriented output
///
Expand Down
22 changes: 14 additions & 8 deletions pageserver/src/tenant/disk_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ where
{
let mut stack = Vec::new();
stack.push((self.root_blk, None));
let block_cursor = self.reader.block_cursor();
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = self.reader.read_blk(self.start_blk + node_blknum)?;
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum)?;

let node = OnDiskNode::deparse(node_buf.as_ref())?;
let prefix_len = node.prefix_len as usize;
Expand Down Expand Up @@ -353,8 +354,10 @@ where

stack.push((self.root_blk, String::new(), 0, 0, 0));

let block_cursor = self.reader.block_cursor();

while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
let blk = self.reader.read_blk(self.start_blk + blknum)?;
let blk = block_cursor.read_blk(self.start_blk + blknum)?;
let buf: &[u8] = blk.as_ref();
let node = OnDiskNode::<L>::deparse(buf)?;

Expand Down Expand Up @@ -683,29 +686,32 @@ impl<const L: usize> BuildNode<L> {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use crate::tenant::block_io::BlockLease;
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
use rand::Rng;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Clone, Default)]
struct TestDisk {
pub(crate) struct TestDisk {
blocks: Vec<Bytes>,
}
impl TestDisk {
fn new() -> Self {
Self::default()
}
}
impl BlockReader for TestDisk {
fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
let mut buf = [0u8; PAGE_SZ];
buf.copy_from_slice(&self.blocks[blknum as usize]);
Ok(std::rc::Rc::new(buf).into())
}
}
impl BlockReader for TestDisk {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::TestDisk(self))
}
}
impl BlockWriter for &mut TestDisk {
fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
let blknum = self.blocks.len();
Expand Down
86 changes: 45 additions & 41 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::config::PageServerConf;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockLease, BlockReader};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
use std::cmp::min;
use std::fs::OpenOptions;
Expand Down Expand Up @@ -61,6 +61,46 @@ impl EphemeralFile {
self.len
}

pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
write_guard.mark_valid();

// Swap for read lock
continue;
}
};
}
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
}
}

pub(crate) async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, io::Error> {
struct Writer<'a> {
ephemeral_file: &'a mut EphemeralFile,
Expand Down Expand Up @@ -204,51 +244,15 @@ impl Drop for EphemeralFile {
}

impl BlockReader for EphemeralFile {
fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
write_guard.mark_valid();

// Swap for read lock
continue;
}
};
}
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
}
fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::tenant::block_io::BlockCursor;
use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
use rand::{thread_rng, RngCore};
use std::fs;
use std::str::FromStr;
Expand Down Expand Up @@ -304,7 +308,7 @@ mod tests {
blobs.push((pos, data));
}

let cursor = BlockCursor::new(&file);
let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
for (pos, expected) in blobs {
let actual = cursor.read_blob(pos).await?;
assert_eq!(actual, expected);
Expand Down
Loading