Skip to content

Commit

Permalink
Merge branch 'choking-and-protocol-analyzer'
Browse files Browse the repository at this point in the history
  • Loading branch information
inetic committed Dec 4, 2023
2 parents f8e2d8d + d43e833 commit b9fc79e
Show file tree
Hide file tree
Showing 14 changed files with 1,185 additions and 153 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"utils/btdht",
"utils/stress-test",
"utils/swarm",
"utils/protocol-analyzer",
"vfs"
]
resolver = "2"
Expand Down
1 change: 1 addition & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,4 @@ tokio = { workspace = true, features = ["process", "test-util"] }

[features]
simulation = ["rand/simulation", "turmoil"]
analyze-protocol = []
323 changes: 323 additions & 0 deletions lib/src/network/choke.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
use std::{
collections::HashMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, Weak,
},
};
use tokio::{
select,
sync::{watch, Mutex as AsyncMutex},
time::{Duration, Instant},
};

const MAX_UNCHOKED_COUNT: usize = 3;
const PERMIT_DURATION_TIMEOUT: Duration = Duration::from_secs(30);
const PERMIT_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(3);

pub(crate) struct Manager {
inner: Arc<Mutex<ManagerInner>>,
}

impl Manager {
pub fn new() -> Self {
let on_change_tx = watch::channel(()).0;

Self {
inner: Arc::new(Mutex::new(ManagerInner {
next_choker_id: AtomicUsize::new(0),
on_change_tx,
choked: Default::default(),
unchoked: Default::default(),
})),
}
}

pub fn new_choker(&self) -> Choker {
let mut inner = self.inner.lock().unwrap();

let choker_id = inner.next_choker_id.fetch_add(1, Ordering::Relaxed);

let choker_inner = Arc::new(AsyncMutex::new(ChokerInner {
manager_inner: Arc::downgrade(&self.inner),
id: choker_id,
on_change_rx: inner.on_change_tx.subscribe(),
}));

if inner.unchoked.len() < MAX_UNCHOKED_COUNT {
inner.unchoked.insert(choker_id, UnchokedState::default());
} else {
inner.choked.insert(choker_id, ChokedState::Uninterested);
}

Choker {
inner: choker_inner,
}
}
}

#[derive(Eq, PartialEq)]
enum ChokedState {
Interested,
Uninterested,
}

#[derive(Clone, Copy)]
struct UnchokedState {
unchoke_started: Instant,
time_of_last_permit: Instant,
}

impl UnchokedState {
fn is_evictable(&self) -> bool {
Instant::now() >= self.evictable_at()
}

fn evictable_at(&self) -> Instant {
let i1 = self.unchoke_started + PERMIT_DURATION_TIMEOUT;
let i2 = self.time_of_last_permit + PERMIT_INACTIVITY_TIMEOUT;
if i1 < i2 {
i1
} else {
i2
}
}
}

impl Default for UnchokedState {
fn default() -> Self {
let now = Instant::now();
Self {
unchoke_started: now,
time_of_last_permit: now,
}
}
}

struct ManagerInner {
next_choker_id: AtomicUsize,
on_change_tx: watch::Sender<()>,
choked: HashMap<usize, ChokedState>,
unchoked: HashMap<usize, UnchokedState>,
}

#[derive(Debug)]
enum GetPermitResult {
Granted,
AwaitUntil(Instant),
}

