Skip to content

Commit

Permalink
reddit split version / data tests
Browse files Browse the repository at this point in the history
  • Loading branch information
louisponet committed Jul 16, 2024
1 parent 355c5b9 commit 82b7cae
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 71 deletions.
54 changes: 32 additions & 22 deletions content/posts/icc_1_seqlock/code/benches/seqlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,28 @@ use core_affinity::CoreId;
use criterion::{criterion_group, criterion_main, Bencher, BenchmarkId, Criterion, SamplingMode};
use rand::Rng;

#[derive(Debug, Clone, Copy)]
struct Message<const N: usize> {
data: [u8; N]
}

impl<const N: usize> Default for Message<N> {
fn default() -> Self {
Self { data: [0; N]}
}
}

fn write_bench<const N_BYTES: usize>(b: &mut Bencher, n_contenders: usize) {
std::thread::scope(|s| {
let lock = Arc::new(code::SeqLock::default());
let lock = Arc::new(code::Seqlock::default());
for i in 0..n_contenders {
let lock2 = lock.clone();
s.spawn(move || {
core_affinity::set_for_current(CoreId { id: 2 * i + 3 });
let mut m = code::Message::<N_BYTES>::default();
let mut m =Message::<N_BYTES>::default();
loop {
lock2.read(&mut m);
if m[0] == 1 && m[1] == 2 {
if m.data[0] == 1 && m.data[1] == 2 {
break;
}
}
Expand All @@ -27,17 +38,17 @@ fn write_bench<const N_BYTES: usize>(b: &mut Bencher, n_contenders: usize) {
std::thread::sleep(Duration::from_millis(1));
s.spawn(move || {
core_affinity::set_for_current(CoreId { id: 1 });
let mut m = code::Message::<N_BYTES>::default();
let mut c = 0usize;
let mut m = Message::<N_BYTES>::default();
let mut c = 0u8;
b.iter(|| {
c = c.wrapping_add(1);
m[0] = c;
m[1] = c;
m.data[0] = c;
m.data[1] = c;
lock.write(&m)
});
m[0] = 1;
m.data[0] = 1;
for i in 1..N_BYTES {
m[i] = 2;
m.data[i] = 2;
}
lock.write(&m);
});
Expand Down Expand Up @@ -95,7 +106,7 @@ fn read_bench<const N_BYTES: usize>(b: &mut Bencher, n_contenders: usize) {
std::thread::scope(|s| {
let clock = quanta::Clock::new();
clock.now();
let lock = Arc::new(code::SeqLock::default());
let lock = Arc::new(code::Seqlock::default());
let done = Arc::new(sync::atomic::AtomicBool::new(false));
let done1 = done.clone();
let lock1 = lock.clone();
Expand All @@ -104,10 +115,10 @@ fn read_bench<const N_BYTES: usize>(b: &mut Bencher, n_contenders: usize) {
let lock2 = lock.clone();
s.spawn(move || {
core_affinity::set_for_current(CoreId { id: 2 * i + 3 });
let mut m = code::Message::<N_BYTES>::default();
let mut m = Message::<N_BYTES>::default();
loop {
lock2.read(&mut m);
if m[0] == 1 && m[1] == 2 {
if m.data[0] == 1 && m.data[1] == 2 {
break;
}
}
Expand All @@ -116,15 +127,15 @@ fn read_bench<const N_BYTES: usize>(b: &mut Bencher, n_contenders: usize) {
let out = s.spawn(move || {
core_affinity::set_for_current(CoreId { id: 3 });
let lck = lock2.as_ref();
let mut m = code::Message::<N_BYTES>::default();
let mut m = Message::<N_BYTES>::default();
let mut avg_lat = 0;
let mut last = 0;
for i in 0..iters {
for _ in 0..iters {
loop {
let curt = rdtscp();
let c = lck.read(&mut m);
lck.read(&mut m);
let delta = rdtscp() - curt;
if m.data[0] != last && c == 2{
if m.data[0] != last {
last = m.data[0];
avg_lat += delta;
break;
Expand All @@ -136,9 +147,8 @@ fn read_bench<const N_BYTES: usize>(b: &mut Bencher, n_contenders: usize) {
});
s.spawn(move || {
core_affinity::set_for_current(CoreId { id: 1 });
let mut m = code::Message::<N_BYTES>::default();
let mut c = 0usize;
let mut rng = rand::thread_rng();
let mut m = Message::<N_BYTES>::default();
let mut c = 0u8;
let lck = lock1.as_ref();
loop {
lck.write(&m);
Expand All @@ -150,8 +160,8 @@ fn read_bench<const N_BYTES: usize>(b: &mut Bencher, n_contenders: usize) {
}
while rdtscp() - last_write < 330 {}
}
m[0] = 1;
m[1] = 2;
m.data[0] = 1;
m.data[1] = 2;
lck.write(&m);
});
out.join().unwrap()
Expand Down Expand Up @@ -228,7 +238,7 @@ fn latency_bench<const N_BYTES: usize>(b: &mut Bencher, n_contenders: usize) {
let clock = quanta::Clock::new();
clock.now();

let lock = Arc::new(code::SeqLock::default());
let lock = Arc::new(code::Seqlock::default());
let done = Arc::new(sync::atomic::AtomicBool::new(false));
let done1 = done.clone();
let lock1 = lock.clone();
Expand Down
121 changes: 79 additions & 42 deletions content/posts/icc_1_seqlock/code/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,48 @@
use std::arch::x86_64::_mm_pause;
use std::slice::SliceIndex;
use std::cell::UnsafeCell;
use std::sync::atomic::{compiler_fence, fence, AtomicUsize, Ordering};
use std::{
arch::x86_64::_mm_pause,
cell::UnsafeCell,
slice::SliceIndex,
sync::atomic::{compiler_fence, fence, AtomicUsize, Ordering},
};
#[inline]
#[cold]
fn cold() {}

#[inline]
fn likely(b: bool) -> bool {
if !b { cold() }
if !b {
cold()
}
b
}

#[inline]
fn unlikely(b: bool) -> bool {
if b { cold() }
if b {
cold()
}
b
}

#[derive(Default)]
#[repr(align(64))]
pub struct SeqLock<T> {
pub struct Seqlock<T> {
version: AtomicUsize,
data: UnsafeCell<T>,
_pad: [u8; 56],
data: UnsafeCell<T>,
}
impl<T: Default> Default for Seqlock<T> {
fn default() -> Self {
Self { version: Default::default(), _pad: [0; 56], data: Default::default() }
}
}
unsafe impl<T: Send> Send for SeqLock<T> {}
unsafe impl<T: Sync> Sync for SeqLock<T> {}
unsafe impl<T: Send> Send for Seqlock<T> {}
unsafe impl<T: Sync> Sync for Seqlock<T> {}

impl<T: Copy> SeqLock<T> {
impl<T: Copy> Seqlock<T> {
pub fn new(data: T) -> Self {
Self {version: AtomicUsize::new(0), data: UnsafeCell::new(data)}
Self { version: Default::default(), _pad: [0; 56], data: UnsafeCell::new(data) }
}

#[inline(never)]
pub fn read(&self, result: &mut T) {
loop {
Expand All @@ -45,6 +57,32 @@ impl<T: Copy> SeqLock<T> {
}
}

#[inline(never)]
pub fn pessimistic_read(&self, result: &mut T) {
loop {
let v1 = self.version.load(Ordering::Acquire);
if v1 & 1 == 1 {
continue;
}

compiler_fence(Ordering::AcqRel);
*result = unsafe { *self.data.get() };
compiler_fence(Ordering::AcqRel);
let v2 = self.version.load(Ordering::Acquire);
if v1 == v2 {
return;
}
}
}

#[inline(never)]
pub fn write_old(&self, val: &T) {
let v = self.version.load(Ordering::Relaxed).wrapping_add(1);
self.version.store(v, Ordering::Relaxed);
unsafe { *self.data.get() = *val };
self.version.store(v.wrapping_add(1), Ordering::Relaxed);
}

#[inline(never)]
pub fn write(&self, val: &T) {
let v = self.version.fetch_add(1, Ordering::Release);
Expand All @@ -57,37 +95,39 @@ impl<T: Copy> SeqLock<T> {

#[cfg(test)]
mod tests {
use std::{
sync::atomic::AtomicBool,
time::{Duration, Instant},
};

use super::*;
use std::{sync::atomic::AtomicBool, time::{Duration, Instant}};

fn read_test<const N: usize>()
{
let lock = SeqLock::new([0usize; N]);
fn read_test<const N: usize>() {
let lock = Seqlock::new([0usize; N]);
let done = AtomicBool::new(false);
std::thread::scope(|s| {
s.spawn(|| {
let mut msg = [0usize; N];
while !done.load(Ordering::Relaxed) {
lock.read(&mut msg);
let first = msg[0];
for i in msg {
assert_eq!(first, i); // data consistency is verified here
}
}
assert_ne!(msg[0], 0)
});
let mut msg = [0usize; N];
while !done.load(Ordering::Relaxed) {
lock.read(&mut msg);
let first = msg[0];
for i in msg {
assert_eq!(first, i); // data consistency is verified here
}
}
assert_ne!(msg[0], 0)
});
s.spawn(|| {
let curt = Instant::now();
let mut count = 0;
let mut msg = [0usize; N];
while curt.elapsed() < Duration::from_secs(1) {
msg.fill(count);
lock.write(&msg);
count = count.wrapping_add(1);
}
done.store(true, Ordering::Relaxed);
});

let curt = Instant::now();
let mut count = 0;
let mut msg = [0usize; N];
while curt.elapsed() < Duration::from_secs(1) {
msg.fill(count);
lock.write(&msg);
count = count.wrapping_add(1);
}
done.store(true, Ordering::Relaxed);
});
});
}

Expand All @@ -109,9 +149,6 @@ mod tests {
}
#[test]
fn read_large() {
read_test::<{2usize.pow(16)}>()
read_test::<{ 2usize.pow(16) }>()
}
}



Loading

0 comments on commit 82b7cae

Please sign in to comment.