Skip to content

Commit

Permalink
Introduce VectorClockSnapshot type (#41)
Browse files Browse the repository at this point in the history
* Add Vector clock snapshot type that holds the state of a vector clock frozen in time. Things that hold vectorclocks they are working with should continue to use VectorClock, things that were storing a vector clock to record the state of a clock for future comaprison should use the snapshot

* accidentally forgot to add the new files 🤦

* address lint

* Add reminder todo comment
  • Loading branch information
jscatena88 authored Jun 27, 2024
1 parent 829aa1f commit 3ca27f1
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 32 deletions.
6 changes: 3 additions & 3 deletions src/codec/meta/actor_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::codec::crypto::{
AccessKey, AsymLockedAccessKey, AsymLockedAccessKeyError, SigningKey, VerifyingKey,
};
use crate::codec::header::AccessMask;
use crate::codec::meta::{UserAgent, VectorClock};
use crate::codec::meta::{UserAgent, VectorClock, VectorClockSnapshot};
use crate::codec::{ParserResult, Stream};

const KEY_PRESENT_BIT: u8 = 0b0000_0001;
Expand Down Expand Up @@ -234,8 +234,8 @@ impl ActorSettings {
self.user_agent.clone()
}

pub fn vector_clock(&self) -> VectorClock {
self.vector_clock.clone()
pub fn vector_clock(&self) -> VectorClockSnapshot {
VectorClockSnapshot::from(&self.vector_clock)
}

pub fn verifying_key(&self) -> VerifyingKey {
Expand Down
20 changes: 8 additions & 12 deletions src/codec/meta/journal_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

use futures::io::AsyncWrite;

use crate::codec::meta::{Cid, VectorClock};
use crate::codec::meta::{Cid, VectorClockSnapshot};
use crate::codec::{ParserResult, Stream};

#[derive(Clone, Debug, PartialEq)]
pub struct JournalCheckpoint {
merkle_root_cid: Cid,
vector: VectorClock,
vector: VectorClockSnapshot,
}

impl JournalCheckpoint {
Expand All @@ -25,16 +25,9 @@ impl JournalCheckpoint {
Ok(written_bytes)
}

pub(crate) fn initialize() -> Self {
JournalCheckpoint {
merkle_root_cid: Cid::IDENTITY,
vector: VectorClock::initialize(),
}
}

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, merkle_root_cid) = Cid::parse(input)?;
let (input, vector) = VectorClock::parse(input)?;
let (input, vector) = VectorClockSnapshot::parse(input)?;

let journal_checkpoint = JournalCheckpoint {
merkle_root_cid,
Expand All @@ -45,7 +38,7 @@ impl JournalCheckpoint {
}

pub const fn size() -> usize {
Cid::size() + VectorClock::size()
Cid::size() + VectorClockSnapshot::size()
}
}

Expand All @@ -61,7 +54,10 @@ mod tests {
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test(async))]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn test_user_agent_roundtrip() {
let checkpoint = JournalCheckpoint::initialize();
let checkpoint = JournalCheckpoint {
merkle_root_cid: Cid::from([0; 32]),
vector: VectorClockSnapshot::from(0),
};

let mut buffer = Vec::with_capacity(JournalCheckpoint::size());
checkpoint
Expand Down
2 changes: 1 addition & 1 deletion src/codec/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ pub use journal_checkpoint::JournalCheckpoint;
pub use meta_key::MetaKey;
pub use permanent_id::PermanentId;
pub use user_agent::UserAgent;
pub use vector_clock::VectorClock;
pub use vector_clock::{VectorClock, VectorClockSnapshot};
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use std::cmp::PartialEq;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use futures::{AsyncWrite, AsyncWriteExt};
use winnow::{binary::le_u64, Parser};
use futures::AsyncWrite;

use crate::codec::{ParserResult, Stream};

mod snapshot;
pub use snapshot::Snapshot as VectorClockSnapshot;

/// VectorClocks are used as monotonic clocks for a particular actor or resource within the
/// filesystem and is used for providing strict ordering of events. The internal value is
/// initialized to a random value when a new one is initialized.
Expand All @@ -22,20 +24,16 @@ impl VectorClock {
&self,
writer: &mut W,
) -> std::io::Result<usize> {
let current = self.0.load(Ordering::Relaxed);
let clock_bytes = current.to_le_bytes();

writer.write_all(&clock_bytes).await?;

Ok(clock_bytes.len())
VectorClockSnapshot::from(self).encode(writer).await
}

pub fn initialize() -> Self {
Self::from(0)
// TODO: make this actually initialize to a random value as the docs above indicate
Self::from(VectorClockSnapshot::from(0))
}

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, value) = le_u64.parse_peek(input)?;
let (input, value) = VectorClockSnapshot::parse(input)?;
Ok((input, Self::from(value)))
}

Expand All @@ -44,9 +42,9 @@ impl VectorClock {
}
}