impl ManagerInner {
/// Does this:
/// * If the `choker_id` is already unchoked it is granted a permit. Otherwise
/// * if there is a free slot in `unchoked`, adds `choker_id` into it and grants it a permit.
/// Otherwise
/// * check if some of the `unchoked` chokers can be evicted, if so evict them and
/// **some** choked choker takes its place. If the unchoked choker is `choker_id` then it
/// is granted a permit. Othewise
/// * we calculate when the soonest unchoked choker is evictable and `choker_id` will
/// need to recheck at that time.
fn get_permit(&mut self, choker_id: usize) -> GetPermitResult {
if let Some(state) = self.unchoked.get_mut(&choker_id) {
// It's unchoked, update permit and return.
state.time_of_last_permit = Instant::now();
return GetPermitResult::Granted;
}

// Unwrap OK because if `choker_id` is not in `unchoked`, it must be in `choked`.
*self.choked.get_mut(&choker_id).unwrap() = ChokedState::Interested;

// It's choked, check if we can unchoke something.
if self.unchoked.len() < MAX_UNCHOKED_COUNT || self.try_evict_from_unchoked() {
// Unwrap OK because we know `choked` is not empty (`choker_id` is in it).
let to_unchoke = self.random_choked_and_interested().unwrap();

assert!(self.choked.remove(&to_unchoke).is_some());
self.unchoked.insert(to_unchoke, UnchokedState::default());

if to_unchoke == choker_id {
return GetPermitResult::Granted;
} else {
// TODO: Consider waking up only the one who just got unchoked.
self.on_change_tx.send(()).unwrap_or(());
// Unwrap OK because we know `unchoked` is not empty.
let until = self.soonest_evictable().unwrap().1.evictable_at();
return GetPermitResult::AwaitUntil(until);
}
}

// Unwrap OK because we know `unchoked` is not empty.
let until = self.soonest_evictable().unwrap().1.evictable_at();

GetPermitResult::AwaitUntil(until)
}

// Return true if some choker was evicted from `unchoked` and inserted into `choked`.
fn try_evict_from_unchoked(&mut self) -> bool {
let to_evict = if let Some((id, state)) = self.soonest_evictable() {
if state.is_evictable() {
Some(id)
} else {
None
}
} else {
None
};

if let Some(to_evict) = to_evict {
self.unchoked.remove(&to_evict);
self.choked.insert(to_evict, ChokedState::Uninterested);
true
} else {
false
}
}

fn soonest_evictable(&self) -> Option<(usize, UnchokedState)> {
let mut soonest: Option<(usize, UnchokedState)> = None;
for (id, state) in &self.unchoked {
let evictable_at = state.evictable_at();
if let Some(old_soonest) = soonest {
if evictable_at < old_soonest.1.evictable_at() {
soonest = Some((*id, *state));
}
} else {
soonest = Some((*id, *state));
}
}
soonest
}

fn random_choked_and_interested(&self) -> Option<usize> {
use rand::Rng;

let mut interested = self
.choked
.iter()
.filter(|(_, state)| **state == ChokedState::Interested);

let count = interested.clone().count();

if count == 0 {
return None;
}

interested
.nth(rand::thread_rng().gen_range(0..count))
.map(|(id, _)| *id)
}

fn remove_choker(&mut self, choker_id: usize) {
self.choked.remove(&choker_id);
self.unchoked.remove(&choker_id);
self.on_change_tx.send(()).unwrap_or(());
}
}

#[derive(Clone)]
pub(crate) struct Choker {
inner: Arc<AsyncMutex<ChokerInner>>,
}

impl Choker {
/// Halts forever when the `Manager` has already been destroyed.
pub async fn wait_until_unchoked(&self) {
self.inner.lock().await.wait_until_unchoked().await
}

#[cfg(test)]
async fn try_get_permit(&mut self) -> Option<GetPermitResult> {
self.inner.lock().await.try_get_permit()
}
}

struct ChokerInner {
manager_inner: Weak<Mutex<ManagerInner>>,
id: usize,
on_change_rx: watch::Receiver<()>,
}

impl ChokerInner {
pub async fn wait_until_unchoked(&mut self) {
use std::future::pending;

loop {
self.on_change_rx.borrow_and_update();

let result = self.try_get_permit();

let sleep_until = match result {
None => {
let () = pending().await;
unreachable!();
}
Some(result) => match result {
GetPermitResult::Granted => return,
GetPermitResult::AwaitUntil(sleep_until) => sleep_until,
},
};

select! {
result = self.on_change_rx.changed() => {
if result.is_err() {
let () = pending().await;
}
},
_ = tokio::time::sleep_until(sleep_until) => {
}
}
}
}

fn try_get_permit(&mut self) -> Option<GetPermitResult> {
let result = match self.manager_inner.upgrade() {
Some(inner) => inner.lock().unwrap().get_permit(self.id),
None => return None,
};

Some(result)
}
}

impl Drop for ChokerInner {
fn drop(&mut self) {
if let Some(manager_inner) = self.manager_inner.upgrade() {
manager_inner.lock().unwrap().remove_choker(self.id);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;

#[tokio::test(flavor = "multi_thread")]
async fn sanity() {
let manager = Manager::new();
let mut chokers = Vec::new();

for _ in 0..(MAX_UNCHOKED_COUNT + 1) {
chokers.push(manager.new_choker());
}

for i in 0..MAX_UNCHOKED_COUNT {
assert_matches!(
chokers[i].try_get_permit().await,
Some(GetPermitResult::Granted)
);
}

assert_matches!(
chokers[MAX_UNCHOKED_COUNT].try_get_permit().await,
Some(GetPermitResult::AwaitUntil(_))
);

tokio::time::timeout(
PERMIT_INACTIVITY_TIMEOUT + Duration::from_millis(200),
chokers[MAX_UNCHOKED_COUNT].wait_until_unchoked(),
)
.await
.unwrap();
}
}
Loading

0 comments on commit b9fc79e

Please sign in to comment.