Skip to content

Commit

Permalink
Force alignment for all chunk buffers (#225)
Browse files Browse the repository at this point in the history
* Force alignment on all chunk buffers

* Update deps

* x64 build

* x64 build

* Explicity chunk cloning
  • Loading branch information
arkpar authored Oct 6, 2023
1 parent 4ac2aca commit ec68693
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 83 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ loom = { version = "0.5.1", optional = true }
siphasher = "0.3.10"

[dev-dependencies]
env_logger = "0.10.0"
env_logger = { version = "0.10.0", default-features = false, features = ["auto-color", "humantime"] }
fdlimit = "0.2.1"
rand = { version = "0.8.2", features = ["small_rng"] }
tempfile = "3.2"
Expand Down
2 changes: 1 addition & 1 deletion admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2018"

[dependencies]
clap = { version = "4", features = ["derive"] }
env_logger = "0.10.0"
env_logger = { version = "0.10.0", default-features = false, features = ["auto-color", "humantime"] }
fdlimit = "0.2.1"
log = "0.4.8"
parity-db = { path = ".." }
Expand Down
2 changes: 1 addition & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Borrow<[u8]> for RcValue {

impl Borrow<Vec<u8>> for RcValue {
fn borrow(&self) -> &Vec<u8> {
self.value().borrow()
self.value()
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn madvise_random(_map: &mut memmap2::MmapMut) {}
fn mmap(file: &std::fs::File, len: usize) -> Result<memmap2::MmapMut> {
#[cfg(not(test))]
const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; // 1 Gb
// Use different value for tests to work around docker limits on the test machine.
// Use a different value for tests to work around docker limits on the test machine.
#[cfg(test)]
const RESERVE_ADDRESS_SPACE: usize = 64 * 1024 * 1024; // 64 Mb

Expand Down Expand Up @@ -92,7 +92,7 @@ impl TableFile {
} else {
capacity = len / entry_size as u64;
}
let mut map = mmap(&file, len as usize)?;
let map = mmap(&file, len as usize)?;
Some((map, file))
} else {
None
Expand Down
119 changes: 45 additions & 74 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ const META_SIZE: usize = 16 * 1024; // Contains header and column stats
const ENTRY_BITS: u8 = 64;
pub const ENTRY_BYTES: usize = ENTRY_BITS as usize / 8;

const EMPTY_CHUNK: Chunk = [0u8; CHUNK_LEN];
const EMPTY_CHUNK: Chunk = Chunk([0u8; CHUNK_LEN]);
const EMPTY_ENTRIES: [Entry; CHUNK_ENTRIES] = [Entry::empty(); CHUNK_ENTRIES];

pub type Chunk = [u8; CHUNK_LEN];
#[repr(C, align(8))]
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct Chunk(pub [u8; CHUNK_LEN]);

#[allow(clippy::assertions_on_constants)]
const _: () = assert!(META_SIZE >= HEADER_SIZE + stats::TOTAL_SIZE);
Expand Down Expand Up @@ -78,7 +81,7 @@ impl Entry {
self.0
}

fn empty() -> Self {
const fn empty() -> Self {
Entry(0)
}

Expand Down Expand Up @@ -246,9 +249,9 @@ impl IndexTable {
Ok(())
}

fn chunk_at(index: u64, map: &memmap2::MmapMut) -> Result<&[u8; CHUNK_LEN]> {
fn chunk_at(index: u64, map: &memmap2::MmapMut) -> Result<&Chunk> {
let offset = META_SIZE + index as usize * CHUNK_LEN;
let ptr = unsafe { &*(map[offset..offset + CHUNK_LEN].as_ptr() as *const [u8; CHUNK_LEN]) };
let ptr = unsafe { &*(map[offset..offset + CHUNK_LEN].as_ptr() as *const Chunk) };
Ok(try_io!(Ok(ptr)))
}

Expand All @@ -260,33 +263,18 @@ impl IndexTable {
Ok(try_io!(Ok(ptr)))
}
#[cfg(target_arch = "x86_64")]
fn find_entry(
&self,
key_prefix: u64,
sub_index: usize,
chunk: &[u8; CHUNK_LEN],
) -> (Entry, usize) {
fn find_entry(&self, key_prefix: u64, sub_index: usize, chunk: &Chunk) -> (Entry, usize) {
self.find_entry_sse2(key_prefix, sub_index, chunk)
}

#[cfg(not(target_arch = "x86_64"))]
fn find_entry(
&self,
key_prefix: u64,
sub_index: usize,
chunk: &[u8; CHUNK_LEN],
) -> (Entry, usize) {
fn find_entry(&self, key_prefix: u64, sub_index: usize, chunk: &Chunk) -> (Entry, usize) {
self.find_entry_base(key_prefix, sub_index, chunk)
}

#[cfg(target_arch = "x86_64")]
fn find_entry_sse2(
&self,
key_prefix: u64,
sub_index: usize,
chunk: &[u8; CHUNK_LEN],
) -> (Entry, usize) {
assert!(chunk.len() >= CHUNK_ENTRIES * 8); // Bound checking (not done by SIMD instructions)
fn find_entry_sse2(&self, key_prefix: u64, sub_index: usize, chunk: &Chunk) -> (Entry, usize) {
assert!(chunk.0.len() >= CHUNK_ENTRIES * 8); // Bound checking (not done by SIMD instructions)
const _: () = assert!(
CHUNK_ENTRIES % 4 == 0,
"We assume here we got buffer with a number of elements that is a multiple of 4"
Expand All @@ -308,11 +296,11 @@ impl IndexTable {
// Then we remove the address by shifting such that the partial key is in the low
// part
let first_two = _mm_shuffle_epi32::<0b11011000>(_mm_srl_epi64(
_mm_loadu_si128(chunk[i * 8..].as_ptr() as *const __m128i),
_mm_loadu_si128(chunk.0[i * 8..].as_ptr() as *const __m128i),
shift_mask,
));
let last_two = _mm_shuffle_epi32::<0b11011000>(_mm_srl_epi64(
_mm_loadu_si128(chunk[(i + 2) * 8..].as_ptr() as *const __m128i),
_mm_loadu_si128(chunk.0[(i + 2) * 8..].as_ptr() as *const __m128i),
shift_mask,
));
// We set into current the input low parts
Expand All @@ -329,12 +317,7 @@ impl IndexTable {
(Entry::empty(), 0)
}

fn find_entry_base(
&self,
key_prefix: u64,
sub_index: usize,
chunk: &[u8; CHUNK_LEN],
) -> (Entry, usize) {
fn find_entry_base(&self, key_prefix: u64, sub_index: usize, chunk: &Chunk) -> (Entry, usize) {
let partial_key = Entry::extract_key(key_prefix, self.id.index_bits());
for i in sub_index..CHUNK_ENTRIES {
let entry = Self::read_entry(chunk, i);
Expand Down Expand Up @@ -378,18 +361,16 @@ impl IndexTable {
}

pub fn entries(&self, chunk_index: u64, log: &impl LogQuery) -> Result<[Entry; CHUNK_ENTRIES]> {
let mut chunk = [0; CHUNK_LEN];
if let Some(entry) =
log.with_index(self.id, chunk_index, |chunk| Self::transmute_chunk(*chunk))
log.with_index(self.id, chunk_index, |chunk| *Self::transmute_chunk(chunk))
{
return Ok(entry)
}
if let Some(map) = &*self.map.read() {
let source = Self::chunk_at(chunk_index, map)?;
chunk.copy_from_slice(source);
return Ok(Self::transmute_chunk(chunk))
let chunk = Self::chunk_at(chunk_index, map)?;
return Ok(*Self::transmute_chunk(chunk))
}
Ok(Self::transmute_chunk(EMPTY_CHUNK))
Ok(EMPTY_ENTRIES)
}

pub fn sorted_entries(&self) -> Result<Vec<Entry>> {
Expand All @@ -415,24 +396,18 @@ impl IndexTable {
}

#[inline(always)]
fn transmute_chunk(chunk: [u8; CHUNK_LEN]) -> [Entry; CHUNK_ENTRIES] {
let mut result: [Entry; CHUNK_ENTRIES] = unsafe { std::mem::transmute(chunk) };
if !cfg!(target_endian = "little") {
for item in result.iter_mut() {
*item = Entry::from_u64(u64::from_le(item.0));
}
}
result
fn transmute_chunk(chunk: &Chunk) -> &[Entry; CHUNK_ENTRIES] {
unsafe { std::mem::transmute(chunk) }
}

#[inline(always)]
fn write_entry(entry: &Entry, at: usize, chunk: &mut [u8; CHUNK_LEN]) {
chunk[at * 8..at * 8 + 8].copy_from_slice(&entry.as_u64().to_le_bytes());
fn write_entry(entry: &Entry, at: usize, chunk: &mut Chunk) {
chunk.0[at * 8..at * 8 + 8].copy_from_slice(&entry.as_u64().to_le_bytes());
}

#[inline(always)]
fn read_entry(chunk: &[u8; CHUNK_LEN], at: usize) -> Entry {
Entry::from_u64(u64::from_le_bytes(chunk[at * 8..at * 8 + 8].try_into().unwrap()))
fn read_entry(chunk: &Chunk, at: usize) -> Entry {
Entry::from_u64(u64::from_le_bytes(chunk.0[at * 8..at * 8 + 8].try_into().unwrap()))
}

#[inline(always)]
Expand All @@ -444,7 +419,7 @@ impl IndexTable {
&self,
key_prefix: u64,
address: Address,
source: &[u8],
mut chunk: Chunk,
sub_index: Option<usize>,
log: &mut LogWriter,
) -> Result<PlanOutcome> {
Expand All @@ -454,8 +429,6 @@ impl IndexTable {
log::warn!(target: "parity-db", "{}: Address space overflow at {}: {}", self.id, chunk_index, address);
return Ok(PlanOutcome::NeedReindex)
}
let mut chunk = [0; CHUNK_LEN];
chunk.copy_from_slice(source);
let partial_key = Entry::extract_key(key_prefix, self.id.index_bits());
let new_entry = Entry::new(address, partial_key, self.id.index_bits());
if let Some(i) = sub_index {
Expand All @@ -466,15 +439,15 @@ impl IndexTable {
);
Self::write_entry(&new_entry, i, &mut chunk);
log::trace!(target: "parity-db", "{}: Replaced at {}.{}: {}", self.id, chunk_index, i, new_entry.address(self.id.index_bits()));
log.insert_index(self.id, chunk_index, i as u8, &chunk);
log.insert_index(self.id, chunk_index, i as u8, chunk);
return Ok(PlanOutcome::Written)
}
for i in 0..CHUNK_ENTRIES {
let entry = Self::read_entry(&chunk, i);
if entry.is_empty() {
Self::write_entry(&new_entry, i, &mut chunk);
log::trace!(target: "parity-db", "{}: Inserted at {}.{}: {}", self.id, chunk_index, i, new_entry.address(self.id.index_bits()));
log.insert_index(self.id, chunk_index, i as u8, &chunk);
log.insert_index(self.id, chunk_index, i as u8, chunk);
return Ok(PlanOutcome::Written)
}
}
Expand All @@ -493,28 +466,26 @@ impl IndexTable {
let key_prefix = TableKey::index_from_partial(key);
let chunk_index = self.chunk_index(key_prefix);

if let Some(chunk) = log.with_index(self.id, chunk_index, |chunk| *chunk) {
return self.plan_insert_chunk(key_prefix, address, &chunk, sub_index, log)
if let Some(chunk) = log.with_index(self.id, chunk_index, |chunk| chunk.clone()) {
return self.plan_insert_chunk(key_prefix, address, chunk, sub_index, log)
}

if let Some(map) = &*self.map.read() {
let chunk = Self::chunk_at(chunk_index, map)?;
let chunk = Self::chunk_at(chunk_index, map)?.clone();
return self.plan_insert_chunk(key_prefix, address, chunk, sub_index, log)
}

let chunk = &EMPTY_CHUNK;
let chunk = EMPTY_CHUNK.clone();
self.plan_insert_chunk(key_prefix, address, chunk, sub_index, log)
}

fn plan_remove_chunk(
&self,
key_prefix: u64,
source: &[u8],
mut chunk: Chunk,
sub_index: usize,
log: &mut LogWriter,
) -> Result<PlanOutcome> {
let mut chunk = [0; CHUNK_LEN];
chunk.copy_from_slice(source);
let chunk_index = self.chunk_index(key_prefix);
let partial_key = Entry::extract_key(key_prefix, self.id.index_bits());

Expand All @@ -523,7 +494,7 @@ impl IndexTable {
if !entry.is_empty() && entry.partial_key(self.id.index_bits()) == partial_key {
let new_entry = Entry::empty();
Self::write_entry(&new_entry, i, &mut chunk);
log.insert_index(self.id, chunk_index, i as u8, &chunk);
log.insert_index(self.id, chunk_index, i as u8, chunk);
log::trace!(target: "parity-db", "{}: Removed at {}.{}", self.id, chunk_index, i);
return Ok(PlanOutcome::Written)
}
Expand All @@ -541,12 +512,12 @@ impl IndexTable {

let chunk_index = self.chunk_index(key_prefix);

if let Some(chunk) = log.with_index(self.id, chunk_index, |chunk| *chunk) {
return self.plan_remove_chunk(key_prefix, &chunk, sub_index, log)
if let Some(chunk) = log.with_index(self.id, chunk_index, |chunk| chunk.clone()) {
return self.plan_remove_chunk(key_prefix, chunk, sub_index, log)
}

if let Some(map) = &*self.map.read() {
let chunk = Self::chunk_at(chunk_index, map)?;
let chunk = Self::chunk_at(chunk_index, map)?.clone();
return self.plan_remove_chunk(key_prefix, chunk, sub_index, log)
}

Expand Down Expand Up @@ -649,7 +620,7 @@ mod test {

#[test]
fn test_entries() {
let mut chunk = IndexTable::transmute_chunk(EMPTY_CHUNK);
let mut chunk = IndexTable::transmute_chunk(&EMPTY_CHUNK).clone();
let mut chunk2 = EMPTY_CHUNK;
for (i, chunk) in chunk.iter_mut().enumerate().take(CHUNK_ENTRIES) {
use std::{
Expand All @@ -664,7 +635,7 @@ mod test {
*chunk = entry;
}

assert!(IndexTable::transmute_chunk(chunk2) == chunk);
assert!(IndexTable::transmute_chunk(&chunk2) == &chunk);
}

#[test]
Expand All @@ -679,9 +650,9 @@ mod test {

let data_address = Address::from_u64((1 << index_bits) - 1);

let mut chunk = [0; CHUNK_ENTRIES * 8];
let mut chunk = Chunk([0; CHUNK_ENTRIES * 8]);
for (i, partial_key) in partial_keys.iter().enumerate() {
chunk[i * 8..(i + 1) * 8].copy_from_slice(
chunk.0[i * 8..(i + 1) * 8].copy_from_slice(
&Entry::new(data_address, *partial_key, index_bits).as_u64().to_le_bytes(),
);
}
Expand All @@ -705,7 +676,7 @@ mod test {
fn test_find_any_entry() {
let table =
IndexTable { id: TableId(18), map: RwLock::new(None), path: Default::default() };
let mut chunk = [0u8; CHUNK_LEN];
let mut chunk = Chunk([0u8; CHUNK_LEN]);
let mut entries = [Entry::empty(); CHUNK_ENTRIES];
let mut keys = [0u64; CHUNK_ENTRIES];
let mut rng = rand::prelude::SmallRng::from_seed(Default::default());
Expand Down Expand Up @@ -742,7 +713,7 @@ mod test {
fn test_find_entry_same_value() {
let table =
IndexTable { id: TableId(18), map: RwLock::new(None), path: Default::default() };
let mut chunk = [0u8; CHUNK_LEN];
let mut chunk = Chunk([0u8; CHUNK_LEN]);
let key = 0x4242424242424242;
let partial_key = Entry::extract_key(key, 18);
let entry = Entry::new(Address::new(0, 0), partial_key, 18);
Expand All @@ -765,7 +736,7 @@ mod test {
fn test_find_entry_zero_pk() {
let table =
IndexTable { id: TableId(16), map: RwLock::new(None), path: Default::default() };
let mut chunk = [0u8; CHUNK_LEN];
let mut chunk = Chunk([0u8; CHUNK_LEN]);
let zero_key = 0x0000000000000000;
let entry = Entry::new(Address::new(1, 1), zero_key, 16);

Expand All @@ -790,7 +761,7 @@ mod test {
) {
let table =
IndexTable { id: TableId(18), map: RwLock::new(None), path: Default::default() };
let mut chunk = [0u8; CHUNK_LEN];
let mut chunk = Chunk([0u8; CHUNK_LEN]);
let mut keys = [0u64; CHUNK_ENTRIES];
let mut rng = rand::prelude::SmallRng::from_seed(Default::default());
for i in 0..CHUNK_ENTRIES {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ pub type Key = [u8; KEY_SIZE];

#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
compile_error!("parity-db only supports x86_64 and aarch64");

#[cfg(not(target_endian = "little"))]
compile_error!("parity-db only supports little-endian platforms");
8 changes: 4 additions & 4 deletions src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl LogChange {
while mask != 0 {
let i = mask.trailing_zeros();
mask &= !(1 << i);
write(&chunk[i as usize * ENTRY_BYTES..(i as usize + 1) * ENTRY_BYTES])?;
write(&chunk.0[i as usize * ENTRY_BYTES..(i as usize + 1) * ENTRY_BYTES])?;
}
}
}
Expand Down Expand Up @@ -405,13 +405,13 @@ impl<'a> LogWriter<'a> {
self.log.record_id
}

pub fn insert_index(&mut self, table: IndexTableId, index: u64, sub: u8, data: &IndexChunk) {
pub fn insert_index(&mut self, table: IndexTableId, index: u64, sub: u8, data: IndexChunk) {
match self.log.local_index.entry(table).or_default().map.entry(index) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
*entry.get_mut() = (self.log.record_id, entry.get().1 | (1 << sub), *data);
*entry.get_mut() = (self.log.record_id, entry.get().1 | (1 << sub), data);
},
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert((self.log.record_id, 1 << sub, *data));
entry.insert((self.log.record_id, 1 << sub, data));
},
}
}
Expand Down

0 comments on commit ec68693

Please sign in to comment.