Skip to content

Commit

Permalink
Apply choker before a request is received
Browse files Browse the repository at this point in the history
As opposed to when response is being sent.
  • Loading branch information
inetic committed Dec 4, 2023
1 parent 2c50156 commit d43e833
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 31 deletions.
36 changes: 25 additions & 11 deletions lib/src/network/choke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ enum GetPermitResult {
}

impl ManagerInner {
/// Does this:
/// * If the `choker_id` is already unchoked it is granted a permit. Otherwise
/// * if there is a free slot in `unchoked`, adds `choker_id` into it and grants it a permit.
/// Otherwise
/// * check if some of the `unchoked` chokers can be evicted, if so evict them and
/// **some** choked choker takes its place. If the unchoked choker is `choker_id` then it
/// is granted a permit. Othewise
/// * we calculate when the soonest unchoked choker is evictable and `choker_id` will
/// need to recheck at that time.
fn get_permit(&mut self, choker_id: usize) -> GetPermitResult {
if let Some(state) = self.unchoked.get_mut(&choker_id) {
// It's unchoked, update permit and return.
Expand Down Expand Up @@ -143,7 +152,7 @@ impl ManagerInner {
GetPermitResult::AwaitUntil(until)
}

// Return true if one was evicted from `unchoked` and inserted into `choked`.
// Return true if some choker was evicted from `unchoked` and inserted into `choked`.
fn try_evict_from_unchoked(&mut self) -> bool {
let to_evict = if let Some((id, state)) = self.soonest_evictable() {
if state.is_evictable() {
Expand Down Expand Up @@ -211,9 +220,9 @@ pub(crate) struct Choker {
}

impl Choker {
/// Returns `false` when the `Manager` has already been destroyed.
pub async fn can_send(&self) -> bool {
self.inner.lock().await.can_send().await
/// Halts forever when the `Manager` has already been destroyed.
pub async fn wait_until_unchoked(&self) {
self.inner.lock().await.wait_until_unchoked().await
}

#[cfg(test)]
Expand All @@ -229,24 +238,29 @@ struct ChokerInner {
}

impl ChokerInner {
pub async fn can_send(&mut self) -> bool {
pub async fn wait_until_unchoked(&mut self) {
use std::future::pending;

loop {
self.on_change_rx.borrow_and_update();

let result = self.try_get_permit();

let sleep_until = match result {
None => return false,
None => {
let () = pending().await;
unreachable!();
}
Some(result) => match result {
GetPermitResult::Granted => return true,
GetPermitResult::Granted => return,
GetPermitResult::AwaitUntil(sleep_until) => sleep_until,
},
};

select! {
result = self.on_change_rx.changed() => {
if result.is_err() {
return false;
let () = pending().await;
}
},
_ = tokio::time::sleep_until(sleep_until) => {
Expand Down Expand Up @@ -299,11 +313,11 @@ mod tests {
Some(GetPermitResult::AwaitUntil(_))
);

assert!(tokio::time::timeout(
tokio::time::timeout(
PERMIT_INACTIVITY_TIMEOUT + Duration::from_millis(200),
chokers[MAX_UNCHOKED_COUNT].can_send()
chokers[MAX_UNCHOKED_COUNT].wait_until_unchoked(),
)
.await
.unwrap());
.unwrap();
}
}
138 changes: 118 additions & 20 deletions lib/src/network/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{pin::pin, time::Duration};
use std::{collections::HashSet, pin::pin, sync::Mutex as SyncMutex, time::Duration};

use super::{
choke,
Expand All @@ -22,7 +22,7 @@ use tokio::{
select,
sync::{
broadcast::{self, error::RecvError},
mpsc,
mpsc, watch,
},
};
use tracing::instrument;
Expand All @@ -31,6 +31,7 @@ pub(crate) struct Server {
vault: Vault,
tx: Sender,
rx: Receiver,
choker: choke::Choker,
}

impl Server {
Expand All @@ -42,15 +43,21 @@ impl Server {
) -> Self {
Self {
vault,
tx: Sender(tx, choker),
tx: Sender(tx),
rx,
choker,
}
}

pub async fn run(&mut self) -> Result<()> {
let Self { vault, tx, rx } = self;
let responder = Responder::new(vault, tx);
let monitor = Monitor::new(vault, tx);
let Self {
vault,
tx,
rx,
choker,
} = self;
let responder = Responder::new(vault, tx, choker);
let monitor = Monitor::new(vault, tx, choker);

select! {
result = responder.run(rx) => result,
Expand All @@ -63,11 +70,12 @@ impl Server {
struct Responder<'a> {
vault: &'a Vault,
tx: &'a Sender,
choker: &'a choke::Choker,
}

impl<'a> Responder<'a> {
fn new(vault: &'a Vault, tx: &'a Sender) -> Self {
Self { vault, tx }
fn new(vault: &'a Vault, tx: &'a Sender, choker: &'a choke::Choker) -> Self {
Self { vault, tx, choker }
}

async fn run(self, rx: &'a mut Receiver) -> Result<()> {
Expand All @@ -82,6 +90,8 @@ impl<'a> Responder<'a> {
request = rx.recv() => {
match request {
Some(request) => {
self.choker.wait_until_unchoked().await;

let handler = self
.vault
.monitor
Expand Down Expand Up @@ -241,15 +251,50 @@ impl<'a> Responder<'a> {
}
}

/// When we receive events that a branch has changed, we accumulate those events into this
/// structure and use it once we receive a permit from the choker.
enum BranchAccumulator {
All,
Some(HashSet<PublicKey>),
None,
}

impl BranchAccumulator {
fn insert_one_branch(&mut self, branch_id: PublicKey) {
match self {
Self::All => (),
Self::Some(branches) => {
branches.insert(branch_id);
}
Self::None => {
let mut branches = HashSet::new();
branches.insert(branch_id);
*self = BranchAccumulator::Some(branches);
}
}
}

fn insert_all_branches(&mut self) {
*self = BranchAccumulator::All;
}

fn take(&mut self) -> Self {
let mut ret = BranchAccumulator::None;
std::mem::swap(self, &mut ret);
ret
}
}

/// Monitors the repository for changes and notifies the peer.
struct Monitor<'a> {
vault: &'a Vault,
tx: &'a Sender,
choker: &'a choke::Choker,
}

impl<'a> Monitor<'a> {
fn new(vault: &'a Vault, tx: &'a Sender) -> Self {
Self { vault, tx }
fn new(vault: &'a Vault, tx: &'a Sender, choker: &'a choke::Choker) -> Self {
Self { vault, tx, choker }
}

async fn run(self) -> Result<()> {
Expand All @@ -260,18 +305,75 @@ impl<'a> Monitor<'a> {
Duration::from_secs(1)
));

self.handle_all_branches_changed().await?;
// Explanation of the code below: Because we're using the choker, we can't handle events
// that we receive on `events` right a way. We need to wait for the choker to give us a
// permit. In the mean while we "accumulate" events from `events` into the `accumulator`
// and once we receive a permit from the `choker` we process them all at once. Note that
// this allows us to process each branch event only once even if we received multiple
// events per particular branch.

let accumulator = SyncMutex::new(BranchAccumulator::All);
let (notify_tx, mut notify_rx) = watch::channel(());

// We already have a value in the `accumulator` so make use it's used in `work`.
notify_tx.send(()).unwrap_or(());

while let Some(event) = events.next().await {
match event {
BranchChanged::One(branch_id) => self.handle_branch_changed(branch_id).await?,
BranchChanged::All => self.handle_all_branches_changed().await?,
let mut work = pin!(async {
loop {
if notify_rx.changed().await.is_err() {
break;
}
self.choker.wait_until_unchoked().await;
let accum = accumulator.lock().unwrap().take();
if self.apply_accumulator(accum).await.is_err() {
break;
}
}
});

loop {
select! {
event = events.next() => {
let event = match event {
Some(event) => event,
None => break,
};

let mut accum = accumulator.lock().unwrap();

match event {
BranchChanged::One(branch_id) => {
accum.insert_one_branch(branch_id);
},
BranchChanged::All => {
accum.insert_all_branches();
}
}

notify_tx.send(()).unwrap_or(());
},
_ = &mut work => break,
}
}

Ok(())
}

async fn apply_accumulator(&self, accumulator: BranchAccumulator) -> Result<()> {
match accumulator {
BranchAccumulator::All => {
self.handle_all_branches_changed().await?;
}
BranchAccumulator::Some(branches) => {
for branch in branches {
self.handle_branch_changed(branch).await?;
}
}
BranchAccumulator::None => (),
}
Ok(())
}

async fn handle_all_branches_changed(&self) -> Result<()> {
let root_nodes = self.load_root_nodes().await?;
for root_node in root_nodes {
Expand Down Expand Up @@ -363,14 +465,10 @@ impl<'a> Monitor<'a> {

type Receiver = mpsc::Receiver<Request>;

struct Sender(mpsc::Sender<Content>, choke::Choker);
struct Sender(mpsc::Sender<Content>);

impl Sender {
async fn send(&self, response: Response) -> bool {
if !self.1.can_send().await {
// `choke::Manager` has been destroyed
return false;
}
self.0.send(Content::Response(response)).await.is_ok()
}
}
Expand Down

0 comments on commit d43e833

Please sign in to comment.