diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index bf6ee78c..656c403d 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - protocol::libp2p::kademlia::{PeerRecord, QueryId, Record, RecordKey}, + protocol::libp2p::kademlia::{KademliaPeer, PeerRecord, QueryId, Record, RecordKey}, PeerId, }; @@ -134,6 +134,15 @@ pub(crate) enum KademliaCommand { query_id: QueryId, }, + /// Get providers from DHT. + GetProviders { + /// Provided key. + key: RecordKey, + + /// Query ID for the query. + query_id: QueryId, + }, + /// Register as a content provider for `key`. StartProviding { /// Provided key. @@ -190,6 +199,16 @@ pub enum KademliaEvent { records: RecordsType, }, + /// `GET_PROVIDERS` query succeeded. + GetProvidersSuccess { + /// Query ID. + query_id: QueryId, + + /// Found providers with cached addresses. Returned providers are sorted by distane to the + /// provided key. + providers: Vec, + }, + /// `PUT_VALUE` query succeeded. // TODO: this is never emitted. Implement + add `AddProviderSuccess`. PutRecordSuccess { @@ -284,6 +303,8 @@ impl KademliaHandle { } /// Store record to DHT to the given peers. + /// + /// Returns [`Err`] only if `Kademlia` is terminating. pub async fn put_record_to_peers( &mut self, record: Record, @@ -305,6 +326,8 @@ impl KademliaHandle { } /// Get record from DHT. + /// + /// Returns [`Err`] only if `Kademlia` is terminating. pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId { let query_id = self.next_query_id(); let _ = self @@ -322,6 +345,7 @@ impl KademliaHandle { /// Register as a content provider on the DHT. /// /// Register the local peer ID & its `public_addresses` as a provider for a given `key`. + /// Returns [`Err`] only if `Kademlia` is terminating. pub async fn start_providing( &mut self, key: RecordKey, @@ -340,6 +364,16 @@ impl KademliaHandle { query_id } + /// Get providers from DHT. + /// + /// Returns [`Err`] only if `Kademlia` is terminating. + pub async fn get_providers(&mut self, key: RecordKey) -> QueryId { + let query_id = self.next_query_id(); + let _ = self.cmd_tx.send(KademliaCommand::GetProviders { key, query_id }).await; + + query_id + } + /// Store the record in the local store. Used in combination with /// [`IncomingRecordValidationMode::Manual`]. pub async fn store_record(&mut self, record: Record) { diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index bba2b285..f8f4965f 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -187,14 +187,13 @@ impl KademliaMessage { }; let mut buf = BytesMut::with_capacity(message.encoded_len()); - message.encode(&mut buf).expect("Vec to provide needed capacity"); + message.encode(&mut buf).expect("BytesMut to provide needed capacity"); buf.freeze() } /// Create `GET_PROVIDERS` request for `key`. - #[allow(unused)] - pub fn get_providers_request(key: RecordKey) -> Vec { + pub fn get_providers_request(key: RecordKey) -> Bytes { let message = schema::kademlia::Message { key: key.to_vec(), cluster_level_raw: 10, @@ -202,20 +201,17 @@ impl KademliaMessage { ..Default::default() }; - let mut buf = Vec::with_capacity(message.encoded_len()); - message.encode(&mut buf).expect("Vec to provide needed capacity"); + let mut buf = BytesMut::with_capacity(message.encoded_len()); + message.encode(&mut buf).expect("BytesMut to provide needed capacity"); - buf + buf.freeze() } /// Create `GET_PROVIDERS` response. pub fn get_providers_response( - key: RecordKey, providers: Vec, closer_peers: &[KademliaPeer], ) -> Vec { - debug_assert!(providers.iter().all(|p| p.key == key)); - let provider_peers = providers .into_iter() .map(|p| { @@ -229,7 +225,6 @@ impl KademliaMessage { .collect(); let message = schema::kademlia::Message { - key: key.to_vec(), cluster_level_raw: 10, r#type: schema::kademlia::MessageType::GetProviders.into(), closer_peers: closer_peers.iter().map(Into::into).collect(), diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 18c1b17d..a70dfb7c 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -599,11 +599,8 @@ impl Kademlia { .routing_table .closest(Key::from(key.to_vec()), self.replication_factor); - let message = KademliaMessage::get_providers_response( - key.clone(), - providers, - &closer_peers, - ); + let message = + KademliaMessage::get_providers_response(providers, &closer_peers); self.executor.send_message(peer, message.into(), substream); } (None, None) => tracing::debug!( @@ -829,6 +826,19 @@ impl Kademlia { .await; Ok(()) } + QueryAction::GetProvidersQueryDone { + query_id, + providers, + } => { + let _ = self + .event_tx + .send(KademliaEvent::GetProvidersSuccess { + query_id, + providers, + }) + .await; + Ok(()) + } QueryAction::QueryFailed { query } => { tracing::debug!(target: LOG_TARGET, ?query, "query failed"); @@ -963,7 +973,8 @@ impl Kademlia { "store record to DHT", ); - // For `PUT_VALUE` requests originating locally we are always the publisher. + // For `PUT_VALUE` requests originating locally we are always the + // publisher. record.publisher = Some(self.local_key.clone().into_preimage()); // Make sure TTL is set. @@ -1060,14 +1071,19 @@ impl Kademlia { (Some(record), Quorum::One) => { let _ = self .event_tx - .send(KademliaEvent::GetRecordSuccess { query_id, records: RecordsType::LocalStore(record.clone()) }) + .send(KademliaEvent::GetRecordSuccess { + query_id, + records: RecordsType::LocalStore(record.clone()), + }) .await; } (record, _) => { self.engine.start_get_record( query_id, key.clone(), - self.routing_table.closest(Key::new(key.clone()), self.replication_factor).into(), + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), quorum, if record.is_some() { 1 } else { 0 }, ); @@ -1075,6 +1091,17 @@ impl Kademlia { } } + Some(KademliaCommand::GetProviders { key, query_id }) => { + tracing::debug!(target: LOG_TARGET, ?key, "get providers from DHT"); + + self.engine.start_get_providers( + query_id, + key.clone(), + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), + ); + } Some(KademliaCommand::AddKnownPeer { peer, addresses }) => { tracing::trace!( target: LOG_TARGET, @@ -1088,7 +1115,10 @@ impl Kademlia { addresses.clone(), self.peers .get(&peer) - .map_or(ConnectionType::NotConnected, |_| ConnectionType::Connected), + .map_or( + ConnectionType::NotConnected, + |_| ConnectionType::Connected, + ), ); self.service.add_known_address(&peer, addresses.into_iter()); @@ -1101,7 +1131,8 @@ impl Kademlia { ); // Make sure TTL is set. - record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); + record.expires = + record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); self.store.put(record); } diff --git a/src/protocol/libp2p/kademlia/query/get_providers.rs b/src/protocol/libp2p/kademlia/query/get_providers.rs new file mode 100644 index 00000000..d679265c --- /dev/null +++ b/src/protocol/libp2p/kademlia/query/get_providers.rs @@ -0,0 +1,267 @@ +// Copyright 2024 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::Bytes; + +use crate::{ + protocol::libp2p::kademlia::{ + message::KademliaMessage, + query::{QueryAction, QueryId}, + record::Key as RecordKey, + types::{ConnectionType, Distance, KademliaPeer, Key}, + }, + types::multiaddr::Multiaddr, + PeerId, +}; + +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; + +/// Logging target for the file. +const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::get_providers"; + +/// The configuration needed to instantiate a new [`GetProvidersContext`]. +#[derive(Debug)] +pub struct GetProvidersConfig { + /// Local peer ID. + pub local_peer_id: PeerId, + + /// Parallelism factor. + pub parallelism_factor: usize, + + /// Query ID. + pub query: QueryId, + + /// Target key. + pub target: Key, +} + +#[derive(Debug)] +pub struct GetProvidersContext { + /// Query immutable config. + pub config: GetProvidersConfig, + + /// Cached Kademlia message to send. + kad_message: Bytes, + + /// Peers from whom the `QueryEngine` is waiting to hear a response. + pub pending: HashMap, + + /// Queried candidates. + /// + /// These are the peers for whom the query has already been sent + /// and who have either returned their closest peers or failed to answer. + pub queried: HashSet, + + /// Candidates. + pub candidates: BTreeMap, + + /// Found providers. + pub found_providers: Vec, +} + +impl GetProvidersContext { + /// Create new [`GetProvidersContext`]. + pub fn new(config: GetProvidersConfig, in_peers: VecDeque) -> Self { + let mut candidates = BTreeMap::new(); + + for candidate in &in_peers { + let distance = config.target.distance(&candidate.key); + candidates.insert(distance, candidate.clone()); + } + + let kad_message = + KademliaMessage::get_providers_request(config.target.clone().into_preimage()); + + Self { + config, + kad_message, + candidates, + pending: HashMap::new(), + queried: HashSet::new(), + found_providers: Vec::new(), + } + } + + /// Get the found providers. + pub fn found_providers(self) -> Vec { + // Merge addresses of different provider records of the same peer. + let mut providers = HashMap::>::new(); + self.found_providers.into_iter().for_each(|provider| { + providers + .entry(provider.peer) + .or_default() + .extend(provider.addresses.into_iter()) + }); + + // Convert into `Vec` + let mut providers = providers + .into_iter() + .map(|(peer, addresses)| KademliaPeer { + key: Key::from(peer.clone()), + peer, + addresses: addresses.into_iter().collect(), + connection: ConnectionType::NotConnected, + }) + .collect::>(); + + // Sort by the provider distance to the target key. + providers.sort_unstable_by(|p1, p2| { + p1.key.distance(&self.config.target).cmp(&p2.key.distance(&self.config.target)) + }); + + providers + } + + /// Register response failure for `peer`. + pub fn register_response_failure(&mut self, peer: PeerId) { + let Some(peer) = self.pending.remove(&peer) else { + tracing::debug!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetProvidersContext`: pending peer doesn't exist", + ); + return; + }; + + self.queried.insert(peer.peer); + } + + /// Register `GET_PROVIDERS` response from `peer`. + pub fn register_response( + &mut self, + peer: PeerId, + providers: impl IntoIterator, + closer_peers: impl IntoIterator, + ) { + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetProvidersContext`: received response from peer", + ); + + let Some(peer) = self.pending.remove(&peer) else { + tracing::debug!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetProvidersContext`: received response from peer but didn't expect it", + ); + return; + }; + + self.found_providers.extend(providers.into_iter()); + + // Add the queried peer to `queried` and all new peers which haven't been + // queried to `candidates` + self.queried.insert(peer.peer); + + let to_query_candidate = closer_peers.into_iter().filter_map(|peer| { + // Peer already produced a response. + if self.queried.contains(&peer.peer) { + return None; + } + + // Peer was queried, awaiting response. + if self.pending.contains_key(&peer.peer) { + return None; + } + + // Local node. + if self.config.local_peer_id == peer.peer { + return None; + } + + Some(peer) + }); + + for candidate in to_query_candidate { + let distance = self.config.target.distance(&candidate.key); + self.candidates.insert(distance, candidate); + } + } + + /// Get next action for `peer`. + // TODO: remove this and store the next action to `PeerAction` + pub fn next_peer_action(&mut self, peer: &PeerId) -> Option { + self.pending.contains_key(peer).then_some(QueryAction::SendMessage { + query: self.config.query, + peer: *peer, + message: self.kad_message.clone(), + }) + } + + /// Schedule next peer for outbound `GET_VALUE` query. + fn schedule_next_peer(&mut self) -> Option { + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + "`GetProvidersContext`: get next peer", + ); + + let (_, candidate) = self.candidates.pop_first()?; + let peer = candidate.peer; + + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetProvidersContext`: current candidate", + ); + self.pending.insert(candidate.peer, candidate); + + Some(QueryAction::SendMessage { + query: self.config.query, + peer, + message: self.kad_message.clone(), + }) + } + + /// Check if the query cannot make any progress. + /// + /// Returns true when there are no pending responses and no candidates to query. + fn is_done(&self) -> bool { + self.pending.is_empty() && self.candidates.is_empty() + } + + /// Get next action for a `GET_PROVIDERS` query. + pub fn next_action(&mut self) -> Option { + if self.is_done() { + // If we cannot make progress, return the final result. + // A query failed when we are not able to find any providers. + if self.found_providers.is_empty() { + Some(QueryAction::QueryFailed { + query: self.config.query, + }) + } else { + Some(QueryAction::QuerySucceeded { + query: self.config.query, + }) + } + } else if self.pending.len() == self.config.parallelism_factor { + // At this point, we either have pending responses or candidates to query; and we need + // more records. Ensure we do not exceed the parallelism factor. + None + } else { + self.schedule_next_peer() + } + } +} diff --git a/src/protocol/libp2p/kademlia/query/get_record.rs b/src/protocol/libp2p/kademlia/query/get_record.rs index 722ee101..12ea8293 100644 --- a/src/protocol/libp2p/kademlia/query/get_record.rs +++ b/src/protocol/libp2p/kademlia/query/get_record.rs @@ -135,7 +135,12 @@ impl GetRecordContext { /// Register response failure for `peer`. pub fn register_response_failure(&mut self, peer: PeerId) { let Some(peer) = self.pending.remove(&peer) else { - tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist"); + tracing::debug!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetRecordContext`: pending peer doesn't exist", + ); return; }; @@ -149,10 +154,20 @@ impl GetRecordContext { record: Option, peers: Vec, ) { - tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer"); + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetRecordContext`: received response from peer", + ); let Some(peer) = self.pending.remove(&peer) else { - tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer but didn't expect it"); + tracing::debug!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetRecordContext`: received response from peer but didn't expect it", + ); return; }; @@ -206,12 +221,21 @@ impl GetRecordContext { /// Schedule next peer for outbound `GET_VALUE` query. fn schedule_next_peer(&mut self) -> Option { - tracing::trace!(target: LOG_TARGET, query = ?self.config.query, "get next peer"); + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + "`GetRecordContext`: get next peer", + ); let (_, candidate) = self.candidates.pop_first()?; let peer = candidate.peer; - tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "current candidate"); + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetRecordContext`: current candidate", + ); self.pending.insert(candidate.peer, candidate); Some(QueryAction::SendMessage { diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 34f6e84e..8a247e45 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -23,6 +23,7 @@ use crate::{ message::KademliaMessage, query::{ find_node::{FindNodeConfig, FindNodeContext}, + get_providers::{GetProvidersConfig, GetProvidersContext}, get_record::{GetRecordConfig, GetRecordContext}, }, record::{Key as RecordKey, ProviderRecord, Record}, @@ -40,6 +41,7 @@ use self::find_many_nodes::FindManyNodesContext; mod find_many_nodes; mod find_node; +mod get_providers; mod get_record; /// Logging target for the file. @@ -90,6 +92,12 @@ enum QueryType { /// Context for the `FIND_NODE` query. context: FindNodeContext, }, + + /// `GET_PROVIDERS` query. + GetProviders { + /// Context for the `GET_PROVIDERS` query. + context: GetProvidersContext, + }, } /// Query action. @@ -147,6 +155,15 @@ pub enum QueryAction { records: Vec, }, + /// `GET_PROVIDERS` query succeeded. + GetProvidersQueryDone { + /// Query ID. + query_id: QueryId, + + /// Found providers. + providers: Vec, + }, + /// Query succeeded. QuerySucceeded { /// ID of the query that succeeded. @@ -358,6 +375,39 @@ impl QueryEngine { query_id } + /// Start `GET_PROVIDERS` query. + pub fn start_get_providers( + &mut self, + query_id: QueryId, + key: RecordKey, + candidates: VecDeque, + ) -> QueryId { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + ?key, + num_peers = ?candidates.len(), + "start `GET_PROVIDERS` query", + ); + + let target = Key::new(key); + let config = GetProvidersConfig { + local_peer_id: self.local_peer_id, + parallelism_factor: self.parallelism_factor, + query: query_id, + target, + }; + + self.queries.insert( + query_id, + QueryType::GetProviders { + context: GetProvidersContext::new(config, candidates), + }, + ); + + query_id + } + /// Register response failure from a queried peer. pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure"); @@ -381,6 +431,9 @@ impl QueryEngine { Some(QueryType::AddProvider { context, .. }) => { context.register_response_failure(peer); } + Some(QueryType::GetProviders { context }) => { + context.register_response_failure(peer); + } } } @@ -422,6 +475,16 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::GetProviders { context }) => match message { + KademliaMessage::GetProviders { + key: _, + providers, + peers, + } => { + context.register_response(peer, providers, peers); + } + _ => unreachable!(), + }, } } @@ -439,6 +502,7 @@ impl QueryEngine { Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer), Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer), + Some(QueryType::GetProviders { context }) => context.next_peer_action(peer), } } @@ -467,6 +531,10 @@ impl QueryEngine { provider, peers: context.responses.into_values().collect::>(), }, + QueryType::GetProviders { context } => QueryAction::GetProvidersQueryDone { + query_id: context.config.query, + providers: context.found_providers(), + }, } } @@ -487,6 +555,7 @@ impl QueryEngine { QueryType::PutRecordToPeers { context, .. } => context.next_action(), QueryType::GetRecord { context } => context.next_action(), QueryType::AddProvider { context, .. } => context.next_action(), + QueryType::GetProviders { context } => context.next_action(), }; match action {