Skip to content

Commit

Permalink
Avoid double-indirection of Arc<Vec<u8>> by using Arc<[u8]> directly
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreichold authored and cberner committed May 11, 2024
1 parent e93051a commit 5a6af86
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 46 deletions.
6 changes: 3 additions & 3 deletions src/tree_store/btree_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ enum EitherPage {
Immutable(PageImpl),
Mutable(PageMut),
OwnedMemory(Vec<u8>),
ArcMemory(Arc<Vec<u8>>),
ArcMemory(Arc<[u8]>),
}

impl EitherPage {
Expand All @@ -132,7 +132,7 @@ impl EitherPage {
EitherPage::Immutable(page) => page.memory(),
EitherPage::Mutable(page) => page.memory(),
EitherPage::OwnedMemory(mem) => mem.as_slice(),
EitherPage::ArcMemory(mem) => mem.as_slice(),
EitherPage::ArcMemory(mem) => mem,
}
}
}
Expand All @@ -159,7 +159,7 @@ impl<'a, V: Value + 'static> AccessGuard<'a, V> {
}
}

pub(crate) fn with_arc_page(page: Arc<Vec<u8>>, range: Range<usize>) -> Self {
pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
Self {
page: EitherPage::ArcMemory(page),
offset: range.start,
Expand Down
10 changes: 5 additions & 5 deletions src/tree_store/btree_mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ enum DeletionResult {
DeletedLeaf,
// A leaf with fewer entries than desired
PartialLeaf {
page: Arc<Vec<u8>>,
page: Arc<[u8]>,
deleted_pair: usize,
},
// A branch page subtree with fewer children than desired
Expand Down Expand Up @@ -316,7 +316,7 @@ impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> {
let existing_value = if found {
let (start, end) = accessor.value_range(position).unwrap();
if self.modify_uncommitted && self.mem.uncommitted(page_number) {
let arc = page.to_arc_vec();
let arc = page.to_arc();
drop(page);
self.mem.free(page_number);
Some(AccessGuard::with_arc_page(arc, start..end))
Expand Down Expand Up @@ -350,7 +350,7 @@ impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> {
let existing_value = if found {
let (start, end) = accessor.value_range(position).unwrap();
if self.modify_uncommitted && self.mem.uncommitted(page_number) {
let arc = page.to_arc_vec();
let arc = page.to_arc();
drop(page);
self.mem.free(page_number);
Some(AccessGuard::with_arc_page(arc, start..end))
Expand Down Expand Up @@ -546,7 +546,7 @@ impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> {
// Merge when less than 33% full. Splits occur when a page is full and produce two 50%
// full pages, so we use 33% instead of 50% to avoid oscillating
PartialLeaf {
page: page.to_arc_vec(),
page: page.to_arc(),
deleted_pair: position,
}
} else {
Expand All @@ -570,7 +570,7 @@ impl<'a, 'b, K: Key, V: Value> MutateHelper<'a, 'b, K, V> {
drop(accessor);
let guard = if uncommitted && self.modify_uncommitted {
let page_number = page.get_page_number();
let arc = page.to_arc_vec();
let arc = page.to_arc();
drop(page);
self.mem.free(page_number);
Some(AccessGuard::with_arc_page(arc, start..end))
Expand Down
4 changes: 2 additions & 2 deletions src/tree_store/page_store/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ pub(crate) trait Page {
}

pub struct PageImpl {
pub(super) mem: Arc<Vec<u8>>,
pub(super) mem: Arc<[u8]>,
pub(super) page_number: PageNumber,
#[cfg(debug_assertions)]
pub(super) open_pages: Arc<Mutex<HashMap<PageNumber, u64>>>,
}

impl PageImpl {
pub(crate) fn to_arc_vec(&self) -> Arc<Vec<u8>> {
pub(crate) fn to_arc(&self) -> Arc<[u8]> {
self.mem.clone()
}
}
Expand Down
66 changes: 30 additions & 36 deletions src/tree_store/page_store/cached_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::tree_store::LEAF;
use crate::{DatabaseError, Result, StorageBackend, StorageError};
use std::collections::BTreeMap;
use std::io;
use std::mem;
use std::ops::{Index, IndexMut};
use std::slice::SliceIndex;
#[cfg(feature = "cache_metrics")]
Expand Down Expand Up @@ -31,48 +30,48 @@ impl CachePriority {
pub(super) struct WritablePage {
buffer: Arc<Mutex<PrioritizedWriteCache>>,
offset: u64,
data: Vec<u8>,
data: Option<Arc<[u8]>>,
priority: CachePriority,
}

impl WritablePage {
pub(super) fn mem(&self) -> &[u8] {
&self.data
self.data.as_ref().unwrap()
}

pub(super) fn mem_mut(&mut self) -> &mut [u8] {
&mut self.data
Arc::get_mut(self.data.as_mut().unwrap()).unwrap()
}
}

impl Drop for WritablePage {
fn drop(&mut self) {
let data = mem::take(&mut self.data);
let data = self.data.take().unwrap();
self.buffer
.lock()
.unwrap()
.return_value(&self.offset, Arc::new(data), self.priority);
.return_value(&self.offset, data, self.priority);
}
}

impl<I: SliceIndex<[u8]>> Index<I> for WritablePage {
type Output = I::Output;

fn index(&self, index: I) -> &Self::Output {
self.data.index(index)
self.mem().index(index)
}
}

impl<I: SliceIndex<[u8]>> IndexMut<I> for WritablePage {
fn index_mut(&mut self, index: I) -> &mut Self::Output {
self.data.index_mut(index)
self.mem_mut().index_mut(index)
}
}

#[derive(Default)]
struct PrioritizedCache {
cache: BTreeMap<u64, Arc<Vec<u8>>>,
low_pri_cache: BTreeMap<u64, Arc<Vec<u8>>>,
cache: BTreeMap<u64, Arc<[u8]>>,
low_pri_cache: BTreeMap<u64, Arc<[u8]>>,
}

impl PrioritizedCache {
Expand All @@ -83,12 +82,7 @@ impl PrioritizedCache {
}
}

fn insert(
&mut self,
key: u64,
value: Arc<Vec<u8>>,
priority: CachePriority,
) -> Option<Arc<Vec<u8>>> {
fn insert(&mut self, key: u64, value: Arc<[u8]>, priority: CachePriority) -> Option<Arc<[u8]>> {
if matches!(priority, CachePriority::Low) {
debug_assert!(!self.cache.contains_key(&key));
self.low_pri_cache.insert(key, value)
Expand All @@ -98,23 +92,23 @@ impl PrioritizedCache {
}
}

fn remove(&mut self, key: &u64) -> Option<Arc<Vec<u8>>> {
fn remove(&mut self, key: &u64) -> Option<Arc<[u8]>> {
let result = self.cache.remove(key);
if result.is_some() {
return result;
}
self.low_pri_cache.remove(key)
}

fn get(&self, key: &u64) -> Option<&Arc<Vec<u8>>> {
fn get(&self, key: &u64) -> Option<&Arc<[u8]>> {
let result = self.cache.get(key);
if result.is_some() {
return result;
}
self.low_pri_cache.get(key)
}

fn pop_lowest_priority(&mut self) -> Option<(u64, Arc<Vec<u8>>)> {
fn pop_lowest_priority(&mut self) -> Option<(u64, Arc<[u8]>)> {
let result = self.low_pri_cache.pop_first();
if result.is_some() {
return result;
Expand All @@ -125,8 +119,8 @@ impl PrioritizedCache {

#[derive(Default)]
struct PrioritizedWriteCache {
cache: BTreeMap<u64, Option<Arc<Vec<u8>>>>,
low_pri_cache: BTreeMap<u64, Option<Arc<Vec<u8>>>>,
cache: BTreeMap<u64, Option<Arc<[u8]>>>,
low_pri_cache: BTreeMap<u64, Option<Arc<[u8]>>>,
}

impl PrioritizedWriteCache {
Expand All @@ -137,7 +131,7 @@ impl PrioritizedWriteCache {
}
}

fn insert(&mut self, key: u64, value: Arc<Vec<u8>>, priority: CachePriority) {
fn insert(&mut self, key: u64, value: Arc<[u8]>, priority: CachePriority) {
if matches!(priority, CachePriority::Low) {
assert!(self.low_pri_cache.insert(key, Some(value)).is_none());
debug_assert!(!self.cache.contains_key(&key));
Expand All @@ -147,15 +141,15 @@ impl PrioritizedWriteCache {
}
}

fn get(&self, key: &u64) -> Option<&Arc<Vec<u8>>> {
fn get(&self, key: &u64) -> Option<&Arc<[u8]>> {
let result = self.cache.get(key);
if result.is_some() {
return result.map(|x| x.as_ref().unwrap());
}
self.low_pri_cache.get(key).map(|x| x.as_ref().unwrap())
}

fn remove(&mut self, key: &u64) -> Option<Arc<Vec<u8>>> {
fn remove(&mut self, key: &u64) -> Option<Arc<[u8]>> {
if let Some(value) = self.cache.remove(key) {
assert!(value.is_some());
return value;
Expand All @@ -167,7 +161,7 @@ impl PrioritizedWriteCache {
None
}

fn return_value(&mut self, key: &u64, value: Arc<Vec<u8>>, priority: CachePriority) {
fn return_value(&mut self, key: &u64, value: Arc<[u8]>, priority: CachePriority) {
if matches!(priority, CachePriority::Low) {
assert!(self
.low_pri_cache
Expand All @@ -180,7 +174,7 @@ impl PrioritizedWriteCache {
}
}

fn take_value(&mut self, key: &u64) -> Option<Arc<Vec<u8>>> {
fn take_value(&mut self, key: &u64) -> Option<Arc<[u8]>> {
if let Some(value) = self.cache.get_mut(key) {
let result = value.take().unwrap();
return Some(result);
Expand All @@ -192,7 +186,7 @@ impl PrioritizedWriteCache {
None
}

fn pop_lowest_priority(&mut self) -> Option<(u64, Arc<Vec<u8>>, CachePriority)> {
fn pop_lowest_priority(&mut self) -> Option<(u64, Arc<[u8]>, CachePriority)> {
for (k, v) in self.low_pri_cache.range(..) {
if v.is_some() {
let key = *k;
Expand Down Expand Up @@ -344,7 +338,7 @@ impl PagedCachedFile {
len: usize,
hint: PageHint,
cache_policy: impl Fn(&[u8]) -> CachePriority,
) -> Result<Arc<Vec<u8>>> {
) -> Result<Arc<[u8]>> {
self.check_fsync_failure()?;
debug_assert_eq!(0, offset % self.page_size);
#[cfg(feature = "cache_metrics")]
Expand All @@ -371,7 +365,7 @@ impl PagedCachedFile {
}
}

let buffer = Arc::new(self.read_direct(offset, len)?);
let buffer: Arc<[u8]> = self.read_direct(offset, len)?.into();
let cache_size = self.read_cache_bytes.fetch_add(len, Ordering::AcqRel);
let mut write_lock = self.read_cache[cache_slot].write().unwrap();
write_lock.insert(offset, buffer.clone(), cache_policy(&buffer));
Expand Down Expand Up @@ -450,14 +444,14 @@ impl PagedCachedFile {
);
self.read_cache_bytes
.fetch_sub(removed.len(), Ordering::AcqRel);
Some(Arc::try_unwrap(removed).unwrap())
Some(removed)
} else {
None
}
};

let data = if let Some(removed) = lock.take_value(&offset) {
Arc::try_unwrap(removed).unwrap()
removed
} else {
let previous = self.write_buffer_bytes.fetch_add(len, Ordering::AcqRel);
if previous + len > self.max_write_buffer_bytes {
Expand All @@ -481,19 +475,19 @@ impl PagedCachedFile {
let result = if let Some(data) = existing {
data
} else if overwrite {
vec![0; len]
vec![0; len].into()
} else {
self.read_direct(offset, len)?
self.read_direct(offset, len)?.into()
};
let priority = cache_policy(&result);
lock.insert(offset, Arc::new(result), priority);
Arc::try_unwrap(lock.take_value(&offset).unwrap()).unwrap()
lock.insert(offset, result, priority);
lock.take_value(&offset).unwrap()
};
let priority = cache_policy(&data);
Ok(WritablePage {
buffer: self.write_buffer.clone(),
offset,
data,
data: Some(data),
priority,
})
}
Expand Down

0 comments on commit 5a6af86

Please sign in to comment.