Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
core/authority-discovery: Implement module logic itself
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 21, 2019
1 parent 94a238a commit 7de7db7
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 0 deletions.
13 changes: 13 additions & 0 deletions core/authority-discovery/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,17 @@ pub type Result<T> = std::result::Result<T, Error>;

#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
RetrievingAuthorityId,
VerifyingDhtPayload,
HashingAuthorityId(libp2p::core::multiaddr::multihash::EncodeError),
CallingRuntime(client::error::Error),
SigningDhtPayload,
/// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure it
/// is actually an authority, we match the hash against the hash of the authority id of all other authorities. This
/// error is the result of the above failing.
MatchingHashedAuthorityIdWithAuthorityId,
SettingPeersetPriorityGroup(String),
Encoding(prost::EncodeError),
Decoding(prost::DecodeError),
ParsingMultiaddress(libp2p::core::multiaddr::Error),
}
196 changes: 196 additions & 0 deletions core/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,178 @@ where
phantom_authority_id: PhantomData,
}
}

fn publish_own_ext_addresses(&mut self) -> Result<()> {
let id = BlockId::hash(self.client.info().best_hash);

let authority_id = self
.client
.runtime_api()
.authority_id(&id)
.map_err(Error::CallingRuntime)?
.ok_or(Error::RetrievingAuthorityId)?;

let addresses = self
.network
.external_addresses()
.into_iter()
.map(|mut a| {
a.push(libp2p::core::multiaddr::Protocol::P2p(
self.network.peer_id().into(),
));
a
})
.map(|a| a.to_string())
.collect();

let mut serialized_addresses = vec![];
{
let mut a = schema::AuthorityAddresses::default();
a.addresses = addresses;
a.encode(&mut serialized_addresses)
.map_err(Error::Encoding)?;
};

let sig = self
.client
.runtime_api()
.sign(&id, serialized_addresses.clone(), authority_id.clone())
.map_err(Error::CallingRuntime)?
.ok_or(Error::SigningDhtPayload)?;

let mut signed_addresses = vec![];
{
let mut a = schema::SignedAuthorityAddresses::default();
a.addresses = serialized_addresses;
a.signature = sig;
a.encode(&mut signed_addresses).map_err(Error::Encoding)?;
};

self.network
.put_value(hash_authority_id(authority_id.as_ref())?, signed_addresses);

Ok(())
}

fn request_addresses_of_others(&mut self) -> Result<()> {
let id = BlockId::hash(self.client.info().best_hash);

let authorities = self
.client
.runtime_api()
.authorities(&id)
.map_err(Error::CallingRuntime)?;

for authority_id in authorities.iter() {
self.network
.get_value(&hash_authority_id(authority_id.as_ref())?);
}

Ok(())
}

fn handle_dht_events(&mut self) -> Result<()> {
while let Ok(Async::Ready(Some(event))) = self.dht_event_rx.poll() {
match event {
DhtEvent::ValueFound(v) => {
if log_enabled!(log::Level::Debug) {
let hashes = v.iter().map(|(hash, _value)| hash.clone());
debug!(target: "sub-authority-discovery", "Value for hash '{:?}' found on Dht.", hashes);
}

self.handle_dht_value_found_event(v)?;
}
DhtEvent::ValueNotFound(hash) => {
warn!(target: "sub-authority-discovery", "Value for hash '{:?}' not found on Dht.", hash)
}
DhtEvent::ValuePut(hash) => {
debug!(target: "sub-authority-discovery", "Successfully put hash '{:?}' on Dht.", hash)
}
DhtEvent::ValuePutFailed(hash) => {
warn!(target: "sub-authority-discovery", "Failed to put hash '{:?}' on Dht.", hash)
}
}
}

Ok(())
}

fn handle_dht_value_found_event(
&mut self,
values: Vec<(libp2p::kad::record::Key, Vec<u8>)>,
) -> Result<()> {
println!("==== dht found handling, cache: {:?}", self.address_cache);
let id = BlockId::hash(self.client.info().best_hash);

// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure
// it is actually an authority, we match the hash against the hash of the authority id of all other authorities.
let authorities = self.client.runtime_api().authorities(&id)?;
self.purge_old_authorities_from_cache(&authorities);

let authorities = authorities
.into_iter()
.map(|a| hash_authority_id(a.as_ref()).map(|h| (h, a)))
.collect::<Result<HashMap<_, _>>>()?;

for (key, value) in values.iter() {
// Check if the event origins from an authority in the current
// authority set.
let authority_pub_key: &AuthorityId = authorities
.get(key)
.ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;

let signed_addresses =
schema::SignedAuthorityAddresses::decode(value).map_err(Error::Decoding)?;

let is_verified = self
.client
.runtime_api()
.verify(
&id,
signed_addresses.addresses.clone(),
signed_addresses.signature.clone(),
authority_pub_key.clone(),
)
.map_err(Error::CallingRuntime)?;

if !is_verified {
return Err(Error::VerifyingDhtPayload);
}

let addresses: Vec<libp2p::Multiaddr> =
schema::AuthorityAddresses::decode(signed_addresses.addresses)
.map(|a| a.addresses)
.map_err(Error::Decoding)?
.into_iter()
.map(|a| a.parse())
.collect::<std::result::Result<_, _>>()
.map_err(Error::ParsingMultiaddress)?;

self.address_cache
.insert(authority_pub_key.clone(), addresses);
}

// Let's update the peerset priority group with the all the addresses we
// have in our cache.

let addresses = HashSet::from_iter(
self.address_cache
.iter()
.map(|(_peer_id, addresses)| addresses.clone())
.flatten(),
);

self.network
.set_priority_group("authorities".to_string(), addresses)
.map_err(Error::SettingPeersetPriorityGroup)?;

Ok(())
}

fn purge_old_authorities_from_cache(&mut self, authorities: &Vec<AuthorityId>) {
self.address_cache
.retain(|peer_id, _addresses| authorities.contains(peer_id))
}
}

impl<AuthorityId, Client, B, S, H> futures::Future
Expand All @@ -163,8 +335,32 @@ where
type Error = ();

fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let mut inner = || -> Result<()> {
// Process incoming events before triggering new ones.
self.handle_dht_events()?;

while let Ok(Async::Ready(_)) = self.interval.poll() {
self.publish_own_ext_addresses()?;

self.request_addresses_of_others()?;
}

Ok(())
};

match inner() {
Ok(()) => {}
Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e),
}

// Make sure to always return NotReady as this is a long running task
// with the same lifetime of the node itself.
Ok(futures::Async::NotReady)
}
}

fn hash_authority_id(id: &[u8]) -> Result<(libp2p::kad::record::Key)> {
libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, id)
.map(|k| libp2p::kad::record::Key::new(&k))
.map_err(Error::HashingAuthorityId)
}

0 comments on commit 7de7db7

Please sign in to comment.