Skip to content

Commit

Permalink
Merge branch 'client-store-optimize'
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Aug 13, 2024
2 parents 79e63c0 + b60649e commit fb5e7cf
Show file tree
Hide file tree
Showing 71 changed files with 2,255 additions and 2,305 deletions.
2 changes: 1 addition & 1 deletion bridge/src/dht_contacts.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use deadlock::AsyncMutex;
use indexmap::set::IndexSet;
use ouisync_lib::network::dht_discovery::DhtContactsStoreTrait;
use ouisync_lib::DhtContactsStoreTrait;
use std::{
cmp::Eq,
collections::HashSet,
Expand Down
2 changes: 1 addition & 1 deletion bridge/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::{ConfigKey, ConfigStore};
use ouisync_lib::network::{peer_addr::PeerAddr, Network};
use ouisync_lib::{Network, PeerAddr};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;

Expand Down
2 changes: 1 addition & 1 deletion cli/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use hyper_rustls::TlsAcceptor;
use metrics::{Gauge, Key, KeyName, Label, Level, Metadata, Recorder, Unit};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusRecorder};
use ouisync_bridge::config::{ConfigError, ConfigKey};
use ouisync_lib::{network::PeerState, PeerInfoCollector, PublicRuntimeId};
use ouisync_lib::{PeerInfoCollector, PeerState, PublicRuntimeId};
use scoped_task::ScopedAbortHandle;
use std::{
collections::HashMap,
Expand Down
12 changes: 3 additions & 9 deletions cli/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use chrono::{DateTime, SecondsFormat, Utc};
use clap::{builder::BoolishValueParser, Subcommand, ValueEnum};
use ouisync_lib::{
network::{PeerSource, PeerState},
AccessMode, PeerAddr, PeerInfo, StorageSize,
};
use ouisync_lib::{AccessMode, PeerAddr, PeerInfo, PeerSource, PeerState, StorageSize};
use serde::{Deserialize, Serialize};
use std::{
fmt, iter,
Expand Down Expand Up @@ -276,7 +273,7 @@ pub(crate) enum Request {
/// Repository token
#[arg(short, long)]
token: String,
}
},
}

