From 869c349c8da902d4a6cd112f2a44a8371c463aae Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 12 Feb 2024 15:21:56 +0000 Subject: [PATCH 1/5] refactor(crashsafe + VirtualFile): sync impl of crashsafe_overwrite --- libs/utils/src/crashsafe.rs | 44 +++++++++++++++++++++++++++++++++- pageserver/src/virtual_file.rs | 7 +----- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/libs/utils/src/crashsafe.rs b/libs/utils/src/crashsafe.rs index 1c72e9cae9cd..021f050110e5 100644 --- a/libs/utils/src/crashsafe.rs +++ b/libs/utils/src/crashsafe.rs @@ -1,7 +1,7 @@ use std::{ borrow::Cow, fs::{self, File}, - io, + io::{self, Write}, }; use camino::{Utf8Path, Utf8PathBuf}; @@ -161,6 +161,48 @@ pub async fn durable_rename( Ok(()) } +/// Writes a file to the specified `final_path` in a crash safe fasion +/// +/// The file is first written to the specified tmp_path, and in a second +/// step, the tmp path is renamed to the final path. As renames are +/// atomic, a crash during the write operation will never leave behind a +/// partially written file. +/// +/// NB: an async variant of this code exists in Pageserver's VirtualFile. +pub fn overwrite( + final_path: &Utf8Path, + tmp_path: &Utf8Path, + content: &[u8], +) -> std::io::Result<()> { + let Some(final_path_parent) = final_path.parent() else { + return Err(std::io::Error::from_raw_os_error( + nix::errno::Errno::EINVAL as i32, + )); + }; + std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?; + let mut file = std::fs::OpenOptions::new() + .write(true) + // Use `create_new` so that, if we race with ourselves or something else, + // we bail out instead of causing damage. + .create_new(true) + .open(tmp_path)?; + file.write_all(content)?; + file.sync_all()?; + drop(file); // before the rename, that's important! + // renames are atomic + std::fs::rename(tmp_path, final_path)?; + // Only open final path parent dirfd now, so that this operation only + // ever holds one VirtualFile fd at a time. That's important because + // the current `find_victim_slot` impl might pick the same slot for both + // VirtualFile., and it eventually does a blocking write lock instead of + // try_lock. + let final_parent_dirfd = std::fs::OpenOptions::new() + .read(true) + .open(final_path_parent)?; + final_parent_dirfd.sync_all()?; + Ok(()) +} + #[cfg(test)] mod tests { diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 059a6596d313..5c51b80f1fef 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -404,12 +404,7 @@ impl VirtualFile { Ok(vfile) } - /// Writes a file to the specified `final_path` in a crash safe fasion - /// - /// The file is first written to the specified tmp_path, and in a second - /// step, the tmp path is renamed to the final path. As renames are - /// atomic, a crash during the write operation will never leave behind a - /// partially written file. + /// Async & [`VirtualFile`]-enabled version of [`::utils::crashsafe::overwrite`]. pub async fn crashsafe_overwrite( final_path: &Utf8Path, tmp_path: &Utf8Path, From afd0d18a0c763179f821bef0929f2005ab669348 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 12 Feb 2024 15:51:31 +0000 Subject: [PATCH 2/5] convert callers --- pageserver/src/deletion_queue.rs | 7 +- pageserver/src/tenant.rs | 33 +++------ pageserver/src/tenant/metadata.rs | 3 +- pageserver/src/tenant/secondary/downloader.rs | 11 +-- pageserver/src/virtual_file.rs | 68 ++++++++----------- 5 files changed, 48 insertions(+), 74 deletions(-) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index da1da9331a2b..00c67e6b4ed3 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -234,7 +234,8 @@ impl DeletionHeader { let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?; let header_path = conf.deletion_header_path(); let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX); - VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes) + // REVIEW: this now uses spawn_blocking instead of blocking the executor thread + VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes) .await .maybe_fatal_err("save deletion header")?; @@ -325,7 +326,9 @@ impl DeletionList { let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX); let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); - VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes) + + // REVIEW: this will now use spawn_blocking instead of blocking the executor thread + VirtualFile::crashsafe_overwrite(path, temp_path, bytes) .await .maybe_fatal_err("save deletion list") .map_err(Into::into) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d946c571188c..1f3bc1347292 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -28,7 +28,6 @@ use remote_storage::GenericRemoteStorage; use std::fmt; use storage_broker::BrokerClientChannel; use tokio::io::BufReader; -use tokio::runtime::Handle; use tokio::sync::watch; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -2878,17 +2877,10 @@ impl Tenant { let tenant_shard_id = *tenant_shard_id; let config_path = config_path.to_owned(); - tokio::task::spawn_blocking(move || { - Handle::current().block_on(async move { - let conf_content = conf_content.as_bytes(); - VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content) - .await - .with_context(|| { - format!("write tenant {tenant_shard_id} config to {config_path}") - }) - }) - }) - .await??; + let conf_content = conf_content.into_bytes(); + VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content) + .await + .with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?; Ok(()) } @@ -2915,17 +2907,12 @@ impl Tenant { let tenant_shard_id = *tenant_shard_id; let target_config_path = target_config_path.to_owned(); - tokio::task::spawn_blocking(move || { - Handle::current().block_on(async move { - let conf_content = conf_content.as_bytes(); - VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content) - .await - .with_context(|| { - format!("write tenant {tenant_shard_id} config to {target_config_path}") - }) - }) - }) - .await??; + let conf_content = conf_content.into_bytes(); + VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content) + .await + .with_context(|| { + format!("write tenant {tenant_shard_id} config to {target_config_path}") + })?; Ok(()) } diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 6fb86c65e27f..a9d4a28aa26e 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -279,7 +279,8 @@ pub async fn save_metadata( let path = conf.metadata_path(tenant_shard_id, timeline_id); let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX); let metadata_bytes = data.to_bytes().context("serialize metadata")?; - VirtualFile::crashsafe_overwrite(&path, &temp_path, &metadata_bytes) + // REVIEW: this will now use spawn_blocking instead of blocking the executor thread + VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes) .await .context("write metadata")?; Ok(()) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 0666e104f8f2..c8288acc20eb 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -484,14 +484,9 @@ impl<'a> TenantDownloader<'a> { let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX); let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}"); let heatmap_path_bg = heatmap_path.clone(); - tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current().block_on(async move { - VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await - }) - }) - .await - .expect("Blocking task is never aborted") - .maybe_fatal_err(&context_msg)?; + VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes) + .await + .maybe_fatal_err(&context_msg)?; tracing::debug!("Wrote local heatmap to {}", heatmap_path); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 5c51b80f1fef..14bfef45eb06 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -19,14 +19,13 @@ use once_cell::sync::OnceCell; use pageserver_api::shard::TenantShardId; use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; -use tokio_epoll_uring::IoBufMut; +use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; -use utils::fs_ext; pub use pageserver_api::models::virtual_file as api; pub(crate) mod io_engine; @@ -404,41 +403,31 @@ impl VirtualFile { Ok(vfile) } - /// Async & [`VirtualFile`]-enabled version of [`::utils::crashsafe::overwrite`]. - pub async fn crashsafe_overwrite( - final_path: &Utf8Path, - tmp_path: &Utf8Path, - content: &[u8], + /// Async version of [`::utils::crashsafe::overwrite`]. + /// + /// # NB: + /// + /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but, + /// it did at an earlier time. + /// And it will use this module's [`io_engine`] in the near future, so, leaving it here. + pub async fn crashsafe_overwrite + Send, Buf: IoBuf + Send>( + final_path: Utf8PathBuf, + tmp_path: Utf8PathBuf, + content: B, ) -> std::io::Result<()> { - let Some(final_path_parent) = final_path.parent() else { - return Err(std::io::Error::from_raw_os_error( - nix::errno::Errno::EINVAL as i32, - )); - }; - std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?; - let mut file = Self::open_with_options( - tmp_path, - OpenOptions::new() - .write(true) - // Use `create_new` so that, if we race with ourselves or something else, - // we bail out instead of causing damage. - .create_new(true), - ) - .await?; - file.write_all(content).await?; - file.sync_all().await?; - drop(file); // before the rename, that's important! - // renames are atomic - std::fs::rename(tmp_path, final_path)?; - // Only open final path parent dirfd now, so that this operation only - // ever holds one VirtualFile fd at a time. That's important because - // the current `find_victim_slot` impl might pick the same slot for both - // VirtualFile., and it eventually does a blocking write lock instead of - // try_lock. - let final_parent_dirfd = - Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?; - final_parent_dirfd.sync_all().await?; - Ok(()) + tokio::task::spawn_blocking(move || { + let slice_storage; + let content_len = content.bytes_init(); + let content = if content.bytes_init() > 0 { + slice_storage = Some(content.slice(0..content_len)); + slice_storage.as_deref().expect("just set it to Some()") + } else { + &[] + }; + utils::crashsafe::overwrite(&final_path, &tmp_path, content) + }) + .await + .expect("blocking task is never aborted") } /// Call File::sync_all() on the underlying File. @@ -671,7 +660,6 @@ where F: FnMut(tokio_epoll_uring::Slice, u64) -> Fut, Fut: std::future::Future, std::io::Result)>, { - use tokio_epoll_uring::BoundedBuf; let mut buf: tokio_epoll_uring::Slice = buf.slice_full(); // includes all the uninitialized memory while buf.bytes_total() != 0 { let res; @@ -1288,7 +1276,7 @@ mod tests { let path = testdir.join("myfile"); let tmp_path = testdir.join("myfile.tmp"); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo") + VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec()) .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); @@ -1297,7 +1285,7 @@ mod tests { assert!(!tmp_path.exists()); drop(file); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar") + VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec()) .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); @@ -1319,7 +1307,7 @@ mod tests { std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap(); assert!(tmp_path.exists()); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo") + VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec()) .await .unwrap(); From f2d173595116b7a0d093967da47612d0500e06d5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 12 Feb 2024 15:58:54 +0000 Subject: [PATCH 3/5] add TODO --- pageserver/src/virtual_file.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 14bfef45eb06..437d690c3297 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -415,6 +415,9 @@ impl VirtualFile { tmp_path: Utf8PathBuf, content: B, ) -> std::io::Result<()> { + // TODO: use tokio_epoll_uring if configured as `io_engine`. + // See https://github.com/neondatabase/neon/issues/6663 + tokio::task::spawn_blocking(move || { let slice_storage; let content_len = content.bytes_init(); From a4056a07cc49e4db4adeb9a50105820dcbc74495 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Feb 2024 10:22:31 +0000 Subject: [PATCH 4/5] improve commentary, remove VirtualFile mentions; https://github.com/neondatabase/neon/pull/6731#discussion_r1489019058 --- libs/utils/src/crashsafe.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/libs/utils/src/crashsafe.rs b/libs/utils/src/crashsafe.rs index 021f050110e5..756b19138cc9 100644 --- a/libs/utils/src/crashsafe.rs +++ b/libs/utils/src/crashsafe.rs @@ -161,14 +161,17 @@ pub async fn durable_rename( Ok(()) } -/// Writes a file to the specified `final_path` in a crash safe fasion +/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`]. /// -/// The file is first written to the specified tmp_path, and in a second -/// step, the tmp path is renamed to the final path. As renames are -/// atomic, a crash during the write operation will never leave behind a -/// partially written file. +/// The file is first written to the specified `tmp_path`, and in a second +/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync +/// and atomic rename guarantee that, if we crash at any point, there will never +/// be a partially written file at `final_path` (but maybe at `tmp_path`). /// -/// NB: an async variant of this code exists in Pageserver's VirtualFile. +/// Callers are responsible for serializing calls of this function for a given `final_path`. +/// If they don't, there may be an error due to conflicting `tmp_path`, or there will +/// be no error and the content of `final_path` will be the "winner" caller's `content`. +/// I.e., the atomticity guarantees still hold. pub fn overwrite( final_path: &Utf8Path, tmp_path: &Utf8Path, @@ -188,17 +191,14 @@ pub fn overwrite( .open(tmp_path)?; file.write_all(content)?; file.sync_all()?; - drop(file); // before the rename, that's important! - // renames are atomic + drop(file); // don't keep the fd open for longer than we have to + std::fs::rename(tmp_path, final_path)?; - // Only open final path parent dirfd now, so that this operation only - // ever holds one VirtualFile fd at a time. That's important because - // the current `find_victim_slot` impl might pick the same slot for both - // VirtualFile., and it eventually does a blocking write lock instead of - // try_lock. + let final_parent_dirfd = std::fs::OpenOptions::new() .read(true) .open(final_path_parent)?; + final_parent_dirfd.sync_all()?; Ok(()) } From d0f2ab303cb96a3f02149ad338a8db4109eff54c Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Feb 2024 11:49:05 +0000 Subject: [PATCH 5/5] remove REVIEW comments --- pageserver/src/deletion_queue.rs | 2 -- pageserver/src/tenant/metadata.rs | 1 - 2 files changed, 3 deletions(-) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 00c67e6b4ed3..e0c40ea1b0f6 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -234,7 +234,6 @@ impl DeletionHeader { let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?; let header_path = conf.deletion_header_path(); let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX); - // REVIEW: this now uses spawn_blocking instead of blocking the executor thread VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes) .await .maybe_fatal_err("save deletion header")?; @@ -327,7 +326,6 @@ impl DeletionList { let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); - // REVIEW: this will now use spawn_blocking instead of blocking the executor thread VirtualFile::crashsafe_overwrite(path, temp_path, bytes) .await .maybe_fatal_err("save deletion list") diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index a9d4a28aa26e..233acfd43176 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -279,7 +279,6 @@ pub async fn save_metadata( let path = conf.metadata_path(tenant_shard_id, timeline_id); let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX); let metadata_bytes = data.to_bytes().context("serialize metadata")?; - // REVIEW: this will now use spawn_blocking instead of blocking the executor thread VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes) .await .context("write metadata")?;