Skip to content

Commit

Permalink
pageserver: make BufferedWriter do double-buffering (#9693)
Browse files Browse the repository at this point in the history
Closes #9387.

## Problem

`BufferedWriter` cannot proceed while the owned buffer is flushing to
disk. We want to implement double buffering so that the flush can happen
in the background. See #9387.

## Summary of changes

- Maintain two owned buffers in `BufferedWriter`.
- The writer is in charge of copying the data into owned, aligned
buffer, once full, submit it to the flush task.
- The flush background task is in charge of flushing the owned buffer to
disk, and returned the buffer to the writer for reuse.
- The writer and the flush background task communicate through a
bi-directional channel.

For in-memory layer, we also need to be able to read from the buffered
writer in `get_values_reconstruct_data`. To handle this case, we did the
following
- Use replace `VirtualFile::write_all` with `VirtualFile::write_all_at`,
and use `Arc` to share it between writer and background task.
- leverage `IoBufferMut::freeze` to get a cheaply clonable `IoBuffer`,
one clone will be submitted to the channel, the other clone will be
saved within the writer to serve reads. When we want to reuse the
buffer, we can invoke `IoBuffer::into_mut`, which gives us back the
mutable aligned buffer.
- InMemoryLayer reads is now aware of the maybe_flushed part of the
buffer.

**Caveat**

- We removed the owned version of write, because this interface does not
work well with buffer alignment. The result is that without direct IO
enabled,
[`download_object`](https://github.com/neondatabase/neon/blob/a439d57050dafd603d24e001215213eb5246a029/pageserver/src/tenant/remote_timeline_client/download.rs#L243)
does one more memcpy than before this PR due to the switch to use
`_borrowed` version of the write.
- "Bypass aligned part of write" could be implemented later to avoid
large amount of memcpy.

**Testing**
- use an oneshot channel based control mechanism to make flush behavior
deterministic in test.
- test reading from `EphemeralFile` when the last submitted buffer is
not flushed, in-progress, and done flushing to disk.


## Performance


We see performance improvement for small values, and regression on big
values, likely due to being CPU bound + disk write latency.


[Results](https://www.notion.so/neondatabase/Benchmarking-New-BufferedWriter-11-20-2024-143f189e0047805ba99acda89f984d51?pvs=4)


## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
  • Loading branch information
2 people authored and awarus committed Dec 5, 2024
1 parent 088eb72 commit 7b0e3db
Show file tree
Hide file tree
Showing 21 changed files with 846 additions and 389 deletions.
1 change: 1 addition & 0 deletions libs/utils/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod heavier_once_cell;

pub mod duplex;
pub mod gate;

pub mod spsc_fold;
1 change: 1 addition & 0 deletions libs/utils/src/sync/duplex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod mpsc;
36 changes: 36 additions & 0 deletions libs/utils/src/sync/duplex/mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use tokio::sync::mpsc;

/// A bi-directional channel.
pub struct Duplex<S, R> {
pub tx: mpsc::Sender<S>,
pub rx: mpsc::Receiver<R>,
}

/// Creates a bi-directional channel.
///
/// The channel will buffer up to the provided number of messages. Once the buffer is full,
/// attempts to send new messages will wait until a message is received from the channel.
/// The provided buffer capacity must be at least 1.
pub fn channel<A: Send, B: Send>(buffer: usize) -> (Duplex<A, B>, Duplex<B, A>) {
let (tx_a, rx_a) = mpsc::channel::<A>(buffer);
let (tx_b, rx_b) = mpsc::channel::<B>(buffer);

(Duplex { tx: tx_a, rx: rx_b }, Duplex { tx: tx_b, rx: rx_a })
}

impl<S: Send, R: Send> Duplex<S, R> {
/// Sends a value, waiting until there is capacity.
///
/// A successful send occurs when it is determined that the other end of the channel has not hung up already.
pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError<S>> {
self.tx.send(x).await
}

/// Receives the next value for this receiver.
///
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer.
pub async fn recv(&mut self) -> Option<R> {
self.rx.recv().await
}
}
4 changes: 1 addition & 3 deletions pageserver/benches/bench_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@ async fn ingest(
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

let gate = utils::sync::gate::Gate::default();
let entered = gate.enter().unwrap();

let layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?;
let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &gate, &ctx).await?;

let data = Value::Image(Bytes::from(vec![0u8; put_size]));
let data_ser_size = data.serialized_size().unwrap() as usize;
Expand Down
Loading

0 comments on commit 7b0e3db

Please sign in to comment.