Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: make BufferedWriter do double-buffering #9693

Merged
merged 40 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dd1c45e
eliminate size_tracking_writer
yliang412 Nov 7, 2024
224cbb4
change OwnedAsyncWriter trait to use write_all_at
yliang412 Nov 8, 2024
f0efc90
use Arc around W: OwnedAsyncWriter
yliang412 Nov 8, 2024
26c8b50
implement non-generic flush handle & bg task
yliang412 Nov 9, 2024
4599804
make flush handle & task generic
yliang412 Nov 10, 2024
bdffc35
use background flush for write path; read path broken
yliang412 Nov 10, 2024
e0848c2
make InMemory read aware of mutable & maybe_flushed
yliang412 Nov 11, 2024
e5bb85d
fix clippy
yliang412 Nov 11, 2024
7b34e73
fix tests
yliang412 Nov 11, 2024
b0d7fc7
fix IoBufferMut::extend_from_slice
yliang412 Nov 11, 2024
ce7cd36
add IoBufAligned marker
yliang412 Nov 12, 2024
20e6a0c
use open_with_options_v2 (O_DIRECT) for ephemeral file
yliang412 Nov 12, 2024
d6d8a16
Merge branch 'main' into yuchen/double-buffered-writer
yliang412 Nov 12, 2024
ffd88ed
fix clippy
yliang412 Nov 12, 2024
6844b5f
add comments; make read buffering works with write_buffered (owned ve…
yliang412 Nov 12, 2024
990bc65
review: https://github.com/neondatabase/neon/pull/9693#discussion_r18…
yliang412 Nov 15, 2024
5acc61b
move duplex to utils; make flush behavior controllable in test
yliang412 Nov 18, 2024
9db6b1e
fix clippy
yliang412 Nov 19, 2024
826e239
add comments
yliang412 Nov 19, 2024
78a17a7
improve FullSlice semantics
yliang412 Nov 19, 2024
77801fe
Merge branch 'main' into yuchen/double-buffered-writer
yliang412 Nov 19, 2024
0f63c95
document and reorder flush background task invokation sequence
yliang412 Nov 20, 2024
54d253d
review: change IoMode default back to Buffered
yliang412 Nov 25, 2024
e5bf2be
remove write_buffered; add notes for bypass-aligned-part-of-write
yliang412 Nov 25, 2024
28718bf
review: simplify FlushControl by using ZST for not(test)
yliang412 Nov 25, 2024
76f0e4f
review: remove save_buf_for_read
yliang412 Nov 25, 2024
d4ebd5c
use CheapCloneForRead trait to prevent efficiency bugs
yliang412 Nov 25, 2024
8a37f41
remove resolved todos
yliang412 Nov 25, 2024
4284fcd
fix docs clippy
yliang412 Nov 25, 2024
c3302ad
Merge branch 'main' into yuchen/double-buffered-writer
yliang412 Nov 25, 2024
b54764b
hold timeline open in background task using gate guard (#9825)
yliang412 Nov 27, 2024
b6a2516
Merge branch 'main' into yuchen/double-buffered-writer
yliang412 Nov 27, 2024
9f384a8
review: remove unused impl Buffer for BytesMut
yliang412 Dec 2, 2024
bf9a6d0
review: follow Buffer::extend_from_slice trait definition
yliang412 Dec 2, 2024
fac4269
review: fix CheapCloneForRead for FullSlice
yliang412 Dec 2, 2024
a439d57
review: cleanup comments + expect_err
yliang412 Dec 2, 2024
6a1aa52
review: move FlushHandle::handle_error right after ::flush
yliang412 Dec 2, 2024
21ca0c4
review: set channel buffer size to 1
yliang412 Dec 2, 2024
9d1821a
Merge branch 'main' into yuchen/double-buffered-writer
yliang412 Dec 2, 2024
1da4028
fix clippy
yliang412 Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 115 additions & 57 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use crate::page_cache;
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
use bytes::BytesMut;
use camino::Utf8PathBuf;
use num_traits::Num;
use pageserver_api::shard::TenantShardId;
Expand All @@ -20,17 +18,15 @@ use tracing::error;

use std::io;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use utils::id::TimelineId;

pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId,
bytes_written: u64,
buffered_writer: owned_buffers_io::write::BufferedWriter<
BytesMut,
size_tracking_writer::Writer<VirtualFile>,
>,
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: utils::sync::gate::GateGuard,
}
Expand All @@ -55,15 +51,17 @@ impl EphemeralFile {
"ephemeral-{filename_disambiguator}"
)));

