From 3d878f9018d2fe619f9b2e4b65552c4f67642275 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Thu, 31 Aug 2023 13:26:56 +0200 Subject: [PATCH 1/8] redownload_expired_blocks test now checks sync time It currently takes too long and thus the test fails. --- lib/src/store/block_expiration_tracker.rs | 14 ++++-- lib/tests/sync.rs | 60 ++++++++++++++++------- 2 files changed, 52 insertions(+), 22 deletions(-) diff --git a/lib/src/store/block_expiration_tracker.rs b/lib/src/store/block_expiration_tracker.rs index f63f7edc0..f83487ca9 100644 --- a/lib/src/store/block_expiration_tracker.rs +++ b/lib/src/store/block_expiration_tracker.rs @@ -268,13 +268,17 @@ async fn run_task( .execute(&mut tx) .await?; - if update_result.rows_affected() > 0 { - sqlx::query("DELETE FROM blocks WHERE id = ?") - .bind(&block) - .execute(&mut tx) - .await?; + if update_result.rows_affected() == 0 { + return Ok(()); } + sqlx::query("DELETE FROM blocks WHERE id = ?") + .bind(&block) + .execute(&mut tx) + .await?; + + tracing::warn!("Block {block:?} has expired"); + // We need to remove the block from `shared` here while the database is locked for writing. // If we did it after the commit, it could happen that after the commit but between the // removal someone re-adds the block into the database. That would result in there being a diff --git a/lib/tests/sync.rs b/lib/tests/sync.rs index b043bd414..a4b1714dc 100644 --- a/lib/tests/sync.rs +++ b/lib/tests/sync.rs @@ -976,25 +976,33 @@ fn content_stays_available_during_sync() { #[test] fn redownload_expired_blocks() { + use std::time::SystemTime; + let mut env = Env::new(); let (origin_has_it_tx, mut origin_has_it_rx) = mpsc::channel(1); let (cache_had_it_tx, mut cache_had_it_rx) = mpsc::channel(1); let (finish_origin_tx, mut finish_origin_rx) = mpsc::channel(1); let (finish_cache_tx, mut finish_cache_rx) = mpsc::channel(1); - env.actor("origin", async move { - let (_network, repo, _reg) = actor::setup().await; + let test_content = Arc::new(common::random_content(2 * 1024 * 1024)); - // Create a file - let mut file = repo.create_file("test.txt").await.unwrap(); - file.write_all(b"test content").await.unwrap(); - file.flush().await.unwrap(); + env.actor("origin", { + let test_content = test_content.clone(); + async move { + let (_network, repo, _reg) = actor::setup().await; - let block_count = repo.count_blocks().await.unwrap(); + // Create a file + let mut file = repo.create_file("test.txt").await.unwrap(); + //file.write_all(b"test content").await.unwrap(); + file.write_all(&test_content).await.unwrap(); + file.flush().await.unwrap(); + + let block_count = repo.count_blocks().await.unwrap(); - origin_has_it_tx.send(block_count).await.unwrap(); + origin_has_it_tx.send(block_count).await.unwrap(); - finish_origin_rx.recv().await; + finish_origin_rx.recv().await; + } }); env.actor("cache", async move { @@ -1002,12 +1010,10 @@ fn redownload_expired_blocks() { let repo = actor::create_repo_with_mode(DEFAULT_REPO, AccessMode::Blind).await; let _reg = network.register(repo.handle()).await; - repo.set_block_expiration(Some(Duration::from_secs(1))) - .await - .unwrap(); - let block_count = origin_has_it_rx.recv().await.unwrap(); + let start = SystemTime::now(); + network.add_user_provided_peer(&actor::lookup_addr("origin").await); // Wait to have the blocks, this is a blind replica, so we can't check the file exists and @@ -1018,11 +1024,22 @@ fn redownload_expired_blocks() { }) .await; + let normal_sync_duration = SystemTime::now().duration_since(start).unwrap(); + + let expiration_duration = Duration::from_secs(5); + + repo.set_block_expiration(Some(expiration_duration)) + .await + .unwrap(); + // Wait for the blocks to expire. TODO: We could also wait for the block count to drop to // zero, but currently the expiration tracker does not trigger a repo change. - sleep(Duration::from_secs(3)).await; + sleep(3 * expiration_duration).await; - cache_had_it_tx.send(()).await.unwrap(); + cache_had_it_tx + .send((normal_sync_duration, SystemTime::now())) + .await + .unwrap(); finish_cache_rx.recv().await; }); @@ -1032,12 +1049,21 @@ fn redownload_expired_blocks() { let repo = actor::create_repo_with_mode(DEFAULT_REPO, AccessMode::Read).await; let _reg = network.register(repo.handle()).await; - cache_had_it_rx.recv().await; + // Use `start` to measure how long it took to sync the data from the expired cache. + let (normal_sync_duration, start) = cache_had_it_rx.recv().await.unwrap(); network.add_user_provided_peer(&actor::lookup_addr("cache").await); common::expect_entry_exists(&repo, "test.txt", EntryType::File).await; - common::expect_file_content(&repo, "test.txt", b"test content").await; + common::expect_file_content(&repo, "test.txt", &test_content).await; + + let expired_sync_duration = SystemTime::now().duration_since(start).unwrap(); + + assert!( + expired_sync_duration < 3 * normal_sync_duration, + "Sync of expired blocks is more than 3x higher than normal sync \ + (normal:{normal_sync_duration:?}, expired:{expired_sync_duration:?})" + ); finish_origin_tx.send(()).await.unwrap(); finish_cache_tx.send(()).await.unwrap(); From 0e6b1c3145667578e57677c92f5cdbd57f396052 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Tue, 5 Sep 2023 14:56:41 +0200 Subject: [PATCH 2/8] Add `broadcast_hash_set` channel --- lib/src/sync.rs | 110 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/lib/src/sync.rs b/lib/src/sync.rs index a7d5a7cbb..ff49d8b02 100644 --- a/lib/src/sync.rs +++ b/lib/src/sync.rs @@ -72,6 +72,7 @@ pub mod broadcast { pub(crate) mod uninitialized_watch { use futures_util::{stream, Stream}; use tokio::sync::watch as w; + pub use w::error::RecvError; pub struct Sender(w::Sender>); @@ -212,3 +213,112 @@ pub(crate) mod atomic_slot { } } } + +/// Similar to `tokio::sync::broadcast` but does not enqueue the values. Instead, it "accumulates" +/// them into a set. Advantages include that one doesn't need to specify the recv buffer size and +/// the `insert` (analogue to `broadcast::send`) is non-async and never fails. +pub(crate) mod broadcast_hash_set { + use super::uninitialized_watch; + pub use super::uninitialized_watch::RecvError; + use std::{ + collections::{HashMap, HashSet}, + hash::Hash, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + }; + + #[derive(Clone)] + pub struct Sender { + shared: Arc>, + } + + impl Sender { + pub fn insert(&self, value: &T) { + let mut receivers = self.shared.receivers.lock().unwrap(); + + for (_id, receiver) in receivers.iter_mut() { + if receiver.1.insert(value.clone()) { + receiver.0.send(()).unwrap_or(()); + } + } + } + + pub fn subscribe(&self) -> Receiver { + let id = self.shared.id_generator.fetch_add(1, Ordering::Relaxed); + let (watch_tx, watch_rx) = uninitialized_watch::channel(); + + let mut receivers = self.shared.receivers.lock().unwrap(); + + receivers.insert(id, (watch_tx, HashSet::new())); + + Receiver { + id, + shared: self.shared.clone(), + watch_rx, + } + } + } + + pub struct Receiver { + id: usize, + shared: Arc>, + watch_rx: uninitialized_watch::Receiver<()>, + } + + impl Receiver { + pub async fn changed(&mut self) -> Result, RecvError> { + self.watch_rx.changed().await?; + + let mut receivers = self.shared.receivers.lock().unwrap(); + + // Unwrap is OK because the entry must exists for as long as this `Receiver` exists. + let old_set = &mut receivers.get_mut(&self.id).unwrap().1; + let mut new_set = HashSet::new(); + + if !old_set.is_empty() { + std::mem::swap(&mut new_set, old_set); + } + + Ok(new_set) + } + } + + impl Drop for Receiver { + fn drop(&mut self) { + self.shared.receivers.lock().unwrap().remove(&self.id); + } + } + + struct Shared { + id_generator: AtomicUsize, + receivers: Mutex, HashSet)>>, + } + + pub fn channel() -> (Sender, Receiver) { + let id_generator = AtomicUsize::new(0); + let mut receivers = HashMap::new(); + + let id = id_generator.fetch_add(1, Ordering::Relaxed); + let (watch_tx, watch_rx) = uninitialized_watch::channel(); + + receivers.insert(id, (watch_tx, HashSet::new())); + + let shared = Arc::new(Shared { + id_generator, + receivers: Mutex::new(receivers), + }); + + ( + Sender { + shared: shared.clone(), + }, + Receiver { + id, + shared, + watch_rx, + }, + ) + } +} From 8a2f9ac6575c506f707a57791bbc63edc59cb1bd Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Tue, 5 Sep 2023 15:09:03 +0200 Subject: [PATCH 3/8] Use replace broadcast channel in Store for broadcast_hash_set --- lib/src/network/client.rs | 8 +++++--- lib/src/store/mod.rs | 15 ++++++++------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/lib/src/network/client.rs b/lib/src/network/client.rs index 9defdc0c3..857518adc 100644 --- a/lib/src/network/client.rs +++ b/lib/src/network/client.rs @@ -108,9 +108,11 @@ impl Client { result?; break; } - branch_to_reload = reload_index_rx.recv() => { - if let Ok(branch_to_reload) = branch_to_reload { - self.reload_index(&branch_to_reload); + branches_to_reload = reload_index_rx.changed() => { + if let Ok(branches_to_reload) = branches_to_reload { + for branch_to_reload in &branches_to_reload { + self.reload_index(&branch_to_reload); + } } } } diff --git a/lib/src/store/mod.rs b/lib/src/store/mod.rs index ca90835d1..962758b37 100644 --- a/lib/src/store/mod.rs +++ b/lib/src/store/mod.rs @@ -41,6 +41,7 @@ use crate::{ MultiBlockPresence, Proof, RootNode, Summary, INNER_LAYER_COUNT, }, storage_size::StorageSize, + sync::broadcast_hash_set, }; use futures_util::{Stream, TryStreamExt}; use std::{ @@ -50,25 +51,25 @@ use std::{ time::Duration, }; // TODO: Consider creating an async `RwLock` in the `deadlock` module and use it here. -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::RwLock; /// Data store #[derive(Clone)] pub(crate) struct Store { db: db::Pool, cache: Arc, - pub client_reload_index_tx: broadcast::Sender, + pub client_reload_index_tx: broadcast_hash_set::Sender, block_expiration_tracker: Arc>>>, } impl Store { pub fn new(db: db::Pool) -> Self { + let client_reload_index_tx = broadcast_hash_set::channel().0; + Self { db, cache: Arc::new(Cache::new()), - // TODO: The broadcast channel is not the best structure for this use case. Ideally - // we'd have something like the `watch` but that can be edited until it's received. - client_reload_index_tx: broadcast::channel(1024).0, + client_reload_index_tx, block_expiration_tracker: Arc::new(RwLock::new(None)), } } @@ -240,7 +241,7 @@ impl Store { pub(crate) struct Reader { inner: Handle, cache: CacheTransaction, - client_reload_index_tx: broadcast::Sender, + client_reload_index_tx: broadcast_hash_set::Sender, block_expiration_tracker: Option>, } @@ -321,7 +322,7 @@ impl Reader { tx.commit().await?; for branch_id in branches { - self.client_reload_index_tx.send(branch_id).unwrap_or(0); + self.client_reload_index_tx.insert(&branch_id); } Ok(true) From 0bd588ba4ac0b5e0e124ecd3cb430129d7c66d1c Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Tue, 5 Sep 2023 15:18:21 +0200 Subject: [PATCH 4/8] redownload_expired_blocks test: make reader replica blind The tests is faster that way and more focused. --- lib/tests/sync.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/lib/tests/sync.rs b/lib/tests/sync.rs index a4b1714dc..f495212e3 100644 --- a/lib/tests/sync.rs +++ b/lib/tests/sync.rs @@ -986,6 +986,14 @@ fn redownload_expired_blocks() { let test_content = Arc::new(common::random_content(2 * 1024 * 1024)); + async fn wait_for_block_count(repo: &Repository, block_count: u64) { + common::eventually(&repo, || { + async { repo.count_blocks().await.unwrap() == block_count } + .instrument(tracing::Span::current()) + }) + .await; + } + env.actor("origin", { let test_content = test_content.clone(); async move { @@ -993,7 +1001,6 @@ fn redownload_expired_blocks() { // Create a file let mut file = repo.create_file("test.txt").await.unwrap(); - //file.write_all(b"test content").await.unwrap(); file.write_all(&test_content).await.unwrap(); file.flush().await.unwrap(); @@ -1013,16 +1020,11 @@ fn redownload_expired_blocks() { let block_count = origin_has_it_rx.recv().await.unwrap(); let start = SystemTime::now(); - network.add_user_provided_peer(&actor::lookup_addr("origin").await); // Wait to have the blocks, this is a blind replica, so we can't check the file exists and // has the required content. - common::eventually(&repo, || { - async { repo.count_blocks().await.unwrap() == block_count } - .instrument(tracing::Span::current()) - }) - .await; + wait_for_block_count(&repo, block_count).await; let normal_sync_duration = SystemTime::now().duration_since(start).unwrap(); @@ -1037,7 +1039,7 @@ fn redownload_expired_blocks() { sleep(3 * expiration_duration).await; cache_had_it_tx - .send((normal_sync_duration, SystemTime::now())) + .send((block_count, normal_sync_duration)) .await .unwrap(); @@ -1046,16 +1048,16 @@ fn redownload_expired_blocks() { env.actor("reader", async move { let network = actor::create_network(Proto::Tcp).await; - let repo = actor::create_repo_with_mode(DEFAULT_REPO, AccessMode::Read).await; + let repo = actor::create_repo_with_mode(DEFAULT_REPO, AccessMode::Blind).await; let _reg = network.register(repo.handle()).await; // Use `start` to measure how long it took to sync the data from the expired cache. - let (normal_sync_duration, start) = cache_had_it_rx.recv().await.unwrap(); + let (block_count, normal_sync_duration) = cache_had_it_rx.recv().await.unwrap(); + let start = SystemTime::now(); network.add_user_provided_peer(&actor::lookup_addr("cache").await); - common::expect_entry_exists(&repo, "test.txt", EntryType::File).await; - common::expect_file_content(&repo, "test.txt", &test_content).await; + wait_for_block_count(&repo, block_count).await; let expired_sync_duration = SystemTime::now().duration_since(start).unwrap(); From b5c97dbc44322826ae83420ec3aee6cf994d928b Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Wed, 6 Sep 2023 09:51:59 +0200 Subject: [PATCH 5/8] Throttle root messages sent by the server --- lib/Cargo.toml | 1 + lib/src/network/server.rs | 63 ++++++--- lib/src/repository/mod.rs | 18 ++- lib/src/sync.rs | 280 ++++++++++++++++++++++++++++++-------- lib/tests/sync.rs | 17 ++- 5 files changed, 293 insertions(+), 86 deletions(-) diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 69a365f2f..0bf54685b 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -44,6 +44,7 @@ noise-rust-crypto = { version = "0.5.0", default-features = false, features = [" num_enum = { workspace = true } once_cell = { workspace = true } parse-size = { version = "1.0.0", features = ["std"] } +pin-project-lite = "0.2.9" rand = { workspace = true } ref-cast = "1.0.14" rupnp = { version = "1.1.0", default-features = false, features = [] } diff --git a/lib/src/network/server.rs b/lib/src/network/server.rs index e39c5b181..1b6d891cc 100644 --- a/lib/src/network/server.rs +++ b/lib/src/network/server.rs @@ -1,3 +1,5 @@ +use std::{pin::pin, time::Duration}; + use super::{ debug_payload::{DebugRequestPayload, DebugResponsePayload}, message::{Content, Request, Response, ResponseDisambiguator}, @@ -5,15 +7,22 @@ use super::{ use crate::{ crypto::{sign::PublicKey, Hash}, error::{Error, Result}, - event::Payload, + event::{Event, Payload}, protocol::{BlockContent, BlockId, RootNode}, repository::Vault, store, + sync::stream::RateLimit, +}; +use futures_util::{ + stream::{self, FuturesUnordered}, + Stream, StreamExt, TryStreamExt, }; -use futures_util::{stream::FuturesUnordered, StreamExt, TryStreamExt}; use tokio::{ select, - sync::{broadcast::error::RecvError, mpsc}, + sync::{ + broadcast::{self, error::RecvError}, + mpsc, + }, }; use tracing::instrument; @@ -242,23 +251,17 @@ impl<'a> Monitor<'a> { } async fn run(self) -> Result<()> { - let mut subscription = self.vault.event_tx.subscribe(); - - // send initial branches self.handle_all_branches_changed().await?; - loop { - match subscription.recv().await.map(|event| event.payload) { - Ok( - Payload::BranchChanged(branch_id) | Payload::BlockReceived { branch_id, .. }, - ) => self.handle_branch_changed(branch_id).await?, - Ok(Payload::MaintenanceCompleted) => continue, - Err(RecvError::Lagged(_)) => { - tracing::warn!("event receiver lagged"); - self.handle_all_branches_changed().await? - } - - Err(RecvError::Closed) => break, + let mut events = pin!(RateLimit::throttle( + events(self.vault.event_tx.subscribe()), + Duration::from_secs(1) + )); + + while let Some(event) = events.next().await { + match event { + BranchChanged::One(branch_id) => self.handle_branch_changed(branch_id).await?, + BranchChanged::All => self.handle_all_branches_changed().await?, } } @@ -348,3 +351,27 @@ impl Sender { self.0.send(Content::Response(response)).await.is_ok() } } + +fn events(rx: broadcast::Receiver) -> impl Stream { + stream::unfold(rx, |mut rx| async move { + loop { + match rx.recv().await { + Ok(Event { payload, .. }) => match payload { + Payload::BranchChanged(branch_id) + | Payload::BlockReceived { branch_id, .. } => { + return Some((BranchChanged::One(branch_id), rx)) + } + Payload::MaintenanceCompleted => continue, + }, + Err(RecvError::Lagged(_)) => return Some((BranchChanged::All, rx)), + Err(RecvError::Closed) => return None, + } + } + }) +} + +#[derive(Eq, PartialEq, Hash)] +enum BranchChanged { + One(PublicKey), + All, +} diff --git a/lib/src/repository/mod.rs b/lib/src/repository/mod.rs index 49f22a6e8..fec0ea045 100644 --- a/lib/src/repository/mod.rs +++ b/lib/src/repository/mod.rs @@ -44,12 +44,13 @@ use crate::{ state_monitor::StateMonitor, storage_size::StorageSize, store, - sync::broadcast::ThrottleReceiver, + sync::stream::RateLimit, }; use camino::Utf8Path; use futures_util::{future, TryStreamExt}; +use futures_util::{stream, StreamExt}; use scoped_task::ScopedJoinHandle; -use std::{io, path::Path, sync::Arc}; +use std::{io, path::Path, pin::pin, sync::Arc}; use tokio::{ fs, sync::broadcast::{self, error::RecvError}, @@ -858,14 +859,17 @@ async fn generate_and_store_writer_id( async fn report_sync_progress(vault: Vault) { let mut prev_progress = Progress { value: 0, total: 0 }; - let mut event_rx = ThrottleReceiver::new(vault.event_tx.subscribe(), Duration::from_secs(1)); - loop { - match event_rx.recv().await { - Ok(_) | Err(RecvError::Lagged(_)) => (), - Err(RecvError::Closed) => break, + let events = stream::unfold(vault.event_tx.subscribe(), |mut rx| async move { + match rx.recv().await { + Ok(_) | Err(RecvError::Lagged(_)) => Some(((), rx)), + Err(RecvError::Closed) => None, } + }); + let events = RateLimit::throttle(events, Duration::from_secs(1)); + let mut events = pin!(events); + while events.next().await.is_some() { let next_progress = match vault.store().sync_progress().await { Ok(progress) => progress, Err(error) => { diff --git a/lib/src/sync.rs b/lib/src/sync.rs index ff49d8b02..c9462e358 100644 --- a/lib/src/sync.rs +++ b/lib/src/sync.rs @@ -9,64 +9,6 @@ use std::{ use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; -/// MPMC broadcast channel -pub mod broadcast { - use tokio::{ - select, - sync::broadcast, - time::{self, Duration, Instant}, - }; - - /// Adapter for `Receiver` which limits the rate at which messages are received. The messages - /// are not buffered - if the rate limit is exceeded, all but the last message are discarded. - pub(crate) struct ThrottleReceiver { - rx: broadcast::Receiver, - interval: Duration, - last_recv: Instant, - } - - impl ThrottleReceiver - where - T: Clone, - { - pub fn new(inner: broadcast::Receiver, interval: Duration) -> Self { - Self { - rx: inner, - interval, - last_recv: Instant::now() - .checked_sub(interval) - .expect("Interval should be smaller than the time since epoch"), - } - } - - pub async fn recv(&mut self) -> Result { - let mut item = None; - let end = self.last_recv + self.interval; - - if Instant::now() < end { - loop { - select! { - _ = time::sleep_until(end) => break, - result = self.rx.recv() => { - item = Some(result?); - } - } - } - } - - let item = if let Some(item) = item { - item - } else { - self.rx.recv().await? - }; - - self.last_recv = Instant::now(); - - Ok(item) - } - } -} - /// Similar to tokio::sync::watch, but has no initial value. Because there is no initial value the /// API must be sligthly different. In particular, we don't have the `borrow` function. pub(crate) mod uninitialized_watch { @@ -322,3 +264,225 @@ pub(crate) mod broadcast_hash_set { ) } } + +pub(crate) mod stream { + use futures_util::{stream::Fuse, Stream, StreamExt}; + use indexmap::IndexSet; + use pin_project_lite::pin_project; + use std::{ + future::Future, + hash::Hash, + pin::Pin, + task::{ready, Context, Poll}, + }; + use tokio::time::{self, Duration, Sleep}; + + pin_project! { + /// Rate-limitting stream adapter. + /// + /// ```ignore + /// in: |a|a|a|a|a| | | | | |a|a|a|a| | | | + /// debounce: | | | | | | |a| | | | | | | | |a| | + /// throttle: |a| |a| |a| | | | | |a| |a| |a| | | + /// ``` + /// + /// Multiple occurences of the same item within the rate-limit period are reduced to a + /// single one but distinct items are preserved: + /// + /// ```ignore + /// in: |a|b|a|b| | | | | + /// debounced: | | | | | |a|b| | + /// throttle: |a| |a|b| | | | | + /// ``` + pub struct RateLimit + where + S: Stream, + { + #[pin] + inner: Fuse, + strategy: RateLimitStrategy, + period: Duration, + items: IndexSet, + #[pin] + sleep: Option, + } + } + + impl RateLimit + where + S: Stream, + { + pub fn new(inner: S, strategy: RateLimitStrategy, period: Duration) -> Self { + Self { + inner: inner.fuse(), + strategy, + period, + items: IndexSet::new(), + sleep: None, + } + } + + pub fn debounce(inner: S, period: Duration) -> Self { + Self::new(inner, RateLimitStrategy::Debounce, period) + } + + pub fn throttle(inner: S, period: Duration) -> Self { + Self::new(inner, RateLimitStrategy::Throttle, period) + } + } + + impl Stream for RateLimit + where + S: Stream, + S::Item: Hash + Eq, + { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + loop { + if this.sleep.is_none() { + if let Some(item) = this.items.pop() { + return Poll::Ready(Some(item)); + } + } + + match this.inner.as_mut().poll_next(cx) { + Poll::Ready(Some(item)) => match this.strategy { + RateLimitStrategy::Debounce => { + this.sleep.set(Some(time::sleep(*this.period))); + this.items.insert(item); + } + RateLimitStrategy::Throttle => { + if this.sleep.is_none() { + this.sleep.set(Some(time::sleep(*this.period))); + return Poll::Ready(Some(item)); + } else { + this.items.insert(item); + } + } + }, + Poll::Ready(None) => { + if this.sleep.is_some() { + this.sleep.set(None); + this.items.reverse(); // keep the original order + continue; + } else { + return Poll::Ready(None); + } + } + Poll::Pending => (), + } + + if let Some(sleep) = this.sleep.as_mut().as_pin_mut() { + ready!(sleep.poll(cx)); + this.sleep.set(None); + this.items.reverse(); // keep the original order + } else { + return Poll::Pending; + } + } + } + } + + #[derive(Copy, Clone, Debug)] + pub enum RateLimitStrategy { + Debounce, + Throttle, + } + + #[cfg(test)] + mod tests { + use super::*; + use futures_util::{future, stream, StreamExt}; + use std::fmt::Debug; + use tokio::{sync::mpsc, time::Instant}; + + #[tokio::test(start_paused = true)] + async fn rate_limit_equal_items() { + let input = [ + (ms(0), 0), + (ms(100), 0), + (ms(100), 0), + (ms(1500), 0), + (ms(0), 0), + ]; + + // unlimitted + let (tx, rx) = mpsc::channel(1); + let expected = [ + (0, ms(0)), + (0, ms(100)), + (0, ms(200)), + (0, ms(1700)), + (0, ms(1700)), + ]; + future::join(produce(tx, input), verify(into_stream(rx), expected)).await; + + // debounced + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(1200)), (0, ms(2700))]; + future::join( + produce(tx, input), + verify(RateLimit::debounce(into_stream(rx), ms(1000)), expected), + ) + .await; + + // FIXME: + // // throttled + // let (tx, rx) = mpsc::channel(1); + // let expected = [(0, ms(0)), (0, ms(1000)), (0, ms(1700)), (0, ms(2700))]; + // future::join( + // produce(tx, input), + // verify(RateLimit::throttle(into_stream(rx), ms(1000)), expected), + // ) + // .await; + } + + #[tokio::test(start_paused = true)] + async fn rate_limit_inequal_items() { + let input = [(ms(0), 0), (ms(0), 1), (ms(0), 0), (ms(0), 1)]; + + // unlimitted + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (1, ms(0)), (0, ms(0)), (1, ms(0))]; + future::join(produce(tx, input), verify(into_stream(rx), expected)).await; + + // debounced + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(1000)), (1, ms(1000))]; + future::join( + produce(tx, input), + verify(RateLimit::debounce(into_stream(rx), ms(1000)), expected), + ) + .await; + } + + fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) + } + + fn into_stream(rx: mpsc::Receiver) -> impl Stream { + stream::unfold(rx, |mut rx| async move { Some((rx.recv().await?, rx)) }) + } + + async fn produce(tx: mpsc::Sender, input: impl IntoIterator) { + for (delay, item) in input { + time::sleep(delay).await; + tx.send(item).await.ok().unwrap(); + } + } + + async fn verify( + stream: impl Stream, + expected: impl IntoIterator, + ) { + let start = Instant::now(); + let actual: Vec<_> = stream.map(|item| (item, start.elapsed())).collect().await; + let expected: Vec<_> = expected.into_iter().collect(); + + assert_eq!(actual, expected); + } + } +} diff --git a/lib/tests/sync.rs b/lib/tests/sync.rs index f495212e3..d1ba1d521 100644 --- a/lib/tests/sync.rs +++ b/lib/tests/sync.rs @@ -1061,10 +1061,21 @@ fn redownload_expired_blocks() { let expired_sync_duration = SystemTime::now().duration_since(start).unwrap(); + // At time of writing this test, syncing of the expired blocks takes about 2.6 times longer + // than normal sync. This is perhaps understandable because the 'cache' node needs to + // download the blocks from 'origin' again and there is also some traffic caused by the + // `reader` node requesting blocks from the `cache` node and it responding with "not found" + // at first. + // + // Here we check that the expired sync is less than 3.5 times the normal sync (as opposed to + // the above ratio 2.6), that's because when this test runs with other tests, the actual + // speed value is less predictable. assert!( - expired_sync_duration < 3 * normal_sync_duration, - "Sync of expired blocks is more than 3x higher than normal sync \ - (normal:{normal_sync_duration:?}, expired:{expired_sync_duration:?})" + (expired_sync_duration.as_millis() as f64) + < (3.5 * normal_sync_duration.as_millis() as f64), + "Sync of expired blocks is more than 3.5x higher than normal sync \ + (normal:{normal_sync_duration:?}, expired:{expired_sync_duration:?}, ratio:{})", + expired_sync_duration.as_millis() as f64 / normal_sync_duration.as_millis() as f64, ); finish_origin_tx.send(()).await.unwrap(); From 106e99db8db637794add3262fb64444f6dd4c120 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Wed, 6 Sep 2023 10:12:18 +0200 Subject: [PATCH 6/8] Remove the Debounce rate limiting strategy it was not used --- lib/src/network/server.rs | 4 +-- lib/src/repository/mod.rs | 4 +-- lib/src/sync.rs | 62 +++++++-------------------------------- 3 files changed, 14 insertions(+), 56 deletions(-) diff --git a/lib/src/network/server.rs b/lib/src/network/server.rs index 1b6d891cc..056629d45 100644 --- a/lib/src/network/server.rs +++ b/lib/src/network/server.rs @@ -11,7 +11,7 @@ use crate::{ protocol::{BlockContent, BlockId, RootNode}, repository::Vault, store, - sync::stream::RateLimit, + sync::stream::Throttle, }; use futures_util::{ stream::{self, FuturesUnordered}, @@ -253,7 +253,7 @@ impl<'a> Monitor<'a> { async fn run(self) -> Result<()> { self.handle_all_branches_changed().await?; - let mut events = pin!(RateLimit::throttle( + let mut events = pin!(Throttle::new( events(self.vault.event_tx.subscribe()), Duration::from_secs(1) )); diff --git a/lib/src/repository/mod.rs b/lib/src/repository/mod.rs index fec0ea045..420e5b90c 100644 --- a/lib/src/repository/mod.rs +++ b/lib/src/repository/mod.rs @@ -44,7 +44,7 @@ use crate::{ state_monitor::StateMonitor, storage_size::StorageSize, store, - sync::stream::RateLimit, + sync::stream::Throttle, }; use camino::Utf8Path; use futures_util::{future, TryStreamExt}; @@ -866,7 +866,7 @@ async fn report_sync_progress(vault: Vault) { Err(RecvError::Closed) => None, } }); - let events = RateLimit::throttle(events, Duration::from_secs(1)); + let events = Throttle::new(events, Duration::from_secs(1)); let mut events = pin!(events); while events.next().await.is_some() { diff --git a/lib/src/sync.rs b/lib/src/sync.rs index c9462e358..333dfd9e1 100644 --- a/lib/src/sync.rs +++ b/lib/src/sync.rs @@ -282,7 +282,6 @@ pub(crate) mod stream { /// /// ```ignore /// in: |a|a|a|a|a| | | | | |a|a|a|a| | | | - /// debounce: | | | | | | |a| | | | | | | | |a| | /// throttle: |a| |a| |a| | | | | |a| |a| |a| | | /// ``` /// @@ -291,16 +290,14 @@ pub(crate) mod stream { /// /// ```ignore /// in: |a|b|a|b| | | | | - /// debounced: | | | | | |a|b| | /// throttle: |a| |a|b| | | | | /// ``` - pub struct RateLimit + pub struct Throttle where S: Stream, { #[pin] inner: Fuse, - strategy: RateLimitStrategy, period: Duration, items: IndexSet, #[pin] @@ -308,30 +305,21 @@ pub(crate) mod stream { } } - impl RateLimit + impl Throttle where S: Stream, { - pub fn new(inner: S, strategy: RateLimitStrategy, period: Duration) -> Self { + pub fn new(inner: S, period: Duration) -> Self { Self { inner: inner.fuse(), - strategy, period, items: IndexSet::new(), sleep: None, } } - - pub fn debounce(inner: S, period: Duration) -> Self { - Self::new(inner, RateLimitStrategy::Debounce, period) - } - - pub fn throttle(inner: S, period: Duration) -> Self { - Self::new(inner, RateLimitStrategy::Throttle, period) - } } - impl Stream for RateLimit + impl Stream for Throttle where S: Stream, S::Item: Hash + Eq, @@ -349,20 +337,14 @@ pub(crate) mod stream { } match this.inner.as_mut().poll_next(cx) { - Poll::Ready(Some(item)) => match this.strategy { - RateLimitStrategy::Debounce => { + Poll::Ready(Some(item)) => { + if this.sleep.is_none() { this.sleep.set(Some(time::sleep(*this.period))); + return Poll::Ready(Some(item)); + } else { this.items.insert(item); } - RateLimitStrategy::Throttle => { - if this.sleep.is_none() { - this.sleep.set(Some(time::sleep(*this.period))); - return Poll::Ready(Some(item)); - } else { - this.items.insert(item); - } - } - }, + } Poll::Ready(None) => { if this.sleep.is_some() { this.sleep.set(None); @@ -386,12 +368,6 @@ pub(crate) mod stream { } } - #[derive(Copy, Clone, Debug)] - pub enum RateLimitStrategy { - Debounce, - Throttle, - } - #[cfg(test)] mod tests { use super::*; @@ -420,22 +396,13 @@ pub(crate) mod stream { ]; future::join(produce(tx, input), verify(into_stream(rx), expected)).await; - // debounced - let (tx, rx) = mpsc::channel(1); - let expected = [(0, ms(1200)), (0, ms(2700))]; - future::join( - produce(tx, input), - verify(RateLimit::debounce(into_stream(rx), ms(1000)), expected), - ) - .await; - // FIXME: // // throttled // let (tx, rx) = mpsc::channel(1); // let expected = [(0, ms(0)), (0, ms(1000)), (0, ms(1700)), (0, ms(2700))]; // future::join( // produce(tx, input), - // verify(RateLimit::throttle(into_stream(rx), ms(1000)), expected), + // verify(Throttle::throttle(into_stream(rx), ms(1000)), expected), // ) // .await; } @@ -448,15 +415,6 @@ pub(crate) mod stream { let (tx, rx) = mpsc::channel(1); let expected = [(0, ms(0)), (1, ms(0)), (0, ms(0)), (1, ms(0))]; future::join(produce(tx, input), verify(into_stream(rx), expected)).await; - - // debounced - let (tx, rx) = mpsc::channel(1); - let expected = [(0, ms(1000)), (1, ms(1000))]; - future::join( - produce(tx, input), - verify(RateLimit::debounce(into_stream(rx), ms(1000)), expected), - ) - .await; } fn ms(ms: u64) -> Duration { From 580eeb90134f2c3b1509f95d891623c0f7755084 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Thu, 7 Sep 2023 12:47:12 +0200 Subject: [PATCH 7/8] Refactor sync::stream::Throttle and adds tests --- lib/Cargo.toml | 2 +- lib/src/network/server.rs | 2 +- lib/src/sync.rs | 222 +++++++++++++++++++++++++++++--------- 3 files changed, 176 insertions(+), 50 deletions(-) diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 0bf54685b..4d44bb4dc 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -44,7 +44,7 @@ noise-rust-crypto = { version = "0.5.0", default-features = false, features = [" num_enum = { workspace = true } once_cell = { workspace = true } parse-size = { version = "1.0.0", features = ["std"] } -pin-project-lite = "0.2.9" +pin-project-lite = "0.2.13" rand = { workspace = true } ref-cast = "1.0.14" rupnp = { version = "1.1.0", default-features = false, features = [] } diff --git a/lib/src/network/server.rs b/lib/src/network/server.rs index 056629d45..2731a1e1a 100644 --- a/lib/src/network/server.rs +++ b/lib/src/network/server.rs @@ -370,7 +370,7 @@ fn events(rx: broadcast::Receiver) -> impl Stream { }) } -#[derive(Eq, PartialEq, Hash)] +#[derive(Eq, PartialEq, Clone, Debug, Hash)] enum BranchChanged { One(PublicKey), All, diff --git a/lib/src/sync.rs b/lib/src/sync.rs index 333dfd9e1..eb74ea97c 100644 --- a/lib/src/sync.rs +++ b/lib/src/sync.rs @@ -267,15 +267,23 @@ pub(crate) mod broadcast_hash_set { pub(crate) mod stream { use futures_util::{stream::Fuse, Stream, StreamExt}; - use indexmap::IndexSet; use pin_project_lite::pin_project; use std::{ + collections::{hash_map, BTreeMap, HashMap}, + fmt::Debug, future::Future, hash::Hash, pin::Pin, task::{ready, Context, Poll}, }; - use tokio::time::{self, Duration, Sleep}; + use tokio::time::{self, Duration, Instant, Sleep}; + + type EntryId = u64; + + struct Delay { + until: Instant, + next: Option, + } pin_project! { /// Rate-limitting stream adapter. @@ -295,74 +303,145 @@ pub(crate) mod stream { pub struct Throttle where S: Stream, + S::Item: Hash, { #[pin] inner: Fuse, period: Duration, - items: IndexSet, + ready: BTreeMap, + delays: HashMap, #[pin] sleep: Option, + next_id: EntryId, } } impl Throttle where S: Stream, + S::Item: Eq + Hash, { pub fn new(inner: S, period: Duration) -> Self { Self { inner: inner.fuse(), period, - items: IndexSet::new(), + ready: BTreeMap::new(), + delays: HashMap::new(), sleep: None, + next_id: 0, + } + } + + fn is_ready(ready: &BTreeMap, item: &S::Item) -> bool { + for v in ready.values() { + if v == item { + return true; + } } + false } } impl Stream for Throttle where S: Stream, - S::Item: Hash + Eq, + S::Item: Hash + Eq + Clone + Debug, { type Item = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); + let now = Instant::now(); + let mut inner_is_finished = false; + // Take entries from `inner` into `items` ASAP so we can timestamp them. loop { - if this.sleep.is_none() { - if let Some(item) = this.items.pop() { - return Poll::Ready(Some(item)); - } - } - match this.inner.as_mut().poll_next(cx) { Poll::Ready(Some(item)) => { - if this.sleep.is_none() { - this.sleep.set(Some(time::sleep(*this.period))); - return Poll::Ready(Some(item)); - } else { - this.items.insert(item); + let is_ready = Self::is_ready(&this.ready, &item); + + match this.delays.entry(item.clone()) { + hash_map::Entry::Occupied(mut entry) => { + if !is_ready { + let delay = entry.get_mut(); + + if delay.next.is_none() { + let entry_id = *this.next_id; + *this.next_id += 1; + delay.next = Some(entry_id); + } + } + } + hash_map::Entry::Vacant(entry) => { + let entry_id = *this.next_id; + *this.next_id += 1; + + if !is_ready { + this.ready.insert(entry_id, item.clone()); + } + + entry.insert(Delay { + until: now + *this.period, + next: None, + }); + } } } Poll::Ready(None) => { - if this.sleep.is_some() { - this.sleep.set(None); - this.items.reverse(); // keep the original order - continue; - } else { - return Poll::Ready(None); - } + inner_is_finished = true; + break; } - Poll::Pending => (), + Poll::Pending => { + break; + } + } + } + + loop { + if let Some(first_entry) = this.ready.first_entry() { + return Poll::Ready(Some(first_entry.remove())); } if let Some(sleep) = this.sleep.as_mut().as_pin_mut() { ready!(sleep.poll(cx)); this.sleep.set(None); - this.items.reverse(); // keep the original order + } + + let mut first: Option<(&S::Item, &mut Delay)> = None; + + for (item, delay) in this.delays.iter_mut() { + if let Some((_, first_delay)) = &first { + if (delay.until, delay.next) < (first_delay.until, first_delay.next) { + first = Some((item, delay)); + } + } else { + first = Some((item, delay)); + } + } + + let (first_item, first_delay) = match &mut first { + Some(first) => (&first.0, &mut first.1), + None => { + return if inner_is_finished { + Poll::Ready(None) + } else { + Poll::Pending + } + } + }; + + if first_delay.until <= now { + let first_item = (*first_item).clone(); + + if first_delay.next.is_some() { + first_delay.until = now + *this.period; + first_delay.next = None; + return Poll::Ready(Some(first_item)); + } else { + this.delays.remove(&first_item); + } } else { - return Poll::Pending; + this.sleep.set(Some(time::sleep_until(first_delay.until))); } } } @@ -377,6 +456,39 @@ pub(crate) mod stream { #[tokio::test(start_paused = true)] async fn rate_limit_equal_items() { + let input = [(ms(0), 0), (ms(0), 0)]; + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (0, ms(1000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + + //-------------------------------------------------------- + + let input = [(ms(0), 0), (ms(100), 0)]; + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (0, ms(1000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + + //-------------------------------------------------------- + + let input = [(ms(0), 0), (ms(0), 0), (ms(1001), 0)]; + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (0, ms(1000)), (0, ms(2000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + + //-------------------------------------------------------- + let input = [ (ms(0), 0), (ms(100), 0), @@ -385,36 +497,50 @@ pub(crate) mod stream { (ms(0), 0), ]; - // unlimitted let (tx, rx) = mpsc::channel(1); - let expected = [ - (0, ms(0)), - (0, ms(100)), - (0, ms(200)), - (0, ms(1700)), - (0, ms(1700)), - ]; - future::join(produce(tx, input), verify(into_stream(rx), expected)).await; - - // FIXME: - // // throttled - // let (tx, rx) = mpsc::channel(1); - // let expected = [(0, ms(0)), (0, ms(1000)), (0, ms(1700)), (0, ms(2700))]; - // future::join( - // produce(tx, input), - // verify(Throttle::throttle(into_stream(rx), ms(1000)), expected), - // ) - // .await; + let expected = [(0, ms(0)), (0, ms(1000)), (0, ms(2000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; } #[tokio::test(start_paused = true)] async fn rate_limit_inequal_items() { + let input = [(ms(0), 0), (ms(0), 1)]; + + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (1, ms(0))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + + //-------------------------------------------------------- + let input = [(ms(0), 0), (ms(0), 1), (ms(0), 0), (ms(0), 1)]; - // unlimitted let (tx, rx) = mpsc::channel(1); - let expected = [(0, ms(0)), (1, ms(0)), (0, ms(0)), (1, ms(0))]; - future::join(produce(tx, input), verify(into_stream(rx), expected)).await; + let expected = [(0, ms(0)), (1, ms(0)), (0, ms(1000)), (1, ms(1000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; + + //-------------------------------------------------------- + + let input = [(ms(0), 0), (ms(0), 1), (ms(0), 1), (ms(0), 0)]; + + let (tx, rx) = mpsc::channel(1); + let expected = [(0, ms(0)), (1, ms(0)), (1, ms(1000)), (0, ms(1000))]; + future::join( + produce(tx, input), + verify(Throttle::new(into_stream(rx), ms(1000)), expected), + ) + .await; } fn ms(ms: u64) -> Duration { From 8ba72ea886745f5c623b07c2c25714e0ca7a77a2 Mon Sep 17 00:00:00 2001 From: Peter Jankuliak Date: Thu, 7 Sep 2023 15:20:26 +0200 Subject: [PATCH 8/8] Write all blocks in transfer_snapshot_between_two_replicas at once To make the test go faster. Especially now with throttling RootNode updates the test was running with the speed of one block per throttle period (1s), which caused the test to time out. --- lib/src/network/tests.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/src/network/tests.rs b/lib/src/network/tests.rs index 49f169e4c..05f4f51c4 100644 --- a/lib/src/network/tests.rs +++ b/lib/src/network/tests.rs @@ -109,7 +109,7 @@ async fn transfer_snapshot_between_two_replicas_case( // TODO: Make it faster and increase the cases. #[proptest(cases = 8)] fn transfer_blocks_between_two_replicas( - #[strategy(1usize..32)] block_count: usize, + #[strategy(1usize..62)] block_count: usize, #[strategy(test_utils::rng_seed_strategy())] rng_seed: u64, ) { test_utils::run(transfer_blocks_between_two_replicas_case( @@ -145,8 +145,10 @@ async fn transfer_blocks_between_two_replicas_case(block_count: usize, rng_seed: a_vault.receive_block(block, Some(promise)).await.unwrap(); tracing::info!(?id, "write block"); + } - // Then wait until replica B receives and writes it too. + // Then wait until replica B receives and writes it too. + for (id, _block) in snapshot.blocks() { wait_until_block_exists(&b_vault, id).await; } };