diff --git a/libs/utils/src/crashsafe.rs b/libs/utils/src/crashsafe.rs index 1c72e9cae9cd..756b19138cc9 100644 --- a/libs/utils/src/crashsafe.rs +++ b/libs/utils/src/crashsafe.rs @@ -1,7 +1,7 @@ use std::{ borrow::Cow, fs::{self, File}, - io, + io::{self, Write}, }; use camino::{Utf8Path, Utf8PathBuf}; @@ -161,6 +161,48 @@ pub async fn durable_rename( Ok(()) } +/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`]. +/// +/// The file is first written to the specified `tmp_path`, and in a second +/// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync +/// and atomic rename guarantee that, if we crash at any point, there will never +/// be a partially written file at `final_path` (but maybe at `tmp_path`). +/// +/// Callers are responsible for serializing calls of this function for a given `final_path`. +/// If they don't, there may be an error due to conflicting `tmp_path`, or there will +/// be no error and the content of `final_path` will be the "winner" caller's `content`. +/// I.e., the atomticity guarantees still hold. +pub fn overwrite( + final_path: &Utf8Path, + tmp_path: &Utf8Path, + content: &[u8], +) -> std::io::Result<()> { + let Some(final_path_parent) = final_path.parent() else { + return Err(std::io::Error::from_raw_os_error( + nix::errno::Errno::EINVAL as i32, + )); + }; + std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?; + let mut file = std::fs::OpenOptions::new() + .write(true) + // Use `create_new` so that, if we race with ourselves or something else, + // we bail out instead of causing damage. + .create_new(true) + .open(tmp_path)?; + file.write_all(content)?; + file.sync_all()?; + drop(file); // don't keep the fd open for longer than we have to + + std::fs::rename(tmp_path, final_path)?; + + let final_parent_dirfd = std::fs::OpenOptions::new() + .read(true) + .open(final_path_parent)?; + + final_parent_dirfd.sync_all()?; + Ok(()) +} + #[cfg(test)] mod tests { diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 9046fe881be5..e0c40ea1b0f6 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -234,7 +234,7 @@ impl DeletionHeader { let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?; let header_path = conf.deletion_header_path(); let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX); - VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes) + VirtualFile::crashsafe_overwrite(header_path, temp_path, header_bytes) .await .maybe_fatal_err("save deletion header")?; @@ -325,7 +325,8 @@ impl DeletionList { let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX); let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); - VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes) + + VirtualFile::crashsafe_overwrite(path, temp_path, bytes) .await .maybe_fatal_err("save deletion list") .map_err(Into::into) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 9f1f188bf253..1f3bc1347292 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -28,7 +28,6 @@ use remote_storage::GenericRemoteStorage; use std::fmt; use storage_broker::BrokerClientChannel; use tokio::io::BufReader; -use tokio::runtime::Handle; use tokio::sync::watch; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -2878,17 +2877,10 @@ impl Tenant { let tenant_shard_id = *tenant_shard_id; let config_path = config_path.to_owned(); - tokio::task::spawn_blocking(move || { - Handle::current().block_on(async move { - let conf_content = conf_content.into_bytes(); - VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content) - .await - .with_context(|| { - format!("write tenant {tenant_shard_id} config to {config_path}") - }) - }) - }) - .await??; + let conf_content = conf_content.into_bytes(); + VirtualFile::crashsafe_overwrite(config_path.clone(), temp_path, conf_content) + .await + .with_context(|| format!("write tenant {tenant_shard_id} config to {config_path}"))?; Ok(()) } @@ -2915,17 +2907,12 @@ impl Tenant { let tenant_shard_id = *tenant_shard_id; let target_config_path = target_config_path.to_owned(); - tokio::task::spawn_blocking(move || { - Handle::current().block_on(async move { - let conf_content = conf_content.into_bytes(); - VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content) - .await - .with_context(|| { - format!("write tenant {tenant_shard_id} config to {target_config_path}") - }) - }) - }) - .await??; + let conf_content = conf_content.into_bytes(); + VirtualFile::crashsafe_overwrite(target_config_path.clone(), temp_path, conf_content) + .await + .with_context(|| { + format!("write tenant {tenant_shard_id} config to {target_config_path}") + })?; Ok(()) } diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index dcbe781f908a..233acfd43176 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -279,7 +279,7 @@ pub async fn save_metadata( let path = conf.metadata_path(tenant_shard_id, timeline_id); let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX); let metadata_bytes = data.to_bytes().context("serialize metadata")?; - VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes) + VirtualFile::crashsafe_overwrite(path, temp_path, metadata_bytes) .await .context("write metadata")?; Ok(()) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index c23416a7f042..c8288acc20eb 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -484,14 +484,9 @@ impl<'a> TenantDownloader<'a> { let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX); let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}"); let heatmap_path_bg = heatmap_path.clone(); - tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current().block_on(async move { - VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await - }) - }) - .await - .expect("Blocking task is never aborted") - .maybe_fatal_err(&context_msg)?; + VirtualFile::crashsafe_overwrite(heatmap_path_bg, temp_path, heatmap_bytes) + .await + .maybe_fatal_err(&context_msg)?; tracing::debug!("Wrote local heatmap to {}", heatmap_path); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 6cff748d42c0..2a8c22430bfc 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -19,14 +19,13 @@ use once_cell::sync::OnceCell; use pageserver_api::shard::TenantShardId; use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; -use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; +use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; -use utils::fs_ext; pub use pageserver_api::models::virtual_file as api; pub(crate) mod io_engine; @@ -404,47 +403,34 @@ impl VirtualFile { Ok(vfile) } - /// Writes a file to the specified `final_path` in a crash safe fasion + /// Async version of [`::utils::crashsafe::overwrite`]. /// - /// The file is first written to the specified tmp_path, and in a second - /// step, the tmp path is renamed to the final path. As renames are - /// atomic, a crash during the write operation will never leave behind a - /// partially written file. - pub async fn crashsafe_overwrite( - final_path: &Utf8Path, - tmp_path: &Utf8Path, + /// # NB: + /// + /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but, + /// it did at an earlier time. + /// And it will use this module's [`io_engine`] in the near future, so, leaving it here. + pub async fn crashsafe_overwrite + Send, Buf: IoBuf + Send>( + final_path: Utf8PathBuf, + tmp_path: Utf8PathBuf, content: B, ) -> std::io::Result<()> { - let Some(final_path_parent) = final_path.parent() else { - return Err(std::io::Error::from_raw_os_error( - nix::errno::Errno::EINVAL as i32, - )); - }; - std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?; - let mut file = Self::open_with_options( - tmp_path, - OpenOptions::new() - .write(true) - // Use `create_new` so that, if we race with ourselves or something else, - // we bail out instead of causing damage. - .create_new(true), - ) - .await?; - let (_content, res) = file.write_all(content).await; - res?; - file.sync_all().await?; - drop(file); // before the rename, that's important! - // renames are atomic - std::fs::rename(tmp_path, final_path)?; - // Only open final path parent dirfd now, so that this operation only - // ever holds one VirtualFile fd at a time. That's important because - // the current `find_victim_slot` impl might pick the same slot for both - // VirtualFile., and it eventually does a blocking write lock instead of - // try_lock. - let final_parent_dirfd = - Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?; - final_parent_dirfd.sync_all().await?; - Ok(()) + // TODO: use tokio_epoll_uring if configured as `io_engine`. + // See https://github.com/neondatabase/neon/issues/6663 + + tokio::task::spawn_blocking(move || { + let slice_storage; + let content_len = content.bytes_init(); + let content = if content.bytes_init() > 0 { + slice_storage = Some(content.slice(0..content_len)); + slice_storage.as_deref().expect("just set it to Some()") + } else { + &[] + }; + utils::crashsafe::overwrite(&final_path, &tmp_path, content) + }) + .await + .expect("blocking task is never aborted") } /// Call File::sync_all() on the underlying File. @@ -1315,7 +1301,7 @@ mod tests { let path = testdir.join("myfile"); let tmp_path = testdir.join("myfile.tmp"); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec()) + VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec()) .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); @@ -1324,7 +1310,7 @@ mod tests { assert!(!tmp_path.exists()); drop(file); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec()) + VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec()) .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); @@ -1346,7 +1332,7 @@ mod tests { std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap(); assert!(tmp_path.exists()); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec()) + VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec()) .await .unwrap();