diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 69a365f2f..4d44bb4dc 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -44,6 +44,7 @@ noise-rust-crypto = { version = "0.5.0", default-features = false, features = [" num_enum = { workspace = true } once_cell = { workspace = true } parse-size = { version = "1.0.0", features = ["std"] } +pin-project-lite = "0.2.13" rand = { workspace = true } ref-cast = "1.0.14" rupnp = { version = "1.1.0", default-features = false, features = [] } diff --git a/lib/src/network/client.rs b/lib/src/network/client.rs index 9defdc0c3..857518adc 100644 --- a/lib/src/network/client.rs +++ b/lib/src/network/client.rs @@ -108,9 +108,11 @@ impl Client { result?; break; } - branch_to_reload = reload_index_rx.recv() => { - if let Ok(branch_to_reload) = branch_to_reload { - self.reload_index(&branch_to_reload); + branches_to_reload = reload_index_rx.changed() => { + if let Ok(branches_to_reload) = branches_to_reload { + for branch_to_reload in &branches_to_reload { + self.reload_index(&branch_to_reload); + } } } } diff --git a/lib/src/network/server.rs b/lib/src/network/server.rs index e39c5b181..2731a1e1a 100644 --- a/lib/src/network/server.rs +++ b/lib/src/network/server.rs @@ -1,3 +1,5 @@ +use std::{pin::pin, time::Duration}; + use super::{ debug_payload::{DebugRequestPayload, DebugResponsePayload}, message::{Content, Request, Response, ResponseDisambiguator}, @@ -5,15 +7,22 @@ use super::{ use crate::{ crypto::{sign::PublicKey, Hash}, error::{Error, Result}, - event::Payload, + event::{Event, Payload}, protocol::{BlockContent, BlockId, RootNode}, repository::Vault, store, + sync::stream::Throttle, +}; +use futures_util::{ + stream::{self, FuturesUnordered}, + Stream, StreamExt, TryStreamExt, }; -use futures_util::{stream::FuturesUnordered, StreamExt, TryStreamExt}; use tokio::{ select, - sync::{broadcast::error::RecvError, mpsc}, + sync::{ + broadcast::{self, error::RecvError}, + mpsc, + }, }; use tracing::instrument; @@ -242,23 +251,17 @@ impl<'a> Monitor<'a> { } async fn run(self) -> Result<()> { - let mut subscription = self.vault.event_tx.subscribe(); - - // send initial branches self.handle_all_branches_changed().await?; - loop { - match subscription.recv().await.map(|event| event.payload) { - Ok( - Payload::BranchChanged(branch_id) | Payload::BlockReceived { branch_id, .. }, - ) => self.handle_branch_changed(branch_id).await?, - Ok(Payload::MaintenanceCompleted) => continue, - Err(RecvError::Lagged(_)) => { - tracing::warn!("event receiver lagged"); - self.handle_all_branches_changed().await? - } - - Err(RecvError::Closed) => break, + let mut events = pin!(Throttle::new( + events(self.vault.event_tx.subscribe()), + Duration::from_secs(1) + )); + + while let Some(event) = events.next().await { + match event { + BranchChanged::One(branch_id) => self.handle_branch_changed(branch_id).await?, + BranchChanged::All => self.handle_all_branches_changed().await?, } } @@ -348,3 +351,27 @@ impl Sender { self.0.send(Content::Response(response)).await.is_ok() } } + +fn events(rx: broadcast::Receiver) -> impl Stream { + stream::unfold(rx, |mut rx| async move { + loop { + match rx.recv().await { + Ok(Event { payload, .. }) => match payload { + Payload::BranchChanged(branch_id) + | Payload::BlockReceived { branch_id, .. } => { + return Some((BranchChanged::One(branch_id), rx)) + } + Payload::MaintenanceCompleted => continue, + }, + Err(RecvError::Lagged(_)) => return Some((BranchChanged::All, rx)), + Err(RecvError::Closed) => return None, + } + } + }) +} + +#[derive(Eq, PartialEq, Clone, Debug, Hash)] +enum BranchChanged { + One(PublicKey), + All, +} diff --git a/lib/src/network/tests.rs b/lib/src/network/tests.rs index 49f169e4c..05f4f51c4 100644 --- a/lib/src/network/tests.rs +++ b/lib/src/network/tests.rs @@ -109,7 +109,7 @@ async fn transfer_snapshot_between_two_replicas_case( // TODO: Make it faster and increase the cases. #[proptest(cases = 8)] fn transfer_blocks_between_two_replicas( - #[strategy(1usize..32)] block_count: usize, + #[strategy(1usize..62)] block_count: usize, #[strategy(test_utils::rng_seed_strategy())] rng_seed: u64, ) { test_utils::run(transfer_blocks_between_two_replicas_case( @@ -145,8 +145,10 @@ async fn transfer_blocks_between_two_replicas_case(block_count: usize, rng_seed: a_vault.receive_block(block, Some(promise)).await.unwrap(); tracing::info!(?id, "write block"); + } - // Then wait until replica B receives and writes it too. + // Then wait until replica B receives and writes it too. + for (id, _block) in snapshot.blocks() { wait_until_block_exists(&b_vault, id).await; } }; diff --git a/lib/src/repository/mod.rs b/lib/src/repository/mod.rs index 49f22a6e8..420e5b90c 100644 --- a/lib/src/repository/mod.rs +++ b/lib/src/repository/mod.rs @@ -44,12 +44,13 @@ use crate::{ state_monitor::StateMonitor, storage_size::StorageSize, store, - sync::broadcast::ThrottleReceiver, + sync::stream::Throttle, }; use camino::Utf8Path; use futures_util::{future, TryStreamExt}; +use futures_util::{stream, StreamExt}; use scoped_task::ScopedJoinHandle; -use std::{io, path::Path, sync::Arc}; +use std::{io, path::Path, pin::pin, sync::Arc}; use tokio::{ fs, sync::broadcast::{self, error::RecvError}, @@ -858,14 +859,17 @@ async fn generate_and_store_writer_id( async fn report_sync_progress(vault: Vault) { let mut prev_progress = Progress { value: 0, total: 0 }; - let mut event_rx = ThrottleReceiver::new(vault.event_tx.subscribe(), Duration::from_secs(1)); - loop { - match event_rx.recv().await { - Ok(_) | Err(RecvError::Lagged(_)) => (), - Err(RecvError::Closed) => break, + let events = stream::unfold(vault.event_tx.subscribe(), |mut rx| async move { + match rx.recv().await { + Ok(_) | Err(RecvError::Lagged(_)) => Some(((), rx)), + Err(RecvError::Closed) => None, } + }); + let events = Throttle::new(events, Duration::from_secs(1)); + let mut events = pin!(events); + while events.next().await.is_some() { let next_progress = match vault.store().sync_progress().await { Ok(progress) => progress, Err(error) => { diff --git a/lib/src/store/block_expiration_tracker.rs b/lib/src/store/block_expiration_tracker.rs index f63f7edc0..f83487ca9 100644 --- a/lib/src/store/block_expiration_tracker.rs +++ b/lib/src/store/block_expiration_tracker.rs @@ -268,13 +268,17 @@ async fn run_task( .execute(&mut tx) .await?; - if update_result.rows_affected() > 0 { - sqlx::query("DELETE FROM blocks WHERE id = ?") - .bind(&block) - .execute(&mut tx) - .await?; + if update_result.rows_affected() == 0 { + return Ok(()); } + sqlx::query("DELETE FROM blocks WHERE id = ?") + .bind(&block) + .execute(&mut tx) + .await?; + + tracing::warn!("Block {block:?} has expired"); + // We need to remove the block from `shared` here while the database is locked for writing. // If we did it after the commit, it could happen that after the commit but between the // removal someone re-adds the block into the database. That would result in there being a diff --git a/lib/src/store/mod.rs b/lib/src/store/mod.rs index ca90835d1..962758b37 100644 --- a/lib/src/store/mod.rs +++ b/lib/src/store/mod.rs @@ -41,6 +41,7 @@ use crate::{ MultiBlockPresence, Proof, RootNode, Summary, INNER_LAYER_COUNT, }, storage_size::StorageSize, + sync::broadcast_hash_set, }; use futures_util::{Stream, TryStreamExt}; use std::{ @@ -50,25 +51,25 @@ use std::{ time::Duration, }; // TODO: Consider creating an async `RwLock` in the `deadlock` module and use it here. -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::RwLock; /// Data store #[derive(Clone)] pub(crate) struct Store { db: db::Pool, cache: Arc, - pub client_reload_index_tx: broadcast::Sender, + pub client_reload_index_tx: broadcast_hash_set::Sender, block_expiration_tracker: Arc>>>, } impl Store { pub fn new(db: db::Pool) -> Self { + let client_reload_index_tx = broadcast_hash_set::channel().0; + Self { db, cache: Arc::new(Cache::new()), - // TODO: The broadcast channel is not the best structure for this use case. Ideally - // we'd have something like the `watch` but that can be edited until it's received. - client_reload_index_tx: broadcast::channel(1024).0, + client_reload_index_tx, block_expiration_tracker: Arc::new(RwLock::new(None)), } } @@ -240,7 +241,7 @@ impl Store { pub(crate) struct Reader { inner: Handle, cache: CacheTransaction, - client_reload_index_tx: broadcast::Sender, + client_reload_index_tx: broadcast_hash_set::Sender, block_expiration_tracker: Option>, } @@ -321,7 +322,7 @@ impl Reader { tx.commit().await?; for branch_id in branches { - self.client_reload_index_tx.send(branch_id).unwrap_or(0); + self.client_reload_index_tx.insert(&branch_id); } Ok(true) diff --git a/lib/src/sync.rs b/lib/src/sync.rs index a7d5a7cbb..eb74ea97c 100644 --- a/lib/src/sync.rs +++ b/lib/src/sync.rs @@ -9,69 +9,12 @@ use std::{ use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; -/// MPMC broadcast channel -pub mod broadcast { - use tokio::{ - select, - sync::broadcast, - time::{self, Duration, Instant}, - }; - - /// Adapter for `Receiver` which limits the rate at which messages are received. The messages - /// are not buffered - if the rate limit is exceeded, all but the last message are discarded. - pub(crate) struct ThrottleReceiver { - rx: broadcast::Receiver, - interval: Duration, - last_recv: Instant, - } - - impl ThrottleReceiver - where - T: Clone, - { - pub fn new(inner: broadcast::Receiver, interval: Duration) -> Self { - Self { - rx: inner, - interval, - last_recv: Instant::now() - .checked_sub(interval) - .expect("Interval should be smaller than the time since epoch"), - } - } - - pub async fn recv(&mut self) -> Result { - let mut item = None; - let end = self.last_recv + self.interval; - - if Instant::now() < end { - loop { - select! { - _ = time::sleep_until(end) => break, - result = self.rx.recv() => { - item = Some(result?); - } - } - } - } - - let item = if let Some(item) = item { - item - } else { - self.rx.recv().await? - }; - - self.last_recv = Instant::now(); - - Ok(item) - } - } -} - /// Similar to tokio::sync::watch, but has no initial value. Because there is no initial value the /// API must be sligthly different. In particular, we don't have the `borrow` function. pub(crate) mod uninitialized_watch { use futures_util::{stream, Stream}; use tokio::sync::watch as w; + pub use w::error::RecvError; pub struct Sender(w::Sender>); @@ -212,3 +155,418 @@ pub(crate) mod atomic_slot { } } } + +/// Similar to `tokio::sync::broadcast` but does not enqueue the values. Instead, it "accumulates" +/// them into a set. Advantages include that one doesn't need to specify the recv buffer size and +/// the `insert` (analogue to `broadcast::send`) is non-async and never fails. +pub(crate) mod broadcast_hash_set { + use super::uninitialized_watch; + pub use super::uninitialized_watch::RecvError; + use std::{ + collections::{HashMap, HashSet}, + hash::Hash, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + }; + + #[derive(Clone)] + pub struct Sender { + shared: Arc>, + } + + impl Sender { + pub fn insert(&self, value: &T) { + let mut receivers = self.shared.receivers.lock().unwrap(); + + for (_id, receiver) in receivers.iter_mut() { + if receiver.1.insert(value.clone()) { + receiver.0.send(()).unwrap_or(()); + } + } + } + + pub fn subscribe(&self) -> Receiver { + let id = self.shared.id_generator.fetch_add(1, Ordering::Relaxed); + let (watch_tx, watch_rx) = uninitialized_watch::channel(); + + let mut receivers = self.shared.receivers.lock().unwrap(); + + receivers.insert(id, (watch_tx, HashSet::new())); + + Receiver { + id, + shared: self.shared.clone(), + watch_rx, + } + } + } + + pub struct Receiver { + id: usize, + shared: Arc>, + watch_rx: uninitialized_watch::Receiver<()>, + } + + impl Receiver { + pub async fn changed(&mut self) -> Result, RecvError> { + self.watch_rx.changed().await?; + + let mut receivers = self.shared.receivers.lock().unwrap(); + + // Unwrap is OK because the entry must exists for as long as this `Receiver` exists. + let old_set = &mut receivers.get_mut(&self.id).unwrap().1; + let mut new_set = HashSet::new(); + + if !old_set.is_empty() { + std::mem::swap(&mut new_set, old_set); + } + + Ok(new_set) + } + } + + impl Drop for Receiver { + fn drop(&mut self) { + self.shared.receivers.lock().unwrap().remove(&self.id); + } + } + + struct Shared { + id_generator: AtomicUsize, + receivers: Mutex, HashSet)>>, + } + + pub fn channel() -> (Sender, Receiver) { + let id_generator = AtomicUsize::new(0); + let mut receivers = HashMap::new(); + + let id = id_generator.fetch_add(1, Ordering::Relaxed); + let (watch_tx, watch_rx) = uninitialized_watch::channel(); + + receivers.insert(id, (watch_tx, HashSet::new())); + + let shared = Arc::new(Shared { + id_generator, + receivers: Mutex::new(receivers), + }); + + ( + Sender { + shared: shared.clone(), + }, + Receiver { + id, + shared, + watch_rx, + }, + ) + } +} + +pub(crate) mod stream { + use futures_util::{stream::Fuse, Stream, StreamExt}; + use pin_project_lite::pin_project; + use std::{ + collections::{hash_map, BTreeMap, HashMap}, + fmt::Debug, + future::Future, + hash::Hash, + pin::Pin, + task::{ready, Context, Poll}, + }; + use tokio::time::{self, Duration, Instant, Sleep}; + + type EntryId = u64; + + struct Delay { + until: Instant, + next: Option, + } + + pin_project! { + /// Rate-limitting stream adapter. + /// + /// ```ignore + /// in: |a|a|a|a|a| | | | | |a|a|a|a| | | | + /// throttle: |a| |a| |a| | | | | |a| |a| |a| | | + /// ``` + /// + /// Multiple occurences of the same item within the rate-limit period are reduced to a + /// single one but distinct items are preserved: + /// + /// ```ignore + /// in: |a|b|a|b| | | | | + /// throttle: |a| |a|b| | | | | + /// ``` + pub struct Throttle + where + S: Stream, + S::Item: Hash, + { + #[pin] + inner: Fuse, + period: Duration, + ready: BTreeMap, + delays: HashMap, + #[pin] + sleep: Option, + next_id: EntryId, + } + } + + impl Throttle + where + S: Stream, + S::Item: Eq + Hash, + { + pub fn new(inner: S, period: Duration) -> Self { + Self { + inner: inner.fuse(), + period, + ready: BTreeMap::new(), + delays: HashMap::new(), + sleep: None, + next_id: 0, + } + } + + fn is_ready(ready: &BTreeMap, item: &S::Item) -> bool { + for v in ready.values() { + if v == item { + return true; + } + } + false + } + } + + impl Stream for Throttle + where + S: Stream, + S::Item: Hash + Eq + Clone + Debug, + { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + let now = Instant::now(); + let mut inner_is_finished = false; + + // Take entries from `inner` into `items` ASAP so we can timestamp them. + loop { + match this.inner.as_mut().poll_next(cx) { + Poll::Ready(Some(item)) => { + let is_ready = Self::is_ready(&this.ready, &item); + + match this.delays.entry(item.clone()) { + hash_map::Entry::Occupied(mut entry) => { + if !is_ready { + let delay = entry.get_mut(); + + if delay.next.is_none() { + let entry_id = *this.next_id; + *this.next_id += 1; + delay.next = Some(entry_id); + } + } + } + hash_map::Entry::Vacant(entry) => { + let entry_id = *this.next_id; + *this.next_id += 1; + + if !is_ready { + this.ready.insert(entry_id, item.clone()); + } + + entry.insert(Delay { + until: now + *this.period, + next: None, + }); + } + } + } + Poll::Ready(None) => { + inner_is_finished = true; + break; + } + Poll::Pending => { + break; + } + } + } + + loop { + if let Some(first_entry) = this.ready.first_entry() { + return Poll::Ready(Some(first_entry.remove())); + } + + if let Some(sleep) = this.sleep.as_mut().as_pin_mut() { + ready!(sleep.poll(cx)); + this.sleep.set(None); + } + + let mut first: Option<(&S::Item, &mut Delay)> = None; + + for (item, delay) in this.delays.iter_mut() { + if let Some((_, first_delay)) = &first { + if (delay.until, delay.next) < (first_delay.until, first_delay.next) { + first = Some((item, delay)); + } + } else { + first = Some((item, delay)); + } + } + + let (first_item, first_delay) = match &mut first { + Some(first) => (&first.0, &mut first.1), + None => { + return if inner_is_finished { + Poll::Ready(None) + } else { + Poll::Pending + } + } + }; + + if first_delay.until <= now { + let first_item = (*first_item).clone(); + + if first_delay.next.is_some() { + first_delay.until = now + *this.period; + first_delay.next = None; + return Poll::Ready(Some(first_item)); + } else { + this.delays.remove(&first_item); + } + } else { + this.sleep.set(Some(time::sleep_until(first_delay.until))); + } + } + } + } + + #[cfg(test)] + mod tests { + use super::*; + use futures_util::{future, stream, StreamExt}; + use std::fmt::Debug; + use tokio::{sync::mpsc, time::Instant}; + + #[tokio::test(start_paused = true)] + async fn rate_limit_equal_items() { + let input = [(ms(0), 0), (ms(0), 0)]; + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (0, ms(1000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + + //-------------------------------------------------------- + + let input = [(ms(0), 0), (ms(100), 0)]; + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (0, ms(1000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + + //-------------------------------------------------------- + + let input = [(ms(0), 0), (ms(0), 0), (ms(1001), 0)]; + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (0, ms(1000)), (0, ms(2000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + + //-------------------------------------------------------- + + let input = [ + (ms(0), 0), + (ms(100), 0), + (ms(100), 0), + (ms(1500), 0), + (ms(0), 0), + ]; + + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (0, ms(1000)), (0, ms(2000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + } + + #[tokio::test(start_paused = true)] + async fn rate_limit_inequal_items() { + let input = [(ms(0), 0), (ms(0), 1)]; + + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (1, ms(0))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + + //-------------------------------------------------------- + + let input = [(ms(0), 0), (ms(0), 1), (ms(0), 0), (ms(0), 1)]; + + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (1, ms(0)), (0, ms(1000)), (1, ms(1000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + + //-------------------------------------------------------- + + let input = [(ms(0), 0), (ms(0), 1), (ms(0), 1), (ms(0), 0)]; + + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (1, ms(0)), (1, ms(1000)), (0, ms(1000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + } + + fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) + } + + fn into_stream(rx: mpsc::Receiver) -> impl Stream { + stream::unfold(rx, |mut rx| async move { Some((rx.recv().await?, rx)) }) + } + + async fn produce(tx: mpsc::Sender, input: impl IntoIterator) { + for (delay, item) in input { + time::sleep(delay).await; + tx.send(item).await.ok().unwrap(); + } + } + + async fn verify( + stream: impl Stream, + expected: impl IntoIterator, + ) { + let start = Instant::now(); + let actual: Vec<_> = stream.map(|item| (item, start.elapsed())).collect().await; + let expected: Vec<_> = expected.into_iter().collect(); + + assert_eq!(actual, expected); + } + } +} diff --git a/lib/tests/sync.rs b/lib/tests/sync.rs index b043bd414..d1ba1d521 100644 --- a/lib/tests/sync.rs +++ b/lib/tests/sync.rs @@ -976,25 +976,40 @@ fn content_stays_available_during_sync() { #[test] fn redownload_expired_blocks() { + use std::time::SystemTime; + let mut env = Env::new(); let (origin_has_it_tx, mut origin_has_it_rx) = mpsc::channel(1); let (cache_had_it_tx, mut cache_had_it_rx) = mpsc::channel(1); let (finish_origin_tx, mut finish_origin_rx) = mpsc::channel(1); let (finish_cache_tx, mut finish_cache_rx) = mpsc::channel(1); - env.actor("origin", async move { - let (_network, repo, _reg) = actor::setup().await; + let test_content = Arc::new(common::random_content(2 * 1024 * 1024)); - // Create a file - let mut file = repo.create_file("test.txt").await.unwrap(); - file.write_all(b"test content").await.unwrap(); - file.flush().await.unwrap(); + async fn wait_for_block_count(repo: &Repository, block_count: u64) { + common::eventually(&repo, || { + async { repo.count_blocks().await.unwrap() == block_count } + .instrument(tracing::Span::current()) + }) + .await; + } + + env.actor("origin", { + let test_content = test_content.clone(); + async move { + let (_network, repo, _reg) = actor::setup().await; - let block_count = repo.count_blocks().await.unwrap(); + // Create a file + let mut file = repo.create_file("test.txt").await.unwrap(); + file.write_all(&test_content).await.unwrap(); + file.flush().await.unwrap(); + + let block_count = repo.count_blocks().await.unwrap(); - origin_has_it_tx.send(block_count).await.unwrap(); + origin_has_it_tx.send(block_count).await.unwrap(); - finish_origin_rx.recv().await; + finish_origin_rx.recv().await; + } }); env.actor("cache", async move { @@ -1002,42 +1017,66 @@ fn redownload_expired_blocks() { let repo = actor::create_repo_with_mode(DEFAULT_REPO, AccessMode::Blind).await; let _reg = network.register(repo.handle()).await; - repo.set_block_expiration(Some(Duration::from_secs(1))) - .await - .unwrap(); - let block_count = origin_has_it_rx.recv().await.unwrap(); + let start = SystemTime::now(); network.add_user_provided_peer(&actor::lookup_addr("origin").await); // Wait to have the blocks, this is a blind replica, so we can't check the file exists and // has the required content. - common::eventually(&repo, || { - async { repo.count_blocks().await.unwrap() == block_count } - .instrument(tracing::Span::current()) - }) - .await; + wait_for_block_count(&repo, block_count).await; + + let normal_sync_duration = SystemTime::now().duration_since(start).unwrap(); + + let expiration_duration = Duration::from_secs(5); + + repo.set_block_expiration(Some(expiration_duration)) + .await + .unwrap(); // Wait for the blocks to expire. TODO: We could also wait for the block count to drop to // zero, but currently the expiration tracker does not trigger a repo change. - sleep(Duration::from_secs(3)).await; + sleep(3 * expiration_duration).await; - cache_had_it_tx.send(()).await.unwrap(); + cache_had_it_tx + .send((block_count, normal_sync_duration)) + .await + .unwrap(); finish_cache_rx.recv().await; }); env.actor("reader", async move { let network = actor::create_network(Proto::Tcp).await; - let repo = actor::create_repo_with_mode(DEFAULT_REPO, AccessMode::Read).await; + let repo = actor::create_repo_with_mode(DEFAULT_REPO, AccessMode::Blind).await; let _reg = network.register(repo.handle()).await; - cache_had_it_rx.recv().await; + // Use `start` to measure how long it took to sync the data from the expired cache. + let (block_count, normal_sync_duration) = cache_had_it_rx.recv().await.unwrap(); + let start = SystemTime::now(); network.add_user_provided_peer(&actor::lookup_addr("cache").await); - common::expect_entry_exists(&repo, "test.txt", EntryType::File).await; - common::expect_file_content(&repo, "test.txt", b"test content").await; + wait_for_block_count(&repo, block_count).await; + + let expired_sync_duration = SystemTime::now().duration_since(start).unwrap(); + + // At time of writing this test, syncing of the expired blocks takes about 2.6 times longer + // than normal sync. This is perhaps understandable because the 'cache' node needs to + // download the blocks from 'origin' again and there is also some traffic caused by the + // `reader` node requesting blocks from the `cache` node and it responding with "not found" + // at first. + // + // Here we check that the expired sync is less than 3.5 times the normal sync (as opposed to + // the above ratio 2.6), that's because when this test runs with other tests, the actual + // speed value is less predictable. + assert!( + (expired_sync_duration.as_millis() as f64) + < (3.5 * normal_sync_duration.as_millis() as f64), + "Sync of expired blocks is more than 3.5x higher than normal sync \ + (normal:{normal_sync_duration:?}, expired:{expired_sync_duration:?}, ratio:{})", + expired_sync_duration.as_millis() as f64 / normal_sync_duration.as_millis() as f64, + ); finish_origin_tx.send(()).await.unwrap(); finish_cache_tx.send(()).await.unwrap();