Skip to content

Commit

Permalink
Merge branch 'expiration-speedup'
Browse files Browse the repository at this point in the history
  • Loading branch information
inetic committed Sep 7, 2023
2 parents 2c0b1d5 + 8ba72ea commit a36180c
Show file tree
Hide file tree
Showing 9 changed files with 562 additions and 124 deletions.
1 change: 1 addition & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ noise-rust-crypto = { version = "0.5.0", default-features = false, features = ["
num_enum = { workspace = true }
once_cell = { workspace = true }
parse-size = { version = "1.0.0", features = ["std"] }
pin-project-lite = "0.2.13"
rand = { workspace = true }
ref-cast = "1.0.14"
rupnp = { version = "1.1.0", default-features = false, features = [] }
Expand Down
8 changes: 5 additions & 3 deletions lib/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ impl Client {
result?;
break;
}
branch_to_reload = reload_index_rx.recv() => {
if let Ok(branch_to_reload) = branch_to_reload {
self.reload_index(&branch_to_reload);
branches_to_reload = reload_index_rx.changed() => {
if let Ok(branches_to_reload) = branches_to_reload {
for branch_to_reload in &branches_to_reload {
self.reload_index(&branch_to_reload);

Check warning on line 114 in lib/src/network/client.rs

View workflow job for this annotation

GitHub Actions / check (android)

this expression creates a reference which is immediately dereferenced by the compiler

Check warning on line 114 in lib/src/network/client.rs

View workflow job for this annotation

GitHub Actions / check (windows)

this expression creates a reference which is immediately dereferenced by the compiler
}
}
}
}
Expand Down
63 changes: 45 additions & 18 deletions lib/src/network/server.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
use std::{pin::pin, time::Duration};

use super::{
debug_payload::{DebugRequestPayload, DebugResponsePayload},
message::{Content, Request, Response, ResponseDisambiguator},
};
use crate::{
crypto::{sign::PublicKey, Hash},
error::{Error, Result},
event::Payload,
event::{Event, Payload},
protocol::{BlockContent, BlockId, RootNode},
repository::Vault,
store,
sync::stream::Throttle,
};
use futures_util::{
stream::{self, FuturesUnordered},
Stream, StreamExt, TryStreamExt,
};
use futures_util::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use tokio::{
select,
sync::{broadcast::error::RecvError, mpsc},
sync::{
broadcast::{self, error::RecvError},
mpsc,
},
};
use tracing::instrument;

Expand Down Expand Up @@ -242,23 +251,17 @@ impl<'a> Monitor<'a> {
}

async fn run(self) -> Result<()> {
let mut subscription = self.vault.event_tx.subscribe();

// send initial branches
self.handle_all_branches_changed().await?;

loop {
match subscription.recv().await.map(|event| event.payload) {
Ok(
Payload::BranchChanged(branch_id) | Payload::BlockReceived { branch_id, .. },
) => self.handle_branch_changed(branch_id).await?,
Ok(Payload::MaintenanceCompleted) => continue,
Err(RecvError::Lagged(_)) => {
tracing::warn!("event receiver lagged");
self.handle_all_branches_changed().await?
}

Err(RecvError::Closed) => break,
let mut events = pin!(Throttle::new(
events(self.vault.event_tx.subscribe()),
Duration::from_secs(1)
));

while let Some(event) = events.next().await {
match event {
BranchChanged::One(branch_id) => self.handle_branch_changed(branch_id).await?,
BranchChanged::All => self.handle_all_branches_changed().await?,
}
}

Expand Down Expand Up @@ -348,3 +351,27 @@ impl Sender {
self.0.send(Content::Response(response)).await.is_ok()
}
}

fn events(rx: broadcast::Receiver<Event>) -> impl Stream<Item = BranchChanged> {
stream::unfold(rx, |mut rx| async move {
loop {
match rx.recv().await {
Ok(Event { payload, .. }) => match payload {
Payload::BranchChanged(branch_id)
| Payload::BlockReceived { branch_id, .. } => {
return Some((BranchChanged::One(branch_id), rx))
}
Payload::MaintenanceCompleted => continue,
},
Err(RecvError::Lagged(_)) => return Some((BranchChanged::All, rx)),
Err(RecvError::Closed) => return None,
}
}
})
}

#[derive(Eq, PartialEq, Clone, Debug, Hash)]
enum BranchChanged {
One(PublicKey),
All,
}
6 changes: 4 additions & 2 deletions lib/src/network/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn transfer_snapshot_between_two_replicas_case(
// TODO: Make it faster and increase the cases.
#[proptest(cases = 8)]
fn transfer_blocks_between_two_replicas(
#[strategy(1usize..32)] block_count: usize,
#[strategy(1usize..62)] block_count: usize,
#[strategy(test_utils::rng_seed_strategy())] rng_seed: u64,
) {
test_utils::run(transfer_blocks_between_two_replicas_case(
Expand Down Expand Up @@ -145,8 +145,10 @@ async fn transfer_blocks_between_two_replicas_case(block_count: usize, rng_seed:

a_vault.receive_block(block, Some(promise)).await.unwrap();
tracing::info!(?id, "write block");
}

// Then wait until replica B receives and writes it too.
// Then wait until replica B receives and writes it too.
for (id, _block) in snapshot.blocks() {

Check warning on line 151 in lib/src/network/tests.rs

View workflow job for this annotation

GitHub Actions / check (android)

you seem to want to iterate on a map's keys

Check warning on line 151 in lib/src/network/tests.rs

View workflow job for this annotation

GitHub Actions / check (windows)

you seem to want to iterate on a map's keys
wait_until_block_exists(&b_vault, id).await;
}
};
Expand Down
18 changes: 11 additions & 7 deletions lib/src/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ use crate::{
state_monitor::StateMonitor,
storage_size::StorageSize,
store,
sync::broadcast::ThrottleReceiver,
sync::stream::Throttle,
};
use camino::Utf8Path;
use futures_util::{future, TryStreamExt};
use futures_util::{stream, StreamExt};
use scoped_task::ScopedJoinHandle;
use std::{io, path::Path, sync::Arc};
use std::{io, path::Path, pin::pin, sync::Arc};
use tokio::{
fs,
sync::broadcast::{self, error::RecvError},
Expand Down Expand Up @@ -858,14 +859,17 @@ async fn generate_and_store_writer_id(

async fn report_sync_progress(vault: Vault) {
let mut prev_progress = Progress { value: 0, total: 0 };
let mut event_rx = ThrottleReceiver::new(vault.event_tx.subscribe(), Duration::from_secs(1));

loop {
match event_rx.recv().await {
Ok(_) | Err(RecvError::Lagged(_)) => (),
Err(RecvError::Closed) => break,
let events = stream::unfold(vault.event_tx.subscribe(), |mut rx| async move {
match rx.recv().await {
Ok(_) | Err(RecvError::Lagged(_)) => Some(((), rx)),
Err(RecvError::Closed) => None,
}
});
let events = Throttle::new(events, Duration::from_secs(1));
let mut events = pin!(events);

while events.next().await.is_some() {
let next_progress = match vault.store().sync_progress().await {
Ok(progress) => progress,
Err(error) => {
Expand Down
14 changes: 9 additions & 5 deletions lib/src/store/block_expiration_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,17 @@ async fn run_task(
.execute(&mut tx)
.await?;

if update_result.rows_affected() > 0 {
sqlx::query("DELETE FROM blocks WHERE id = ?")
.bind(&block)
.execute(&mut tx)
.await?;
if update_result.rows_affected() == 0 {
return Ok(());
}

sqlx::query("DELETE FROM blocks WHERE id = ?")
.bind(&block)
.execute(&mut tx)
.await?;

tracing::warn!("Block {block:?} has expired");

// We need to remove the block from `shared` here while the database is locked for writing.
// If we did it after the commit, it could happen that after the commit but between the
// removal someone re-adds the block into the database. That would result in there being a
Expand Down
15 changes: 8 additions & 7 deletions lib/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::{
MultiBlockPresence, Proof, RootNode, Summary, INNER_LAYER_COUNT,
},
storage_size::StorageSize,
sync::broadcast_hash_set,
};
use futures_util::{Stream, TryStreamExt};
use std::{
Expand All @@ -50,25 +51,25 @@ use std::{
time::Duration,
};
// TODO: Consider creating an async `RwLock` in the `deadlock` module and use it here.
use tokio::sync::{broadcast, RwLock};
use tokio::sync::RwLock;

/// Data store
#[derive(Clone)]
pub(crate) struct Store {
db: db::Pool,
cache: Arc<Cache>,
pub client_reload_index_tx: broadcast::Sender<PublicKey>,
pub client_reload_index_tx: broadcast_hash_set::Sender<PublicKey>,
block_expiration_tracker: Arc<RwLock<Option<Arc<BlockExpirationTracker>>>>,
}

impl Store {
pub fn new(db: db::Pool) -> Self {
let client_reload_index_tx = broadcast_hash_set::channel().0;

Self {
db,
cache: Arc::new(Cache::new()),
// TODO: The broadcast channel is not the best structure for this use case. Ideally
// we'd have something like the `watch` but that can be edited until it's received.
client_reload_index_tx: broadcast::channel(1024).0,
client_reload_index_tx,
block_expiration_tracker: Arc::new(RwLock::new(None)),
}
}
Expand Down Expand Up @@ -240,7 +241,7 @@ impl Store {
pub(crate) struct Reader {
inner: Handle,
cache: CacheTransaction,
client_reload_index_tx: broadcast::Sender<PublicKey>,
client_reload_index_tx: broadcast_hash_set::Sender<PublicKey>,
block_expiration_tracker: Option<Arc<BlockExpirationTracker>>,
}

Expand Down Expand Up @@ -321,7 +322,7 @@ impl Reader {
tx.commit().await?;

for branch_id in branches {
self.client_reload_index_tx.send(branch_id).unwrap_or(0);
self.client_reload_index_tx.insert(&branch_id);
}

Ok(true)
Expand Down
Loading

0 comments on commit a36180c

Please sign in to comment.