Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
inetic committed Sep 7, 2023
1 parent bb70de1 commit 08ba64a
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 122 deletions.
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ noise-rust-crypto = { version = "0.5.0", default-features = false, features = ["
num_enum = { workspace = true }
once_cell = { workspace = true }
parse-size = { version = "1.0.0", features = ["std"] }
pin-project-lite = "0.2.9"
pin-project-lite = "0.2.13"
rand = { workspace = true }
ref-cast = "1.0.14"
rupnp = { version = "1.1.0", default-features = false, features = [] }
Expand Down
2 changes: 1 addition & 1 deletion lib/src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ fn events(rx: broadcast::Receiver<Event>) -> impl Stream<Item = BranchChanged> {
})
}

#[derive(Eq, PartialEq, Clone, Debug)]
#[derive(Eq, PartialEq, Clone, Debug, Hash)]
enum BranchChanged {
One(PublicKey),
All,
Expand Down
259 changes: 139 additions & 120 deletions lib/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,22 +267,22 @@ pub(crate) mod broadcast_hash_set {

pub(crate) mod stream {
use futures_util::{stream::Fuse, Stream, StreamExt};
use indexmap::{IndexMap, IndexSet};
use pin_project_lite::pin_project;
use std::{
collections::BTreeMap,
collections::{hash_map, BTreeMap, HashMap},
fmt::Debug,
future::Future,
hash::Hash,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::time::{self, Duration, Instant, Sleep};

#[derive(Debug)]
struct State<Item: Eq + Debug> {
item: Item,
delay_until: Instant,
accumulated: bool,
type EntryId = u64;

struct Delay {
until: Instant,
next: Option<EntryId>,
}

pin_project! {
Expand All @@ -303,175 +303,147 @@ pub(crate) mod stream {
pub struct Throttle<S>
where
S: Stream,
S::Item: Eq + Debug,
S::Item: Hash,
{
#[pin]
inner: Fuse<S>,
period: Duration,
items: BTreeMap<u64, State<S::Item>>,
ready: BTreeMap<EntryId, S::Item>,
delays: HashMap<S::Item, Delay>,
#[pin]
sleep: Option<Sleep>,
next_id: u64,
next_id: EntryId,
}
}

impl<S> Throttle<S>
where
S: Stream,
S::Item: Eq + Debug,
S::Item: Eq + Hash,
{
pub fn new(inner: S, period: Duration) -> Self {
Self {
inner: inner.fuse(),
period,
items: BTreeMap::new(),
ready: BTreeMap::new(),
delays: HashMap::new(),
sleep: None,
next_id: 0,
}
}

fn find_by_item<'a>(
items: &'a mut BTreeMap<u64, State<S::Item>>,
item: &'a S::Item,
) -> Option<(u64, &'a mut State<S::Item>)> {
for (id, state) in items.iter_mut() {
if &state.item == item {
return Some((*id, state));
fn is_ready(ready: &BTreeMap<EntryId, S::Item>, item: &S::Item) -> bool {
for v in ready.values() {
if v == item {
return true;
}
}
None
false
}
}

impl<S> Stream for Throttle<S>
where
S: Stream,
S::Item: Eq + Clone + Debug,
S::Item: Hash + Eq + Clone + Debug,
{
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
println!("------------------ L{}", line!());
let mut this = self.project();
let now = Instant::now();
let mut inner_is_finished = false;

// Take entries from `inner` into `items` ASAP so we can timestamp them.
loop {
println!("------------------ L{}", line!());
match this.inner.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => {
println!("------------------ L{} {:?}", line!(), item);
match Self::find_by_item(&mut this.items, &item) {
Some((_id, state)) => {
println!("------------------ L{} {:?}", line!(), state);
state.accumulated = true;
let is_ready = Self::is_ready(&this.ready, &item);

match this.delays.entry(item.clone()) {
hash_map::Entry::Occupied(mut entry) => {
if !is_ready {
let delay = entry.get_mut();

if delay.next.is_none() {
let entry_id = *this.next_id;
*this.next_id += 1;
delay.next = Some(entry_id);
}
}
}
None => {
println!("------------------ L{}", line!());
let id = *this.next_id;
hash_map::Entry::Vacant(entry) => {
let entry_id = *this.next_id;
*this.next_id += 1;

this.items.insert(
id,
State {
item,
delay_until: now + *this.period,
accumulated: true,
},
);
if !is_ready {
this.ready.insert(entry_id, item.clone());
}

entry.insert(Delay {
until: now + *this.period,
next: None,
});
}
};
}
}
Poll::Ready(None) => {
println!("------------------ L{} inner finished", line!());
inner_is_finished = true;
break;
}
Poll::Pending => {
println!("------------------ L{} inner pending", line!());
break;
}
}
}

println!("------------------ L{}", line!());
loop {
println!("------------------ L{}", line!());
if let Some(first_entry) = this.ready.first_entry() {
return Poll::Ready(Some(first_entry.remove()));
}

if let Some(sleep) = this.sleep.as_mut().as_pin_mut() {
println!("------------------ L{} sleep?", line!());
ready!(sleep.poll(cx));
println!("------------------ L{} sleep done", line!());
this.sleep.set(None);
}

if let Some(first_entry) = this.items.first_entry() {
let mut state = first_entry.remove();
println!("------------------ L{} first entry {:?}", line!(), state);

if state.delay_until <= now {
if state.accumulated {
state.accumulated = false;
state.delay_until = now + *this.period;
let mut first: Option<(&S::Item, &mut Delay)> = None;

let id = *this.next_id;
*this.next_id += 1;

let item = state.item.clone();
this.items.insert(id, state);
for (item, delay) in this.delays.iter_mut() {
if let Some((_, first_delay)) = &first {
if (delay.until, delay.next) < (first_delay.until, first_delay.next) {
first = Some((item, delay));
}
} else {
first = Some((item, delay));
}
}

return Poll::Ready(Some(item));
let (first_item, first_delay) = match &mut first {
Some(first) => (&first.0, &mut first.1),
None => {
return if inner_is_finished {
Poll::Ready(None)
} else {
continue;
Poll::Pending
}
}
};

if first_delay.until <= now {
let first_item = (*first_item).clone();

if first_delay.next.is_some() {
first_delay.until = now + *this.period;
first_delay.next = None;
return Poll::Ready(Some(first_item));
} else {
this.sleep.set(Some(time::sleep_until(state.delay_until)));
continue;
this.delays.remove(&first_item);
}
} else {
if inner_is_finished {
return Poll::Ready(None);
}
return Poll::Pending;
this.sleep.set(Some(time::sleep_until(first_delay.until)));
}
}

//loop {
// if this.sleep.is_none() {
// if let Some(item) = this.items.pop() {
// return Poll::Ready(Some(item));
// }
// }

// match this.inner.as_mut().poll_next(cx) {
// Poll::Ready(Some(item)) => {
// if this.sleep.is_none() {
// this.sleep.set(Some(time::sleep(*this.period)));
// return Poll::Ready(Some(item));
// } else {
// this.items.insert(item);
// }
// }
// Poll::Ready(None) => {
// if this.sleep.is_some() {
// this.sleep.set(None);
// this.items.reverse(); // keep the original order
// continue;
// } else {
// return Poll::Ready(None);
// }
// }
// Poll::Pending => (),
// }

// if let Some(sleep) = this.sleep.as_mut().as_pin_mut() {
// ready!(sleep.poll(cx));
// this.sleep.set(None);
// this.items.reverse(); // keep the original order
// } else {
// return Poll::Pending;
// }
//}
}
}

Expand All @@ -484,6 +456,39 @@ pub(crate) mod stream {

#[tokio::test(start_paused = true)]
async fn rate_limit_equal_items() {
let input = [(ms(0), 0), (ms(0), 0)];
let (tx, rx) = mpsc::channel(1);
let expected = [(0, ms(0)), (0, ms(1000))];
future::join(
produce(tx, input),
verify(Throttle::new(into_stream(rx), ms(1000)), expected),
)
.await;

//--------------------------------------------------------

let input = [(ms(0), 0), (ms(100), 0)];
let (tx, rx) = mpsc::channel(1);
let expected = [(0, ms(0)), (0, ms(1000))];
future::join(
produce(tx, input),
verify(Throttle::new(into_stream(rx), ms(1000)), expected),
)
.await;

//--------------------------------------------------------

let input = [(ms(0), 0), (ms(0), 0), (ms(1001), 0)];
let (tx, rx) = mpsc::channel(1);
let expected = [(0, ms(0)), (0, ms(1000)), (0, ms(2000))];
future::join(
produce(tx, input),
verify(Throttle::new(into_stream(rx), ms(1000)), expected),
)
.await;

//--------------------------------------------------------

let input = [
(ms(0), 0),
(ms(100), 0),
Expand All @@ -492,19 +497,6 @@ pub(crate) mod stream {
(ms(0), 0),
];

// unlimitted
let (tx, rx) = mpsc::channel(1);
let expected = [
(0, ms(0)),
(0, ms(100)),
(0, ms(200)),
(0, ms(1700)),
(0, ms(1700)),
];
future::join(produce(tx, input), verify(into_stream(rx), expected)).await;

println!("----------------- start L{}", line!());
// throttled
let (tx, rx) = mpsc::channel(1);
let expected = [(0, ms(0)), (0, ms(1000)), (0, ms(2000))];
future::join(
Expand All @@ -516,12 +508,39 @@ pub(crate) mod stream {

#[tokio::test(start_paused = true)]
async fn rate_limit_inequal_items() {
let input = [(ms(0), 0), (ms(0), 1)];

let (tx, rx) = mpsc::channel(1);
let expected = [(0, ms(0)), (1, ms(0))];
future::join(
produce(tx, input),
verify(Throttle::new(into_stream(rx), ms(1000)), expected),
)
.await;

//--------------------------------------------------------

let input = [(ms(0), 0), (ms(0), 1), (ms(0), 0), (ms(0), 1)];

// unlimitted
let (tx, rx) = mpsc::channel(1);
let expected = [(0, ms(0)), (1, ms(0)), (0, ms(0)), (1, ms(0))];
future::join(produce(tx, input), verify(into_stream(rx), expected)).await;
let expected = [(0, ms(0)), (1, ms(0)), (0, ms(1000)), (1, ms(1000))];
future::join(
produce(tx, input),
verify(Throttle::new(into_stream(rx), ms(1000)), expected),
)
.await;

//--------------------------------------------------------

let input = [(ms(0), 0), (ms(0), 1), (ms(0), 1), (ms(0), 0)];

let (tx, rx) = mpsc::channel(1);
let expected = [(0, ms(0)), (1, ms(0)), (1, ms(1000)), (0, ms(1000))];
future::join(
produce(tx, input),
verify(Throttle::new(into_stream(rx), ms(1000)), expected),
)
.await;
}

fn ms(ms: u64) -> Duration {
Expand Down

0 comments on commit 08ba64a

Please sign in to comment.