Skip to content

Commit

Permalink
Merge branch 'block-id-cache'
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Aug 29, 2024
2 parents 170da27 + f02e809 commit b61d36c
Show file tree
Hide file tree
Showing 54 changed files with 2,068 additions and 1,671 deletions.
55 changes: 33 additions & 22 deletions bindings/dart/lib/ouisync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,9 @@ class Session {
Future<String?> get natBehavior =>
_client.invoke<String?>('network_nat_behavior');

Future<TrafficStats> get trafficStats => _client
.invoke<List<Object?>>('network_traffic_stats')
.then((list) => TrafficStats.decode(list));

/// Gets a stream that yields lists of known peers.
Stream<List<PeerInfo>> get onPeersChange async* {
await for (final _ in networkEvents) {
yield await peers;
}
}
Future<NetworkStats> get networkStats => _client
.invoke<List<Object?>>('network_stats')
.then((list) => NetworkStats.decode(list));

Future<List<PeerInfo>> get peers => _client
.invoke<List<Object?>>('network_known_peers')
Expand Down Expand Up @@ -251,12 +244,14 @@ class PeerInfo {
final PeerSource source;
final PeerStateKind state;
final String? runtimeId;
final NetworkStats stats;

PeerInfo({
required this.addr,
required this.source,
required this.state,
this.runtimeId,
this.stats = const NetworkStats(),
});

static PeerInfo decode(Object? raw) {
Expand All @@ -278,11 +273,14 @@ class PeerInfo {
throw Exception('invalid peer info state');
}

final stats = NetworkStats.decode(list[3] as List<Object?>);

return PeerInfo(
addr: addr,
source: source,
state: state,
runtimeId: runtimeId,
stats: stats,
);
}

Expand All @@ -294,21 +292,29 @@ class PeerInfo {
'$runtimeType(addr: $addr, source: $source, state: $state, runtimeId: $runtimeId)';
}

class TrafficStats {
final int send;
final int recv;

const TrafficStats({required this.send, required this.recv});

static TrafficStats decode(List<Object?> raw) {
final send = raw[0] as int;
final recv = raw[1] as int;
class NetworkStats {
final int bytesTx;
final int bytesRx;
final int throughputTx;
final int throughputRx;

const NetworkStats({
this.bytesTx = 0,
this.bytesRx = 0,
this.throughputTx = 0,
this.throughputRx = 0,
});

return TrafficStats(send: send, recv: recv);
}
static NetworkStats decode(List<Object?> raw) => NetworkStats(
bytesTx: raw[0] as int,
bytesRx: raw[1] as int,
throughputTx: raw[2] as int,
throughputRx: raw[3] as int,
);

@override
String toString() => '$runtimeType(send: $send, recv: $recv)';
String toString() =>
'$runtimeType(bytesTx: $bytesTx, bytesRx: $bytesRx, throughputTx: $throughputTx, throughputRx: $throughputRx)';
}

/// A handle to a Ouisync repository.
Expand Down Expand Up @@ -594,6 +600,11 @@ class Repository {

/// Unmount the repository.
Future<void> unmount() => _client.invoke<void>("repository_unmount", _handle);

/// Fetch the per-repository network statistics.
Future<NetworkStats> get networkStats => _client
.invoke<List<Object?>>('repository_stats', _handle)
.then((list) => NetworkStats.decode(list));
}

sealed class AccessChange {
Expand Down
11 changes: 6 additions & 5 deletions bindings/dart/test/multiple_nodes_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void main() {
final addr = (await session1.quicListenerLocalAddressV4)!;

final expect = expectLater(
session2.onPeersChange,
session2.networkEvents.asyncMap((_) => session2.peers),
emitsThrough(contains(isA<PeerInfo>()
.having((peer) => peer.addr, 'addr', equals('quic/$addr'))
.having(
Expand All @@ -81,7 +81,7 @@ void main() {
await expect;
});

test('traffic stats', () async {
test('network stats', () async {
final addr = (await session1.quicListenerLocalAddressV4)!;
await session2.addUserProvidedPeer('quic/$addr');

Expand All @@ -99,8 +99,9 @@ void main() {
await repo2.events.first;
}

final stats = await session2.trafficStats;
expect(stats.send, greaterThan(0));
expect(stats.recv, greaterThan(65536)); // at least two blocks received
final stats = await session2.networkStats;

expect(stats.bytesTx, greaterThan(0));
expect(stats.bytesRx, greaterThan(65536)); // at least two blocks received
});
}
8 changes: 5 additions & 3 deletions bridge/src/logger/common.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate};
use std::path::Path;
use std::{env, path::Path};
use tracing_subscriber::EnvFilter;

pub(super) fn create_log_filter() -> EnvFilter {
EnvFilter::builder()
// TODO: Allow changing the log level at runtime or at least at init
// time (via a command-line option or so)
.with_default_directive("ouisync=debug".parse().unwrap())
.from_env_lossy()
.parse_lossy(
env::var(EnvFilter::DEFAULT_ENV)
.unwrap_or_else(|_| "ouisync=debug,deadlock=warn".to_string()),
)
}

pub(super) fn create_file_writer(path: &Path) -> FileRotate<AppendCount> {
Expand Down
25 changes: 11 additions & 14 deletions cli/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,11 @@ impl fmt::Display for PeerInfoDisplay<'_> {
if let PeerState::Active { id, since } = &self.0.state {
write!(
f,
" {} {} {} {} {}",
" {} {} {} {}",
id.as_public_key(),
format_time(*since),
self.0.stats.send,
self.0.stats.recv,
format_time(self.0.stats.recv_at),
self.0.stats.bytes_tx,
self.0.stats.bytes_rx,
)?;
}

Expand All @@ -598,7 +597,7 @@ fn format_time(time: SystemTime) -> String {
#[cfg(test)]
mod tests {
use super::*;
use ouisync_lib::{PeerSource, PeerState, SecretRuntimeId, TrafficStats};
use ouisync_lib::{PeerSource, PeerState, SecretRuntimeId, Stats};
use rand::{rngs::StdRng, SeedableRng};
use std::net::Ipv4Addr;

Expand All @@ -614,7 +613,7 @@ mod tests {
addr: PeerAddr::Quic(addr),
source: PeerSource::Dht,
state: PeerState::Connecting,
stats: TrafficStats::default(),
stats: Stats::default(),
})
.to_string(),
"127.0.0.1 1248 quic dht connecting"
Expand All @@ -630,12 +629,11 @@ mod tests {
.unwrap()
.into(),
},
stats: TrafficStats {
send: 1024,
recv: 4096,
recv_at: DateTime::parse_from_rfc3339("2024-06-12T14:00:00Z")
.unwrap()
.into(),
stats: Stats {
bytes_tx: 1024,
bytes_rx: 4096,
throughput_tx: 0,
throughput_rx: 0,
},
})
.to_string(),
Expand All @@ -647,8 +645,7 @@ mod tests {
ee1aa49a4459dfe813a3cf6eb882041230c7b2558469de81f87c9bf23bf10a03 \
2024-06-12T02:30:00Z \
1024 \
4096 \
2024-06-12T14:00:00Z"
4096"
);
}
}
48 changes: 24 additions & 24 deletions deadlock/src/expect_short_lifetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use super::timer::{Id, Timer};
use once_cell::sync::Lazy;
use std::{
backtrace::Backtrace,
fmt,
panic::Location,
thread::{self, JoinHandle},
time::{Duration, Instant},
};
use tracing::Span;

/// Attach this to objects that are expected to be short-lived to be warned when they live longer
/// than expected.
Expand All @@ -17,13 +17,13 @@ pub struct ExpectShortLifetime {

impl ExpectShortLifetime {
#[track_caller]
pub fn new(max_lifetime: Duration) -> Self {
Self::new_in(max_lifetime, Location::caller())
pub fn new(deadline: Duration) -> Self {
Self::new_in(deadline, Location::caller())
}

pub fn new_in(max_lifetime: Duration, location: &'static Location<'static>) -> Self {
let context = Context::new(location);
let id = schedule(max_lifetime, context);
pub fn new_in(deadline: Duration, location: &'static Location<'static>) -> Self {
let context = Context::new(location, deadline);
let id = schedule(deadline, context);

Self {
id,
Expand All @@ -39,33 +39,23 @@ impl Drop for ExpectShortLifetime {
}

struct Context {
start_time: Instant,
deadline: Duration,
span: Span,
location: &'static Location<'static>,
backtrace: Backtrace,
}

impl Context {
fn new(location: &'static Location<'static>) -> Self {
fn new(location: &'static Location<'static>, deadline: Duration) -> Self {
Self {
start_time: Instant::now(),
deadline,
span: Span::current(),
location,
backtrace: Backtrace::capture(),
}
}
}

impl fmt::Display for Context {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"elapsed: {:?}, started at: {}\nbacktrace:\n{}",
self.start_time.elapsed(),
self.location,
self.backtrace
)
}
}

static TIMER: Timer<Context> = Timer::new();
static WATCHING_THREAD: Lazy<JoinHandle<()>> = Lazy::new(|| thread::spawn(watching_thread));

Expand All @@ -80,7 +70,7 @@ fn schedule(duration: Duration, context: Context) -> Id {
fn cancel(id: Id, start: Instant) {
if TIMER.cancel(id).is_none() {
tracing::warn!(
"🐒🐒🐒 Previously reported task (id: {}) eventually completed in {:?} 🐒🐒🐒",
"🐒🐒🐒 Previously reported task {} eventually completed in {:?} 🐒🐒🐒",
id,
start.elapsed(),
);
Expand All @@ -91,10 +81,20 @@ fn watching_thread() {
loop {
let (id, context) = TIMER.wait();

let Context {
deadline,
span,
location,
backtrace,
} = context;

tracing::warn!(
"🐒🐒🐒 Task taking too long (id: {}) 🐒🐒🐒\n{}\n",
parent: span,
"🐒🐒🐒 Task {} (started in {}) is taking longer than {:?} 🐒🐒🐒\n{}",
id,
context
location,
deadline,
backtrace,
);
}
}
5 changes: 4 additions & 1 deletion ffi/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ impl ouisync_bridge::transport::Handler for Handler {
.await?
.into()
}
Request::RepositoryStats(repository) => {
repository::stats(&self.state, repository).await?.into()
}
Request::DirectoryCreate { repository, path } => {
directory::create(&self.state, repository, path)
.await?
Expand Down Expand Up @@ -412,7 +415,7 @@ impl ouisync_bridge::transport::Handler for Handler {
Request::NetworkExternalAddrV4 => self.state.network.external_addr_v4().await.into(),
Request::NetworkExternalAddrV6 => self.state.network.external_addr_v6().await.into(),
Request::NetworkNatBehavior => self.state.network.nat_behavior().await.into(),
Request::NetworkTrafficStats => self.state.network.traffic_stats().into(),
Request::NetworkStats => self.state.network.stats().into(),
Request::NetworkShutdown => {
self.state.network.shutdown().await;
().into()
Expand Down
Loading

0 comments on commit b61d36c

Please sign in to comment.