let file = VirtualFile::open_with_options(
&filename,
virtual_file::OpenOptions::new()
.read(true)
.write(true)
.create(true),
ctx,
)
.await?;
let file = Arc::new(
VirtualFile::open_with_options_v2(
&filename,
virtual_file::OpenOptions::new()
.read(true)
.write(true)
.create(true),
ctx,
)
.await?,
);

let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore

Expand All @@ -73,8 +71,9 @@ impl EphemeralFile {
page_cache_file_id,
bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
size_tracking_writer::Writer::new(file),
BytesMut::with_capacity(TAIL_SZ),
file,
|| IoBufferMut::with_capacity(TAIL_SZ),
ctx,
),
_gate_guard: gate_guard,
})
Expand All @@ -85,7 +84,7 @@ impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = self.buffered_writer.as_inner().as_inner().path();
let path = self.buffered_writer.as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
Expand Down Expand Up @@ -168,11 +167,12 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
dst: tokio_epoll_uring::Slice<B>,
ctx: &'a RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
let file_size_tracking_writer = self.buffered_writer.as_inner();
let flushed_offset = file_size_tracking_writer.bytes_written();
let flushed_offset = self.buffered_writer.bytes_written();

let buffer = self.buffered_writer.inspect_buffer();
let buffered = &buffer[0..buffer.pending()];
let mutable = self.buffered_writer.inspect_mutable();
let mutable = &mutable[0..mutable.pending()];

let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();

let dst_cap = dst.bytes_total().into_u64();
let end = {
Expand All @@ -197,11 +197,37 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
}
}
}
let written_range = Range(start, std::cmp::min(end, flushed_offset));
let buffered_range = Range(std::cmp::max(start, flushed_offset), end);

// [ written ][ maybe_flushed ][ mutable ]
// |- TAIL_SZ -||- TAIL_SZ -|
// ^
// `flushed_offset`
//
let (written_range, maybe_flushed_range) = {
if maybe_flushed.is_some() {
(
Range(
start,
std::cmp::min(end, flushed_offset.saturating_sub(TAIL_SZ as u64)),
),
Range(
std::cmp::max(start, flushed_offset.saturating_sub(TAIL_SZ as u64)),
std::cmp::min(end, flushed_offset),
),
)
} else {
(
Range(start, std::cmp::min(end, flushed_offset)),
// zero len
Range(flushed_offset, u64::MIN),
)
}
};

let mutable_range = Range(std::cmp::max(start, flushed_offset), end);

let dst = if written_range.len() > 0 {
let file: &VirtualFile = file_size_tracking_writer.as_inner();
let file: &VirtualFile = self.buffered_writer.as_inner();
let bounds = dst.bounds();
let slice = file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
Expand All @@ -211,19 +237,21 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
dst
};

let dst = if buffered_range.len() > 0 {
let offset_in_buffer = buffered_range
let dst = if maybe_flushed_range.len() > 0 {
let offset_in_buffer = maybe_flushed_range
.0
.checked_sub(flushed_offset)
.checked_sub(flushed_offset.saturating_sub(TAIL_SZ as u64))
.unwrap()
.into_usize();
let to_copy =
&buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())];
// Checked previously the buffer is Some.
let maybe_flushed = maybe_flushed.unwrap();
let to_copy = &maybe_flushed
[offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
let bounds = dst.bounds();
let mut view = dst.slice({
let start = written_range.len().into_usize();
let end = start
.checked_add(buffered_range.len().into_usize())
.checked_add(maybe_flushed_range.len().into_usize())
.unwrap();
start..end
});
Expand All @@ -234,6 +262,28 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
dst
};

let dst = if mutable_range.len() > 0 {
let offset_in_buffer = mutable_range
.0
.checked_sub(flushed_offset)
.unwrap()
.into_usize();
let to_copy =
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
let bounds = dst.bounds();
let mut view = dst.slice({
let start =
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
start..end
});
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
} else {
dst
};

// TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs

Ok((dst, (end - start).into_usize()))
Expand Down Expand Up @@ -331,9 +381,9 @@ mod tests {
.await
.unwrap();

let cap = file.buffered_writer.inspect_buffer().capacity();
let cap = file.buffered_writer.inspect_mutable().capacity();

let write_nbytes = cap + cap / 2;
let write_nbytes = cap * 2 + cap / 2;

let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
Expand All @@ -359,12 +409,14 @@ mod tests {
assert_eq!(&buf, &content[i..i + 1]);
}

let file_contents =
std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
assert_eq!(file_contents, &content[0..cap]);
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
assert!(file_contents == content[0..cap] || file_contents == content[0..cap * 2]);
yliang412 marked this conversation as resolved.
Show resolved Hide resolved

let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
assert_eq!(maybe_flushed_buffer_contents, &content[cap..cap * 2]);

let buffer_contents = file.buffered_writer.inspect_buffer();
assert_eq!(buffer_contents, &content[cap..write_nbytes]);
let mutable_buffer_contents = file.buffered_writer.inspect_mutable();
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
}

