Skip to content

Commit

Permalink
refactor: revert the pre-serialization and parallel buffer (foyer-rs#717
Browse files Browse the repository at this point in the history
)

* refactor: use entry uncompressed serialized size for selection

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* refactor: refactor entry serde

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* refactor: revert the parallelism batching design

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* refactor: remove spawn on write

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* chore: pass ffmt test

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: fix build on nightly with nightly feature

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* chore: fix

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

---------

Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx authored Sep 24, 2024
1 parent 553a9cb commit 60839ee
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 288 deletions.
8 changes: 4 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ fastrace = { workspace = true }
fastrace-jaeger = { workspace = true, optional = true }
fastrace-opentelemetry = { workspace = true, optional = true }
foyer = { version = "*", path = "../foyer" }
opentelemetry = { version = "0.24", optional = true }
opentelemetry-otlp = { version = "0.17", optional = true }
opentelemetry-semantic-conventions = { version = "0.16", optional = true }
opentelemetry_sdk = { version = "0.24", features = [
opentelemetry = { version = "0.25", optional = true }
opentelemetry-otlp = { version = "0.25", optional = true }
opentelemetry-semantic-conventions = { version = "0.25", optional = true }
opentelemetry_sdk = { version = "0.25", features = [
"rt-tokio",
"trace",
], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion foyer-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ serde = { workspace = true }
serde_bytes = "0.11.15"
tokio = { workspace = true }
tracing = "0.1"
tracing-opentelemetry = { version = "0.25", optional = true }
tracing-opentelemetry = { version = "0.26", optional = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
zipf = "7"

Expand Down
1 change: 1 addition & 0 deletions foyer-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ bytes = "1"
clap = { workspace = true }
either = "1"
fastrace = { workspace = true }
flume = "0.11"
foyer-common = { version = "0.9.3", path = "../foyer-common" }
foyer-memory = { version = "0.7.3", path = "../foyer-memory" }
fs4 = "0.9.1"
Expand Down
21 changes: 12 additions & 9 deletions foyer-storage/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ use futures::Future;
use crate::{
error::Result,
large::generic::{GenericLargeStorage, GenericLargeStorageConfig},
serde::KvInfo,
small::generic::{GenericSmallStorage, GenericSmallStorageConfig},
storage::{
either::{Either, EitherConfig, Selection, Selector},
noop::Noop,
},
DeviceStats, IoBytes, Storage,
DeviceStats, Storage,
};

pub struct SizeSelector<K, V, S>
Expand Down Expand Up @@ -84,8 +83,12 @@ where
type Value = V;
type BuildHasher = S;

fn select(&self, _entry: &CacheEntry<Self::Key, Self::Value, Self::BuildHasher>, buffer: &IoBytes) -> Selection {
if buffer.len() < self.threshold {
fn select(
&self,
_entry: &CacheEntry<Self::Key, Self::Value, Self::BuildHasher>,
estimated_size: usize,
) -> Selection {
if estimated_size < self.threshold {
Selection::Left
} else {
Selection::Right
Expand Down Expand Up @@ -239,12 +242,12 @@ where
}
}

fn enqueue(&self, entry: CacheEntry<Self::Key, Self::Value, Self::BuildHasher>, buffer: IoBytes, info: KvInfo) {
fn enqueue(&self, entry: CacheEntry<Self::Key, Self::Value, Self::BuildHasher>, estimated_size: usize) {
match self {
Engine::Noop(storage) => storage.enqueue(entry, buffer, info),
Engine::Large(storage) => storage.enqueue(entry, buffer, info),
Engine::Small(storage) => storage.enqueue(entry, buffer, info),
Engine::Combined(storage) => storage.enqueue(entry, buffer, info),
Engine::Noop(storage) => storage.enqueue(entry, estimated_size),
Engine::Large(storage) => storage.enqueue(entry, estimated_size),
Engine::Small(storage) => storage.enqueue(entry, estimated_size),
Engine::Combined(storage) => storage.enqueue(entry, estimated_size),
}
}

Expand Down
154 changes: 77 additions & 77 deletions foyer-storage/src/large/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
fmt::Debug,
mem::ManuallyDrop,
ops::{Deref, DerefMut, Range},
time::Instant,
};
use std::{fmt::Debug, ops::Range, sync::Arc, time::Instant};

use foyer_common::{
bits,
code::{HashBuilder, StorageKey, StorageValue},
metrics::Metrics,
range::RangeBoundsExt,
strict_assert_eq,
wait_group::{WaitGroup, WaitGroupFuture, WaitGroupGuard},
};
use foyer_memory::CacheEntry;
use itertools::Itertools;
Expand All @@ -39,38 +34,12 @@ use super::{
use crate::{
device::{bytes::IoBytes, MonitoredDevice, RegionId},
io_buffer_pool::IoBufferPool,
large::indexer::HashedEntryAddress,
large::{indexer::HashedEntryAddress, serde::EntryHeader},
region::{GetCleanRegionHandle, RegionManager},
Dev, DevExt, IoBuffer,
serde::{Checksummer, EntrySerializer},
Compression, Dev, DevExt, IoBuffer,
};

pub struct Allocation {
_guard: WaitGroupGuard,
slice: ManuallyDrop<Box<[u8]>>,
}

impl Deref for Allocation {
type Target = [u8];

fn deref(&self) -> &Self::Target {
self.slice.as_ref()
}
}

impl DerefMut for Allocation {
fn deref_mut(&mut self) -> &mut Self::Target {
self.slice.as_mut()
}
}

impl Allocation {
unsafe fn new(buffer: &mut [u8], guard: WaitGroupGuard) -> Self {
let fake = Vec::from_raw_parts(buffer.as_mut_ptr(), buffer.len(), buffer.len());
let slice = ManuallyDrop::new(fake.into_boxed_slice());
Self { _guard: guard, slice }
}
}

pub struct BatchMut<K, V, S>
where
K: StorageKey,
Expand All @@ -83,14 +52,14 @@ where
tombstones: Vec<TombstoneInfo>,
waiters: Vec<oneshot::Sender<()>>,
init: Option<Instant>,
wait: WaitGroup,

/// Cache write buffer between rotation to reduce page fault.
buffer_pool: IoBufferPool,

region_manager: RegionManager,
device: MonitoredDevice,
indexer: Indexer,
metrics: Arc<Metrics>,
}

impl<K, V, S> Debug for BatchMut<K, V, S>
Expand All @@ -116,7 +85,13 @@ where
V: StorageValue,
S: HashBuilder + Debug,
{
pub fn new(capacity: usize, region_manager: RegionManager, device: MonitoredDevice, indexer: Indexer) -> Self {
pub fn new(
capacity: usize,
region_manager: RegionManager,
device: MonitoredDevice,
indexer: Indexer,
metrics: Arc<Metrics>,
) -> Self {
let capacity = bits::align_up(device.align(), capacity);
let mut batch = Self {
buffer: IoBuffer::new(capacity),
Expand All @@ -125,62 +100,100 @@ where
tombstones: vec![],
waiters: vec![],
init: None,
wait: WaitGroup::default(),
buffer_pool: IoBufferPool::new(capacity, 1),
region_manager,
device,
indexer,
metrics,
};
batch.append_group();
batch
}

pub fn entry(&mut self, size: usize, entry: CacheEntry<K, V, S>, sequence: Sequence) -> Option<Allocation> {
pub fn entry(&mut self, entry: CacheEntry<K, V, S>, compression: &Compression, sequence: Sequence) -> bool {
tracing::trace!("[batch]: append entry with sequence: {sequence}");

let aligned = bits::align_up(self.device.align(), size);
self.may_init();

if entry.is_outdated() || self.len + aligned > self.buffer.len() {
return None;
if entry.is_outdated() {
return false;
}

let allocation = self.allocate(aligned);
let pos = self.len;

let info = match EntrySerializer::serialize(
entry.key(),
entry.value(),
compression,
&mut self.buffer[pos + EntryHeader::serialized_len()..],
&self.metrics,
) {
Ok(info) => info,
Err(e) => {
tracing::warn!("[batch]: serialize entry error: {e}");
return false;
}
};

let header = EntryHeader {
key_len: info.key_len as _,
value_len: info.value_len as _,
hash: entry.hash(),
sequence,
checksum: Checksummer::checksum(
&self.buffer[pos + EntryHeader::serialized_len()
..pos + EntryHeader::serialized_len() + info.key_len + info.value_len],
),
compression: *compression,
};
header.write(&mut self.buffer[pos..pos + EntryHeader::serialized_len()]);

let aligned = bits::align_up(self.device.align(), header.entry_len());
self.advance(aligned);

let group = self.groups.last_mut().unwrap();
group.indices.push(HashedEntryAddress {
hash: entry.hash(),
address: EntryAddress {
region: RegionId::MAX,
offset: group.region.offset as u32 + group.region.len as u32,
len: size as _,
len: header.entry_len() as _,
sequence,
},
});
group.entries.push(entry);
group.region.len += aligned;
group.range.end += aligned;

Some(allocation)
true
}

pub fn tombstone(&mut self, tombstone: Tombstone, stats: Option<InvalidStats>) {
tracing::trace!("[batch]: append tombstone");

self.may_init();

self.tombstones.push(TombstoneInfo { tombstone, stats });
}

pub fn reinsertion(&mut self, reinsertion: &Reinsertion) -> Option<Allocation> {
pub fn reinsertion(&mut self, reinsertion: &Reinsertion) -> bool {
tracing::trace!("[batch]: submit reinsertion");

self.may_init();

let aligned = bits::align_up(self.device.align(), reinsertion.buffer.len());

// Skip if the entry is no longer in the indexer.
// Skip if the batch buffer size exceeds the threshold.
if self.indexer.get(reinsertion.hash).is_none() || self.len + aligned > self.buffer.len() {
return None;
return false;
}

let allocation = self.allocate(aligned);
let pos = self.len;

self.buffer[pos..pos + reinsertion.buffer.len()].copy_from_slice(&reinsertion.buffer);

self.advance(aligned);

let group = self.groups.last_mut().unwrap();
// Reserve buffer space for entry.
Expand All @@ -196,22 +209,17 @@ where
group.region.len += aligned;
group.range.end += aligned;

Some(allocation)
true
}

/// Register a waiter to be notified after the batch is finished.
pub fn wait(&mut self) -> oneshot::Receiver<()> {
pub fn wait(&mut self, tx: oneshot::Sender<()>) {
tracing::trace!("[batch]: register waiter");
self.may_init();
let (tx, rx) = oneshot::channel();
self.waiters.push(tx);
rx
}

// Note: Make sure `rotate` is called after all buffer from the last batch are dropped.
//
// Otherwise, the page fault caused by the buffer pool will hurt the performance.
pub fn rotate(&mut self) -> Option<(Batch<K, V, S>, WaitGroupFuture)> {
pub fn rotate(&mut self) -> Option<Batch<K, V, S>> {
if self.is_empty() {
return None;
}
Expand All @@ -222,8 +230,6 @@ where
let buffer = IoBytes::from(buffer);
self.buffer_pool.release(buffer.clone());

let wait = std::mem::take(&mut self.wait);

let init = self.init.take();

let tombstones = std::mem::take(&mut self.tombstones);
Expand Down Expand Up @@ -269,20 +275,16 @@ where
None => self.append_group(),
}

Some((
Batch {
groups,
tombstones,
waiters,
init,
},
wait.wait(),
))
Some(Batch {
groups,
tombstones,
waiters,
init,
})
}

fn allocate(&mut self, len: usize) -> Allocation {
fn advance(&mut self, len: usize) {
assert!(bits::is_aligned(self.device.align(), len));
self.may_init();
assert!(bits::is_aligned(self.device.align(), self.len));

// Rotate group if the current one is full.
Expand All @@ -292,24 +294,22 @@ where
self.append_group();
}

// Reserve buffer space for entry.
let start = self.len;
let end = start + len;
self.len = end;

unsafe { Allocation::new(&mut self.buffer[start..end], self.wait.acquire()) }
self.len += len;
}

fn is_empty(&self) -> bool {
#[inline]
pub fn is_empty(&self) -> bool {
self.tombstones.is_empty() && self.groups.iter().all(|group| group.range.is_empty()) && self.waiters.is_empty()
}

#[inline]
fn may_init(&mut self) {
if self.init.is_none() {
self.init = Some(Instant::now());
}
}

#[inline]
fn append_group(&mut self) {
self.groups.push(GroupMut {
region: RegionHandle {
Expand Down
Loading

0 comments on commit 60839ee

Please sign in to comment.