From 7a63685cde1a514c75c2a3799e3f3d0e8d1e8a99 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 18 Aug 2023 19:31:03 +0200 Subject: [PATCH] simplify page-caching of EphemeralFile (#4994) (This PR is the successor of https://github.com/neondatabase/neon/pull/4984 ) ## Summary The current way in which `EphemeralFile` uses `PageCache` complicates the Pageserver code base to a degree that isn't worth it. This PR refactors how we cache `EphemeralFile` contents, by exploiting the append-only nature of `EphemeralFile`. The result is that `PageCache` only holds `ImmutableFilePage` and `MaterializedPage`. These types of pages are read-only and evictable without write-back. This allows us to remove the writeback code from `PageCache`, also eliminating an entire failure mode. Futher, many great open-source libraries exist to solve the problem of a read-only cache, much better than our `page_cache.rs` (e.g., better replacement policy, less global locking). With this PR, we can now explore using them. ## Problem & Analysis Before this PR, `PageCache` had three types of pages: * `ImmutableFilePage`: caches Delta / Image layer file contents * `MaterializedPage`: caches results of Timeline::get (page materialization) * `EphemeralPage`: caches `EphemeralFile` contents `EphemeralPage` is quite different from `ImmutableFilePage` and `MaterializedPage`: * Immutable and materialized pages are for the acceleration of (future) reads of the same data using `PAGE_CACHE_SIZE * PAGE_SIZE` bytes of DRAM. * Ephemeral pages are a write-back cache of `EphemeralFile` contents, i.e., if there is pressure in the page cache, we spill `EphemeralFile` contents to disk. `EphemeralFile` is only used by `InMemoryLayer`, for the following purposes: * **write**: when filling up the `InMemoryLayer`, via `impl BlobWriter for EphemeralFile` * **read**: when doing **page reconstruction** for a page@lsn that isn't written to disk * **read**: when writing L0 layer files, we re-read the `InMemoryLayer` and put the contents into the L0 delta writer (**`create_delta_layer`**). This happens every 10min or when InMemoryLayer reaches 256MB in size. The access patterns of the `InMemoryLayer` use case are as follows: * **write**: via `BlobWriter`, strictly append-only * **read for page reconstruction**: via `BlobReader`, random * **read for `create_delta_layer`**: via `BlobReader`, dependent on data, but generally random. Why? * in classical LSM terms, this function is what writes the memory-resident `C0` tree into the disk-resident `C1` tree * in our system, though, the values of InMemoryLayer are stored in an EphemeralFile, and hence they are not guaranteed to be memory-resident * the function reads `Value`s in `Key, LSN` order, which is `!=` insert order What do these `EphemeralFile`-level access patterns mean for the page cache? * **write**: * the common case is that `Value` is a WAL record, and if it isn't a full-page-image WAL record, then it's smaller than `PAGE_SIZE` * So, the `EphemeralPage` pages act as a buffer for these `< PAGE_CACHE` sized writes. * If there's no page cache eviction between subsequent `InMemoryLayer::put_value` calls, the `EphemeralPage` is still resident, so the page cache avoids doing a `write` system call. * In practice, a busy page server will have page cache evictions because we only configure 64MB of page cache size. * **reads for page reconstruction**: read acceleration, just as for the other page types. * **reads for `create_delta_layer`**: * The `Value` reads happen through a `BlockCursor`, which optimizes the case of repeated reads from the same page. * So, the best case is that subsequent values are located on the same page; hence `BlockCursor`s buffer is maximally effective. * The worst case is that each `Value` is on a different page; hence the `BlockCursor`'s 1-page-sized buffer is ineffective. * The best case translates into `256MB/PAGE_SIZE` page cache accesses, one per page. * the worst case translates into `#Values` page cache accesses * again, the page cache accesses must be assumed to be random because the `Value`s aren't accessed in insertion order but `Key, LSN` order. ## Summary of changes Preliminaries for this PR were: - #5003 - #5004 - #5005 - uncommitted microbenchmark in #5011 Based on the observations outlined above, this PR makes the following changes: * Rip out `EphemeralPage` from `page_cache.rs` * Move the `block_io::FileId` to `page_cache::FileId` * Add a `PAGE_SIZE`d buffer to the `EphemeralPage` struct. It's called `mutable_tail`. * Change `write_blob` to use `mutable_tail` for the write buffering instead of a page cache page. * if `mutable_tail` is full, it writes it out to disk, zeroes it out, and re-uses it. * There is explicitly no double-buffering, so that memory allocation per `EphemeralFile` instance is fixed. * Change `read_blob` to return different `BlockLease` variants depending on `blknum` * for the `blknum` that corresponds to the `mutable_tail`, return a ref to it * Rust borrowing rules prevent `write_blob` calls while refs are outstanding. * for all non-tail blocks, return a page-cached `ImmutablePage` * It is safe to page-cache these as ImmutablePage because EphemeralFile is append-only. ## Performance How doe the changes above affect performance? M claim is: not significantly. * **write path**: * before this PR, the `EphemeralFile::write_blob` didn't issue its own `write` system calls. * If there were enough free pages, it didn't issue *any* `write` system calls. * If it had to evict other `EphemeralPage`s to get pages a page for its writes (`get_buf_for_write`), the page cache code would implicitly issue the writeback of victim pages as needed. * With this PR, `EphemeralFile::write_blob` *always* issues *all* of its *own* `write` system calls. * Also, the writes are explicit instead of implicit through page cache write back, which will help #4743 * The perf impact of always doing the writes is the CPU overhead and syscall latency. * Before this PR, we might have never issued them if there were enough free pages. * We don't issue `fsync` and can expect the writes to only hit the kernel page cache. * There is also an advantage in issuing the writes directly: the perf impact is paid by the tenant that caused the writes, instead of whatever tenant evicts the `EphemeralPage`. * **reads for page reconstruction**: no impact. * The `write_blob` function pre-warms the page cache when it writes the `mutable_tail` to disk. * So, the behavior is the same as with the EphemeralPages before this PR. * **reads for `create_delta_layer`**: no impact. * Same argument as for page reconstruction. * Note for the future: * going through the page cache likely causes read amplification here. Why? * Due to the `Key,Lsn`-ordered access pattern, we don't read all the values in the page before moving to the next page. In the worst case, we might read the same page multiple times to read different `Values` from it. * So, it might be better to bypass the page cache here. * Idea drafts: * bypass PS page cache + prefetch pipeline + iovec-based IO * bypass PS page cache + use `copy_file_range` to copy from ephemeral file into the L0 delta file, without going through user space --- pageserver/src/page_cache.rs | 205 ++++---------- pageserver/src/tenant.rs | 3 - pageserver/src/tenant/block_io.rs | 32 +-- pageserver/src/tenant/ephemeral_file.rs | 264 +++++++----------- .../tenant/storage_layer/inmemory_layer.rs | 4 +- 5 files changed, 175 insertions(+), 333 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 8306ce46360f..e1e696ddad54 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -10,6 +10,42 @@ //! PostgreSQL buffer size, and a Slot struct for each buffer to contain //! information about what's stored in the buffer. //! +//! # Types Of Pages +//! +//! [`PageCache`] only supports immutable pages. +//! Hence there is no need to worry about coherency. +//! +//! Two types of pages are supported: +//! +//! * **Materialized pages**, filled & used by page reconstruction +//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`]. +//! +//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only. +//! It uses the page cache only for the blocks that are already fully written and immutable. +//! +//! # Filling The Page Cache +//! +//! Page cache maps from a cache key to a buffer slot. +//! The cache key uniquely identifies the piece of data that is being cached. +//! +//! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`]. +//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access. +//! +//! The cache key for **immutable file** pages is [`FileId`] and a block number. +//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following: +//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`]. +//! * Get a [`FileId`] using [`next_file_id`]. +//! * Use the mechanism to associate the on-disk file with the returned [`FileId`]. +//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`]. +//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains +//! a read guard for the page. Just use it. +//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains +//! a write guard for the page. Fill the page with the contents of the on-disk file. +//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid. +//! Then try again to [`PageCache::read_immutable_buf`]. +//! Unless there's high cache pressure, the page should now be cached. +//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.) +//! //! # Locking //! //! There are two levels of locking involved: There's one lock for the "mapping" @@ -40,20 +76,18 @@ use std::{ collections::{hash_map::Entry, HashMap}, convert::TryInto, sync::{ - atomic::{AtomicU8, AtomicUsize, Ordering}, + atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError, }, }; use anyhow::Context; use once_cell::sync::OnceCell; -use tracing::error; use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, }; -use crate::tenant::{block_io, ephemeral_file, writeback_ephemeral_file}; use crate::{metrics::PageCacheSizeMetrics, repository::Key}; static PAGE_CACHE: OnceCell = OnceCell::new(); @@ -87,6 +121,17 @@ pub fn get() -> &'static PageCache { pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize; const MAX_USAGE_COUNT: u8 = 5; +/// See module-level comment. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct FileId(u64); + +static NEXT_ID: AtomicU64 = AtomicU64::new(1); + +/// See module-level comment. +pub fn next_file_id() -> FileId { + FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed)) +} + /// /// CacheKey uniquely identifies a "thing" to cache in the page cache. /// @@ -97,12 +142,8 @@ enum CacheKey { hash_key: MaterializedPageHashKey, lsn: Lsn, }, - EphemeralPage { - file_id: ephemeral_file::FileId, - blkno: u32, - }, ImmutableFilePage { - file_id: block_io::FileId, + file_id: FileId, blkno: u32, }, } @@ -128,7 +169,6 @@ struct Slot { struct SlotInner { key: Option, buf: &'static mut [u8; PAGE_SZ], - dirty: bool, } impl Slot { @@ -177,9 +217,7 @@ pub struct PageCache { /// can have a separate mapping map, next to this field. materialized_page_map: RwLock>>, - ephemeral_page_map: RwLock>, - - immutable_page_map: RwLock>, + immutable_page_map: RwLock>, /// The actual buffers with their metadata. slots: Box<[Slot]>, @@ -258,14 +296,6 @@ impl PageWriteGuard<'_> { ); self.valid = true; } - pub fn mark_dirty(&mut self) { - // only ephemeral pages can be dirty ATM. - assert!(matches!( - self.inner.key, - Some(CacheKey::EphemeralPage { .. }) - )); - self.inner.dirty = true; - } } impl Drop for PageWriteGuard<'_> { @@ -280,7 +310,6 @@ impl Drop for PageWriteGuard<'_> { let self_key = self.inner.key.as_ref().unwrap(); PAGE_CACHE.get().unwrap().remove_mapping(self_key); self.inner.key = None; - self.inner.dirty = false; } } } @@ -388,62 +417,16 @@ impl PageCache { Ok(()) } - // Section 1.2: Public interface functions for working with Ephemeral pages. - - pub fn read_ephemeral_buf( - &self, - file_id: ephemeral_file::FileId, - blkno: u32, - ) -> anyhow::Result { - let mut cache_key = CacheKey::EphemeralPage { file_id, blkno }; - - self.lock_for_read(&mut cache_key) - } - - pub fn write_ephemeral_buf( - &self, - file_id: ephemeral_file::FileId, - blkno: u32, - ) -> anyhow::Result { - let cache_key = CacheKey::EphemeralPage { file_id, blkno }; - - self.lock_for_write(&cache_key) - } - - /// Immediately drop all buffers belonging to given file, without writeback - pub fn drop_buffers_for_ephemeral(&self, drop_file_id: ephemeral_file::FileId) { - for slot_idx in 0..self.slots.len() { - let slot = &self.slots[slot_idx]; - - let mut inner = slot.inner.write().unwrap(); - if let Some(key) = &inner.key { - match key { - CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => { - // remove mapping for old buffer - self.remove_mapping(key); - inner.key = None; - inner.dirty = false; - } - _ => {} - } - } - } - } - - // Section 1.3: Public interface functions for working with immutable file pages. + // Section 1.2: Public interface functions for working with immutable file pages. - pub fn read_immutable_buf( - &self, - file_id: block_io::FileId, - blkno: u32, - ) -> anyhow::Result { + pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result { let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno }; self.lock_for_read(&mut cache_key) } - /// Immediately drop all buffers belonging to given file, without writeback - pub fn drop_buffers_for_immutable(&self, drop_file_id: block_io::FileId) { + /// Immediately drop all buffers belonging to given file + pub fn drop_buffers_for_immutable(&self, drop_file_id: FileId) { for slot_idx in 0..self.slots.len() { let slot = &self.slots[slot_idx]; @@ -456,7 +439,6 @@ impl PageCache { // remove mapping for old buffer self.remove_mapping(key); inner.key = None; - inner.dirty = false; } _ => {} } @@ -534,10 +516,6 @@ impl PageCache { CacheKey::MaterializedPage { .. } => { unreachable!("Materialized pages use lookup_materialized_page") } - CacheKey::EphemeralPage { .. } => ( - &crate::metrics::PAGE_CACHE.read_accesses_ephemeral, - &crate::metrics::PAGE_CACHE.read_hits_ephemeral, - ), CacheKey::ImmutableFilePage { .. } => ( &crate::metrics::PAGE_CACHE.read_accesses_immutable, &crate::metrics::PAGE_CACHE.read_hits_immutable, @@ -578,7 +556,6 @@ impl PageCache { // Make the slot ready let slot = &self.slots[slot_idx]; inner.key = Some(cache_key.clone()); - inner.dirty = false; slot.usage_count.store(1, Ordering::Relaxed); return Ok(ReadBufResult::NotFound(PageWriteGuard { @@ -640,7 +617,6 @@ impl PageCache { // Make the slot ready let slot = &self.slots[slot_idx]; inner.key = Some(cache_key.clone()); - inner.dirty = false; slot.usage_count.store(1, Ordering::Relaxed); return Ok(WriteBufResult::NotFound(PageWriteGuard { @@ -679,10 +655,6 @@ impl PageCache { *lsn = version.lsn; Some(version.slot_idx) } - CacheKey::EphemeralPage { file_id, blkno } => { - let map = self.ephemeral_page_map.read().unwrap(); - Some(*map.get(&(*file_id, *blkno))?) - } CacheKey::ImmutableFilePage { file_id, blkno } => { let map = self.immutable_page_map.read().unwrap(); Some(*map.get(&(*file_id, *blkno))?) @@ -706,10 +678,6 @@ impl PageCache { None } } - CacheKey::EphemeralPage { file_id, blkno } => { - let map = self.ephemeral_page_map.read().unwrap(); - Some(*map.get(&(*file_id, *blkno))?) - } CacheKey::ImmutableFilePage { file_id, blkno } => { let map = self.immutable_page_map.read().unwrap(); Some(*map.get(&(*file_id, *blkno))?) @@ -743,12 +711,6 @@ impl PageCache { panic!("could not find old key in mapping") } } - CacheKey::EphemeralPage { file_id, blkno } => { - let mut map = self.ephemeral_page_map.write().unwrap(); - map.remove(&(*file_id, *blkno)) - .expect("could not find old key in mapping"); - self.size_metrics.current_bytes_ephemeral.sub_page_sz(1); - } CacheKey::ImmutableFilePage { file_id, blkno } => { let mut map = self.immutable_page_map.write().unwrap(); map.remove(&(*file_id, *blkno)) @@ -788,17 +750,7 @@ impl PageCache { } } } - CacheKey::EphemeralPage { file_id, blkno } => { - let mut map = self.ephemeral_page_map.write().unwrap(); - match map.entry((*file_id, *blkno)) { - Entry::Occupied(entry) => Some(*entry.get()), - Entry::Vacant(entry) => { - entry.insert(slot_idx); - self.size_metrics.current_bytes_ephemeral.add_page_sz(1); - None - } - } - } + CacheKey::ImmutableFilePage { file_id, blkno } => { let mut map = self.immutable_page_map.write().unwrap(); match map.entry((*file_id, *blkno)) { @@ -849,25 +801,8 @@ impl PageCache { } }; if let Some(old_key) = &inner.key { - if inner.dirty { - if let Err(err) = Self::writeback(old_key, inner.buf) { - // Writing the page to disk failed. - // - // FIXME: What to do here, when? We could propagate the error to the - // caller, but victim buffer is generally unrelated to the original - // call. It can even belong to a different tenant. Currently, we - // report the error to the log and continue the clock sweep to find - // a different victim. But if the problem persists, the page cache - // could fill up with dirty pages that we cannot evict, and we will - // loop retrying the writebacks indefinitely. - error!("writeback of buffer {:?} failed: {}", old_key, err); - continue; - } - } - // remove mapping for old buffer self.remove_mapping(old_key); - inner.dirty = false; inner.key = None; } return Ok((slot_idx, inner)); @@ -875,28 +810,6 @@ impl PageCache { } } - fn writeback(cache_key: &CacheKey, buf: &[u8]) -> Result<(), std::io::Error> { - match cache_key { - CacheKey::MaterializedPage { - hash_key: _, - lsn: _, - } => Err(std::io::Error::new( - std::io::ErrorKind::Other, - "unexpected dirty materialized page", - )), - CacheKey::EphemeralPage { file_id, blkno } => { - writeback_ephemeral_file(*file_id, *blkno, buf) - } - CacheKey::ImmutableFilePage { - file_id: _, - blkno: _, - } => Err(std::io::Error::new( - std::io::ErrorKind::Other, - "unexpected dirty immutable page", - )), - } - } - /// Initialize a new page cache /// /// This should be called only once at page server startup. @@ -907,7 +820,6 @@ impl PageCache { let size_metrics = &crate::metrics::PAGE_CACHE_SIZE; size_metrics.max_bytes.set_page_sz(num_pages); - size_metrics.current_bytes_ephemeral.set_page_sz(0); size_metrics.current_bytes_immutable.set_page_sz(0); size_metrics.current_bytes_materialized_page.set_page_sz(0); @@ -917,11 +829,7 @@ impl PageCache { let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); Slot { - inner: RwLock::new(SlotInner { - key: None, - buf, - dirty: false, - }), + inner: RwLock::new(SlotInner { key: None, buf }), usage_count: AtomicU8::new(0), } }) @@ -929,7 +837,6 @@ impl PageCache { Self { materialized_page_map: Default::default(), - ephemeral_page_map: Default::default(), immutable_page_map: Default::default(), slots, next_evict_slot: AtomicUsize::new(0), diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index cedb381ccc44..309020391f8e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -136,9 +136,6 @@ pub use timeline::{ LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline, }; -// re-export this function so that page_cache.rs can use it. -pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file; - // re-export for use in remote_timeline_client.rs pub use crate::tenant::metadata::save_metadata; diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 3cc4e61a950b..503e5bd4e670 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -6,7 +6,6 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use bytes::Bytes; use std::ops::{Deref, DerefMut}; use std::os::unix::fs::FileExt; -use std::sync::atomic::AtomicU64; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -43,37 +42,34 @@ where } } -/// A block accessible for reading -/// -/// During builds with `#[cfg(test)]`, this is a proper enum -/// with two variants to support testing code. During normal -/// builds, it just has one variant and is thus a cheap newtype -/// wrapper of [`PageReadGuard`] -pub enum BlockLease { +/// Reference to an in-memory copy of an immutable on-disk block. +pub enum BlockLease<'a> { PageReadGuard(PageReadGuard<'static>), + EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), #[cfg(test)] Rc(std::rc::Rc<[u8; PAGE_SZ]>), } -impl From> for BlockLease { - fn from(value: PageReadGuard<'static>) -> Self { +impl From> for BlockLease<'static> { + fn from(value: PageReadGuard<'static>) -> BlockLease<'static> { BlockLease::PageReadGuard(value) } } #[cfg(test)] -impl From> for BlockLease { +impl<'a> From> for BlockLease<'a> { fn from(value: std::rc::Rc<[u8; PAGE_SZ]>) -> Self { BlockLease::Rc(value) } } -impl Deref for BlockLease { +impl<'a> Deref for BlockLease<'a> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { match self { BlockLease::PageReadGuard(v) => v.deref(), + BlockLease::EphemeralFileMutableTail(v) => v, #[cfg(test)] BlockLease::Rc(v) => v.deref(), } @@ -116,13 +112,6 @@ where self.reader.read_blk(blknum) } } -static NEXT_ID: AtomicU64 = AtomicU64::new(1); -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct FileId(u64); - -fn next_file_id() -> FileId { - FileId(NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)) -} /// An adapter for reading a (virtual) file using the page cache. /// @@ -132,7 +121,7 @@ pub struct FileBlockReader { pub file: F, /// Unique ID of this file, used as key in the page cache. - file_id: FileId, + file_id: page_cache::FileId, } impl FileBlockReader @@ -140,7 +129,7 @@ where F: FileExt, { pub fn new(file: F) -> Self { - let file_id = next_file_id(); + let file_id = page_cache::next_file_id(); FileBlockReader { file_id, file } } @@ -157,7 +146,6 @@ where F: FileExt, { fn read_blk(&self, blknum: u32) -> Result { - // Look up the right page let cache = page_cache::get(); loop { match cache diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 237c17d8528d..5de9c24d907b 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -2,54 +2,31 @@ //! used to keep in-memory layers spilled on disk. use crate::config::PageServerConf; -use crate::page_cache::{self, ReadBufResult, WriteBufResult, PAGE_SZ}; +use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockLease, BlockReader}; use crate::virtual_file::VirtualFile; -use once_cell::sync::Lazy; use std::cmp::min; -use std::collections::HashMap; use std::fs::OpenOptions; use std::io::{self, ErrorKind}; use std::ops::DerefMut; use std::os::unix::prelude::FileExt; use std::path::PathBuf; -use std::sync::{Arc, RwLock}; +use std::sync::atomic::AtomicU64; use tracing::*; use utils::id::{TenantId, TimelineId}; -/// -/// This is the global cache of file descriptors (File objects). -/// -static EPHEMERAL_FILES: Lazy> = Lazy::new(|| { - RwLock::new(EphemeralFiles { - next_file_id: FileId(1), - files: HashMap::new(), - }) -}); - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct FileId(u64); - -impl std::fmt::Display for FileId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -pub struct EphemeralFiles { - next_file_id: FileId, - - files: HashMap>, -} - pub struct EphemeralFile { - file_id: FileId, + page_cache_file_id: page_cache::FileId, + _tenant_id: TenantId, _timeline_id: TimelineId, - file: Arc, - - pub size: u64, + file: VirtualFile, + size: u64, + /// An ephemeral file is append-only. + /// We keep the last page, which can still be modified, in [`Self::mutable_tail`]. + /// The other pages, which can no longer be modified, are accessed through the page cache. + mutable_tail: [u8; PAGE_SZ], } impl EphemeralFile { @@ -58,74 +35,31 @@ impl EphemeralFile { tenant_id: TenantId, timeline_id: TimelineId, ) -> Result { - let mut l = EPHEMERAL_FILES.write().unwrap(); - let file_id = l.next_file_id; - l.next_file_id = FileId(l.next_file_id.0 + 1); + static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1); + let filename_disambiguator = + NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let filename = conf .timeline_path(&tenant_id, &timeline_id) - .join(PathBuf::from(format!("ephemeral-{}", file_id))); + .join(PathBuf::from(format!("ephemeral-{filename_disambiguator}"))); let file = VirtualFile::open_with_options( &filename, OpenOptions::new().read(true).write(true).create(true), )?; - let file_rc = Arc::new(file); - l.files.insert(file_id, file_rc.clone()); Ok(EphemeralFile { - file_id, + page_cache_file_id: page_cache::next_file_id(), _tenant_id: tenant_id, _timeline_id: timeline_id, - file: file_rc, + file, size: 0, + mutable_tail: [0u8; PAGE_SZ], }) } - fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), io::Error> { - let mut off = 0; - while off < PAGE_SZ { - let n = self - .file - .read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?; - - if n == 0 { - // Reached EOF. Fill the rest of the buffer with zeros. - const ZERO_BUF: [u8; PAGE_SZ] = [0u8; PAGE_SZ]; - - buf[off..].copy_from_slice(&ZERO_BUF[off..]); - break; - } - - off += n; - } - Ok(()) - } - - fn get_buf_for_write( - &self, - blkno: u32, - ) -> Result, io::Error> { - // Look up the right page - let cache = page_cache::get(); - let mut write_guard = match cache - .write_ephemeral_buf(self.file_id, blkno) - .map_err(|e| to_io_error(e, "Failed to write ephemeral buf"))? - { - WriteBufResult::Found(guard) => guard, - WriteBufResult::NotFound(mut guard) => { - // Read the page from disk into the buffer - // TODO: if we're overwriting the whole page, no need to read it in first - self.fill_buffer(guard.deref_mut(), blkno)?; - guard.mark_valid(); - - // And then fall through to modify it. - guard - } - }; - write_guard.mark_dirty(); - - Ok(write_guard) + pub(crate) fn size(&self) -> u64 { + self.size } } @@ -146,49 +80,74 @@ impl BlobWriter for EphemeralFile { blknum: u32, /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write. off: usize, - /// Used by [`push_bytes`] to memoize the page cache write guard across calls to it. - memo_page_guard: MemoizedPageWriteGuard, - } - struct MemoizedPageWriteGuard { - guard: page_cache::PageWriteGuard<'static>, - /// The block number of the page in `guard`. - blknum: u32, } impl<'a> Writer<'a> { fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result> { - let blknum = (ephemeral_file.size / PAGE_SZ as u64) as u32; Ok(Writer { - blknum, + blknum: (ephemeral_file.size / PAGE_SZ as u64) as u32, off: (ephemeral_file.size % PAGE_SZ as u64) as usize, - memo_page_guard: MemoizedPageWriteGuard { - guard: ephemeral_file.get_buf_for_write(blknum)?, - blknum, - }, ephemeral_file, }) } #[inline(always)] fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> { - // `src_remaining` is the remaining bytes to be written let mut src_remaining = src; while !src_remaining.is_empty() { - let page = if self.memo_page_guard.blknum == self.blknum { - &mut self.memo_page_guard.guard - } else { - self.memo_page_guard.guard = - self.ephemeral_file.get_buf_for_write(self.blknum)?; - self.memo_page_guard.blknum = self.blknum; - &mut self.memo_page_guard.guard - }; - let dst_remaining = &mut page[self.off..]; + let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..]; let n = min(dst_remaining.len(), src_remaining.len()); dst_remaining[..n].copy_from_slice(&src_remaining[..n]); self.off += n; src_remaining = &src_remaining[n..]; if self.off == PAGE_SZ { - // This block is done, move to next one. - self.blknum += 1; - self.off = 0; + match self.ephemeral_file.file.write_all_at( + &self.ephemeral_file.mutable_tail, + self.blknum as u64 * PAGE_SZ as u64, + ) { + Ok(_) => { + // Pre-warm the page cache with what we just wrote. + // This isn't necessary for coherency/correctness, but it's how we've always done it. + let cache = page_cache::get(); + match cache.read_immutable_buf( + self.ephemeral_file.page_cache_file_id, + self.blknum, + ) { + Ok(page_cache::ReadBufResult::Found(_guard)) => { + // This function takes &mut self, so, it shouldn't be possible to reach this point. + unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum); + } + Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => { + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + buf.copy_from_slice(&self.ephemeral_file.mutable_tail); + write_guard.mark_valid(); + // pre-warm successful + } + Err(e) => { + error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}"); + // fail gracefully, it's not the end of the world if we can't pre-warm the cache here + } + } + // Zero the buffer for re-use. + // Zeroing is critical for correcntess because the write_blob code below + // and similarly read_blk expect zeroed pages. + self.ephemeral_file.mutable_tail.fill(0); + // This block is done, move to next one. + self.blknum += 1; + self.off = 0; + } + Err(e) => { + return Err(std::io::Error::new( + ErrorKind::Other, + // order error before path because path is long and error is short + format!( + "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}", + self.blknum, + e, + self.ephemeral_file.file.path.display(), + ), + )); + } + } } } Ok(()) @@ -227,10 +186,7 @@ impl Drop for EphemeralFile { fn drop(&mut self) { // drop all pages from page cache let cache = page_cache::get(); - cache.drop_buffers_for_ephemeral(self.file_id); - - // remove entry from the hash map - EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id); + cache.drop_buffers_for_immutable(self.page_cache_file_id); // unlink the file let res = std::fs::remove_file(&self.file.path); @@ -250,54 +206,48 @@ impl Drop for EphemeralFile { } } -pub fn writeback(file_id: FileId, blkno: u32, buf: &[u8]) -> Result<(), io::Error> { - if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) { - match file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64) { - Ok(_) => Ok(()), - Err(e) => Err(io::Error::new( - ErrorKind::Other, - format!( - "failed to write back to ephemeral file at {} error: {}", - file.path.display(), - e - ), - )), - } - } else { - Err(io::Error::new( - ErrorKind::Other, - "could not write back page, not found in ephemeral files hash", - )) - } -} - impl BlockReader for EphemeralFile { fn read_blk(&self, blknum: u32) -> Result { - // Look up the right page - let cache = page_cache::get(); - loop { - match cache - .read_ephemeral_buf(self.file_id, blknum) - .map_err(|e| to_io_error(e, "Failed to read ephemeral buf"))? - { - ReadBufResult::Found(guard) => return Ok(guard.into()), - ReadBufResult::NotFound(mut write_guard) => { - // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum)?; - write_guard.mark_valid(); - - // Swap for read lock - continue; - } - }; + let flushed_blknums = 0..self.size / PAGE_SZ as u64; + if flushed_blknums.contains(&(blknum as u64)) { + let cache = page_cache::get(); + loop { + match cache + .read_immutable_buf(self.page_cache_file_id, blknum) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + // order path before error because error is anyhow::Error => might have many contexts + format!( + "ephemeral file: read immutable page #{}: {}: {:#}", + blknum, + self.file.path.display(), + e, + ), + ) + })? { + page_cache::ReadBufResult::Found(guard) => { + return Ok(BlockLease::PageReadGuard(guard)) + } + page_cache::ReadBufResult::NotFound(mut write_guard) => { + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + self.file + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?; + write_guard.mark_valid(); + + // Swap for read lock + continue; + } + }; + } + } else { + debug_assert_eq!(blknum as u64, self.size / PAGE_SZ as u64); + Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail)) } } } -fn to_io_error(e: anyhow::Error, context: &str) -> io::Error { - io::Error::new(ErrorKind::Other, format!("{context}: {e:#}")) -} - #[cfg(test)] mod tests { use super::*; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index aa9d0884e000..d3ec78887d5f 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -230,11 +230,11 @@ impl std::fmt::Display for InMemoryLayer { impl InMemoryLayer { /// - /// Get layer size on the disk + /// Get layer size. /// pub async fn size(&self) -> Result { let inner = self.inner.read().await; - Ok(inner.file.size) + Ok(inner.file.size()) } ///