Skip to content

Commit

Permalink
kad: Providers part 5: GET_PROVIDERS query (#236)
Browse files Browse the repository at this point in the history
Implement iterative `GET_PROVIDERS` query.
  • Loading branch information
dmitry-markin authored Sep 30, 2024
1 parent 6ffcdd2 commit 0f865fb
Show file tree
Hide file tree
Showing 6 changed files with 446 additions and 26 deletions.
36 changes: 35 additions & 1 deletion src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
protocol::libp2p::kademlia::{PeerRecord, QueryId, Record, RecordKey},
protocol::libp2p::kademlia::{KademliaPeer, PeerRecord, QueryId, Record, RecordKey},
PeerId,
};

Expand Down Expand Up @@ -134,6 +134,15 @@ pub(crate) enum KademliaCommand {
query_id: QueryId,
},

/// Get providers from DHT.
GetProviders {
/// Provided key.
key: RecordKey,

/// Query ID for the query.
query_id: QueryId,
},

/// Register as a content provider for `key`.
StartProviding {
/// Provided key.
Expand Down Expand Up @@ -190,6 +199,16 @@ pub enum KademliaEvent {
records: RecordsType,
},

/// `GET_PROVIDERS` query succeeded.
GetProvidersSuccess {
/// Query ID.
query_id: QueryId,

/// Found providers with cached addresses. Returned providers are sorted by distane to the
/// provided key.
providers: Vec<KademliaPeer>,
},

/// `PUT_VALUE` query succeeded.
// TODO: this is never emitted. Implement + add `AddProviderSuccess`.
PutRecordSuccess {
Expand Down Expand Up @@ -284,6 +303,8 @@ impl KademliaHandle {
}

/// Store record to DHT to the given peers.
///
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn put_record_to_peers(
&mut self,
record: Record,
Expand All @@ -305,6 +326,8 @@ impl KademliaHandle {
}

/// Get record from DHT.
///
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
let query_id = self.next_query_id();
let _ = self
Expand All @@ -322,6 +345,7 @@ impl KademliaHandle {
/// Register as a content provider on the DHT.
///
/// Register the local peer ID & its `public_addresses` as a provider for a given `key`.
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn start_providing(
&mut self,
key: RecordKey,
Expand All @@ -340,6 +364,16 @@ impl KademliaHandle {
query_id
}

/// Get providers from DHT.
///
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn get_providers(&mut self, key: RecordKey) -> QueryId {
let query_id = self.next_query_id();
let _ = self.cmd_tx.send(KademliaCommand::GetProviders { key, query_id }).await;

query_id
}

/// Store the record in the local store. Used in combination with
/// [`IncomingRecordValidationMode::Manual`].
pub async fn store_record(&mut self, record: Record) {
Expand Down
15 changes: 5 additions & 10 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,35 +187,31 @@ impl KademliaMessage {
};

let mut buf = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
message.encode(&mut buf).expect("BytesMut to provide needed capacity");

buf.freeze()
}

/// Create `GET_PROVIDERS` request for `key`.
#[allow(unused)]
pub fn get_providers_request(key: RecordKey) -> Vec<u8> {
pub fn get_providers_request(key: RecordKey) -> Bytes {
let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetProviders.into(),
..Default::default()
};

let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
let mut buf = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("BytesMut to provide needed capacity");

buf
buf.freeze()
}

/// Create `GET_PROVIDERS` response.
pub fn get_providers_response(
key: RecordKey,
providers: Vec<ProviderRecord>,
closer_peers: &[KademliaPeer],
) -> Vec<u8> {
debug_assert!(providers.iter().all(|p| p.key == key));

let provider_peers = providers
.into_iter()
.map(|p| {
Expand All @@ -229,7 +225,6 @@ impl KademliaMessage {
.collect();

let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetProviders.into(),
closer_peers: closer_peers.iter().map(Into::into).collect(),
Expand Down
51 changes: 41 additions & 10 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,8 @@ impl Kademlia {
.routing_table
.closest(Key::from(key.to_vec()), self.replication_factor);

let message = KademliaMessage::get_providers_response(
key.clone(),
providers,
&closer_peers,
);
let message =
KademliaMessage::get_providers_response(providers, &closer_peers);
self.executor.send_message(peer, message.into(), substream);
}
(None, None) => tracing::debug!(
Expand Down Expand Up @@ -829,6 +826,19 @@ impl Kademlia {
.await;
Ok(())
}
QueryAction::GetProvidersQueryDone {
query_id,
providers,
} => {
let _ = self
.event_tx
.send(KademliaEvent::GetProvidersSuccess {
query_id,
providers,
})
.await;
Ok(())
}
QueryAction::QueryFailed { query } => {
tracing::debug!(target: LOG_TARGET, ?query, "query failed");

Expand Down Expand Up @@ -963,7 +973,8 @@ impl Kademlia {
"store record to DHT",
);

// For `PUT_VALUE` requests originating locally we are always the publisher.
// For `PUT_VALUE` requests originating locally we are always the
// publisher.
record.publisher = Some(self.local_key.clone().into_preimage());

// Make sure TTL is set.
Expand Down Expand Up @@ -1060,21 +1071,37 @@ impl Kademlia {
(Some(record), Quorum::One) => {
let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess { query_id, records: RecordsType::LocalStore(record.clone()) })
.send(KademliaEvent::GetRecordSuccess {
query_id,
records: RecordsType::LocalStore(record.clone()),
})
.await;
}
(record, _) => {
self.engine.start_get_record(
query_id,
key.clone(),
self.routing_table.closest(Key::new(key.clone()), self.replication_factor).into(),
self.routing_table
.closest(Key::new(key), self.replication_factor)
.into(),
quorum,
if record.is_some() { 1 } else { 0 },
);
}
}

}
Some(KademliaCommand::GetProviders { key, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?key, "get providers from DHT");

self.engine.start_get_providers(
query_id,
key.clone(),
self.routing_table
.closest(Key::new(key), self.replication_factor)
.into(),
);
}
Some(KademliaCommand::AddKnownPeer { peer, addresses }) => {
tracing::trace!(
target: LOG_TARGET,
Expand All @@ -1088,7 +1115,10 @@ impl Kademlia {
addresses.clone(),
self.peers
.get(&peer)
.map_or(ConnectionType::NotConnected, |_| ConnectionType::Connected),
.map_or(
ConnectionType::NotConnected,
|_| ConnectionType::Connected,
),
);
self.service.add_known_address(&peer, addresses.into_iter());

Expand All @@ -1101,7 +1131,8 @@ impl Kademlia {
);

// Make sure TTL is set.
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
record.expires =
record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

self.store.put(record);
}
Expand Down
Loading

0 comments on commit 0f865fb

Please sign in to comment.