#[tokio::test]
Expand All @@ -378,35 +430,33 @@ mod tests {
.await
.unwrap();

let cap = file.buffered_writer.inspect_buffer().capacity();
// mutable buffer and maybe_flushed buffer each has cap.
let cap = file.buffered_writer.inspect_mutable().capacity();

let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(cap + cap / 2)
.take(cap * 2 + cap / 2)
.collect();

file.write_raw(&content, &ctx).await.unwrap();

// assert the state is as this test expects it to be
assert_eq!(
&file.load_to_io_buf(&ctx).await.unwrap(),
&content[0..cap + cap / 2]
&content[0..cap * 2 + cap / 2]
);
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
assert!(
md.len() == cap.into_u64() || md.len() == 2 * cap.into_u64(),
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
);
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
let md = file
.buffered_writer
.as_inner()
.as_inner()
.path()
.metadata()
.unwrap();
assert_eq!(
md.len(),
cap.into_u64(),
"buffered writer does one write if we write 1.5x buffer capacity"
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
&content[cap..cap * 2]
);
assert_eq!(
&file.buffered_writer.inspect_buffer()[0..cap / 2],
&content[cap..cap + cap / 2]
&file.buffered_writer.inspect_mutable()[0..cap / 2],
&content[cap * 2..cap * 2 + cap / 2]
);
}

Expand All @@ -427,11 +477,11 @@ mod tests {
.await
.unwrap();

let cap = file.buffered_writer.inspect_buffer().capacity();
let cap = file.buffered_writer.inspect_mutable().capacity();

let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(cap + cap / 2)
.take(cap * 2 + cap / 2)
.collect();

file.write_raw(&content, &ctx).await.unwrap();
Expand Down Expand Up @@ -461,9 +511,17 @@ mod tests {
test_read(cap - 10, 10).await;
// read across file and buffer
test_read(cap - 10, 20).await;
// stay from start of buffer
// stay from start of maybe flushed buffer
test_read(cap, 10).await;
// completely within buffer
// completely within maybe flushed buffer
test_read(cap + 10, 10).await;
// border onto edge of maybe flushed buffer.
test_read(cap * 2 - 10, 10).await;
// read across maybe flushed and mutable buffer
test_read(cap * 2 - 10, 20).await;
// read across three segments
test_read(cap - 10, cap + 20).await;
// completely within mutable buffer
test_read(cap * 2 + 10, 10).await;
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
}
}
39 changes: 22 additions & 17 deletions pageserver/src/tenant/remote_timeline_client/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::collections::HashSet;
use std::future::Future;
use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;

use anyhow::{anyhow, Context};
Expand All @@ -26,9 +27,7 @@ use crate::span::{
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName;
use crate::tenant::Generation;
#[cfg_attr(target_os = "macos", allow(unused_imports))]
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{on_fatal_io_error, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath};
use utils::crashsafe::path_with_suffix_extension;
Expand Down Expand Up @@ -203,13 +202,16 @@ async fn download_object<'a>(
}
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
use bytes::BytesMut;
use crate::virtual_file::owned_buffers_io;
async {
let destination_file = VirtualFile::create(dst_path, ctx)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
let destination_file = Arc::new(
VirtualFile::create(dst_path, ctx)
.await
.with_context(|| {
format!("create a destination file for layer '{dst_path}'")
})
.map_err(DownloadError::Other)?,
);

let mut download = storage
.download(src_path, &DownloadOpts::default(), cancel)
Expand All @@ -220,22 +222,25 @@ async fn download_object<'a>(
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
let size_tracking = size_tracking_writer::Writer::new(destination_file);
let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
size_tracking,
BytesMut::with_capacity(super::BUFFER_SIZE),
);
let mut buffered =
owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
ctx,
);
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(e),
};
buffered.write_buffered(chunk.slice_len(), ctx).await?;
// TODO(yuchen): might have performance issue when using borrowed version?
// Problem: input is Bytes, does not satisify IO alignment requirement.
buffered.write_buffered_borrowed(&chunk, ctx).await?;
yliang412 marked this conversation as resolved.
Show resolved Hide resolved
}
let size_tracking = buffered.flush_and_into_inner(ctx).await?;
Ok(size_tracking.into_inner())
let inner = buffered.flush_and_into_inner(ctx).await?;
Ok(inner)
problame marked this conversation as resolved.
Show resolved Hide resolved
}
.await?;

Expand Down
Loading
Loading