Skip to content

Commit

Permalink
poc: Refactor Kad Engine to poll based implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Dec 17, 2024
1 parent ed801e4 commit a248900
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 10 deletions.
24 changes: 16 additions & 8 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl Kademlia {

match pending_action.take() {
None => {
tracing::trace!(
tracing::warn!(
target: LOG_TARGET,
?peer,
?substream_id,
Expand Down Expand Up @@ -877,14 +877,22 @@ impl Kademlia {
tracing::debug!(target: LOG_TARGET, "starting kademlia event loop");

loop {
// poll `QueryEngine` for next actions.
while let Some(action) = self.engine.next_action() {
if let Err((query, peer)) = self.on_query_action(action).await {
self.disconnect_peer(peer, Some(query)).await;
}
}
// // poll `QueryEngine` for next actions.
// while let Some(action) = self.engine.next_action() {
// if let Err((query, peer)) = self.on_query_action(action).await {
// self.disconnect_peer(peer, Some(query)).await;
// }
// }

tokio::select! {
action = self.engine.next() => {
if let Some(action) = action {
if let Err((query, peer)) = self.on_query_action(action).await {
self.disconnect_peer(peer, Some(query)).await;
}
}
},

event = self.service.next() => match event {
Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
if let Err(error) = self.on_connection_established(peer) {
Expand Down Expand Up @@ -966,7 +974,7 @@ impl Kademlia {
"failed to read message from substream",
);

self.disconnect_peer(peer, query_id).await;
// self.disconnect_peer(peer, query_id).await;
}
}
},
Expand Down
37 changes: 36 additions & 1 deletion src/protocol/libp2p/kademlia/query/find_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use bytes::Bytes;
use futures::Stream;

use crate::{
protocol::libp2p::kademlia::{
Expand All @@ -29,7 +30,11 @@ use crate::{
PeerId,
};

use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
pin::Pin,
task::{Context, Poll},
};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_node";
Expand Down Expand Up @@ -91,6 +96,9 @@ pub struct FindNodeContext<T: Clone + Into<Vec<u8>>> {
/// These represent the number of peers added to the `Self::pending` minus the number of peers
/// that have failed to respond within the `Self::peer_timeout`
pending_responses: usize,

is_done: bool,
waker: Option<std::task::Waker>,
}

impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
Expand All @@ -116,6 +124,9 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {

peer_timeout: DEFAULT_PEER_TIMEOUT,
pending_responses: 0,

is_done: false,
waker: None,
}
}

Expand Down Expand Up @@ -298,6 +309,30 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
}
}

impl<T: Clone + Into<Vec<u8>> + Unpin> Stream for FindNodeContext<T> {
type Item = QueryAction;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.is_done {
return Poll::Ready(None);
}

let action = self.next_action();
match action {
Some(QueryAction::QueryFailed { .. }) | Some(QueryAction::QuerySucceeded { .. }) => {
self.is_done = true;
}
None => {
self.waker = Some(cx.waker().clone());
return Poll::Pending;
}
_ => (),
};

Poll::Ready(action)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
34 changes: 33 additions & 1 deletion src/protocol/libp2p/kademlia/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ use crate::{
};

use bytes::Bytes;
use futures::{Stream, StreamExt};

use std::collections::{HashMap, VecDeque};
use std::{
collections::{HashMap, VecDeque},
pin::Pin,
task::{Context, Poll},
};

use self::find_many_nodes::FindManyNodesContext;

Expand Down Expand Up @@ -599,6 +604,33 @@ impl QueryEngine {
}
}

impl Stream for QueryEngine {
type Item = QueryAction;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
for (_, state) in self.queries.iter_mut() {
let result = match state {
QueryType::FindNode { context } => context.poll_next_unpin(cx),
_ => continue,
};

match result {
Poll::Ready(Some(QueryAction::QuerySucceeded { query })) => {
return Poll::Ready(Some(self.on_query_succeeded(query)));
}
Poll::Ready(Some(QueryAction::QueryFailed { query })) => {
return Poll::Ready(Some(self.on_query_failed(query)));
}
Poll::Ready(Some(action)) => return Poll::Ready(Some(action)),
Poll::Ready(None) => panic!("Should never happen, we handle the result"),
Poll::Pending => {}
}
}

Poll::Pending
}
}

#[cfg(test)]
mod tests {
use multihash::{Code, Multihash};
Expand Down

0 comments on commit a248900

Please sign in to comment.