Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kad): Implement ability to acknowledge AddProvider before reading further messages from substream #3468

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

- Bump MSRV to 1.65.0.

- Implement ability to acknowledge `AddProvider` before reading further messages from substream. See [PR 3468].

[PR 3239]: https://github.com/libp2p/rust-libp2p/pull/3239
[PR 3287]: https://github.com/libp2p/rust-libp2p/pull/3287
[PR 3468]: https://github.com/libp2p/rust-libp2p/pull/3468

# 0.42.1

Expand Down
1 change: 1 addition & 0 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ futures = "0.3.26"
log = "0.4"
libp2p-core = { version = "0.39.0", path = "../../core" }
libp2p-swarm = { version = "0.42.0", path = "../../swarm" }
parking_lot = "0.12.0"
prost = "0.11"
rand = "0.8"
sha2 = "0.10.0"
Expand Down
34 changes: 26 additions & 8 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ mod test;

use crate::addresses::Addresses;
use crate::handler::{
KademliaHandlerConfig, KademliaHandlerEvent, KademliaHandlerIn, KademliaHandlerProto,
KademliaRequestId,
InboundStreamEventGuard, KademliaHandlerConfig, KademliaHandlerEvent, KademliaHandlerIn,
KademliaHandlerProto, KademliaRequestId,
};
use crate::jobs::*;
use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus};
Expand Down Expand Up @@ -53,6 +53,7 @@ use smallvec::SmallVec;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::fmt;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::vec;
use std::{borrow::Cow, time::Duration};
Expand Down Expand Up @@ -1740,7 +1741,12 @@ where
}

/// Processes a provider record received from a peer.
fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
fn provider_received(
&mut self,
key: record::Key,
provider: KadPeer,
guard: Arc<InboundStreamEventGuard>,
) {
if &provider.node_id != self.kbuckets.local_key().preimage() {
let record = ProviderRecord {
key,
Expand All @@ -1758,7 +1764,10 @@ where
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::InboundRequest {
request: InboundRequest::AddProvider { record: None },
request: InboundRequest::AddProvider {
record: None,
guard: None,
},
},
));
}
Expand All @@ -1768,6 +1777,7 @@ where
KademliaEvent::InboundRequest {
request: InboundRequest::AddProvider {
record: Some(record),
guard: Some(guard),
},
},
));
Expand Down Expand Up @@ -2140,13 +2150,17 @@ where
}
}

KademliaHandlerEvent::AddProvider { key, provider } => {
KademliaHandlerEvent::AddProvider {
key,
provider,
guard,
} => {
// Only accept a provider record from a legitimate peer.
if provider.node_id != source {
return;
}

self.provider_received(key, provider);
self.provider_received(key, provider, guard);
}

KademliaHandlerEvent::GetRecord { key, request_id } => {
Expand Down Expand Up @@ -2583,8 +2597,12 @@ pub enum InboundRequest {
/// If filtering [`KademliaStoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is
/// included.
///
/// See [`KademliaStoreInserts`] and [`KademliaConfig::set_record_filtering`] for details..
AddProvider { record: Option<ProviderRecord> },
/// See [`KademliaStoreInserts`] and [`KademliaConfig::set_record_filtering`] for details.
AddProvider {
record: Option<ProviderRecord>,
/// Guard corresponding to inbound stream that generated this event.
guard: Option<Arc<InboundStreamEventGuard>>,
},
/// Request to retrieve a record.
GetRecord {
num_closer_peers: usize,
Expand Down
79 changes: 76 additions & 3 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use libp2p_swarm::{
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use log::trace;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::{
error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration,
Expand Down Expand Up @@ -168,6 +171,28 @@ enum OutboundSubstreamState<TUserData> {
Poisoned,
}

/// When this structure is dropped, corresponding inbound stream will be able to read the next
/// message, until then.
///
/// Hold onto this structure until processing of the even if finished to temporarily pause new
/// incoming requests from the same inbound stream.
#[derive(Debug)]
pub struct InboundStreamEventGuard {
ready: Arc<AtomicBool>,
waker: Mutex<Option<Waker>>,
}

impl Drop for InboundStreamEventGuard {
fn drop(&mut self) {
self.ready.store(true, Ordering::Release);
self.waker
.lock()
.take()
.expect("Only called once in Drop impl")
.wake();
}
}

/// State of an active inbound substream.
enum InboundSubstreamState<TUserData> {
/// Waiting for a request from the remote.
Expand All @@ -183,6 +208,11 @@ enum InboundSubstreamState<TUserData> {
KadInStreamSink<NegotiatedSubstream>,
Option<Waker>,
),
PendingProcessing {
connection_id: UniqueConnecId,
weak_guard: Weak<InboundStreamEventGuard>,
substream: KadInStreamSink<NegotiatedSubstream>,
},
/// Waiting to send an answer back to the remote.
PendingSend(
UniqueConnecId,
Expand Down Expand Up @@ -241,6 +271,7 @@ impl<TUserData> InboundSubstreamState<TUserData> {
) {
InboundSubstreamState::WaitingMessage { substream, .. }
| InboundSubstreamState::WaitingBehaviour(_, substream, _)
| InboundSubstreamState::PendingProcessing { substream, .. }
| InboundSubstreamState::PendingSend(_, substream, _)
| InboundSubstreamState::PendingFlush(_, substream)
| InboundSubstreamState::Closing(substream) => {
Expand Down Expand Up @@ -316,6 +347,8 @@ pub enum KademliaHandlerEvent<TUserData> {
key: record::Key,
/// The peer that is the provider of the value for `key`.
provider: KadPeer,
/// Guard corresponding to inbound stream that generated this event.
guard: Arc<InboundStreamEventGuard>,
},

/// Request to get a value from the dht records
Expand Down Expand Up @@ -1043,13 +1076,23 @@ where
)));
}
Poll::Ready(Some(Ok(KadRequestMsg::AddProvider { key, provider }))) => {
*this = InboundSubstreamState::WaitingMessage {
first: false,
let ready = Arc::new(AtomicBool::new(false));
let guard = Arc::new(InboundStreamEventGuard {
ready,
waker: Mutex::new(Some(cx.waker().clone())),
});
*this = InboundSubstreamState::PendingProcessing {
connection_id,
weak_guard: Arc::downgrade(&guard),
substream,
};

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(
KademliaHandlerEvent::AddProvider { key, provider },
KademliaHandlerEvent::AddProvider {
key,
provider,
guard,
},
)));
}
Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => {
Expand Down Expand Up @@ -1101,6 +1144,36 @@ where

return Poll::Pending;
}
InboundSubstreamState::PendingProcessing {
connection_id,
weak_guard,
substream,
} => {
if let Some(guard) = weak_guard.upgrade() {
let old_waker = guard.waker.lock().replace(cx.waker().clone());
if old_waker.is_none() || guard.ready.load(Ordering::Acquire) {
*this = InboundSubstreamState::WaitingMessage {
first: false,
connection_id,
substream,
};
} else {
*this = InboundSubstreamState::PendingProcessing {
connection_id,
weak_guard,
substream,
};

return Poll::Pending;
}
} else {
*this = InboundSubstreamState::WaitingMessage {
first: false,
connection_id,
substream,
};
}
}
InboundSubstreamState::PendingSend(id, mut substream, msg) => {
match substream.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) {
Expand Down