#[derive(Clone, Copy, Debug, Serialize, Deserialize, ValueEnum)]
Expand Down Expand Up @@ -566,10 +563,7 @@ fn format_time(time: SystemTime) -> String {
#[cfg(test)]
mod tests {
use super::*;
use ouisync_lib::{
network::{PeerSource, PeerState, TrafficStats},
SecretRuntimeId,
};
use ouisync_lib::{PeerSource, PeerState, SecretRuntimeId, TrafficStats};
use rand::{rngs::StdRng, SeedableRng};
use std::net::Ipv4Addr;

Expand Down
5 changes: 1 addition & 4 deletions cli/src/repository.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use crate::{options::Dirs, protocol::Error, utils, DB_EXTENSION};
use camino::Utf8Path;
use ouisync_bridge::{config::ConfigStore, protocol::remote::v1, transport::RemoteClient};
use ouisync_lib::{
network::{Network, Registration},
Repository,
};
use ouisync_lib::{Network, Registration, Repository};
use ouisync_vfs::MountGuard;
use state_monitor::StateMonitor;
use std::{
Expand Down
2 changes: 1 addition & 1 deletion cli/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ouisync_bridge::{
network::{self, NetworkDefaults},
transport,
};
use ouisync_lib::network::Network;
use ouisync_lib::Network;
use state_monitor::StateMonitor;
use std::{
path::{Path, PathBuf},
Expand Down
2 changes: 1 addition & 1 deletion cli/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ where

// Gracefully terminate the process, unlike `Child::kill` which sends `SIGKILL` and thus doesn't
// allow destructors to run.
#[cfg(any(target_os = "linux", target_os = "osx"))]
#[cfg(any(target_os = "linux", target_os = "macos"))]
fn terminate(process: &Child) {
// SAFETY: we are just sending a `SIGTERM` signal to the process, there should be no reason for
// undefined behaviour here.
Expand Down
9 changes: 3 additions & 6 deletions ffi/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use crate::{
use camino::Utf8PathBuf;
use ouisync_bridge::network::NetworkDefaults;
use ouisync_lib::{
crypto::PasswordSalt,
network::{NatBehavior, TrafficStats},
AccessChange, AccessMode, LocalSecret, PeerAddr, PeerInfo, Progress, SetLocalSecret,
ShareToken,
crypto::PasswordSalt, AccessChange, AccessMode, LocalSecret, NatBehavior, PeerAddr, PeerInfo,
Progress, SetLocalSecret, ShareToken, TrafficStats,
};
use serde::{Deserialize, Serialize};
use state_monitor::{MonitorId, StateMonitor};
Expand Down Expand Up @@ -606,8 +604,7 @@ mod tests {

use super::*;
use ouisync_lib::{
network::{PeerSource, PeerState},
AccessSecrets, Credentials, PeerInfo, SecretRuntimeId,
AccessSecrets, Credentials, PeerInfo, PeerSource, PeerState, SecretRuntimeId,
};

#[test]
Expand Down
100 changes: 43 additions & 57 deletions ffi/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ use crate::{
use camino::Utf8PathBuf;
use ouisync_bridge::{protocol::Notification, repository, transport::NotificationSender};
use ouisync_lib::{
crypto::Hashable,
network::{self, Registration},
path,
sync::uninitialized_watch,
AccessMode, Credentials, Event, LocalSecret, Payload, Progress, Repository, SetLocalSecret,
ShareToken,
self, crypto::Hashable, path, AccessMode, Credentials, Event, LocalSecret, Payload, Progress,
Registration, Repository, SetLocalSecret, ShareToken,
};
use serde::{Deserialize, Serialize};
use std::{
Expand All @@ -22,7 +18,7 @@ use std::{
sync::{Arc, RwLock as BlockingRwLock},
};
use thiserror::Error;
use tokio::sync::{broadcast::error::RecvError, Notify, RwLock as AsyncRwLock};
use tokio::sync::{broadcast::error::RecvError, watch, RwLock as AsyncRwLock};

pub(crate) struct RepositoryHolder {
pub store_path: PathBuf,
Expand Down Expand Up @@ -241,7 +237,7 @@ pub(crate) async fn set_access_mode(
/// User is responsible for deallocating the returned string.
pub(crate) fn info_hash(state: &State, handle: RepositoryHandle) -> Result<String, Error> {
let holder = state.repositories.get(handle)?;
let info_hash = network::repository_info_hash(holder.repository.secrets().id());
let info_hash = ouisync_lib::repository_info_hash(holder.repository.secrets().id());

Ok(hex::encode(info_hash))
}
Expand Down Expand Up @@ -362,7 +358,7 @@ pub(crate) fn subscribe(
loop {
match notification_rx.recv().await {
Ok(Event {
payload: Payload::BranchChanged(_) | Payload::BlockReceived { .. },
payload: Payload::SnapshotApproved(_) | Payload::BlockReceived { .. },
..
}) => (),
Ok(Event { .. }) => continue,
Expand Down Expand Up @@ -583,56 +579,48 @@ pub(crate) struct MetadataEdit {
/// Registry of opened repositories.
pub(crate) struct Repositories {
inner: BlockingRwLock<Inner>,
pub on_repository_list_changed_tx: uninitialized_watch::Sender<()>,
change_tx: watch::Sender<()>,
}

impl Repositories {
pub fn new() -> Self {
let (on_repository_list_changed_tx, _) = uninitialized_watch::channel();

Self {
inner: BlockingRwLock::new(Inner {
registry: Registry::new(),
index: HashMap::new(),
}),
on_repository_list_changed_tx,
change_tx: watch::Sender::new(()),
}
}

/// Gets or inserts a repository.
pub async fn entry(&self, store_path: PathBuf) -> RepositoryEntry {
let mut change_rx = self.change_tx.subscribe();
change_rx.mark_unchanged(); // skip the initial notification.

loop {
let notify = {
let mut inner = self.inner.write().unwrap();

match inner.index.entry(store_path.clone()) {
Entry::Occupied(entry) => match entry.get() {
IndexEntry::Reserved(notify) => {
// The repo doesn't exists yet but someone is already inserting it.
notify.clone()
}
IndexEntry::Existing(handle) => {
// The repo already exists.
return RepositoryEntry::Occupied(*handle);
}
},
Entry::Vacant(entry) => {
entry.insert(IndexEntry::Reserved(Arc::new(Notify::new())));

// The repo doesn't exist yet and we are the first one to insert it.
return RepositoryEntry::Vacant(RepositoryVacantEntry {
inner: &self.inner,
store_path,
inserted: false,
on_repository_list_changed_tx: self
.on_repository_list_changed_tx
.clone(),
});
}
match self.inner.write().unwrap().index.entry(store_path.clone()) {
Entry::Occupied(entry) => match entry.get() {
// The repo doesn't exists yet but someone is already inserting it. Wait for a
// change notification and try again.
IndexEntry::Reserved => (),
// The repo already exists.
IndexEntry::Existing(handle) => return RepositoryEntry::Occupied(*handle),
},
Entry::Vacant(entry) => {
entry.insert(IndexEntry::Reserved);

// The repo doesn't exist yet and we are the first one to insert it.
return RepositoryEntry::Vacant(RepositoryVacantEntry {
inner: &self.inner,
store_path,
inserted: false,
change_tx: self.change_tx.clone(),
});
}
};
}

notify.notified().await;
change_rx.changed().await.ok();
}
}

Expand All @@ -644,14 +632,14 @@ impl Repositories {
let holder = inner.registry.remove(handle)?;
inner.index.remove(&holder.store_path);

self.on_repository_list_changed_tx.send(()).unwrap_or(());
self.change_tx.send(()).ok();

Some(holder)
}

pub fn remove_all(&self) -> Vec<Arc<RepositoryHolder>> {
let removed = self.inner.write().unwrap().registry.remove_all();
self.on_repository_list_changed_tx.send(()).unwrap_or(());
self.change_tx.send(()).ok();
removed
}

Expand All @@ -668,6 +656,11 @@ impl Repositories {
.map(|(a, b)| (*a, b.clone()))
.collect()
}

/// Subscribe to change notifications.
pub fn subscribe(&self) -> watch::Receiver<()> {
self.change_tx.subscribe()
}
}

pub(crate) enum RepositoryEntry<'a> {
Expand All @@ -679,7 +672,7 @@ pub(crate) struct RepositoryVacantEntry<'a> {
inner: &'a BlockingRwLock<Inner>,
store_path: PathBuf,
inserted: bool,
on_repository_list_changed_tx: uninitialized_watch::Sender<()>,
change_tx: watch::Sender<()>,
}

impl RepositoryVacantEntry<'_> {
Expand All @@ -692,14 +685,12 @@ impl RepositoryVacantEntry<'_> {
unreachable!()
};

let IndexEntry::Reserved(notify) = mem::replace(entry, IndexEntry::Existing(handle)) else {
let IndexEntry::Reserved = mem::replace(entry, IndexEntry::Existing(handle)) else {
unreachable!()
};

self.inserted = true;

notify.notify_waiters();
self.on_repository_list_changed_tx.send(()).unwrap_or(());
self.change_tx.send(()).unwrap_or(());

handle
}
Expand All @@ -711,13 +702,8 @@ impl Drop for RepositoryVacantEntry<'_> {
return;
}

let mut inner = self.inner.write().unwrap();

let Some(IndexEntry::Reserved(notify)) = inner.index.remove(&self.store_path) else {
unreachable!()
};

notify.notify_waiters();
self.inner.write().unwrap().index.remove(&self.store_path);
self.change_tx.send(()).ok();
}
}

Expand All @@ -729,6 +715,6 @@ struct Inner {
}

enum IndexEntry {
Reserved(Arc<Notify>),
Reserved,
Existing(RepositoryHandle),
}
4 changes: 1 addition & 3 deletions ffi/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,7 @@ pub(crate) fn close_blocking(session: Session) {

/// Subscribe to changes in repository list
pub(crate) fn subscribe(state: &State, notification_tx: &NotificationSender) -> TaskHandle {
let mut on_repository_list_changed =
state.repositories.on_repository_list_changed_tx.subscribe();

let mut on_repository_list_changed = state.repositories.subscribe();
let notification_tx = notification_tx.clone();

state.spawn_task(|id| async move {
Expand Down
4 changes: 2 additions & 2 deletions ffi/src/share_token.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{error::Error, state::State};
use ouisync_lib::{network, ShareToken};
use ouisync_lib::{self, ShareToken};

/// Returns the access mode of the given share token.
pub(crate) fn mode(token: ShareToken) -> u8 {
Expand All @@ -9,7 +9,7 @@ pub(crate) fn mode(token: ShareToken) -> u8 {
/// Returns the info-hash of the repository corresponding to the share token formatted as hex
/// string.
pub(crate) fn info_hash(token: ShareToken) -> String {
hex::encode(network::repository_info_hash(token.id()).as_ref())
hex::encode(ouisync_lib::repository_info_hash(token.id()).as_ref())
}

pub(crate) fn suggested_name(token: ShareToken) -> String {
Expand Down
2 changes: 1 addition & 1 deletion ffi/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
repository::Repositories,
};
use ouisync_bridge::{config::ConfigStore, transport};
use ouisync_lib::network::Network;
use ouisync_lib::Network;
use scoped_task::ScopedJoinHandle;
use state_monitor::StateMonitor;
use std::{
Expand Down
Loading

0 comments on commit fb5e7cf

Please sign in to comment.