Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(virtual_file): take owned buffer in VirtualFile::write_all #6664

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9a4880e
WIP
problame Feb 7, 2024
b5a00b0
refactor(disk_btree): make BlockWriter::write_blk infallible
problame Feb 7, 2024
7ba1949
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/btree-b…
problame Feb 7, 2024
91d3e25
finish
problame Feb 7, 2024
6f65648
Revert "refactor(disk_btree): make BlockWriter::write_blk infallible"
problame Feb 7, 2024
85d5fc6
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
edabce6
use right branch
problame Feb 7, 2024
e5d15df
WIP
problame Feb 7, 2024
2a39457
WIP
problame Feb 7, 2024
4659794
WIP
problame Feb 7, 2024
14bdd84
it turns out one wants to take BoundedBuf, not Slice<T>
problame Feb 7, 2024
4fe3b49
make tests pass
problame Feb 7, 2024
a6605a1
pull bunch of changes down
problame Feb 7, 2024
c92e8a7
don't pull that in
problame Feb 7, 2024
b0144e2
update lib
problame Feb 7, 2024
54561a8
fixup
problame Feb 7, 2024
6c47083
we can't use impl IoBuf for Array
problame Feb 7, 2024
238296a
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
207764b
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
e5da261
work around BoundedBuf.slice(0..x) panicking for x == 0
problame Feb 7, 2024
26e51c7
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
6f4d182
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
33f3053
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Feb 7, 2024
33261e4
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
8998178
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pageserver/src/deletion_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl DeletionHeader {
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
let header_path = conf.deletion_header_path();
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
.await
.maybe_fatal_err("save deletion header")?;

Expand Down Expand Up @@ -325,7 +325,7 @@ impl DeletionList {
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);

let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2880,7 +2880,7 @@ impl Tenant {
let config_path = config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.as_bytes();
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
.await
.with_context(|| {
Expand Down Expand Up @@ -2917,7 +2917,7 @@ impl Tenant {
let target_config_path = target_config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.as_bytes();
let conf_content = conf_content.into_bytes();
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
.await
.with_context(|| {
Expand Down
26 changes: 11 additions & 15 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,23 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
src_buf: B,
) -> (B::Buf, Result<(), Error>) {
let src_buf_len = src_buf.bytes_init();
let (src_buf, res) = if src_buf_len > 0 {
let src_buf = src_buf.slice(0..src_buf_len);
let res = self.inner.write_all(&src_buf).await;
let src_buf = Slice::into_inner(src_buf);
(src_buf, res)
} else {
let res = self.inner.write_all(&[]).await;
(Slice::into_inner(src_buf.slice_full()), res)
let (src_buf, res) = self.inner.write_all(src_buf).await;
let nbytes = match res {
Ok(nbytes) => nbytes,
Err(e) => return (src_buf, Err(e)),
};
if let Ok(()) = &res {
self.offset += src_buf_len as u64;
}
(src_buf, res)
self.offset += nbytes as u64;
(src_buf, Ok(()))
}

#[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self) -> Result<(), Error> {
self.inner.write_all(&self.buf).await?;
self.buf.clear();
let buf = std::mem::take(&mut self.buf);
let (mut buf, res) = self.inner.write_all(buf).await;
res?;
buf.clear();
self.buf = buf;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub async fn save_metadata(
let path = conf.metadata_path(tenant_shard_id, timeline_id);
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
VirtualFile::crashsafe_overwrite(&path, &temp_path, &metadata_bytes)
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
.await
.context("write metadata")?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/secondary/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ impl<'a> TenantDownloader<'a> {
let heatmap_path_bg = heatmap_path.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
})
})
.await
Expand Down
30 changes: 10 additions & 20 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ impl DeltaLayerWriterInner {
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
for buf in block_buf.blocks {
file.write_all(buf.as_ref()).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
}
assert!(self.lsn_range.start < self.lsn_range.end);
// Fill in the summary on blk 0
Expand All @@ -476,17 +477,12 @@ impl DeltaLayerWriterInner {
index_root_blk,
};

let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
if buf.spilled() {
// This is bad as we only have one free block for the summary
warn!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
let (_buf, res) = file.write_all(buf).await;
res?;

let metadata = file
.metadata()
Expand Down Expand Up @@ -679,18 +675,12 @@ impl DeltaLayer {

let new_summary = rewrite(actual_summary);

let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here, but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in DeltaLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
Ok(())
}
}
Expand Down
30 changes: 10 additions & 20 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,18 +341,12 @@ impl ImageLayer {

let new_summary = rewrite(actual_summary);

let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
Ok(())
}
}
Expand Down Expand Up @@ -555,7 +549,8 @@ impl ImageLayerWriterInner {
.await?;
let (index_root_blk, block_buf) = self.tree.finish()?;
for buf in block_buf.blocks {
file.write_all(buf.as_ref()).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
}

// Fill in the summary on blk 0
Expand All @@ -570,17 +565,12 @@ impl ImageLayerWriterInner {
index_root_blk,
};

let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
if buf.spilled() {
// This is bad as we only have one free block for the summary
warn!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
let (_buf, res) = file.write_all(buf).await;
res?;

let metadata = file
.metadata()
Expand Down
66 changes: 44 additions & 22 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::IoBufMut;
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};

use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
Expand Down Expand Up @@ -410,10 +410,10 @@ impl VirtualFile {
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite(
pub async fn crashsafe_overwrite<B: BoundedBuf>(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
content: B,
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
Expand All @@ -430,7 +430,8 @@ impl VirtualFile {
.create_new(true),
)
.await?;
file.write_all(content).await?;
let (_content, res) = file.write_all(content).await;
res?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
// renames are atomic
Expand Down Expand Up @@ -601,23 +602,36 @@ impl VirtualFile {
Ok(())
}

pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), Error> {
/// Writes `buf.slice(0..buf.bytes_init())`.
/// Returns the IoBuf that is underlying the BoundedBuf `buf`.
/// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
/// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
pub async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> (B::Buf, Result<usize, Error>) {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(0));
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
}
let mut buf = buf.slice(0..nbytes);
while !buf.is_empty() {
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
match self.write(buf).await {
// TODO: push `Slice` further down
match self.write(&buf).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
return (
Slice::into_inner(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
);
}
Ok(n) => {
buf = &buf[n..];
buf = buf.slice(n..);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
Err(e) => return (Slice::into_inner(buf), Err(e)),
}
}
Ok(())
(Slice::into_inner(buf), Ok(nbytes))
}

async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
Expand Down Expand Up @@ -676,7 +690,6 @@ where
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
{
use tokio_epoll_uring::BoundedBuf;
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
while buf.bytes_total() != 0 {
let res;
Expand Down Expand Up @@ -1063,10 +1076,19 @@ mod tests {
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
MaybeVirtualFile::File(file) => file.write_all(buf),
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all(buf).await;
res.map(|_| ())
}
MaybeVirtualFile::File(file) => {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return Ok(());
}
file.write_all(&buf.slice(0..buf_len))
}
}
}

Expand Down Expand Up @@ -1141,7 +1163,7 @@ mod tests {
.to_owned(),
)
.await?;
file_a.write_all(b"foobar").await?;
file_a.write_all(b"foobar".to_vec()).await?;

// cannot read from a file opened in write-only mode
let _ = file_a.read_string().await.unwrap_err();
Expand All @@ -1150,7 +1172,7 @@ mod tests {
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;

// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar").await.unwrap_err();
let _ = file_a.write_all(b"bar".to_vec()).await.unwrap_err();

// Try simple read
assert_eq!("foobar", file_a.read_string().await?);
Expand Down Expand Up @@ -1293,7 +1315,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");

VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
Expand All @@ -1302,7 +1324,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);

VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar")
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
Expand All @@ -1324,7 +1346,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());

VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
.await
.unwrap();

Expand Down
Loading