impl From<u64> for VectorClock {
fn from(val: u64) -> Self {
Self(Arc::new(AtomicU64::new(val)))
impl From<VectorClockSnapshot> for VectorClock {
fn from(val: VectorClockSnapshot) -> Self {
Self(Arc::new(AtomicU64::new(val.0)))
}
}

Expand Down
126 changes: 126 additions & 0 deletions src/codec/meta/vector_clock/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
//! A snapshot of a vector clock at specific value
//! These are what get stored to record the state of a vector
//! during specific operations
use super::VectorClock;
use crate::codec::{ParserResult, Stream};

use futures::{AsyncWrite, AsyncWriteExt};
use std::sync::atomic::Ordering;
use winnow::{binary::le_u64, Parser};

const WRAP_THRESHOLD: u64 = 2 ^ 18;

/// A snapshot of a [`VectorClock`] at a specific value
///
/// These are what get stored to record the state of a vector
/// during specific operations
///
/// # Wrapping Behavior
/// These must functionally monotonically increase, but if we overflow
/// the underlying value we will wrap around. This is handled by
/// comparing the values with a threshold to determine if the
/// wrapped value is greater than the non-wrapped value.
/// The threshold is 2^18, or 262,144.
/// [`PartialOrd`] and [`Ord`] are implemented to handle this.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct Snapshot(pub(super) u64);

impl Snapshot {
pub async fn encode<W: AsyncWrite + Unpin + Send>(
&self,
writer: &mut W,
) -> std::io::Result<usize> {
let clock_bytes = self.0.to_le_bytes();

writer.write_all(&clock_bytes).await?;

Ok(clock_bytes.len())
}

pub fn parse(input: Stream) -> ParserResult<Self> {
let (input, value) = le_u64.parse_peek(input)?;
Ok((input, Self(value)))
}

pub const fn size() -> usize {
8
}
}

impl From<&VectorClock> for Snapshot {
fn from(value: &VectorClock) -> Self {
Self(value.0.load(Ordering::Relaxed))
}
}

impl From<u64> for Snapshot {
fn from(value: u64) -> Self {
Self(value)
}
}

impl PartialOrd for Snapshot {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for Snapshot {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.0 < WRAP_THRESHOLD || other.0 < WRAP_THRESHOLD {
self.0
.wrapping_add(WRAP_THRESHOLD)
.cmp(&other.0.wrapping_add(WRAP_THRESHOLD))
} else {
self.0.cmp(&other.0)
}
}
}

#[cfg(test)]
mod test {
use super::*;

/// Test PartialOrd implementation
#[test]
fn test_partial_ord() {
// Basic ordering
assert!(Snapshot(1) < Snapshot(2));
assert!(Snapshot(2) > Snapshot(1));
assert!(Snapshot(1) == Snapshot(1));

// Wrapping
assert!(Snapshot(u64::MAX) < Snapshot(0));
assert!(Snapshot(0) > Snapshot(u64::MAX));
assert!(Snapshot(u64::MAX) < Snapshot(WRAP_THRESHOLD - 1));
assert!(Snapshot(u64::MAX) > Snapshot(WRAP_THRESHOLD));
}

/// Test Ord implementation
#[test]
fn test_ord() {
// Basic ordering
assert_eq!(Snapshot(1).cmp(&Snapshot(2)), std::cmp::Ordering::Less);
assert_eq!(Snapshot(2).cmp(&Snapshot(1)), std::cmp::Ordering::Greater);
assert_eq!(Snapshot(1).cmp(&Snapshot(1)), std::cmp::Ordering::Equal);

// Wrapping
assert_eq!(
Snapshot(u64::MAX).cmp(&Snapshot(0)),
std::cmp::Ordering::Less
);
assert_eq!(
Snapshot(0).cmp(&Snapshot(u64::MAX)),
std::cmp::Ordering::Greater
);
assert_eq!(
Snapshot(u64::MAX).cmp(&Snapshot(WRAP_THRESHOLD - 1)),
std::cmp::Ordering::Less
);
assert_eq!(
Snapshot(u64::MAX).cmp(&Snapshot(WRAP_THRESHOLD)),
std::cmp::Ordering::Greater
);
}
}
6 changes: 3 additions & 3 deletions src/filesystem/drive/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ impl InnerDrive {
self.root_pid
}

pub(crate) fn vector_clock(&self) -> VectorClock {
self.vector_clock.clone()
pub fn vector_clock(&self) -> VectorClockSnapshot {
VectorClockSnapshot::from(&self.vector_clock)
}
}

Expand Down Expand Up @@ -563,7 +563,7 @@ pub(crate) mod test {
let (remaining, parsed) = InnerDrive::parse(
Partial::new(encoded.as_slice()),
access.to_owned(),
vector_clock,
vector_clock.into(),
)
.unwrap();
assert!(remaining.is_empty());
Expand Down

0 comments on commit 3ca27f1

Please sign in to comment.