diff --git a/lib/benches/bench_lib.rs b/lib/benches/bench_lib.rs index 56a5225f9..1f0fc1271 100644 --- a/lib/benches/bench_lib.rs +++ b/lib/benches/bench_lib.rs @@ -8,11 +8,9 @@ use tempfile::TempDir; use tokio::runtime::Runtime; use utils::Actor; -criterion_group!(default, write_file, read_file, remove_file, sync); +criterion_group!(default, write_file, read_file, sync); criterion_main!(default); -const FILE_SIZES_M: &[u64] = &[1, 8]; - fn write_file(c: &mut Criterion) { let runtime = Runtime::new().unwrap(); @@ -21,7 +19,7 @@ fn write_file(c: &mut Criterion) { let buffer_size = 4096; - for &m in FILE_SIZES_M { + for m in [1, 8, 16] { let file_size = m * 1024 * 1024; group.throughput(Throughput::Bytes(file_size)); @@ -63,7 +61,7 @@ fn read_file(c: &mut Criterion) { let buffer_size = 4096; - for &m in FILE_SIZES_M { + for m in [1, 8, 16] { let file_size = m * 1024 * 1024; group.throughput(Throughput::Bytes(file_size)); @@ -108,63 +106,13 @@ fn read_file(c: &mut Criterion) { group.finish(); } -fn remove_file(c: &mut Criterion) { - let runtime = Runtime::new().unwrap(); - - let mut group = c.benchmark_group("lib/remove_file"); - group.sample_size(10); - - for &m in FILE_SIZES_M { - let file_size = m * 1024 * 1024; - - group.throughput(Throughput::Bytes(file_size)); - group.bench_function(BenchmarkId::from_parameter(format!("{m} MiB")), |b| { - let file_name = Utf8Path::new("file.dat"); - - b.iter_batched_ref( - || { - let mut rng = StdRng::from_entropy(); - let base_dir = TempDir::new_in(env!("CARGO_TARGET_TMPDIR")).unwrap(); - - let repo = runtime.block_on(async { - let repo = utils::create_repo( - &mut rng, - &base_dir.path().join("repo.db"), - 0, - StateMonitor::make_root(), - ) - .await; - utils::write_file( - &mut rng, - &repo, - file_name, - file_size as usize, - 4096, - false, - ) - .await; - repo - }); - - (base_dir, repo) - }, - |(_base_dir, repo)| { - runtime.block_on(async { repo.remove_entry(file_name).await.unwrap() }); - }, - BatchSize::LargeInput, - ); - }); - } - group.finish(); -} - fn sync(c: &mut Criterion) { let runtime = Runtime::new().unwrap(); let mut group = c.benchmark_group("lib/sync"); group.sample_size(10); - for &m in FILE_SIZES_M { + for m in [1, 8] { let file_size = m * 1024 * 1024; group.throughput(Throughput::Bytes(file_size)); diff --git a/lib/src/blob/block_ids.rs b/lib/src/blob/block_ids.rs index 64407f3c0..af4e3562e 100644 --- a/lib/src/blob/block_ids.rs +++ b/lib/src/blob/block_ids.rs @@ -3,8 +3,7 @@ use crate::{ blob::BlobId, branch::Branch, error::{Error, Result}, - locator::Locator, - protocol::{BlockId, RootNode}, + protocol::{BlockId, Locator, RootNode}, store, }; diff --git a/lib/src/blob/buffer.rs b/lib/src/blob/buffer.rs deleted file mode 100644 index 27d350609..000000000 --- a/lib/src/blob/buffer.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::protocol::BLOCK_SIZE; -use std::{ - convert::TryInto, - ops::{Deref, DerefMut}, -}; -use zeroize::Zeroize; - -// Buffer for keeping loaded block content and also for in-place encryption and decryption. -#[derive(Clone)] -pub(super) struct Buffer(Box<[u8]>); - -impl Buffer { - pub fn new() -> Self { - Self::default() - } - - // Read data from `offset` of the buffer into a fixed-length array. - // - // # Panics - // - // Panics if the remaining length after `offset` is less than `N`. - pub fn read_array(&self, offset: usize) -> [u8; N] { - self[offset..offset + N].try_into().unwrap() - } - - // Read data from `offset` of the buffer into a `u64`. - // - // # Panics - // - // Panics if the remaining length is less than `size_of::()` - pub fn read_u64(&self, offset: usize) -> u64 { - u64::from_le_bytes(self.read_array(offset)) - } - - // Read data from offset into `dst`. - pub fn read(&self, offset: usize, dst: &mut [u8]) { - dst.copy_from_slice(&self.0[offset..offset + dst.len()]); - } - - // Write a `u64` at `offset` into the buffer. - pub fn write_u64(&mut self, offset: usize, value: u64) { - let bytes = value.to_le_bytes(); - self.write(offset, &bytes[..]); - } - - // Writes data from `dst` into the buffer. - pub fn write(&mut self, offset: usize, src: &[u8]) { - self.0[offset..offset + src.len()].copy_from_slice(src); - } -} - -impl Default for Buffer { - fn default() -> Self { - Self(vec![0; BLOCK_SIZE].into_boxed_slice()) - } -} - -// Scramble the buffer on drop to prevent leaving decrypted data in memory past the buffer -// lifetime. -impl Drop for Buffer { - fn drop(&mut self) { - self.0.zeroize() - } -} - -impl Deref for Buffer { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for Buffer { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} diff --git a/lib/src/blob/mod.rs b/lib/src/blob/mod.rs index 8af8c873f..4916744b5 100644 --- a/lib/src/blob/mod.rs +++ b/lib/src/blob/mod.rs @@ -1,7 +1,6 @@ pub(crate) mod lock; mod block_ids; -mod buffer; mod id; mod position; @@ -14,19 +13,16 @@ use self::position::Position; use crate::{ branch::Branch, collections::{hash_map::Entry, HashMap}, - crypto::{ - cipher::{self, Nonce, SecretKey}, - sign::{self, PublicKey}, - }, + crypto::cipher::{self, Nonce, SecretKey}, error::{Error, Result}, - locator::Locator, - protocol::{BlockId, BlockNonce, RootNode, SingleBlockPresence, BLOCK_SIZE}, - store::{self, ReadTransaction, WriteTransaction}, + protocol::{ + Block, BlockContent, BlockId, BlockNonce, Locator, RootNode, SingleBlockPresence, + BLOCK_SIZE, + }, + store::{self, Changeset, ReadTransaction}, }; -use buffer::Buffer; use std::{io::SeekFrom, iter, mem}; use thiserror::Error; -use tracing::{field, instrument, Instrument, Span}; /// Size of the blob header in bytes. // Using u64 instead of usize because HEADER_SIZE must be the same irrespective of whether we're on @@ -35,8 +31,7 @@ pub const HEADER_SIZE: usize = mem::size_of::(); // Max number of blocks in the cache. Increasing this number decreases the number of flushes needed // during writes but increases the coplexity of the individual flushes. -// TODO: Find optimal value for this. -const CACHE_CAPACITY: usize = 64; +const CACHE_CAPACITY: usize = 2048; // 64 MiB #[derive(Debug, Error)] pub(crate) enum ReadWriteError { @@ -296,7 +291,12 @@ impl Blob { Ok(write_len) } - pub async fn write_all(&mut self, tx: &mut WriteTransaction, buffer: &[u8]) -> Result<()> { + pub async fn write_all( + &mut self, + tx: &mut ReadTransaction, + changeset: &mut Changeset, + buffer: &[u8], + ) -> Result<()> { let mut offset = 0; loop { @@ -309,7 +309,7 @@ impl Blob { self.warmup(tx).await?; } Err(ReadWriteError::CacheFull) => { - self.flush(tx).await?; + self.flush(tx, changeset).await?; } } } @@ -366,9 +366,13 @@ impl Blob { /// Flushes this blob, ensuring that all intermediately buffered contents gets written to the /// store. - pub(crate) async fn flush(&mut self, tx: &mut WriteTransaction) -> Result<()> { - self.write_len(tx).await?; - self.write_blocks(tx).await?; + pub(crate) async fn flush( + &mut self, + tx: &mut ReadTransaction, + changeset: &mut Changeset, + ) -> Result<()> { + self.write_len(tx, changeset).await?; + self.write_blocks(changeset); Ok(()) } @@ -393,7 +397,11 @@ impl Blob { } // Write length, if changed - async fn write_len(&mut self, tx: &mut WriteTransaction) -> Result<()> { + async fn write_len( + &mut self, + tx: &mut ReadTransaction, + changeset: &mut Changeset, + ) -> Result<()> { if self.len_modified == self.len_original { return Ok(()); } @@ -407,15 +415,7 @@ impl Blob { let (_, mut content) = read_block(tx, &root_node, &locator, self.branch.keys().read()).await?; content.write_u64(0, self.len_modified); - write_block( - tx, - self.branch.id(), - &locator, - content, - self.branch.keys().read(), - self.branch.keys().write().ok_or(Error::PermissionDenied)?, - ) - .await?; + write_block(changeset, &locator, content, self.branch.keys().read()); } self.len_original = self.len_modified; @@ -423,7 +423,7 @@ impl Blob { Ok(()) } - async fn write_blocks(&mut self, tx: &mut WriteTransaction) -> Result<()> { + fn write_blocks(&mut self, changeset: &mut Changeset) { // Poor man's `drain_filter`. let cache = mem::take(&mut self.cache); let (dirty, clean): (HashMap<_, _>, _) = @@ -433,17 +433,12 @@ impl Blob { for (number, block) in dirty { let locator = Locator::head(self.id).nth(number); write_block( - tx, - self.branch.id(), + changeset, &locator, block.content, self.branch.keys().read(), - self.branch.keys().write().ok_or(Error::PermissionDenied)?, - ) - .await?; + ); } - - Ok(()) } } @@ -463,7 +458,7 @@ impl Clone for Blob { #[derive(Default)] struct CachedBlock { - content: Buffer, + content: BlockContent, dirty: bool, } @@ -477,8 +472,8 @@ impl CachedBlock { } } -impl From for CachedBlock { - fn from(content: Buffer) -> Self { +impl From for CachedBlock { + fn from(content: BlockContent) -> Self { Self { content, dirty: false, @@ -516,6 +511,7 @@ pub(crate) async fn fork(blob_id: BlobId, src_branch: &Branch, dst_branch: &Bran let locators = Locator::head(blob_id).sequence().take(end as usize); for locator in locators { let mut tx = src_branch.store().begin_write().await?; + let mut changeset = Changeset::new(); let encoded_locator = locator.encode(read_key); @@ -534,25 +530,18 @@ pub(crate) async fn fork(blob_id: BlobId, src_branch: &Branch, dst_branch: &Bran SingleBlockPresence::Missing }; - // It can happen that the current and dst branches are different, but the blob has - // already been forked by some other task in the meantime. In that case this - // `insert` is a no-op. We still proceed normally to maintain idempotency. - tx.link_block( - dst_branch.id(), - &encoded_locator, - &block_id, - block_presence, - write_keys, - ) - .instrument(tracing::info_span!( - "fork_block", + changeset.link_block(encoded_locator, block_id, block_presence); + changeset + .apply(&mut tx, dst_branch.id(), write_keys) + .await?; + tx.commit().await?; + + tracing::trace!( num = locator.number(), - id = ?block_id, + block_id = ?block_id, ?block_presence, - )) - .await?; - - tx.commit().await?; + "fork block", + ); } Ok(()) @@ -595,51 +584,41 @@ async fn read_block( root_node: &RootNode, locator: &Locator, read_key: &cipher::SecretKey, -) -> Result<(BlockId, Buffer)> { +) -> Result<(BlockId, BlockContent)> { let id = tx .find_block_at(root_node, &locator.encode(read_key)) .await?; - let mut buffer = Buffer::new(); - let nonce = tx.read_block(&id, &mut buffer).await?; + let mut content = BlockContent::new(); + let nonce = tx.read_block(&id, &mut content).await?; - decrypt_block(read_key, &nonce, &mut buffer); + decrypt_block(read_key, &nonce, &mut content); - Ok((id, buffer)) + Ok((id, content)) } -#[instrument(skip(tx, buffer, read_key, write_keys), fields(id))] -async fn write_block( - tx: &mut WriteTransaction, - branch_id: &PublicKey, +fn write_block( + changeset: &mut Changeset, locator: &Locator, - mut buffer: Buffer, + mut content: BlockContent, read_key: &cipher::SecretKey, - write_keys: &sign::Keypair, -) -> Result { +) -> BlockId { let nonce = rand::random(); - encrypt_block(read_key, &nonce, &mut buffer); - let id = BlockId::from_content(&buffer); - - Span::current().record("id", field::debug(&id)); - - let inserted = tx - .link_block( - branch_id, - &locator.encode(read_key), - &id, - SingleBlockPresence::Present, - write_keys, - ) - .await?; + encrypt_block(read_key, &nonce, &mut content); + + let block = Block::new(content, nonce); + let block_id = block.id; - // We shouldn't be inserting a block to a branch twice. If we do, the assumption is that we - // hit one in 2^sizeof(BlockId) chance that we randomly generated the same BlockId twice. - assert!(inserted); + changeset.link_block( + locator.encode(read_key), + block.id, + SingleBlockPresence::Present, + ); + changeset.write_block(block); - tx.write_block(&id, &buffer, &nonce).await?; + tracing::trace!(?locator, ?block_id, "write block"); - Ok(id) + block_id } fn decrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) { diff --git a/lib/src/blob/tests.rs b/lib/src/blob/tests.rs index ad934756c..edf0c9006 100644 --- a/lib/src/blob/tests.rs +++ b/lib/src/blob/tests.rs @@ -18,10 +18,17 @@ use test_strategy::proptest; #[tokio::test(flavor = "multi_thread")] async fn empty_blob() { let (_, _base_dir, store, [branch]) = setup(0).await; + let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let mut blob = Blob::create(branch.clone(), BlobId::ROOT); - blob.flush(&mut tx).await.unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); // Re-open the blob and read its contents. let mut blob = Blob::open(&mut tx, branch, BlobId::ROOT).await.unwrap(); @@ -54,8 +61,8 @@ async fn write_and_read_case( rng_seed: u64, ) { let (mut rng, _base_dir, store, [branch]) = setup(rng_seed).await; - let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let block_id = if is_root { BlobId::ROOT } else { rng.gen() }; @@ -65,10 +72,16 @@ async fn write_and_read_case( let orig_content: Vec = rng.sample_iter(Standard).take(blob_len).collect(); for chunk in orig_content.chunks(write_len) { - blob.write_all(&mut tx, chunk).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, chunk) + .await + .unwrap(); } - blob.flush(&mut tx).await.unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); // Re-open the blob and read from it in chunks of `read_len` bytes let mut blob = Blob::open(&mut tx, branch.clone(), block_id).await.unwrap(); @@ -101,14 +114,22 @@ fn len( test_utils::run(async { let (rng, _base_dir, store, [branch]) = setup(rng_seed).await; let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let content: Vec = rng.sample_iter(Standard).take(content_len).collect(); let mut blob = Blob::create(branch.clone(), BlobId::ROOT); - blob.write_all(&mut tx, &content[..]).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &content[..]) + .await + .unwrap(); assert_eq!(blob.len(), content_len as u64); - blob.flush(&mut tx).await.unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); + assert_eq!(blob.len(), content_len as u64); let blob = Blob::open(&mut tx, branch, BlobId::ROOT).await.unwrap(); @@ -149,13 +170,21 @@ fn seek_from_end( async fn seek_from(content_len: usize, seek_from: SeekFrom, expected_pos: usize, rng_seed: u64) { let (rng, _base_dir, store, [branch]) = setup(rng_seed).await; + let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let content: Vec = rng.sample_iter(Standard).take(content_len).collect(); let mut blob = Blob::create(branch.clone(), BlobId::ROOT); - blob.write_all(&mut tx, &content[..]).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &content[..]) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); blob.seek(seek_from); @@ -176,12 +205,19 @@ fn seek_from_current( test_utils::run(async { let (rng, _base_dir, store, [branch]) = setup(rng_seed).await; let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let content: Vec = rng.sample_iter(Standard).take(content_len).collect(); let mut blob = Blob::create(branch.clone(), BlobId::ROOT); - blob.write_all(&mut tx, &content[..]).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &content[..]) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); blob.seek(SeekFrom::Start(0)); @@ -203,13 +239,21 @@ fn seek_from_current( #[tokio::test(flavor = "multi_thread")] async fn seek_after_end() { let (_, _base_dir, store, [branch]) = setup(0).await; + let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let content = b"content"; let mut blob = Blob::create(branch.clone(), BlobId::ROOT); - blob.write_all(&mut tx, &content[..]).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &content[..]) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); let mut read_buffer = [0]; @@ -227,12 +271,19 @@ async fn seek_after_end() { async fn seek_before_start() { let (_, _base_dir, store, [branch]) = setup(0).await; let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let content = b"content"; let mut blob = Blob::create(branch.clone(), BlobId::ROOT); - blob.write_all(&mut tx, &content[..]).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &content[..]) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); let mut read_buffer = vec![0; content.len()]; @@ -258,13 +309,25 @@ async fn truncate_to_empty() { .take(2 * BLOCK_SIZE) .collect(); + let mut changeset = Changeset::new(); let mut blob = Blob::create(branch.clone(), id); - blob.write_all(&mut tx, &content).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &content) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); assert_eq!(blob.len(), content.len() as u64); + let mut changeset = Changeset::new(); blob.truncate(0).unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); assert_eq!(blob.len(), 0); let mut buffer = [0; 1]; @@ -287,15 +350,27 @@ async fn truncate_to_shorter() { .take(3 * BLOCK_SIZE) .collect(); + let mut changeset = Changeset::new(); let mut blob = Blob::create(branch.clone(), id); - blob.write_all(&mut tx, &content).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &content) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); assert_eq!(blob.len(), content.len() as u64); let new_len = BLOCK_SIZE / 2; + let mut changeset = Changeset::new(); blob.truncate(new_len as u64).unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); assert_eq!(blob.len(), new_len as u64); let mut buffer = vec![0; content.len()]; @@ -311,6 +386,7 @@ async fn truncate_to_shorter() { async fn truncate_marks_as_dirty() { let (mut rng, _base_dir, store, [branch]) = setup(0).await; let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let id = rng.gen(); @@ -320,8 +396,10 @@ async fn truncate_marks_as_dirty() { .collect(); let mut blob = Blob::create(branch.clone(), id); - blob.write_all(&mut tx, &content).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &content) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); blob.truncate(0).unwrap(); assert!(blob.is_dirty()); @@ -333,6 +411,7 @@ async fn truncate_marks_as_dirty() { #[tokio::test(flavor = "multi_thread")] async fn modify_blob() { let (mut rng, _base_dir, store, [branch]) = setup(0).await; + let mut tx = store.begin_write().await.unwrap(); let id = rng.gen(); @@ -340,9 +419,16 @@ async fn modify_blob() { let locator1 = locator0.next(); let content = vec![0; 2 * BLOCK_SIZE]; + let mut changeset = Changeset::new(); let mut blob = Blob::create(branch.clone(), id); - blob.write_all(&mut tx, &content).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &content) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); let locator0 = locator0.encode(branch.keys().read()); let locator1 = locator1.encode(branch.keys().read()); @@ -354,9 +440,16 @@ async fn modify_blob() { }; let buffer = vec![1; 3 * BLOCK_SIZE / 2]; + let mut changeset = Changeset::new(); blob.seek(SeekFrom::Start(0)); - blob.write_all(&mut tx, &buffer).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &buffer) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); let new_block_id0 = tx.find_block(branch.id(), &locator0).await.unwrap(); let new_block_id1 = tx.find_block(branch.id(), &locator1).await.unwrap(); @@ -375,18 +468,32 @@ async fn modify_blob() { #[tokio::test(flavor = "multi_thread")] async fn append() { let (mut rng, _base_dir, store, [branch]) = setup(0).await; + let mut tx = store.begin_write().await.unwrap(); let id = rng.gen(); + let mut changeset = Changeset::new(); let mut blob = Blob::create(branch.clone(), id); - blob.write_all(&mut tx, b"foo").await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, b"foo") + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); + let mut changeset = Changeset::new(); let mut blob = Blob::open(&mut tx, branch.clone(), id).await.unwrap(); - blob.seek(SeekFrom::End(0)); - blob.write_all(&mut tx, b"bar").await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, b"bar") + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); let mut blob = Blob::open(&mut tx, branch, id).await.unwrap(); @@ -401,12 +508,19 @@ async fn append() { async fn write_reopen_and_read() { let (mut rng, _base_dir, store, [branch]) = setup(0).await; let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let id = rng.gen(); let mut blob = Blob::create(branch.clone(), id); - blob.write_all(&mut tx, b"foo").await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, b"foo") + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); let mut blob = Blob::open(&mut tx, branch, id).await.unwrap(); @@ -452,14 +566,22 @@ async fn fork_and_write_case( let src_content: Vec = (&mut rng).sample_iter(Standard).take(src_len).collect(); let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let mut blob = Blob::create(src_branch.clone(), src_id); - blob.write_all(&mut tx, &src_content[..]).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &src_content[..]) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, src_branch.id(), src_branch.keys().write().unwrap()) + .await + .unwrap(); tx.commit().await.unwrap(); fork(src_id, &src_branch, &dst_branch).await.unwrap(); let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let mut blob = Blob::open(&mut tx, dst_branch.clone(), src_id) .await .unwrap(); @@ -467,8 +589,14 @@ async fn fork_and_write_case( let write_content: Vec = rng.sample_iter(Standard).take(write_len).collect(); blob.seek(SeekFrom::Start(seek_pos as u64)); - blob.write_all(&mut tx, &write_content[..]).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &write_content[..]) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, dst_branch.id(), dst_branch.keys().write().unwrap()) + .await + .unwrap(); // Re-open the orig and verify the content is unchanged let mut orig = Blob::open(&mut tx, src_branch, src_id).await.unwrap(); @@ -504,9 +632,16 @@ async fn fork_is_idempotent() { let content: Vec = (&mut rng).sample_iter(Standard).take(512 * 1024).collect(); let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let mut blob = Blob::create(src_branch.clone(), id); - blob.write_all(&mut tx, &content[..]).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + blob.write_all(&mut tx, &mut changeset, &content[..]) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, src_branch.id(), src_branch.keys().write().unwrap()) + .await + .unwrap(); tx.commit().await.unwrap(); for i in 0..2 { @@ -524,13 +659,18 @@ async fn fork_then_remove_src_branch() { let id_1 = rng.gen(); let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); let mut blob_0 = Blob::create(src_branch.clone(), id_0); - blob_0.flush(&mut tx).await.unwrap(); + blob_0.flush(&mut tx, &mut changeset).await.unwrap(); let mut blob_1 = Blob::create(src_branch.clone(), id_1); - blob_1.flush(&mut tx).await.unwrap(); + blob_1.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, src_branch.id(), src_branch.keys().write().unwrap()) + .await + .unwrap(); tx.commit().await.unwrap(); fork(id_0, &src_branch, &dst_branch).await.unwrap(); @@ -567,8 +707,15 @@ async fn block_ids_test() { .take(BLOCK_SIZE * 3 - HEADER_SIZE) .collect(); let mut tx = store.begin_write().await.unwrap(); - blob.write_all(&mut tx, &content).await.unwrap(); - blob.flush(&mut tx).await.unwrap(); + let mut changeset = Changeset::new(); + blob.write_all(&mut tx, &mut changeset, &content) + .await + .unwrap(); + blob.flush(&mut tx, &mut changeset).await.unwrap(); + changeset + .apply(&mut tx, branch.id(), branch.keys().write().unwrap()) + .await + .unwrap(); tx.commit().await.unwrap(); let mut block_ids = BlockIds::open(branch, blob_id).await.unwrap(); diff --git a/lib/src/block_tracker.rs b/lib/src/block_tracker.rs index 78fedf2c1..e499009bd 100644 --- a/lib/src/block_tracker.rs +++ b/lib/src/block_tracker.rs @@ -354,11 +354,7 @@ type ClientId = usize; #[cfg(test)] mod tests { use super::*; - use crate::{ - collections::HashSet, - protocol::{BlockData, BLOCK_SIZE}, - test_utils, - }; + use crate::{collections::HashSet, protocol::Block, test_utils}; use futures_util::future; use rand::{distributions::Standard, rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; use std::{pin::pin, time::Duration}; @@ -375,12 +371,12 @@ mod tests { assert!(client.acceptor().try_accept().is_none()); // Offered but not required blocks are not returned - let block0 = make_block(); + let block0: Block = rand::random(); client.offer(block0.id, OfferState::Approved); assert!(client.acceptor().try_accept().is_none()); // Required but not offered blocks are not returned - let block1 = make_block(); + let block1: Block = rand::random(); tracker.require(block1.id); assert!(client.acceptor().try_accept().is_none()); @@ -403,7 +399,7 @@ mod tests { async fn simple_async() { let tracker = BlockTracker::new(); - let block = make_block(); + let block: Block = rand::random(); let client = tracker.client(); let mut acceptor = client.acceptor(); @@ -444,7 +440,7 @@ mod tests { let client0 = tracker.client(); let client1 = tracker.client(); - let block = make_block(); + let block: Block = rand::random(); tracker.require(block.id); client0.offer(block.id, OfferState::Approved); @@ -477,7 +473,7 @@ mod tests { let client0 = tracker.client(); let client1 = tracker.client(); - let block = make_block(); + let block: Block = rand::random(); client0.offer(block.id, OfferState::Approved); client1.offer(block.id, OfferState::Approved); @@ -503,7 +499,7 @@ mod tests { let client0 = tracker.client(); let client1 = tracker.client(); - let block = make_block(); + let block: Block = rand::random(); client0.offer(block.id, OfferState::Approved); client1.offer(block.id, OfferState::Approved); @@ -537,7 +533,7 @@ mod tests { let client0 = tracker.client(); let client1 = tracker.client(); - let block = make_block(); + let block: Block = rand::random(); client0.offer(block.id, OfferState::Approved); client1.offer(block.id, OfferState::Approved); @@ -561,7 +557,7 @@ mod tests { let tracker = BlockTracker::new(); let client = tracker.client(); - let block = make_block(); + let block: Block = rand::random(); tracker.require(block.id); client.offer(block.id, OfferState::Pending); @@ -585,7 +581,7 @@ mod tests { let tracker = BlockTracker::new(); let clients: Vec<_> = (0..num_clients).map(|_| tracker.client()).collect(); - let block = make_block(); + let block: Block = rand::random(); tracker.require(block.id); @@ -674,11 +670,4 @@ mod tests { assert!(block_promise.contains(block_id)); } } - - fn make_block() -> BlockData { - let mut content = vec![0; BLOCK_SIZE].into_boxed_slice(); - rand::thread_rng().fill(&mut content[..]); - - BlockData::from(content) - } } diff --git a/lib/src/branch/mod.rs b/lib/src/branch/mod.rs index fca20d2fb..41e8d2bd4 100644 --- a/lib/src/branch/mod.rs +++ b/lib/src/branch/mod.rs @@ -7,9 +7,8 @@ use crate::{ error::{Error, Result}, event::{EventScope, EventSender, Payload}, file::{File, FileProgressCache}, - locator::Locator, path, - protocol::BlockId, + protocol::{BlockId, Locator}, store::{self, Store}, version_vector::VersionVector, }; diff --git a/lib/src/deadlock/expect_short_lifetime.rs b/lib/src/deadlock/expect_short_lifetime.rs index 4373f4bff..392271d2d 100644 --- a/lib/src/deadlock/expect_short_lifetime.rs +++ b/lib/src/deadlock/expect_short_lifetime.rs @@ -12,6 +12,7 @@ use std::{ /// than expected. pub(crate) struct ExpectShortLifetime { id: Id, + start: Instant, } impl ExpectShortLifetime { @@ -24,13 +25,16 @@ impl ExpectShortLifetime { let context = Context::new(location); let id = schedule(max_lifetime, context); - Self { id } + Self { + id, + start: Instant::now(), + } } } impl Drop for ExpectShortLifetime { fn drop(&mut self) { - cancel(self.id); + cancel(self.id, self.start); } } @@ -73,11 +77,12 @@ fn schedule(duration: Duration, context: Context) -> Id { TIMER.schedule(deadline, context) } -fn cancel(id: Id) { +fn cancel(id: Id, start: Instant) { if TIMER.cancel(id).is_none() { - println!( - "🐢🐢🐢 Previously reported task (id: {}) eventually completed 🐢🐢🐢", - id + tracing::warn!( + "🐢🐢🐢 Previously reported task (id: {}) eventually completed in {:?} 🐢🐢🐢", + id, + start.elapsed(), ); } } @@ -86,11 +91,10 @@ fn watching_thread() { loop { let (id, context) = TIMER.wait(); - // Using `println!` and not `tracing::*` to avoid circular dependencies because on - // Android tracing uses `StateMonitor` which uses these mutexes. - println!( + tracing::warn!( "🐢🐢🐢 Task taking too long (id: {}) 🐢🐢🐢\n{}\n", - id, context + id, + context ); } } diff --git a/lib/src/directory/content.rs b/lib/src/directory/content.rs index 280d6f321..8b194abec 100644 --- a/lib/src/directory/content.rs +++ b/lib/src/directory/content.rs @@ -6,7 +6,6 @@ use crate::{ blob::BlobId, branch::Branch, error::{Error, Result}, - protocol::VersionVectorOp, version_vector::VersionVector, }; use serde::Deserialize; @@ -114,14 +113,18 @@ impl Content { } /// Updates the version vector of entry at `name`. - pub fn bump(&mut self, branch: &Branch, name: &str, op: VersionVectorOp<'_>) -> Result<()> { - op.apply( - branch.id(), - self.entries - .get_mut(name) - .ok_or(Error::EntryNotFound)? - .version_vector_mut(), - ); + pub fn bump(&mut self, branch: &Branch, name: &str, merge: &VersionVector) -> Result<()> { + let vv = self + .entries + .get_mut(name) + .ok_or(Error::EntryNotFound)? + .version_vector_mut(); + + if merge.is_empty() { + vv.increment(*branch.id()); + } else { + vv.merge(merge); + } Ok(()) } diff --git a/lib/src/directory/entry.rs b/lib/src/directory/entry.rs index e6e908c28..754335d43 100644 --- a/lib/src/directory/entry.rs +++ b/lib/src/directory/entry.rs @@ -10,7 +10,7 @@ use crate::{ crypto::sign::PublicKey, error::{Error, Result}, file::File, - locator::Locator, + protocol::Locator, store::ReadTransaction, version_vector::VersionVector, versioned::{BranchItem, Versioned}, diff --git a/lib/src/directory/mod.rs b/lib/src/directory/mod.rs index c002a0caf..b4a86954c 100644 --- a/lib/src/directory/mod.rs +++ b/lib/src/directory/mod.rs @@ -26,9 +26,8 @@ use crate::{ debug::DebugPrinter, error::{Error, Result}, file::File, - locator::Locator, - protocol::{RootNode, VersionVectorOp}, - store::{self, ReadTransaction, WriteTransaction}, + protocol::{Locator, RootNode}, + store::{self, Changeset, ReadTransaction, WriteTransaction}, version_vector::VersionVector, }; use async_recursion::async_recursion; @@ -63,6 +62,7 @@ impl Directory { let lock = branch.locker().read(*locator.blob_id()).await; let mut tx = branch.store().begin_write().await?; + let mut changeset = Changeset::new(); let dir = match Self::open_in( Some(lock), @@ -83,9 +83,10 @@ impl Directory { dir } else { let mut dir = Self::create(branch, locator, None); - dir.save(&mut tx, &Content::empty()).await?; - dir.bump(&mut tx, VersionVectorOp::IncrementLocal).await?; - dir.commit(tx).await?; + dir.save(&mut tx, &mut changeset, &Content::empty()).await?; + dir.bump(&mut tx, &mut changeset, &VersionVector::new()) + .await?; + dir.commit(tx, changeset).await?; dir }; @@ -116,6 +117,8 @@ impl Directory { /// Creates a new file inside this directory. pub async fn create_file(&mut self, name: String) -> Result { let mut tx = self.branch().store().begin_write().await?; + let mut changeset = Changeset::new(); + self.refresh_in(&mut tx).await?; let blob_id = rand::random(); @@ -130,10 +133,11 @@ impl Directory { let mut content = self.content.clone(); let _old_lock = content.insert(self.branch(), name, data, None)?; - file.save(&mut tx).await?; - self.save(&mut tx, &content).await?; - self.bump(&mut tx, VersionVectorOp::IncrementLocal).await?; - self.commit(tx).await?; + file.save(&mut tx, &mut changeset).await?; + self.save(&mut tx, &mut changeset, &content).await?; + self.bump(&mut tx, &mut changeset, &VersionVector::new()) + .await?; + self.commit(tx, changeset).await?; self.finalize(content); Ok(file) @@ -141,22 +145,29 @@ impl Directory { /// Creates a new subdirectory of this directory. pub async fn create_directory(&mut self, name: String) -> Result { - self.create_directory_with_version_vector_op(name, VersionVectorOp::IncrementLocal) + self.create_directory_with_version_vector(name, &VersionVector::new()) .await } - async fn create_directory_with_version_vector_op( + async fn create_directory_with_version_vector( &mut self, name: String, - op: VersionVectorOp<'_>, + merge: &VersionVector, ) -> Result { let mut tx = self.branch().store().begin_write().await?; + let mut changeset = Changeset::new(); + self.refresh_in(&mut tx).await?; let blob_id = rand::random(); let mut version_vector = self.content.initial_version_vector(&name); - op.apply(self.branch().id(), &mut version_vector); + + if merge.is_empty() { + version_vector.increment(*self.branch().id()) + } else { + version_vector.merge(merge) + } let data = EntryData::directory(blob_id, version_vector); let parent = self.create_parent_context(name.clone()); @@ -166,10 +177,10 @@ impl Directory { let mut content = self.content.clone(); let _old_lock = content.insert(self.branch(), name, data, None)?; - dir.save(&mut tx, &Content::empty()).await?; - self.save(&mut tx, &content).await?; - self.bump(&mut tx, op).await?; - self.commit(tx).await?; + dir.save(&mut tx, &mut changeset, &Content::empty()).await?; + self.save(&mut tx, &mut changeset, &content).await?; + self.bump(&mut tx, &mut changeset, merge).await?; + self.commit(tx, changeset).await?; self.finalize(content); Ok(dir) @@ -183,38 +194,32 @@ impl Directory { name: &str, merge_vv: &VersionVector, ) -> Result { - if let Some(dir) = self - .try_open_directory_and_merge_version_vector(name, merge_vv) - .await? - { + if let Some(mut dir) = self.try_open_directory(name).await? { + dir.merge_version_vector(merge_vv).await?; Ok(dir) } else { match self - .create_directory_with_version_vector_op( - name.to_owned(), - VersionVectorOp::Merge(merge_vv), - ) + .create_directory_with_version_vector(name.to_owned(), merge_vv) .await { Ok(dir) => Ok(dir), Err(Error::EntryExists) => { // The entry have been created in the meantime by some other task. Try to open // it again (this time it must exist). - self.try_open_directory_and_merge_version_vector(name, merge_vv) - .await - .transpose() - .expect("entry must exist") + let mut dir = self + .try_open_directory(name) + .await? + .expect("entry must exist"); + dir.merge_version_vector(merge_vv).await?; + + Ok(dir) } Err(error) => Err(error), } } } - async fn try_open_directory_and_merge_version_vector( - &self, - name: &str, - merge_vv: &VersionVector, - ) -> Result> { + async fn try_open_directory(&self, name: &str) -> Result> { let entry = match self.lookup(name) { Ok(EntryRef::Directory(entry)) => entry, Ok(EntryRef::Tombstone(_)) | Err(Error::EntryNotFound) => return Ok(None), @@ -222,8 +227,7 @@ impl Directory { Err(error) => return Err(error), }; - let mut dir = entry.open(DirectoryFallback::Disabled).await?; - dir.merge_version_vector(merge_vv).await?; + let dir = entry.open(DirectoryFallback::Disabled).await?; Ok(Some(dir)) } @@ -251,9 +255,10 @@ impl Directory { tombstone: EntryTombstoneData, ) -> Result<()> { let mut tx = self.branch().store().begin_write().await?; + let mut changeset = Changeset::new(); let (content, _old_lock) = match self - .begin_remove_entry(&mut tx, name, branch_id, tombstone) + .begin_remove_entry(&mut tx, &mut changeset, name, branch_id, tombstone) .await { Ok(content) => content, @@ -263,7 +268,7 @@ impl Directory { Err(error) => return Err(error), }; - self.commit(tx).await?; + self.commit(tx, changeset).await?; self.finalize(content); Ok(()) @@ -301,21 +306,37 @@ impl Directory { let mut tx = self.branch().store().begin_write().await?; + let mut changeset = Changeset::new(); let (dst_content, _old_dst_lock) = dst_dir - .begin_insert_entry(&mut tx, dst_name.to_owned(), dst_data) + .begin_insert_entry(&mut tx, &mut changeset, dst_name.to_owned(), dst_data) + .await?; + + // TODO: Handle the case when `self` == `dst_dir` separately (call `refresh` and `save` + // only once) to avoid having to apply the changeset here. + changeset + .apply( + &mut tx, + self.branch().id(), + self.branch() + .keys() + .write() + .ok_or(Error::PermissionDenied)?, + ) .await?; let branch_id = *self.branch().id(); + let mut changeset = Changeset::new(); let (src_content, _old_src_lock) = self .begin_remove_entry( &mut tx, + &mut changeset, src_name, &branch_id, EntryTombstoneData::moved(src_vv), ) .await?; - self.commit(tx).await?; + self.commit(tx, changeset).await?; self.finalize(src_content); dst_dir.finalize(dst_content); @@ -357,8 +378,10 @@ impl Directory { #[instrument(skip(self), err(Debug))] pub(crate) async fn merge_version_vector(&mut self, vv: &VersionVector) -> Result<()> { let mut tx = self.branch().store().begin_write().await?; - self.bump(&mut tx, VersionVectorOp::Merge(vv)).await?; - self.commit(tx).await + let mut changeset = Changeset::new(); + + self.bump(&mut tx, &mut changeset, vv).await?; + self.commit(tx, changeset).await } pub async fn parent(&self) -> Result> { @@ -523,7 +546,8 @@ impl Directory { async fn begin_remove_entry( &mut self, - tx: &mut WriteTransaction, + tx: &mut ReadTransaction, + changeset: &mut Changeset, name: &str, branch_id: &PublicKey, mut tombstone: EntryTombstoneData, @@ -569,12 +593,14 @@ impl Directory { Err(e) => return Err(e), }; - self.begin_insert_entry(tx, name.to_owned(), new_data).await + self.begin_insert_entry(tx, changeset, name.to_owned(), new_data) + .await } async fn begin_insert_entry( &mut self, - tx: &mut WriteTransaction, + tx: &mut ReadTransaction, + changeset: &mut Changeset, name: String, data: EntryData, ) -> Result<(Content, Option)> { @@ -582,8 +608,8 @@ impl Directory { let mut content = self.content.clone(); let old_lock = content.insert(self.branch(), name, data, None)?; - self.save(tx, &content).await?; - self.bump(tx, VersionVectorOp::IncrementLocal).await?; + self.save(tx, changeset, &content).await?; + self.bump(tx, changeset, &VersionVector::new()).await?; Ok((content, old_lock)) } @@ -606,42 +632,57 @@ impl Directory { } } - async fn save(&mut self, tx: &mut WriteTransaction, content: &Content) -> Result<()> { + async fn save( + &mut self, + tx: &mut ReadTransaction, + changeset: &mut Changeset, + content: &Content, + ) -> Result<()> { // Save the directory content into the store let buffer = content.serialize(); self.blob.truncate(0)?; - self.blob.write_all(tx, &buffer).await?; - self.blob.flush(tx).await?; + self.blob.write_all(tx, changeset, &buffer).await?; + self.blob.flush(tx, changeset).await?; Ok(()) } /// Atomically commits the transaction and sends notification event. - async fn commit(&mut self, tx: WriteTransaction) -> Result<()> { + async fn commit(&mut self, mut tx: WriteTransaction, changeset: Changeset) -> Result<()> { + changeset + .apply( + &mut tx, + self.branch().id(), + self.branch() + .keys() + .write() + .ok_or(Error::PermissionDenied)?, + ) + .await?; + let event_tx = self.branch().notify(); tx.commit_and_then(move || event_tx.send()).await?; + Ok(()) } /// Updates the version vectors of this directory and all its ancestors. + /// If `merge` is non-empty, the resulting vv is computed by mergin the original vv with it, + /// otherwise by incrementing the local version. #[async_recursion] - async fn bump<'a: 'async_recursion>( + async fn bump( &mut self, - tx: &mut WriteTransaction, - op: VersionVectorOp<'a>, + tx: &mut ReadTransaction, + changeset: &mut Changeset, + merge: &VersionVector, ) -> Result<()> { // Update the version vector of this directory and all it's ancestors if let Some(parent) = self.parent.as_mut() { - parent.bump(tx, self.blob.branch().clone(), op).await + parent + .bump(tx, changeset, self.blob.branch().clone(), merge) + .await } else { - let write_keys = self - .branch() - .keys() - .write() - .ok_or(Error::PermissionDenied)?; - - tx.bump(self.branch().id(), op, write_keys).await?; - + changeset.bump(merge); Ok(()) } } diff --git a/lib/src/directory/parent_context.rs b/lib/src/directory/parent_context.rs index 8e39b7598..014c7f378 100644 --- a/lib/src/directory/parent_context.rs +++ b/lib/src/directory/parent_context.rs @@ -8,9 +8,8 @@ use crate::{ branch::Branch, directory::{content::EntryExists, Directory}, error::Result, - locator::Locator, - protocol::VersionVectorOp, - store::{ReadTransaction, WriteTransaction}, + protocol::Locator, + store::{Changeset, ReadTransaction}, version_vector::VersionVector, }; use tracing::{field, instrument, Span}; @@ -47,15 +46,16 @@ impl ParentContext { /// This updates the version vector of this entry and all its ancestors. pub async fn bump( &self, - tx: &mut WriteTransaction, + tx: &mut ReadTransaction, + changeset: &mut Changeset, branch: Branch, - op: VersionVectorOp<'_>, + merge: &VersionVector, ) -> Result<()> { let mut directory = self.open_in(tx, branch).await?; let mut content = directory.content.clone(); - content.bump(directory.branch(), &self.entry_name, op)?; - directory.save(tx, &content).await?; - directory.bump(tx, op).await?; + content.bump(directory.branch(), &self.entry_name, merge)?; + directory.save(tx, changeset, &content).await?; + directory.bump(tx, changeset, merge).await?; Ok(()) } @@ -150,6 +150,8 @@ impl ParentContext { // cases the newly forked blob will be unlocked and eventually garbage-collected. This // wastes work but is otherwise harmless. The fork can be retried at any time. let mut tx = directory.branch().store().begin_write().await?; + let mut changeset = Changeset::new(); + directory.refresh_in(&mut tx).await?; let src_vv = src_entry_data.version_vector().clone(); @@ -162,11 +164,9 @@ impl ParentContext { old_blob_id.map(|_| lock), ) { Ok(_lock) => { - directory.save(&mut tx, &content).await?; - directory - .bump(&mut tx, VersionVectorOp::Merge(&src_vv)) - .await?; - directory.commit(tx).await?; + directory.save(&mut tx, &mut changeset, &content).await?; + directory.bump(&mut tx, &mut changeset, &src_vv).await?; + directory.commit(tx, changeset).await?; directory.finalize(content); tracing::trace!("fork complete"); Ok(new_context) diff --git a/lib/src/directory/tests.rs b/lib/src/directory/tests.rs index 8c84242ca..b29bc8002 100644 --- a/lib/src/directory/tests.rs +++ b/lib/src/directory/tests.rs @@ -120,6 +120,8 @@ async fn remove_file() { // Rename a file without moving it to another directory. #[tokio::test(flavor = "multi_thread")] async fn rename_file() { + crate::test_utils::init_log(); + let (_base_dir, branch) = setup().await; let src_name = "zebra.txt"; @@ -156,7 +158,7 @@ async fn rename_file() { .await .unwrap(); - // Reopen again and check the file entry was removed. + // Reopen again and check the file entry was renamed. let parent_dir = branch .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled) .await diff --git a/lib/src/file/mod.rs b/lib/src/file/mod.rs index f892aabf8..5093b1118 100644 --- a/lib/src/file/mod.rs +++ b/lib/src/file/mod.rs @@ -7,9 +7,8 @@ use crate::{ branch::Branch, directory::{Directory, ParentContext}, error::{Error, Result}, - locator::Locator, - protocol::{VersionVectorOp, BLOCK_SIZE}, - store::WriteTransaction, + protocol::{Locator, BLOCK_SIZE}, + store::{Changeset, ReadTransaction}, version_vector::VersionVector, }; use std::{fmt, future::Future, io::SeekFrom}; @@ -208,12 +207,26 @@ impl File { } let mut tx = self.branch().store().begin_write().await?; - self.blob.flush(&mut tx).await?; + let mut changeset = Changeset::new(); + + self.blob.flush(&mut tx, &mut changeset).await?; self.parent .bump( &mut tx, + &mut changeset, self.branch().clone(), - VersionVectorOp::IncrementLocal, + &VersionVector::new(), + ) + .await?; + + changeset + .apply( + &mut tx, + self.branch().id(), + self.branch() + .keys() + .write() + .ok_or(Error::PermissionDenied)?, ) .await?; @@ -225,8 +238,12 @@ impl File { /// Saves any pending modifications but does not update the version vectors. For internal use /// only. - pub(crate) async fn save(&mut self, tx: &mut WriteTransaction) -> Result<()> { - self.blob.flush(tx).await?; + pub(crate) async fn save( + &mut self, + tx: &mut ReadTransaction, + changeset: &mut Changeset, + ) -> Result<()> { + self.blob.flush(tx, changeset).await?; Ok(()) } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 3bc2fd52d..330c14a95 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -31,7 +31,6 @@ mod future; mod iterator; mod joint_directory; mod joint_entry; -mod locator; mod progress; mod protocol; mod repository; diff --git a/lib/src/network/client.rs b/lib/src/network/client.rs index d4fcfdf69..9defdc0c3 100644 --- a/lib/src/network/client.rs +++ b/lib/src/network/client.rs @@ -8,10 +8,7 @@ use crate::{ block_tracker::{BlockPromise, BlockTrackerClient, OfferState}, crypto::{sign::PublicKey, CacheHash, Hashable}, error::{Error, Result}, - protocol::{ - BlockData, BlockId, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence, - UntrustedProof, - }, + protocol::{Block, BlockId, InnerNodeMap, LeafNodeSet, MultiBlockPresence, UntrustedProof}, repository::{BlockRequestMode, RepositoryMonitor, Vault}, store::{self, ReceiveFilter}, }; @@ -182,8 +179,7 @@ impl Client { .await } PendingResponse::Block { - data, - nonce, + block, block_promise, permit: _permit, debug, @@ -191,7 +187,7 @@ impl Client { self.vault .monitor .handle_block_metric - .measure_ok(self.handle_block(data, nonce, block_promise, debug)) + .measure_ok(self.handle_block(block, block_promise, debug)) .await } PendingResponse::BlockNotFound { @@ -350,15 +346,14 @@ impl Client { Ok(()) } - #[instrument(skip_all, fields(id = ?data.id), err(Debug))] + #[instrument(skip_all, fields(id = ?block.id), err(Debug))] async fn handle_block( &self, - data: BlockData, - nonce: BlockNonce, + block: Block, block_promise: Option, _debug: DebugReceivedResponse, ) -> Result<()> { - match self.vault.receive_block(&data, &nonce, block_promise).await { + match self.vault.receive_block(&block, block_promise).await { // Ignore `BlockNotReferenced` errors as they only mean that the block is no longer // needed. Ok(()) | Err(Error::Store(store::Error::BlockNotReferenced)) => Ok(()), diff --git a/lib/src/network/message.rs b/lib/src/network/message.rs index baaf63c5d..30ffb73e3 100644 --- a/lib/src/network/message.rs +++ b/lib/src/network/message.rs @@ -8,7 +8,8 @@ use crate::{ crypto::{sign::PublicKey, Hash, Hashable}, format::Hex, protocol::{ - BlockId, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence, UntrustedProof, + BlockContent, BlockId, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence, + UntrustedProof, }, repository::RepositoryId, }; @@ -54,7 +55,7 @@ pub(crate) enum Response { ChildNodesError(Hash, ResponseDisambiguator, DebugResponsePayload), /// Send a requested block. Block { - content: Box<[u8]>, + content: BlockContent, nonce: BlockNonce, debug: DebugResponsePayload, }, diff --git a/lib/src/network/pending.rs b/lib/src/network/pending.rs index 143ec51d9..a07c97d77 100644 --- a/lib/src/network/pending.rs +++ b/lib/src/network/pending.rs @@ -7,10 +7,7 @@ use crate::{ collections::{hash_map::Entry, HashMap}, crypto::{sign::PublicKey, CacheHash, Hash, Hashable}, deadlock::BlockingMutex, - protocol::{ - BlockData, BlockId, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence, - UntrustedProof, - }, + protocol::{Block, BlockId, InnerNodeMap, LeafNodeSet, MultiBlockPresence, UntrustedProof}, repository::RepositoryMonitor, sync::uninitialized_watch, }; @@ -71,8 +68,7 @@ pub(super) enum PendingResponse { debug: DebugReceivedResponse, }, Block { - data: BlockData, - nonce: BlockNonce, + block: Block, // This will be `None` if the request timeouted but we still received the response // afterwards. block_promise: Option, @@ -177,10 +173,9 @@ impl PendingRequests { debug, } } - processed_response::Success::Block { data, nonce, debug } => { + processed_response::Success::Block { block, debug } => { PendingResponse::Block { - data, - nonce, + block, permit, debug, block_promise: request_data.block_promise, @@ -337,8 +332,7 @@ mod processed_response { DebugReceivedResponse, ), Block { - data: BlockData, - nonce: BlockNonce, + block: Block, debug: DebugReceivedResponse, }, } @@ -368,7 +362,7 @@ impl ProcessedResponse { Self::Success(processed_response::Success::LeafNodes(nodes, disambiguator, _)) => { Key::ChildNodes(nodes.hash(), *disambiguator) } - Self::Success(processed_response::Success::Block { data, .. }) => Key::Block(data.id), + Self::Success(processed_response::Success::Block { block, .. }) => Key::Block(block.id), Self::Failure(processed_response::Failure::RootNode(branch_id, _)) => { Key::RootNode(*branch_id) } @@ -411,8 +405,7 @@ impl From for ProcessedResponse { nonce, debug, } => Self::Success(processed_response::Success::Block { - data: content.into(), - nonce, + block: Block::new(content, nonce), debug: debug.received(), }), Response::RootNodeError(branch_id, debug) => Self::Failure( diff --git a/lib/src/network/server.rs b/lib/src/network/server.rs index 38f0ffa3a..e39c5b181 100644 --- a/lib/src/network/server.rs +++ b/lib/src/network/server.rs @@ -6,7 +6,7 @@ use crate::{ crypto::{sign::PublicKey, Hash}, error::{Error, Result}, event::Payload, - protocol::{BlockId, RootNode, BLOCK_SIZE}, + protocol::{BlockContent, BlockId, RootNode}, repository::Vault, store, }; @@ -196,7 +196,7 @@ impl<'a> Responder<'a> { #[instrument(skip(self, debug), err(Debug))] async fn handle_block(&self, id: BlockId, debug: DebugRequestPayload) -> Result<()> { let debug = debug.begin_reply(); - let mut content = vec![0; BLOCK_SIZE].into_boxed_slice(); + let mut content = BlockContent::new(); let result = self .vault .store() diff --git a/lib/src/network/tests.rs b/lib/src/network/tests.rs index bb3a5b650..49f169e4c 100644 --- a/lib/src/network/tests.rs +++ b/lib/src/network/tests.rs @@ -12,10 +12,11 @@ use crate::{ metrics::Metrics, protocol::{ test_utils::{receive_blocks, receive_nodes, Snapshot}, - BlockId, RootNode, SingleBlockPresence, VersionVectorOp, BLOCK_SIZE, + Block, BlockId, RootNode, SingleBlockPresence, }, repository::{BlockRequestMode, RepositoryId, RepositoryMonitor, Vault}, state_monitor::StateMonitor, + store::Changeset, test_utils, version_vector::VersionVector, }; @@ -142,10 +143,7 @@ async fn transfer_blocks_between_two_replicas_case(block_count: usize, rng_seed: a_block_tracker.offer(*id, OfferState::Approved); let promise = a_block_tracker.acceptor().try_accept().unwrap(); - a_vault - .receive_block(&block.data, &block.nonce, Some(promise)) - .await - .unwrap(); + a_vault.receive_block(block, Some(promise)).await.unwrap(); tracing::info!(?id, "write block"); // Then wait until replica B receives and writes it too. @@ -374,7 +372,6 @@ async fn create_repository( let repository_id = RepositoryId::from(write_keys.public); let event_tx = EventSender::new(1); - let state = Vault::new( repository_id, event_tx, @@ -485,16 +482,12 @@ async fn create_changeset( write_keys: &Keypair, size: usize, ) { + assert!(size > 0); + for _ in 0..size { create_block(rng, vault, writer_id, write_keys).await; } - let mut tx = vault.store().begin_write().await.unwrap(); - tx.bump(writer_id, VersionVectorOp::IncrementLocal, write_keys) - .await - .unwrap(); - tx.commit().await.unwrap(); - vault.event_tx.send(Payload::BranchChanged(*writer_id)); } @@ -505,24 +498,17 @@ async fn create_block( write_keys: &Keypair, ) { let encoded_locator = rng.gen(); - - let mut content = vec![0; BLOCK_SIZE]; - rng.fill(&mut content[..]); - - let block_id = BlockId::from_content(&content); - let nonce = rng.gen(); + let block: Block = rng.gen(); let mut tx = vault.store().begin_write().await.unwrap(); - tx.link_block( - branch_id, - &encoded_locator, - &block_id, - SingleBlockPresence::Present, - write_keys, - ) - .await - .unwrap(); - tx.write_block(&block_id, &content, &nonce).await.unwrap(); + let mut changeset = Changeset::new(); + + changeset.link_block(encoded_locator, block.id, SingleBlockPresence::Present); + changeset.write_block(block); + changeset + .apply(&mut tx, branch_id, write_keys) + .await + .unwrap(); tx.commit().await.unwrap(); } diff --git a/lib/src/protocol/block.rs b/lib/src/protocol/block.rs index d95cbfdc2..06764c41e 100644 --- a/lib/src/protocol/block.rs +++ b/lib/src/protocol/block.rs @@ -1,7 +1,12 @@ use crate::crypto::{Digest, Hash, Hashable}; use rand::{distributions::Standard, prelude::Distribution, Rng}; use serde::{Deserialize, Serialize}; -use std::{array::TryFromSliceError, fmt}; +use std::{ + array::TryFromSliceError, + fmt, + ops::{Deref, DerefMut}, +}; +use zeroize::Zeroize; /// Block size in bytes. pub const BLOCK_SIZE: usize = 32 * 1024; @@ -21,8 +26,8 @@ pub struct BlockId(Hash); impl BlockId { pub(crate) const SIZE: usize = Hash::SIZE; - pub(crate) fn from_content(content: &[u8]) -> Self { - Self(content.hash()) + pub(crate) fn from_content(content: &BlockContent) -> Self { + Self(content.0[..].hash()) } } @@ -64,23 +69,101 @@ derive_sqlx_traits_for_byte_array_wrapper!(BlockId); derive_rand_for_wrapper!(BlockId); #[derive(Clone)] -pub(crate) struct BlockData { - pub content: Box<[u8]>, +pub(crate) struct Block { pub id: BlockId, + pub content: BlockContent, + pub nonce: BlockNonce, } -impl From> for BlockData { - fn from(content: Box<[u8]>) -> Self { +impl Block { + pub fn new(content: BlockContent, nonce: BlockNonce) -> Self { let id = BlockId::from_content(&content); - Self { content, id } + Self { id, content, nonce } } } -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> BlockData { +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> Block { + Block::new(rng.gen(), rng.gen()) + } +} + +#[derive(Clone, Serialize, Deserialize)] +pub(crate) struct BlockContent(Box<[u8]>); + +impl BlockContent { + pub fn new() -> Self { + Self::default() + } + + // Read data from `offset` of the buffer into a fixed-length array. + // + // # Panics + // + // Panics if the remaining length after `offset` is less than `N`. + pub fn read_array(&self, offset: usize) -> [u8; N] { + self[offset..offset + N].try_into().unwrap() + } + + // Read data from `offset` of the buffer into a `u64`. + // + // # Panics + // + // Panics if the remaining length is less than `size_of::()` + pub fn read_u64(&self, offset: usize) -> u64 { + u64::from_le_bytes(self.read_array(offset)) + } + + // Read data from offset into `dst`. + pub fn read(&self, offset: usize, dst: &mut [u8]) { + dst.copy_from_slice(&self.0[offset..offset + dst.len()]); + } + + // Write a `u64` at `offset` into the buffer. + pub fn write_u64(&mut self, offset: usize, value: u64) { + let bytes = value.to_le_bytes(); + self.write(offset, &bytes[..]); + } + + // Writes data from `dst` into the buffer. + pub fn write(&mut self, offset: usize, src: &[u8]) { + self.0[offset..offset + src.len()].copy_from_slice(src); + } +} + +impl Default for BlockContent { + fn default() -> Self { + Self(vec![0; BLOCK_SIZE].into_boxed_slice()) + } +} + +// Scramble the buffer on drop to prevent leaving decrypted data in memory past the buffer +// lifetime. +impl Drop for BlockContent { + fn drop(&mut self) { + self.0.zeroize() + } +} + +impl Deref for BlockContent { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for BlockContent { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> BlockContent { let mut content = vec![0; BLOCK_SIZE].into_boxed_slice(); rng.fill(&mut content[..]); - BlockData::from(content) + BlockContent(content) } } diff --git a/lib/src/protocol/leaf_node.rs b/lib/src/protocol/leaf_node.rs index ff938a21e..24cfd3cd0 100644 --- a/lib/src/protocol/leaf_node.rs +++ b/lib/src/protocol/leaf_node.rs @@ -67,37 +67,30 @@ impl LeafNodeSet { self.0.iter() } - /// Inserts a new node or updates it if already exists. - pub fn modify( + /// Inserts a new node + pub fn insert( &mut self, - locator: &Hash, - block_id: &BlockId, + locator: Hash, + block_id: BlockId, block_presence: SingleBlockPresence, - ) -> LeafNodeModifyStatus { - match self.lookup(locator) { + ) { + match self.lookup(&locator) { Ok(index) => { - let node = &mut self.0[index]; - - if &node.block_id == block_id { - LeafNodeModifyStatus::Unchanged - } else { - let old_block_id = node.block_id; - node.block_id = *block_id; - node.block_presence = block_presence; - - LeafNodeModifyStatus::Updated(old_block_id) - } + self.0[index] = LeafNode { + locator, + block_id, + block_presence, + }; } Err(index) => { self.0.insert( index, LeafNode { - locator: *locator, - block_id: *block_id, + locator, + block_id, block_presence, }, ); - LeafNodeModifyStatus::Inserted } } } @@ -107,6 +100,16 @@ impl LeafNodeSet { Some(self.0.remove(index)) } + pub fn remove_if(&mut self, locator: &Hash, block_id: &BlockId) -> Option { + let index = self.lookup(locator).ok()?; + + if self.0[index].block_id == *block_id { + Some(self.0.remove(index)) + } else { + None + } + } + /// Returns the same nodes but with the `block_presence` set to `Missing`. /// Equivalent to `self.into_iter().map(LeafNode::into_missing()).collect()` but without /// involving reallocation. @@ -165,11 +168,5 @@ impl Hashable for LeafNodeSet { } } -pub(crate) enum LeafNodeModifyStatus { - Updated(BlockId), - Inserted, - Unchanged, -} - // Cached hash of an empty LeafNodeSet. pub(crate) static EMPTY_LEAF_HASH: Lazy = Lazy::new(|| LeafNodeSet::default().hash()); diff --git a/lib/src/locator.rs b/lib/src/protocol/locator.rs similarity index 100% rename from lib/src/locator.rs rename to lib/src/protocol/locator.rs diff --git a/lib/src/protocol/mod.rs b/lib/src/protocol/mod.rs index 7cbf4af7d..91f13fd73 100644 --- a/lib/src/protocol/mod.rs +++ b/lib/src/protocol/mod.rs @@ -4,10 +4,10 @@ mod block; mod inner_node; mod leaf_node; +mod locator; mod proof; mod root_node; mod summary; -mod version_vector_op; #[cfg(test)] pub(crate) mod test_utils; @@ -15,13 +15,13 @@ pub(crate) mod test_utils; pub use self::block::BLOCK_SIZE; pub(crate) use self::{ - block::{BlockData, BlockId, BlockNonce, BLOCK_RECORD_SIZE}, + block::{Block, BlockContent, BlockId, BlockNonce, BLOCK_RECORD_SIZE}, inner_node::{get_bucket, InnerNode, InnerNodeMap, EMPTY_INNER_HASH, INNER_LAYER_COUNT}, - leaf_node::{LeafNode, LeafNodeModifyStatus, LeafNodeSet, EMPTY_LEAF_HASH}, + leaf_node::{LeafNode, LeafNodeSet, EMPTY_LEAF_HASH}, + locator::Locator, proof::{Proof, ProofError, UntrustedProof}, root_node::RootNode, summary::{MultiBlockPresence, NodeState, SingleBlockPresence, Summary}, - version_vector_op::VersionVectorOp, }; #[cfg(test)] diff --git a/lib/src/protocol/root_node.rs b/lib/src/protocol/root_node.rs index a5c8e1735..4a8c60b24 100644 --- a/lib/src/protocol/root_node.rs +++ b/lib/src/protocol/root_node.rs @@ -1,14 +1,12 @@ -use super::{MultiBlockPresence, NodeState, Proof, Summary, EMPTY_INNER_HASH}; +use super::{Proof, Summary}; use crate::{ - crypto::sign::{Keypair, PublicKey}, + crypto::sign::PublicKey, version_vector::VersionVector, versioned::{BranchItem, Versioned}, }; pub(crate) type SnapshotId = u32; -const EMPTY_SNAPSHOT_ID: SnapshotId = 0; - #[derive(Clone, Eq, PartialEq, Debug)] pub(crate) struct RootNode { pub snapshot_id: SnapshotId, @@ -16,27 +14,6 @@ pub(crate) struct RootNode { pub summary: Summary, } -impl RootNode { - /// Creates a root node with no children without storing it in the database. - pub fn empty(writer_id: PublicKey, write_keys: &Keypair) -> Self { - let proof = Proof::new( - writer_id, - VersionVector::new(), - *EMPTY_INNER_HASH, - write_keys, - ); - - Self { - snapshot_id: EMPTY_SNAPSHOT_ID, - proof, - summary: Summary { - state: NodeState::Approved, - block_presence: MultiBlockPresence::Full, - }, - } - } -} - impl Versioned for RootNode { fn version_vector(&self) -> &VersionVector { &self.proof.version_vector diff --git a/lib/src/protocol/summary.rs b/lib/src/protocol/summary.rs index 7aa51ac6d..c2530bb44 100644 --- a/lib/src/protocol/summary.rs +++ b/lib/src/protocol/summary.rs @@ -107,10 +107,6 @@ impl NodeState { matches!(self, Self::Approved) } - pub fn is_incomplete(self) -> bool { - matches!(self, Self::Incomplete) - } - pub fn update(&mut self, other: Self) { *self = match (*self, other) { (Self::Incomplete, _) | (_, Self::Incomplete) => Self::Incomplete, @@ -161,13 +157,6 @@ pub(crate) enum SingleBlockPresence { } impl SingleBlockPresence { - pub fn is_present(self) -> bool { - match self { - Self::Missing | Self::Expired => false, - Self::Present => true, - } - } - pub fn is_missing(self) -> bool { match self { Self::Missing => true, diff --git a/lib/src/protocol/test_utils.rs b/lib/src/protocol/test_utils.rs index 890ec76bd..6b4aa0526 100644 --- a/lib/src/protocol/test_utils.rs +++ b/lib/src/protocol/test_utils.rs @@ -7,17 +7,14 @@ use crate::{ Hash, Hashable, }, protocol::{ - get_bucket, BlockData, BlockId, BlockNonce, InnerNode, InnerNodeMap, LeafNode, LeafNodeSet, + get_bucket, Block, BlockId, InnerNode, InnerNodeMap, LeafNode, LeafNodeSet, INNER_LAYER_COUNT, }, repository::Vault, store::ReceiveFilter, version_vector::VersionVector, }; -use rand::{ - distributions::{Distribution, Standard}, - Rng, -}; +use rand::{distributions::Standard, Rng}; use std::mem; // In-memory snapshot for testing purposes. @@ -41,14 +38,14 @@ impl Snapshot { let mut leaves = HashMap::default(); for (locator, block) in locators_and_blocks { - let id = block.data.id; + let id = block.id; blocks.insert(id, block); let node = LeafNode::present(locator, id); leaves .entry(BucketPath::new(&node.locator, INNER_LAYER_COUNT - 1)) .or_insert_with(LeafNodeSet::default) - .modify(&node.locator, &node.block_id, SingleBlockPresence::Present); + .insert(node.locator, node.block_id, SingleBlockPresence::Present); } let mut inners: [HashMap<_, InnerNodeMap>; INNER_LAYER_COUNT] = Default::default(); @@ -133,27 +130,6 @@ impl<'a> InnerLayer<'a> { } } -#[derive(Clone)] -pub(crate) struct Block { - pub data: BlockData, - pub nonce: BlockNonce, -} - -impl Block { - pub fn id(&self) -> &BlockId { - &self.data.id - } -} - -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> Block { - Block { - data: rng.gen(), - nonce: rng.gen(), - } - } -} - // Receive all nodes in `snapshot` into `index`. pub(crate) async fn receive_nodes( vault: &Vault, @@ -191,13 +167,11 @@ pub(crate) async fn receive_blocks(repo: &Vault, snapshot: &Snapshot) { let acceptor = client.acceptor(); for block in snapshot.blocks().values() { - repo.block_tracker.require(*block.id()); - client.offer(*block.id(), OfferState::Approved); + repo.block_tracker.require(block.id); + client.offer(block.id, OfferState::Approved); let promise = acceptor.try_accept().unwrap(); - repo.receive_block(&block.data, &block.nonce, Some(promise)) - .await - .unwrap(); + repo.receive_block(block, Some(promise)).await.unwrap(); } } diff --git a/lib/src/protocol/version_vector_op.rs b/lib/src/protocol/version_vector_op.rs deleted file mode 100644 index 2f59a2dd6..000000000 --- a/lib/src/protocol/version_vector_op.rs +++ /dev/null @@ -1,21 +0,0 @@ -use crate::{crypto::sign::PublicKey, version_vector::VersionVector}; - -/// Operation on version vector -#[derive(Clone, Copy, Debug)] -pub(crate) enum VersionVectorOp<'a> { - IncrementLocal, - Merge(&'a VersionVector), -} - -impl VersionVectorOp<'_> { - pub fn apply(self, local_id: &PublicKey, target: &mut VersionVector) { - match self { - Self::IncrementLocal => { - target.increment(*local_id); - } - Self::Merge(other) => { - target.merge(other); - } - } - } -} diff --git a/lib/src/repository/tests.rs b/lib/src/repository/tests.rs index 92787919e..1e2da8567 100644 --- a/lib/src/repository/tests.rs +++ b/lib/src/repository/tests.rs @@ -74,7 +74,12 @@ async fn count_leaf_nodes_sanity_checks() { //------------------------------------------------------------------------ // 1 = one for the root with a tombstone entry wait_for(&repo, || async { - count_local_index_leaf_nodes(&repo).await == 1 + let actual = count_local_index_leaf_nodes(&repo).await; + let expected = 1; + + tracing::trace!(actual, expected, "local leaf node count"); + + actual == expected }) .await; } diff --git a/lib/src/repository/vault.rs b/lib/src/repository/vault.rs index 99486c9a1..4b5521489 100644 --- a/lib/src/repository/vault.rs +++ b/lib/src/repository/vault.rs @@ -9,8 +9,7 @@ use crate::{ error::{Error, Result}, event::{EventSender, Payload}, protocol::{ - BlockData, BlockId, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence, ProofError, - UntrustedProof, + Block, BlockId, InnerNodeMap, LeafNodeSet, MultiBlockPresence, ProofError, UntrustedProof, }, storage_size::StorageSize, store::{ @@ -121,17 +120,12 @@ impl Vault { } /// Receive a block from other replica. - pub async fn receive_block( - &self, - data: &BlockData, - nonce: &BlockNonce, - promise: Option, - ) -> Result<()> { - let block_id = data.id; + pub async fn receive_block(&self, block: &Block, promise: Option) -> Result<()> { + let block_id = block.id; let event_tx = self.event_tx.clone(); let mut tx = self.store().begin_write().await?; - let status = match tx.receive_block(&block_id, data, nonce).await { + let status = match tx.receive_block(block).await { Ok(status) => status, Err(error) => { if matches!(error, store::Error::BlockNotReferenced) { diff --git a/lib/src/repository/vault_tests.rs b/lib/src/repository/vault_tests.rs index e46442cd4..3bddbb41f 100644 --- a/lib/src/repository/vault_tests.rs +++ b/lib/src/repository/vault_tests.rs @@ -10,16 +10,15 @@ use crate::{ db, error::Error, event::EventSender, - locator::Locator, metrics::Metrics, progress::Progress, protocol::{ - test_utils::{receive_blocks, receive_nodes, Block, Snapshot}, - BlockId, MultiBlockPresence, NodeState, Proof, SingleBlockPresence, BLOCK_SIZE, - EMPTY_INNER_HASH, + test_utils::{receive_blocks, receive_nodes, Snapshot}, + Block, BlockContent, BlockId, Locator, MultiBlockPresence, NodeState, Proof, + SingleBlockPresence, EMPTY_INNER_HASH, }, state_monitor::StateMonitor, - store::{self, ReadTransaction}, + store::{self, Changeset, ReadTransaction}, test_utils, version_vector::VersionVector, }; @@ -189,24 +188,16 @@ async fn receive_root_node_with_existing_hash() { let remote_id = PublicKey::generate(&mut rng); // Create one block locally - let mut content = vec![0; BLOCK_SIZE]; - rng.fill(&mut content[..]); - - let block_id = BlockId::from_content(&content); - let block_nonce = rng.gen(); + let block: Block = rng.gen(); let locator = rng.gen(); let mut tx = vault.store().begin_write().await.unwrap(); - tx.link_block( - &local_id, - &locator, - &block_id, - SingleBlockPresence::Present, - &secrets.write_keys, - ) - .await - .unwrap(); - tx.write_block(&block_id, &content, &block_nonce) + let mut changeset = Changeset::new(); + + changeset.link_block(locator, block.id, SingleBlockPresence::Present); + changeset.write_block(block); + changeset + .apply(&mut tx, &local_id, &secrets.write_keys) .await .unwrap(); tx.commit().await.unwrap(); @@ -290,23 +281,20 @@ mod receive_and_create_root_node { // Insert one present and two missing, so the root block presence is `Some` let mut tx = vault.store().begin_write().await.unwrap(); + let mut changeset = Changeset::new(); for (locator, block_id, presence) in [ (locator_0, block_id_0_0, SingleBlockPresence::Present), - (locator_1, block_1.data.id, SingleBlockPresence::Missing), + (locator_1, block_1.id, SingleBlockPresence::Missing), (locator_2, block_id_2, SingleBlockPresence::Missing), ] { - tx.link_block( - &local_id, - &locator, - &block_id, - presence, - &secrets.write_keys, - ) - .await - .unwrap(); + changeset.link_block(locator, block_id, presence); } + changeset + .apply(&mut tx, &local_id, &secrets.write_keys) + .await + .unwrap(); tx.commit().await.unwrap(); let root_node_0 = vault @@ -323,9 +311,7 @@ mod receive_and_create_root_node { // Mark one of the missing block as present so the block presences are different (but still // `Some`). let mut tx = vault.store().begin_write().await.unwrap(); - tx.receive_block(block_1.id(), &block_1.data, &block_1.nonce) - .await - .unwrap(); + tx.receive_block(&block_1).await.unwrap(); tx.commit().await.unwrap(); // Receive the same node we already have. The hashes and version vectors are equal but the @@ -352,15 +338,12 @@ mod receive_and_create_root_node { task::yield_now().await; } - tx.link_block( - &local_id, - &locator_0, - &block_id_0_1, - SingleBlockPresence::Present, - &secrets.write_keys, - ) - .await - .unwrap(); + let mut changeset = Changeset::new(); + changeset.link_block(locator_0, block_id_0_1, SingleBlockPresence::Present); + changeset + .apply(&mut tx, &local_id, &secrets.write_keys) + .await + .unwrap(); tx.commit().await.unwrap(); }; @@ -594,10 +577,10 @@ async fn receive_valid_blocks() { let mut reader = vault.store().acquire_read().await.unwrap(); for (id, block) in snapshot.blocks() { - let mut content = vec![0; BLOCK_SIZE]; + let mut content = BlockContent::new(); let nonce = reader.read_block(id, &mut content).await.unwrap(); - assert_eq!(&content[..], &block.data.content[..]); + assert_eq!(&content[..], &block.content[..]); assert_eq!(nonce, block.nonce); assert_eq!(BlockId::from_content(&content), *id); } @@ -611,14 +594,12 @@ async fn receive_orphaned_block() { let block_tracker = vault.block_tracker.client(); for block in snapshot.blocks().values() { - vault.block_tracker.require(*block.id()); - block_tracker.offer(*block.id(), OfferState::Approved); + vault.block_tracker.require(block.id); + block_tracker.offer(block.id, OfferState::Approved); let promise = block_tracker.acceptor().try_accept().unwrap(); assert_matches!( - vault - .receive_block(&block.data, &block.nonce, Some(promise)) - .await, + vault.receive_block(block, Some(promise)).await, Err(Error::Store(store::Error::BlockNotReferenced)) ); } @@ -910,15 +891,12 @@ async fn block_ids_local() { let block_id = rand::random(); let mut tx = vault.store().begin_write().await.unwrap(); - tx.link_block( - &branch_id, - &locator, - &block_id, - SingleBlockPresence::Present, - &secrets.write_keys, - ) - .await - .unwrap(); + let mut changeset = Changeset::new(); + changeset.link_block(locator, block_id, SingleBlockPresence::Present); + changeset + .apply(&mut tx, &branch_id, &secrets.write_keys) + .await + .unwrap(); tx.commit().await.unwrap(); let actual = vault.store().block_ids(u32::MAX).next().await.unwrap(); @@ -1038,7 +1016,7 @@ async fn block_ids_multiple_branches() { let actual = vault.store().block_ids(u32::MAX).next().await.unwrap(); let expected = all_blocks .iter() - .map(|(_, block)| block.id()) + .map(|(_, block)| &block.id) .copied() .collect(); @@ -1203,9 +1181,7 @@ async fn receive_snapshot( async fn receive_block(vault: &Vault, block: &Block) { let mut tx = vault.store().begin_write().await.unwrap(); - tx.receive_block(block.id(), &block.data, &block.nonce) - .await - .unwrap(); + tx.receive_block(block).await.unwrap(); tx.commit().await.unwrap(); } diff --git a/lib/src/repository/worker.rs b/lib/src/repository/worker.rs index abddfac6d..2f2c11cc1 100644 --- a/lib/src/repository/worker.rs +++ b/lib/src/repository/worker.rs @@ -119,33 +119,33 @@ async fn maintain( // Merge branches if let Some(local_branch) = local_branch { - success = success - && shared - .vault - .monitor - .merge_job - .run(merge::run(shared, local_branch)) - .await; - } - - // Prune outdated branches and snapshots - success = success - && shared + let job_success = shared .vault .monitor - .prune_job - .run(prune::run(shared, unlock_tx, prune_counter)) + .merge_job + .run(merge::run(shared, local_branch)) .await; + success = success && job_success; + } + + // Prune outdated branches and snapshots + let job_success = shared + .vault + .monitor + .prune_job + .run(prune::run(shared, unlock_tx, prune_counter)) + .await; + success = success && job_success; // Collect unreachable blocks if shared.secrets.can_read() { - success = success - && shared - .vault - .monitor - .trash_job - .run(trash::run(shared, local_branch, unlock_tx)) - .await; + let job_success = shared + .vault + .monitor + .trash_job + .run(trash::run(shared, local_branch, unlock_tx)) + .await; + success = success && job_success; } if success { @@ -410,13 +410,11 @@ mod prune { mod trash { use super::*; use crate::{ - crypto::sign::{Keypair, PublicKey}, protocol::BlockId, - store::{self, WriteTransaction}, + store::{Changeset, ReadTransaction, WriteTransaction}, }; use futures_util::TryStreamExt; use std::collections::BTreeSet; - use tracing::Instrument; pub(super) async fn run( shared: &Shared, @@ -607,7 +605,11 @@ mod trash { total_count += batch.len(); if let Some((local_branch, write_keys)) = &local_branch_and_write_keys { - remove_local_nodes(&mut tx, local_branch.id(), write_keys, &batch).await?; + let mut changeset = Changeset::new(); + remove_local_nodes(&mut tx, &mut changeset, &batch).await?; + changeset + .apply(&mut tx, local_branch.id(), write_keys) + .await?; } remove_blocks(&mut tx, &batch).await?; @@ -633,24 +635,16 @@ mod trash { } async fn remove_local_nodes( - tx: &mut WriteTransaction, - branch_id: &PublicKey, - write_keys: &Keypair, + tx: &mut ReadTransaction, + changeset: &mut Changeset, block_ids: &[BlockId], ) -> Result<()> { for block_id in block_ids { let locators: Vec<_> = tx.load_locators(block_id).try_collect().await?; - let span = tracing::info_span!("remove_local_node", ?block_id); for locator in locators { - match tx - .unlink_block(branch_id, &locator, Some(block_id), write_keys) - .instrument(span.clone()) - .await - { - Ok(()) | Err(store::Error::LocatorNotFound) => (), - Err(error) => return Err(error.into()), - } + tracing::trace!(?block_id, "unreachable local node removed"); + changeset.unlink_block(locator, Some(*block_id)); } } diff --git a/lib/src/store/block.rs b/lib/src/store/block.rs index 5bf353b7b..15ee4de22 100644 --- a/lib/src/store/block.rs +++ b/lib/src/store/block.rs @@ -9,7 +9,7 @@ use crate::{ crypto::sign::PublicKey, db, future::try_collect_into, - protocol::{BlockData, BlockId, BlockNonce, BLOCK_SIZE}, + protocol::{Block, BlockContent, BlockId, BlockNonce, BLOCK_SIZE}, }; use futures_util::TryStreamExt; use sqlx::Row; @@ -24,8 +24,7 @@ pub(crate) struct ReceiveStatus { pub(super) async fn receive( write_tx: &mut db::WriteTransaction, cache_tx: &mut CacheTransaction, - block: &BlockData, - nonce: &BlockNonce, + block: &Block, ) -> Result { if !leaf_node::set_present(write_tx, &block.id).await? { return Ok(ReceiveStatus::default()); @@ -47,7 +46,7 @@ pub(super) async fn receive( try_collect_into(root_node::load_writer_ids(write_tx, &hash), &mut branches).await?; } - write(write_tx, &block.id, &block.content, nonce).await?; + write(write_tx, block).await?; Ok(ReceiveStatus { branches }) } @@ -60,10 +59,10 @@ pub(super) async fn receive( pub(super) async fn read( conn: &mut db::Connection, id: &BlockId, - buffer: &mut [u8], + content: &mut BlockContent, ) -> Result { assert!( - buffer.len() >= BLOCK_SIZE, + content.len() >= BLOCK_SIZE, "insufficient buffer length for block read" ); @@ -80,17 +79,17 @@ pub(super) async fn read( let nonce: &[u8] = row.get(0); let nonce = BlockNonce::try_from(nonce).map_err(|_| Error::MalformedData)?; - let content: &[u8] = row.get(1); - if content.len() != BLOCK_SIZE { + let src_content: &[u8] = row.get(1); + if src_content.len() != BLOCK_SIZE { tracing::error!( expected = BLOCK_SIZE, - actual = content.len(), + actual = src_content.len(), "wrong block length" ); return Err(Error::MalformedData); } - buffer.copy_from_slice(content); + content.copy_from_slice(src_content); Ok(nonce) } @@ -103,14 +102,9 @@ pub(super) async fn read( /// /// Panics if buffer length is not equal to [`BLOCK_SIZE`]. /// -pub(super) async fn write( - tx: &mut db::WriteTransaction, - id: &BlockId, - buffer: &[u8], - nonce: &BlockNonce, -) -> Result<(), Error> { +pub(super) async fn write(tx: &mut db::WriteTransaction, block: &Block) -> Result<(), Error> { assert_eq!( - buffer.len(), + block.content.len(), BLOCK_SIZE, "incorrect buffer length for block write" ); @@ -120,9 +114,9 @@ pub(super) async fn write( VALUES (?, ?, ?) ON CONFLICT (id) DO NOTHING", ) - .bind(id) - .bind(nonce.as_slice()) - .bind(buffer) + .bind(&block.id) + .bind(&block.nonce[..]) + .bind(&block.content[..]) .execute(tx) .await?; @@ -160,37 +154,34 @@ pub(super) async fn exists(conn: &mut db::Connection, id: &BlockId) -> Result (), Err(error) => panic!("unexpected error: {:?}", error), Ok(_) => panic!("unexpected success"), @@ -201,23 +192,15 @@ mod tests { async fn try_write_existing_block() { let (_base_dir, pool) = setup().await; - let content0 = random_block_content(); - let id = BlockId::from_content(&content0); - let nonce = BlockNonce::default(); + let block: Block = rand::random(); let mut tx = pool.begin_write().await.unwrap(); - write(&mut tx, &id, &content0, &nonce).await.unwrap(); - write(&mut tx, &id, &content0, &nonce).await.unwrap(); + write(&mut tx, &block).await.unwrap(); + write(&mut tx, &block).await.unwrap(); } async fn setup() -> (TempDir, db::Pool) { db::create_temp().await.unwrap() } - - fn random_block_content() -> Vec { - let mut content = vec![0; BLOCK_SIZE]; - rand::thread_rng().fill(&mut content[..]); - content - } } diff --git a/lib/src/store/block_expiration_tracker.rs b/lib/src/store/block_expiration_tracker.rs index a98be3a11..f63f7edc0 100644 --- a/lib/src/store/block_expiration_tracker.rs +++ b/lib/src/store/block_expiration_tracker.rs @@ -77,10 +77,10 @@ impl BlockExpirationTracker { }) } - pub fn handle_block_update(&self, block: &BlockId) { + pub fn handle_block_update(&self, block_id: &BlockId) { // Not inlining these lines to call `SystemTime::now()` only once the `lock` is acquired. let mut lock = self.shared.lock().unwrap(); - lock.handle_block_update(block, SystemTime::now()); + lock.handle_block_update(block_id, SystemTime::now()); drop(lock); self.watch_tx.send(()).unwrap_or(()); } @@ -296,8 +296,7 @@ async fn run_task( mod test { use super::super::*; use super::*; - use crate::protocol::{BLOCK_NONCE_SIZE, BLOCK_SIZE}; - use rand::Rng; + use crate::crypto::sign::Keypair; use tempfile::TempDir; #[test] @@ -383,24 +382,15 @@ mod test { async fn add_block(write_keys: &Keypair, branch_id: &PublicKey, store: &Store) -> BlockId { let mut writer = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); - let block_id: BlockId = rand::random(); - let block_data = random_block_content(); - let block_nonce: [u8; BLOCK_NONCE_SIZE] = rand::random(); + let block: Block = rand::random(); + let block_id = block.id; - writer - .write_block(&block_id, &block_data, &block_nonce) - .await - .unwrap(); - - writer - .link_block( - branch_id, - &rand::random(), - &block_id, - SingleBlockPresence::Present, - write_keys, - ) + changeset.write_block(block); + changeset.link_block(rand::random(), block_id, SingleBlockPresence::Present); + changeset + .apply(&mut writer, branch_id, write_keys) .await .unwrap(); @@ -409,12 +399,6 @@ mod test { block_id } - fn random_block_content() -> Vec { - let mut content = vec![0; BLOCK_SIZE]; - rand::thread_rng().fill(&mut content[..]); - content - } - async fn count_blocks(pool: &db::Pool) -> u64 { block::count(&mut pool.acquire().await.unwrap()) .await diff --git a/lib/src/store/changeset.rs b/lib/src/store/changeset.rs new file mode 100644 index 000000000..dc01f087d --- /dev/null +++ b/lib/src/store/changeset.rs @@ -0,0 +1,82 @@ +use super::{block, error::Error, patch::Patch, WriteTransaction}; +use crate::{ + crypto::{ + sign::{Keypair, PublicKey}, + Hash, + }, + protocol::{Block, BlockId, SingleBlockPresence}, + version_vector::VersionVector, +}; + +/// Recorded changes to be applied to the store as a single unit. +#[derive(Default)] +pub(crate) struct Changeset { + links: Vec<(Hash, BlockId, SingleBlockPresence)>, + unlinks: Vec<(Hash, Option)>, + blocks: Vec, + bump: VersionVector, +} + +impl Changeset { + pub fn new() -> Self { + Self::default() + } + + /// Applies this changeset to the transaction. + pub async fn apply( + self, + tx: &mut WriteTransaction, + branch_id: &PublicKey, + write_keys: &Keypair, + ) -> Result<(), Error> { + let mut patch = Patch::new(tx, *branch_id).await?; + + for (encoded_locator, block_id, block_presence) in self.links { + patch + .insert(tx, encoded_locator, block_id, block_presence) + .await?; + } + + for (encoded_locator, expected_block_id) in self.unlinks { + patch + .remove(tx, &encoded_locator, expected_block_id.as_ref()) + .await?; + } + + patch.save(tx, &self.bump, write_keys).await?; + + for block in self.blocks { + if let Some(tracker) = &tx.block_expiration_tracker { + tracker.handle_block_update(&block.id); + } + + block::write(tx.db(), &block).await?; + } + + Ok(()) + } + + /// Links the given block id into the given branch under the given locator. + pub fn link_block( + &mut self, + encoded_locator: Hash, + block_id: BlockId, + block_presence: SingleBlockPresence, + ) { + self.links.push((encoded_locator, block_id, block_presence)); + } + + pub fn unlink_block(&mut self, encoded_locator: Hash, expected_block_id: Option) { + self.unlinks.push((encoded_locator, expected_block_id)); + } + + /// Writes a block into the store. + pub fn write_block(&mut self, block: Block) { + self.blocks.push(block); + } + + /// Update the root version vector. + pub fn bump(&mut self, merge: &VersionVector) { + self.bump.merge(merge); + } +} diff --git a/lib/src/store/index.rs b/lib/src/store/index.rs index 877cdab50..52be7f92f 100644 --- a/lib/src/store/index.rs +++ b/lib/src/store/index.rs @@ -574,7 +574,7 @@ mod tests { let mut received_blocks = 0; for block in snapshot.blocks().values() { - block::receive(&mut write_tx, &mut cache_tx, &block.data, &block.nonce) + block::receive(&mut write_tx, &mut cache_tx, block) .await .unwrap(); received_blocks += 1; diff --git a/lib/src/store/inner_node.rs b/lib/src/store/inner_node.rs index 26d4f1cad..c886c798f 100644 --- a/lib/src/store/inner_node.rs +++ b/lib/src/store/inner_node.rs @@ -99,6 +99,9 @@ pub(super) async fn save( parent: &Hash, bucket: u8, ) -> Result<(), Error> { + debug_assert_ne!(node.hash, *EMPTY_INNER_HASH); + debug_assert_ne!(node.hash, *EMPTY_LEAF_HASH); + sqlx::query( "INSERT INTO snapshot_inner_nodes ( parent, diff --git a/lib/src/store/leaf_node.rs b/lib/src/store/leaf_node.rs index 11e4fa88e..088d2f734 100644 --- a/lib/src/store/leaf_node.rs +++ b/lib/src/store/leaf_node.rs @@ -157,7 +157,7 @@ pub(super) async fn set_missing( /// Returns true the block changed status from expired to missing pub(super) async fn set_missing_if_expired( tx: &mut db::WriteTransaction, - block: &BlockId, + block_id: &BlockId, ) -> Result { let result = sqlx::query( "UPDATE snapshot_leaf_nodes @@ -165,13 +165,13 @@ pub(super) async fn set_missing_if_expired( WHERE block_id = ? AND block_presence = ?", ) .bind(SingleBlockPresence::Missing) - .bind(block) + .bind(block_id) .bind(SingleBlockPresence::Expired) .execute(tx) .await?; if result.rows_affected() > 0 { - tracing::warn!("Marking 'Expired' block {block:?} as 'Missing'"); + tracing::warn!("Marking 'Expired' block {block_id:?} as 'Missing'"); return Ok(true); } diff --git a/lib/src/store/mod.rs b/lib/src/store/mod.rs index 9d893c4d9..d09313541 100644 --- a/lib/src/store/mod.rs +++ b/lib/src/store/mod.rs @@ -2,11 +2,12 @@ mod block; mod block_expiration_tracker; mod block_ids; mod cache; +mod changeset; mod error; mod index; mod inner_node; mod leaf_node; -mod path; +mod patch; mod quota; mod receive_filter; mod root_node; @@ -17,7 +18,7 @@ mod tests; pub use error::Error; pub(crate) use { - block::ReceiveStatus as BlockReceiveStatus, block_ids::BlockIdsPage, + block::ReceiveStatus as BlockReceiveStatus, block_ids::BlockIdsPage, changeset::Changeset, inner_node::ReceiveStatus as InnerNodeReceiveStatus, leaf_node::ReceiveStatus as LeafNodeReceiveStatus, receive_filter::ReceiveFilter, root_node::ReceiveStatus as RootNodeReceiveStatus, @@ -27,21 +28,17 @@ use self::{ block_expiration_tracker::BlockExpirationTracker, cache::{Cache, CacheTransaction}, index::UpdateSummaryReason, - path::Path, }; use crate::{ collections::HashSet, - crypto::{ - sign::{Keypair, PublicKey}, - CacheHash, Hash, Hashable, - }, + crypto::{sign::PublicKey, CacheHash, Hash, Hashable}, db, debug::DebugPrinter, future::try_collect_into, progress::Progress, protocol::{ - self, BlockData, BlockId, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence, Proof, - RootNode, SingleBlockPresence, Summary, VersionVectorOp, INNER_LAYER_COUNT, + get_bucket, Block, BlockContent, BlockId, BlockNonce, InnerNodeMap, LeafNodeSet, + MultiBlockPresence, Proof, RootNode, Summary, INNER_LAYER_COUNT, }, storage_size::StorageSize, }; @@ -52,7 +49,6 @@ use std::{ sync::Arc, time::Duration, }; -use tracing::Instrument; // TODO: Consider creating an async `RwLock` in the `deadlock` module and use it here. use tokio::sync::{broadcast, RwLock}; @@ -105,7 +101,11 @@ impl Store { } pub async fn block_expiration(&self) -> Option { - (*self.block_expiration_tracker.read().await).as_ref().map(|tracker| tracker.block_expiration()) + self.block_expiration_tracker + .read() + .await + .as_ref() + .map(|tracker| tracker.block_expiration()) } /// Acquires a `Reader` @@ -253,13 +253,13 @@ impl Reader { pub async fn read_block( &mut self, id: &BlockId, - buffer: &mut [u8], + content: &mut BlockContent, ) -> Result { if let Some(expiration_tracker) = &self.block_expiration_tracker { expiration_tracker.handle_block_update(id); } - let result = block::read(self.db(), id, buffer).await; + let result = block::read(self.db(), id, content).await; if matches!(result, Err(Error::BlockNotFound)) { self.set_as_missing_if_expired(id).await?; @@ -271,14 +271,14 @@ impl Reader { pub async fn read_block_on_peer_request( &mut self, id: &BlockId, - buffer: &mut [u8], + content: &mut BlockContent, block_tracker: &crate::block_tracker::BlockTracker, ) -> Result { if let Some(expiration_tracker) = &self.block_expiration_tracker { expiration_tracker.handle_block_update(id); } - let result = block::read(self.db(), id, buffer).await; + let result = block::read(self.db(), id, content).await; if matches!(result, Err(Error::BlockNotFound)) && self.set_as_missing_if_expired(id).await? { block_tracker.require(*id); @@ -381,10 +381,12 @@ impl Reader { root_node::exists(self.db(), node).await } + // TODO: use cache and remove `ReadTransaction::load_inner_nodes_with_cache` pub async fn load_inner_nodes(&mut self, parent_hash: &Hash) -> Result { inner_node::load_children(self.db(), parent_hash).await } + // TODO: use cache and remove `ReadTransaction::load_leaf_nodes_with_cache` pub async fn load_leaf_nodes(&mut self, parent_hash: &Hash) -> Result { leaf_node::load_children(self.db(), parent_hash).await } @@ -431,38 +433,31 @@ impl ReadTransaction { root_node: &RootNode, encoded_locator: &Hash, ) -> Result { - let path = self.load_path(root_node, encoded_locator).await?; - path.get_leaf().ok_or(Error::LocatorNotFound) - } - - async fn load_path( - &mut self, - root_node: &RootNode, - encoded_locator: &Hash, - ) -> Result { - let mut path = Path::new(root_node.proof.hash, root_node.summary, *encoded_locator); - let mut parent = path.root_hash; + // TODO: On cache miss load only the one node we actually need per layer. - for level in 0..INNER_LAYER_COUNT { - path.inner[level] = self.load_inner_nodes_with_cache(&parent).await?; + let mut parent_hash = root_node.proof.hash; - if let Some(node) = path.inner[level].get(path.get_bucket(level)) { - parent = node.hash - } else { - return Ok(path); - }; + for layer in 0..INNER_LAYER_COUNT { + parent_hash = self + .load_inner_nodes_with_cache(&parent_hash) + .await? + .get(get_bucket(encoded_locator, layer)) + .ok_or(Error::LocatorNotFound)? + .hash; } - path.leaves = self.load_leaf_nodes_with_cache(&parent).await?; - - Ok(path) + self.load_leaf_nodes_with_cache(&parent_hash) + .await? + .get(encoded_locator) + .map(|node| node.block_id) + .ok_or(Error::LocatorNotFound) } async fn load_inner_nodes_with_cache( &mut self, parent_hash: &Hash, ) -> Result { - if let Some(nodes) = self.inner.cache.get_inners(parent_hash) { + if let Some(nodes) = self.cache.get_inners(parent_hash) { return Ok(nodes); } @@ -473,7 +468,7 @@ impl ReadTransaction { &mut self, parent_hash: &Hash, ) -> Result { - if let Some(nodes) = self.inner.cache.get_leaves(parent_hash) { + if let Some(nodes) = self.cache.get_leaves(parent_hash) { return Ok(nodes); } @@ -500,78 +495,6 @@ pub(crate) struct WriteTransaction { } impl WriteTransaction { - /// Links the given block id into the given branch under the given locator. - pub async fn link_block( - &mut self, - branch_id: &PublicKey, - encoded_locator: &Hash, - block_id: &BlockId, - block_presence: SingleBlockPresence, - write_keys: &Keypair, - ) -> Result { - let root_node = self.load_or_create_root_node(branch_id, write_keys).await?; - let mut path = self.load_path(&root_node, encoded_locator).await?; - - if path.has_leaf(block_id) { - return Ok(false); - } - - path.set_leaf(block_id, block_presence); - - self.save_path(path, &root_node, write_keys).await?; - - Ok(true) - } - - /// Unlinks (removes) the given block id from the given branch and locator. If - /// `expected_block_id` is `Some`, then the block is unlinked only if its id matches it, - /// otherwise it's removed unconditionally. - pub async fn unlink_block( - &mut self, - branch_id: &PublicKey, - encoded_locator: &Hash, - expected_block_id: Option<&BlockId>, - write_keys: &Keypair, - ) -> Result<(), Error> { - let root_node = self.load_root_node(branch_id).await?; - let mut path = self.load_path(&root_node, encoded_locator).await?; - - let block_id = path - .remove_leaf(encoded_locator) - .ok_or(Error::LocatorNotFound)?; - - if let Some(expected_block_id) = expected_block_id { - if &block_id != expected_block_id { - return Ok(()); - } - } - - self.save_path(path, &root_node, write_keys).await?; - - Ok(()) - } - - /// Writes a block into the store. - /// - /// If a block with the same id already exists, this is a no-op. - /// - /// # Panics - /// - /// Panics if buffer length is not equal to [`BLOCK_SIZE`]. - /// - pub async fn write_block( - &mut self, - id: &BlockId, - buffer: &[u8], - nonce: &BlockNonce, - ) -> Result<(), Error> { - if let Some(tracker) = &self.block_expiration_tracker { - tracker.handle_block_update(id); - } - - block::write(self.db(), id, buffer, nonce).await - } - /// Removes the specified block from the store and marks it as missing in the index. pub async fn remove_block(&mut self, id: &BlockId) -> Result<(), Error> { let (db, cache) = self.db_and_cache(); @@ -591,35 +514,6 @@ impl WriteTransaction { Ok(()) } - /// Update the root version vector of the given branch. - pub async fn bump( - &mut self, - branch_id: &PublicKey, - op: VersionVectorOp<'_>, - write_keys: &Keypair, - ) -> Result<(), Error> { - let root_node = self.load_or_create_root_node(branch_id, write_keys).await?; - - let mut new_vv = root_node.proof.version_vector.clone(); - op.apply(branch_id, &mut new_vv); - - // Sometimes `op` is a no-op. This is not an error. - if new_vv == root_node.proof.version_vector { - return Ok(()); - } - - let new_proof = Proof::new( - root_node.proof.writer_id, - new_vv, - root_node.proof.hash, - write_keys, - ); - - self.create_root_node(new_proof, root_node.summary) - .instrument(tracing::info_span!("bump")) - .await - } - pub async fn remove_branch(&mut self, root_node: &RootNode) -> Result<(), Error> { root_node::remove_older(self.db(), root_node).await?; root_node::remove(self.db(), root_node).await?; @@ -735,24 +629,19 @@ impl WriteTransaction { /// is returned. pub(crate) async fn receive_block( &mut self, - block_id: &BlockId, - data: &BlockData, - nonce: &BlockNonce, + block: &Block, ) -> Result { if let Some(tracker) = &self.block_expiration_tracker { - tracker.handle_block_update(block_id); + tracker.handle_block_update(&block.id); } let (db, cache) = self.db_and_cache(); - block::receive(db, cache, data, nonce).await + block::receive(db, cache, block).await } pub async fn commit(self) -> Result<(), Error> { - let inner = match self.inner.inner.inner { - Handle::WriteTransaction(tx) => tx, - Handle::Connection(_) | Handle::ReadTransaction(_) => unreachable!(), - }; - + let inner = self.inner.inner.inner.into_write(); let cache = self.inner.inner.cache; + if cache.is_dirty() { inner.commit_and_then(move || cache.commit()).await?; } else { @@ -799,17 +688,14 @@ impl WriteTransaction { /// given closure. /// /// See `db::WriteTransaction::commit_and_then` for explanation why this is necessary. - pub async fn commit_and_then(self, f: F) -> Result + pub async fn commit_and_then(self, f: F) -> Result where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let inner = match self.inner.inner.inner { - Handle::WriteTransaction(tx) => tx, - Handle::Connection(_) | Handle::ReadTransaction(_) => unreachable!(), - }; - + let inner = self.inner.inner.inner.into_write(); let cache = self.inner.inner.cache; + if cache.is_dirty() { let f = move || { cache.commit(); @@ -821,74 +707,6 @@ impl WriteTransaction { } } - async fn save_path( - &mut self, - path: Path, - old_root_node: &RootNode, - write_keys: &Keypair, - ) -> Result<(), Error> { - let mut parent_hash = Some(path.root_hash); - for (i, nodes) in path.inner.into_iter().enumerate() { - let bucket = protocol::get_bucket(&path.locator, i); - let new_parent_hash = nodes.get(bucket).map(|node| node.hash); - - if let Some(parent_hash) = parent_hash { - inner_node::save_all(self.db(), &nodes, &parent_hash).await?; - self.inner.inner.cache.put_inners(parent_hash, nodes); - } - - parent_hash = new_parent_hash; - } - - if let Some(parent_hash) = parent_hash { - leaf_node::save_all(self.db(), &path.leaves, &parent_hash).await?; - self.inner.inner.cache.put_leaves(parent_hash, path.leaves); - } - - let writer_id = old_root_node.proof.writer_id; - let new_version_vector = old_root_node - .proof - .version_vector - .clone() - .incremented(writer_id); - let new_proof = Proof::new(writer_id, new_version_vector, path.root_hash, write_keys); - - self.create_root_node(new_proof, path.root_summary).await - } - - async fn create_root_node( - &mut self, - new_proof: Proof, - new_summary: Summary, - ) -> Result<(), Error> { - let root_node = root_node::create(self.db(), new_proof, new_summary).await?; - root_node::remove_older(self.db(), &root_node).await?; - - tracing::trace!( - vv = ?root_node.proof.version_vector, - hash = ?root_node.proof.hash, - branch_id = ?root_node.proof.writer_id, - block_presence = ?root_node.summary.block_presence, - "Local snapshot created" - ); - - self.inner.inner.cache.put_root(root_node); - - Ok(()) - } - - async fn load_or_create_root_node( - &mut self, - branch_id: &PublicKey, - write_keys: &Keypair, - ) -> Result { - if let Some(node) = self.inner.inner.cache.get_root(branch_id) { - return Ok(node); - } - - root_node::load_or_create(self.db(), branch_id, write_keys).await - } - // Access the underlying database transaction. fn db(&mut self) -> &mut db::WriteTransaction { self.inner.inner.inner.as_write() @@ -929,6 +747,13 @@ impl Handle { Handle::Connection(_) | Handle::ReadTransaction(_) => unreachable!(), } } + + fn into_write(self) -> db::WriteTransaction { + match self { + Handle::WriteTransaction(tx) => tx, + Handle::Connection(_) | Handle::ReadTransaction(_) => unreachable!(), + } + } } impl Deref for Handle { diff --git a/lib/src/store/patch.rs b/lib/src/store/patch.rs new file mode 100644 index 000000000..e19680f0a --- /dev/null +++ b/lib/src/store/patch.rs @@ -0,0 +1,291 @@ +use super::{error::Error, inner_node, leaf_node, root_node, ReadTransaction, WriteTransaction}; +use crate::{ + crypto::{ + sign::{Keypair, PublicKey}, + Hash, Hashable, + }, + protocol::{ + get_bucket, BlockId, InnerNode, InnerNodeMap, LeafNodeSet, NodeState, Proof, + SingleBlockPresence, Summary, EMPTY_INNER_HASH, EMPTY_LEAF_HASH, INNER_LAYER_COUNT, + }, + version_vector::VersionVector, +}; +use std::{ + cmp::Ordering, + collections::{btree_map::Entry, BTreeMap}, + fmt, + ops::Range, +}; + +/// Helper structure for creating new snapshots in the index. It represents the set of nodes that +/// are different compared to the current snapshot. +pub(super) struct Patch { + branch_id: PublicKey, + old_vv: VersionVector, + root_hash: Hash, + root_summary: Summary, + inners: BTreeMap, + leaves: BTreeMap, +} + +impl Patch { + pub async fn new(tx: &mut ReadTransaction, branch_id: PublicKey) -> Result { + let (old_vv, root_hash, root_summary) = match tx.load_root_node(&branch_id).await { + Ok(node) => { + let hash = node.proof.hash; + (node.proof.into_version_vector(), hash, node.summary) + } + Err(Error::BranchNotFound) => { + (VersionVector::new(), *EMPTY_INNER_HASH, Summary::INCOMPLETE) + } + Err(error) => return Err(error), + }; + + Ok(Self { + branch_id, + old_vv, + root_hash, + root_summary, + inners: BTreeMap::new(), + leaves: BTreeMap::new(), + }) + } + + pub async fn insert( + &mut self, + tx: &mut ReadTransaction, + encoded_locator: Hash, + block_id: BlockId, + block_presence: SingleBlockPresence, + ) -> Result<(), Error> { + let nodes = self.fetch(tx, &encoded_locator).await?; + nodes.insert(encoded_locator, block_id, block_presence); + + Ok(()) + } + + pub async fn remove( + &mut self, + tx: &mut ReadTransaction, + encoded_locator: &Hash, + expected_block_id: Option<&BlockId>, + ) -> Result<(), Error> { + let nodes = self.fetch(tx, encoded_locator).await?; + + if let Some(block_id) = expected_block_id { + nodes.remove_if(encoded_locator, block_id); + } else { + nodes.remove(encoded_locator); + } + + Ok(()) + } + + pub async fn save( + mut self, + tx: &mut WriteTransaction, + vv: &VersionVector, + write_keys: &Keypair, + ) -> Result<(), Error> { + self.recalculate(); + self.save_children(tx).await?; + self.save_root(tx, vv, write_keys).await?; + + Ok(()) + } + + async fn fetch<'a>( + &'a mut self, + tx: &'_ mut ReadTransaction, + encoded_locator: &'_ Hash, + ) -> Result<&'a mut LeafNodeSet, Error> { + let mut parent_hash = self.root_hash; + let mut key = Key::ROOT; + + for layer in 0..INNER_LAYER_COUNT { + let bucket = get_bucket(encoded_locator, layer); + let nodes = match self.inners.entry(key) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + entry.insert(tx.load_inner_nodes_with_cache(&parent_hash).await?) + } + }; + + parent_hash = nodes + .get(bucket) + .map(|node| node.hash) + .unwrap_or_else(|| empty_hash(layer)); + + key = key.child(bucket); + } + + let nodes = match self.leaves.entry(key) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + entry.insert(tx.load_leaf_nodes_with_cache(&parent_hash).await?) + } + }; + + Ok(nodes) + } + + fn recalculate(&mut self) { + for (key, nodes) in &self.leaves { + let (parent_key, bucket) = key.parent_and_bucket(); + let parent_nodes = self.inners.entry(parent_key).or_default(); + + if !nodes.is_empty() { + let hash = nodes.hash(); + let summary = Summary::from_leaves(nodes); + + parent_nodes.insert(bucket, InnerNode::new(hash, summary)); + } else { + parent_nodes.remove(bucket); + } + } + + let mut stash = Vec::new(); + + for layer in (1..INNER_LAYER_COUNT).rev() { + for (key, nodes) in self.inners.range(Key::range(layer)) { + let (parent_key, bucket) = key.parent_and_bucket(); + + if !nodes.is_empty() { + let hash = nodes.hash(); + let summary = Summary::from_inners(nodes); + + stash.push((parent_key, bucket, Some((hash, summary)))); + } else { + stash.push((parent_key, bucket, None)); + } + } + + for (parent_key, bucket, hash_and_summary) in stash.drain(..) { + let parent_nodes = self.inners.entry(parent_key).or_default(); + + if let Some((hash, summary)) = hash_and_summary { + parent_nodes.insert(bucket, InnerNode::new(hash, summary)); + } else { + parent_nodes.remove(bucket); + } + } + } + + if let Some(nodes) = self.inners.get(&Key::ROOT) { + self.root_hash = nodes.hash(); + self.root_summary = Summary::from_inners(nodes).with_state(NodeState::Approved); + } + } + + async fn save_children(&mut self, tx: &mut WriteTransaction) -> Result<(), Error> { + let mut stack = vec![(self.root_hash, Key::ROOT)]; + + while let Some((parent_hash, key)) = stack.pop() { + if (key.layer as usize) < INNER_LAYER_COUNT { + if let Some(nodes) = self.inners.remove(&key) { + inner_node::save_all(tx.db(), &nodes, &parent_hash).await?; + + for (bucket, node) in &nodes { + stack.push((node.hash, key.child(bucket))); + } + + tx.inner.inner.cache.put_inners(parent_hash, nodes); + } + } else if let Some(nodes) = self.leaves.remove(&key) { + leaf_node::save_all(tx.db(), &nodes, &parent_hash).await?; + tx.inner.inner.cache.put_leaves(parent_hash, nodes); + } + } + + debug_assert!(self.inners.values().all(|nodes| nodes.is_empty())); + debug_assert!(self.leaves.values().all(|nodes| nodes.is_empty())); + + Ok(()) + } + + async fn save_root( + self, + tx: &mut WriteTransaction, + vv: &VersionVector, + write_keys: &Keypair, + ) -> Result<(), Error> { + let db = tx.db(); + + let new_vv = match self.old_vv.partial_cmp(vv) { + Some(Ordering::Less) | None => self.old_vv.merged(vv), + Some(Ordering::Equal | Ordering::Greater) => self.old_vv.incremented(self.branch_id), + }; + + let new_proof = Proof::new(self.branch_id, new_vv, self.root_hash, write_keys); + + let root_node = root_node::create(db, new_proof, self.root_summary).await?; + root_node::remove_older(db, &root_node).await?; + + tracing::trace!( + vv = ?root_node.proof.version_vector, + hash = ?root_node.proof.hash, + branch_id = ?root_node.proof.writer_id, + block_presence = ?root_node.summary.block_presence, + "Local snapshot created" + ); + + tx.inner.inner.cache.put_root(root_node); + + Ok(()) + } +} + +#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] +struct Key { + layer: u8, + path: [u8; INNER_LAYER_COUNT], +} + +impl Key { + const ROOT: Self = Self { + layer: 0, + path: [0; INNER_LAYER_COUNT], + }; + + fn child(mut self, bucket: u8) -> Self { + self.path[self.layer as usize] = bucket; + self.layer += 1; + self + } + + fn parent_and_bucket(mut self) -> (Self, u8) { + self.layer = self.layer.checked_sub(1).expect("root key has no parent"); + let bucket = self.path[self.layer as usize]; + self.path[self.layer as usize] = 0; + + (self, bucket) + } + + // Range of Keys that corresponds to all the nodes at the given layer. + fn range(layer: usize) -> Range { + let a = Key { + layer: layer as u8, + path: [0; INNER_LAYER_COUNT], + }; + let b = Key { + layer: layer as u8 + 1, + path: [0; INNER_LAYER_COUNT], + }; + + a..b + } +} + +impl fmt::Debug for Key { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", &self.path[..self.layer as usize]) + } +} + +fn empty_hash(layer: usize) -> Hash { + if layer < INNER_LAYER_COUNT - 1 { + *EMPTY_INNER_HASH + } else { + *EMPTY_LEAF_HASH + } +} diff --git a/lib/src/store/path.rs b/lib/src/store/path.rs deleted file mode 100644 index eca45afb5..000000000 --- a/lib/src/store/path.rs +++ /dev/null @@ -1,125 +0,0 @@ -use crate::{ - crypto::{Hash, Hashable}, - protocol::{ - self, BlockId, InnerNode, InnerNodeMap, LeafNodeModifyStatus, LeafNodeSet, NodeState, - SingleBlockPresence, Summary, EMPTY_INNER_HASH, INNER_LAYER_COUNT, - }, -}; - -/// -/// Path represents a (possibly incomplete) path in a snapshot from the root to the leaf. -/// Unlike a traditional tree path with only the relevant nodes, this one also contains for each -/// inner layer all siblings of the inner node that would be in the traditional path. -/// -/// // root -/// // / \ -/// // a0 a1 | -/// // / \ | inner: [[a0, a1], [b0, b1]] -/// // b0 b1 | -/// // / \ -/// // c0 c1 | leaves: [c0, c1] -/// -/// The purpose of this is to be able to modify the path (complete it if it's incomplete, modify -/// and/or remove the leaf) and then recalculate all hashes. -/// -#[derive(Debug)] -pub(crate) struct Path { - pub locator: Hash, - pub root_hash: Hash, - pub root_summary: Summary, - pub inner: Vec, - pub leaves: LeafNodeSet, -} - -impl Path { - pub fn new(root_hash: Hash, root_summary: Summary, locator: Hash) -> Self { - let inner = vec![InnerNodeMap::default(); INNER_LAYER_COUNT]; - - Self { - locator, - root_hash, - root_summary, - inner, - leaves: LeafNodeSet::default(), - } - } - - pub fn get_leaf(&self) -> Option { - self.leaves.get(&self.locator).map(|node| node.block_id) - } - - pub fn has_leaf(&self, block_id: &BlockId) -> bool { - self.leaves.iter().any(|l| &l.block_id == block_id) - } - - // Sets the leaf node to the given block id. Returns the previous block id, if any. - pub fn set_leaf( - &mut self, - block_id: &BlockId, - block_presence: SingleBlockPresence, - ) -> Option { - match self.leaves.modify(&self.locator, block_id, block_presence) { - LeafNodeModifyStatus::Updated(old_block_id) => { - self.recalculate(INNER_LAYER_COUNT); - Some(old_block_id) - } - LeafNodeModifyStatus::Inserted => { - self.recalculate(INNER_LAYER_COUNT); - None - } - LeafNodeModifyStatus::Unchanged => None, - } - } - - pub fn remove_leaf(&mut self, locator: &Hash) -> Option { - let block_id = self.leaves.remove(locator)?.block_id; - self.recalculate(INNER_LAYER_COUNT); - Some(block_id) - } - - pub fn get_bucket(&self, inner_layer: usize) -> u8 { - protocol::get_bucket(&self.locator, inner_layer) - } - - /// Recalculate layers from start_layer all the way to the root. - fn recalculate(&mut self, start_layer: usize) { - for inner_layer in (0..start_layer).rev() { - let bucket = self.get_bucket(inner_layer); - - if let Some(hash) = self.compute_hash_for_layer(inner_layer + 1) { - let summary = self.compute_summary_for_layer(inner_layer + 1); - self.inner[inner_layer].insert(bucket, InnerNode::new(hash, summary)); - } else { - self.inner[inner_layer].remove(bucket); - } - } - - self.root_hash = self.compute_hash_for_layer(0).unwrap_or(*EMPTY_INNER_HASH); - self.root_summary = self.compute_summary_for_layer(0); - self.root_summary.state = NodeState::Approved; - } - - // Assumes layers higher than `layer` have their hashes already computed - fn compute_hash_for_layer(&self, layer: usize) -> Option { - if layer == INNER_LAYER_COUNT { - if self.leaves.is_empty() { - None - } else { - Some(self.leaves.hash()) - } - } else if self.inner[layer].is_empty() { - None - } else { - Some(self.inner[layer].hash()) - } - } - - // Assumes layers higher than `layer` have their summaries already computed - fn compute_summary_for_layer(&self, layer: usize) -> Summary { - if layer == INNER_LAYER_COUNT { - Summary::from_leaves(&self.leaves) - } else { - Summary::from_inners(&self.inner[layer]) - } - } -} diff --git a/lib/src/store/quota.rs b/lib/src/store/quota.rs index f3faf603c..d16f69001 100644 --- a/lib/src/store/quota.rs +++ b/lib/src/store/quota.rs @@ -111,7 +111,7 @@ mod tests { use crate::{ crypto::sign::{Keypair, PublicKey}, protocol::SingleBlockPresence, - store::Store, + store::{Changeset, Store}, }; use tempfile::TempDir; @@ -129,25 +129,21 @@ mod tests { let branch_id = PublicKey::random(); let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); assert_eq!(count_referenced_blocks(tx.db(), &[]).await.unwrap(), 0); + changeset.link_block(rand::random(), rand::random(), SingleBlockPresence::Present); + changeset + .apply(&mut tx, &branch_id, &write_keys) + .await + .unwrap(); + tx.commit().await.unwrap(); - tx.link_block( - &branch_id, - &rand::random(), - &rand::random(), - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); - - let root_hash = tx.load_root_node(&branch_id).await.unwrap().proof.hash; + let mut r = store.acquire_read().await.unwrap(); + let root_hash = r.load_root_node(&branch_id).await.unwrap().proof.hash; assert_eq!( - count_referenced_blocks(tx.db(), &[root_hash]) - .await - .unwrap(), + count_referenced_blocks(r.db(), &[root_hash]).await.unwrap(), 1 ); } @@ -160,35 +156,28 @@ mod tests { let branch_a_id = PublicKey::random(); let branch_b_id = PublicKey::random(); + let shared_locator = rand::random(); + let shared_block_id = rand::random(); + let mut tx = pool.begin_write().await.unwrap(); - // unique blocks for branch_id in [&branch_a_id, &branch_b_id] { - tx.link_block( - branch_id, - &rand::random(), - &rand::random(), - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); - } + let mut changeset = Changeset::new(); - // shared blocks - let shared_locator = rand::random(); - let shared_block_id = rand::random(); + // unique block + changeset.link_block(rand::random(), rand::random(), SingleBlockPresence::Present); - for branch_id in [&branch_a_id, &branch_b_id] { - tx.link_block( - branch_id, - &shared_locator, - &shared_block_id, + // shared blocks + changeset.link_block( + shared_locator, + shared_block_id, SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); + ); + + changeset + .apply(&mut tx, branch_id, &write_keys) + .await + .unwrap(); } let root_hash_a = tx.load_root_node(&branch_a_id).await.unwrap().proof.hash; diff --git a/lib/src/store/root_node.rs b/lib/src/store/root_node.rs index 530c78a88..3ca035186 100644 --- a/lib/src/store/root_node.rs +++ b/lib/src/store/root_node.rs @@ -1,9 +1,6 @@ use super::error::Error; use crate::{ - crypto::{ - sign::{Keypair, PublicKey}, - Hash, - }, + crypto::{sign::PublicKey, Hash}, db, debug::DebugPrinter, protocol::{MultiBlockPresence, NodeState, Proof, RootNode, SingleBlockPresence, Summary}, @@ -114,18 +111,6 @@ pub(super) async fn create( }) } -pub(super) async fn load_or_create( - conn: &mut db::Connection, - branch_id: &PublicKey, - write_keys: &Keypair, -) -> Result { - match load(conn, branch_id).await { - Ok(root_node) => Ok(root_node), - Err(Error::BranchNotFound) => Ok(RootNode::empty(*branch_id, write_keys)), - Err(error) => Err(error), - } -} - /// Returns the latest approved root node of the specified branch. pub(super) async fn load( conn: &mut db::Connection, @@ -617,6 +602,7 @@ pub(super) fn load_all_by_writer_in_any_state<'a>( #[cfg(test)] mod tests { use super::*; + use crate::crypto::sign::Keypair; use assert_matches::assert_matches; use tempfile::TempDir; diff --git a/lib/src/store/tests.rs b/lib/src/store/tests.rs index 7e6c5ba92..8d73aef8b 100644 --- a/lib/src/store/tests.rs +++ b/lib/src/store/tests.rs @@ -1,6 +1,9 @@ use super::*; use crate::{ - crypto::cipher::SecretKey, locator::Locator, protocol::EMPTY_INNER_HASH, test_utils, BLOCK_SIZE, + crypto::{cipher::SecretKey, sign::Keypair}, + protocol::{Locator, SingleBlockPresence, EMPTY_INNER_HASH}, + test_utils, + version_vector::VersionVector, }; use proptest::{arbitrary::any, collection::vec}; use rand::{ @@ -25,16 +28,13 @@ async fn link_and_find_block() { let encoded_locator = locator.encode(&read_key); let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); + changeset.link_block(encoded_locator, block_id, SingleBlockPresence::Present); + changeset + .apply(&mut tx, &branch_id, &write_keys) + .await + .unwrap(); - tx.link_block( - &branch_id, - &encoded_locator, - &block_id, - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); let r = tx.find_block(&branch_id, &encoded_locator).await.unwrap(); assert_eq!(r, block_id); @@ -55,26 +55,15 @@ async fn rewrite_locator() { let encoded_locator = locator.encode(&read_key); let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); - tx.link_block( - &branch_id, - &encoded_locator, - &b1, - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); + changeset.link_block(encoded_locator, b1, SingleBlockPresence::Present); + changeset.link_block(encoded_locator, b2, SingleBlockPresence::Present); - tx.link_block( - &branch_id, - &encoded_locator, - &b2, - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); + changeset + .apply(&mut tx, &branch_id, &write_keys) + .await + .unwrap(); let r = tx.find_block(&branch_id, &encoded_locator).await.unwrap(); assert_eq!(r, b2); @@ -98,18 +87,16 @@ async fn remove_locator() { let encoded_locator = locator.encode(&read_key); let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); assert_eq!(0, count_child_nodes(&mut tx).await.unwrap()); - tx.link_block( - &branch_id, - &encoded_locator, - &b, - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); + changeset.link_block(encoded_locator, b, SingleBlockPresence::Present); + changeset + .apply(&mut tx, &branch_id, &write_keys) + .await + .unwrap(); + let r = tx.find_block(&branch_id, &encoded_locator).await.unwrap(); assert_eq!(r, b); @@ -118,7 +105,10 @@ async fn remove_locator() { count_child_nodes(&mut tx).await.unwrap(), ); - tx.unlink_block(&branch_id, &encoded_locator, None, &write_keys) + let mut changeset = Changeset::new(); + changeset.unlink_block(encoded_locator, None); + changeset + .apply(&mut tx, &branch_id, &write_keys) .await .unwrap(); @@ -141,50 +131,50 @@ async fn remove_block() { let branch_id_0 = PublicKey::random(); let branch_id_1 = PublicKey::random(); - let block_id = rand::random(); - let buffer = vec![0; BLOCK_SIZE]; + let block: Block = rand::random(); + let block_id = block.id; let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); - tx.write_block(&block_id, &buffer, &BlockNonce::default()) - .await - .unwrap(); + changeset.write_block(block); let locator0 = Locator::head(rand::random()); let locator0 = locator0.encode(&read_key); - tx.link_block( - &branch_id_0, - &locator0, - &block_id, - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); + changeset.link_block(locator0, block_id, SingleBlockPresence::Present); + changeset + .apply(&mut tx, &branch_id_0, &write_keys) + .await + .unwrap(); let locator1 = Locator::head(rand::random()); let locator1 = locator1.encode(&read_key); - tx.link_block( - &branch_id_1, - &locator1, - &block_id, - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); + let mut changeset = Changeset::new(); + changeset.link_block(locator1, block_id, SingleBlockPresence::Present); + changeset + .apply(&mut tx, &branch_id_1, &write_keys) + .await + .unwrap(); assert!(tx.block_exists(&block_id).await.unwrap()); - tx.unlink_block(&branch_id_0, &locator0, None, &write_keys) + let mut changeset = Changeset::new(); + changeset.unlink_block(locator0, None); + changeset + .apply(&mut tx, &branch_id_0, &write_keys) .await .unwrap(); + assert!(tx.block_exists(&block_id).await.unwrap()); - tx.unlink_block(&branch_id_1, &locator1, None, &write_keys) + let mut changeset = Changeset::new(); + changeset.unlink_block(locator1, None); + changeset + .apply(&mut tx, &branch_id_1, &write_keys) .await .unwrap(); - assert!(!tx.block_exists(&block_id).await.unwrap(),); + + assert!(!tx.block_exists(&block_id).await.unwrap()); } #[tokio::test(flavor = "multi_thread")] @@ -200,44 +190,35 @@ async fn overwrite_block() { let locator = Locator::head(rng.gen()); let locator = locator.encode(&read_key); - let mut buffer = vec![0; BLOCK_SIZE]; - - rng.fill(&mut buffer[..]); - let id0 = BlockId::from_content(&buffer); + let block0: Block = rng.gen(); + let block0_id = block0.id; let mut tx = store.begin_write().await.unwrap(); - tx.link_block( - &branch_id, - &locator, - &id0, - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); - tx.write_block(&id0, &buffer, &rng.gen()).await.unwrap(); + let mut changeset = Changeset::new(); + changeset.link_block(locator, block0.id, SingleBlockPresence::Present); + changeset.write_block(block0); + changeset + .apply(&mut tx, &branch_id, &write_keys) + .await + .unwrap(); - assert!(tx.block_exists(&id0).await.unwrap()); + assert!(tx.block_exists(&block0_id).await.unwrap()); assert_eq!(tx.count_blocks().await.unwrap(), 1); - rng.fill(&mut buffer[..]); - let id1 = BlockId::from_content(&buffer); + let block1: Block = rng.gen(); + let block1_id = block1.id; - tx.write_block(&id1, &buffer, &rng.gen()).await.unwrap(); - - tx.link_block( - &branch_id, - &locator, - &id1, - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); + let mut changeset = Changeset::new(); + changeset.write_block(block1); + changeset.link_block(locator, block1_id, SingleBlockPresence::Present); + changeset + .apply(&mut tx, &branch_id, &write_keys) + .await + .unwrap(); - assert!(!tx.block_exists(&id0).await.unwrap()); - assert!(tx.block_exists(&id1).await.unwrap()); + assert!(!tx.block_exists(&block0_id).await.unwrap()); + assert!(tx.block_exists(&block1_id).await.unwrap()); assert_eq!(tx.count_blocks().await.unwrap(), 1); } @@ -263,14 +244,17 @@ async fn fallback() { (id3, SingleBlockPresence::Missing), ] { let mut tx = store.begin_write().await.unwrap(); + let mut changeset = Changeset::new(); // TODO: `link_block` auto-prunes so this doesn't work. We need to simulate receiving // remote snapshots here instead. - tx.link_block(&branch_0_id, &locator, &block_id, presence, &write_keys) - .await - .unwrap(); + changeset.link_block(locator, block_id, presence); // TODO: actually create the present blocks + changeset + .apply(&mut tx, &branch_0_id, &write_keys) + .await + .unwrap(); tx.commit().await.unwrap(); } @@ -314,6 +298,7 @@ async fn empty_nodes_are_not_stored_case(leaf_count: usize, rng_seed: u64) { let write_keys = Keypair::generate(&mut rng); let mut locators = Vec::new(); + let mut tx = store.begin_write().await.unwrap(); // Add blocks @@ -321,15 +306,12 @@ async fn empty_nodes_are_not_stored_case(leaf_count: usize, rng_seed: u64) { let locator = rng.gen(); let block_id = rng.gen(); - tx.link_block( - &branch_id, - &locator, - &block_id, - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); + let mut changeset = Changeset::new(); + changeset.link_block(locator, block_id, SingleBlockPresence::Present); + changeset + .apply(&mut tx, &branch_id, &write_keys) + .await + .unwrap(); locators.push(locator); @@ -340,7 +322,10 @@ async fn empty_nodes_are_not_stored_case(leaf_count: usize, rng_seed: u64) { locators.shuffle(&mut rng); for locator in locators { - tx.unlink_block(&branch_id, &locator, None, &write_keys) + let mut changeset = Changeset::new(); + changeset.unlink_block(locator, None); + changeset + .apply(&mut tx, &branch_id, &write_keys) .await .unwrap(); @@ -380,15 +365,12 @@ async fn prune_case(ops: Vec, rng_seed: u64) { let block_id = rng.gen(); let mut tx = store.begin_write().await.unwrap(); - tx.link_block( - &branch_id, - &locator, - &block_id, - SingleBlockPresence::Present, - &write_keys, - ) - .await - .unwrap(); + let mut changeset = Changeset::new(); + changeset.link_block(locator, block_id, SingleBlockPresence::Present); + changeset + .apply(&mut tx, &branch_id, &write_keys) + .await + .unwrap(); tx.commit().await.unwrap(); expected.insert(locator, block_id); @@ -399,7 +381,10 @@ async fn prune_case(ops: Vec, rng_seed: u64) { }; let mut tx = store.begin_write().await.unwrap(); - tx.unlink_block(&branch_id, &locator, None, &write_keys) + let mut changeset = Changeset::new(); + changeset.unlink_block(locator, None); + changeset + .apply(&mut tx, &branch_id, &write_keys) .await .unwrap(); tx.commit().await.unwrap(); @@ -408,7 +393,10 @@ async fn prune_case(ops: Vec, rng_seed: u64) { } PruneTestOp::Bump => { let mut tx = store.begin_write().await.unwrap(); - tx.bump(&branch_id, VersionVectorOp::IncrementLocal, &write_keys) + let mut changeset = Changeset::new(); + changeset.bump(&VersionVector::new()); + changeset + .apply(&mut tx, &branch_id, &write_keys) .await .unwrap(); tx.commit().await.unwrap(); diff --git a/lib/src/version_vector.rs b/lib/src/version_vector.rs index bc3be78fc..459102aed 100644 --- a/lib/src/version_vector.rs +++ b/lib/src/version_vector.rs @@ -21,8 +21,8 @@ pub struct VersionVector(BTreeMap); impl VersionVector { /// Creates an empty version vector. - pub fn new() -> Self { - Self::default() + pub const fn new() -> Self { + Self(BTreeMap::new()) } pub fn first(writer_id: PublicKey) -> Self {