From 728fcad07b3ddce411b10ddb014fcfe8ecb09322 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 1 Aug 2019 17:00:45 +0200 Subject: [PATCH] network/service: Let `get_value` return a future returning a DhtEvent So far to retrieve a value from the DHT, one would need to: 1. Trigger a DHT event query via `NetworkWorker.get_value`. 2. Somehow retrieve the event from `NetworkWorker.poll`, e.g. by having it return the responses as a stream. Instead of the above, this commit suggests having `NetworkService.get_value` return a future, eventually resolving to the DHT response. Internally this is coordinated via `futures::sync::oneshot`, maintaining a list of subscribers for a given DHT key, notifying each one when a new event comes in. --- core/network/src/protocol/event.rs | 1 + core/network/src/service.rs | 68 +++++++++++++++++++++++++++--- core/network/src/test/sync.rs | 40 ++++++++++++++++++ 3 files changed, 103 insertions(+), 6 deletions(-) diff --git a/core/network/src/protocol/event.rs b/core/network/src/protocol/event.rs index 2edbb0fbf7563..a9331c148983d 100644 --- a/core/network/src/protocol/event.rs +++ b/core/network/src/protocol/event.rs @@ -20,6 +20,7 @@ use libp2p::multihash::Multihash; /// Events generated by DHT as a response to get_value and put_value requests. +#[derive(Debug, Clone, PartialEq)] pub enum DhtEvent { /// The value was found. ValueFound(Vec<(Multihash, Vec)>), diff --git a/core/network/src/service.rs b/core/network/src/service.rs index acd3bbeab7b10..3fc6cc1f2de1a 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -47,7 +47,7 @@ use crate::config::{Params, TransportConfig}; use crate::error::Error; use crate::protocol::{self, Protocol, Context, CustomMessageOutcome, PeerInfo}; use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; -use crate::protocol::{event::Event, on_demand::{AlwaysBadChecker, RequestData}}; +use crate::protocol::{event::{Event, DhtEvent}, on_demand::{AlwaysBadChecker, RequestData}}; use crate::protocol::specialization::NetworkSpecialization; use crate::protocol::sync::SyncState; @@ -242,6 +242,7 @@ impl, H: ExHashT> NetworkWorker import_queue: params.import_queue, from_worker, on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()), + dht_event_subscribers: HashMap::new(), }) } @@ -301,6 +302,48 @@ impl, H: ExHashT> NetworkWorker self.network_service.user_protocol_mut().on_block_finalized(hash, &header); } + fn notify_dht_event_subscribers(&mut self, e: DhtEvent) { + match e { + DhtEvent::ValueFound(vs) => { + // ValueFound event contains multiple (Multihash, Vec) tuples. Let's sort them into buckets by + // Multihash first and then pass them on to the right subscribers. + let mut value_buckets = HashMap::>>::new(); + + for (h, v) in vs.into_iter() { + value_buckets.entry(h).and_modify(|values| values.push(v)); + } + + for (h, vs) in value_buckets.into_iter() { + let subscribers = self.dht_event_subscribers.remove(&h); + + match subscribers { + Some(subscribers) => { + for s in subscribers.into_iter() { + s.send(DhtEvent::ValueFound(vs.clone().into_iter().map(|v| (h.clone(), v)).collect())).unwrap(); + } + } + None => {}, + } + } + }, + DhtEvent::ValueNotFound(h) => { + let subscribers = self.dht_event_subscribers.remove(&h); + + match subscribers { + Some(subscribers) => { + for s in subscribers.into_iter() { + s.send(DhtEvent::ValueNotFound(h.clone())).unwrap(); + } + } + None => { + }, + } + }, + // TODO: Should we also offer a subscription way for putting values? + DhtEvent::ValuePut(_) | DhtEvent::ValuePutFailed(_) => {}, + } + } + /// Get network state. /// /// **Note**: Use this only for debugging. This API is unstable. There are warnings literaly @@ -451,10 +494,13 @@ impl, H: ExHashT> NetworkServic /// /// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it to /// `on_event` on the network specialization. - pub fn get_value(&self, key: &Multihash) { + pub fn get_value(&self, key: &Multihash) -> impl futures::Future { + let (tx, rx) = futures::sync::oneshot::channel::(); let _ = self .to_worker - .unbounded_send(ServerToWorkerMsg::GetValue(key.clone())); + .unbounded_send(ServerToWorkerMsg::GetValue(( tx, key.clone() ))); + + rx } /// Start putting a value in the DHT. @@ -561,7 +607,7 @@ enum ServerToWorkerMsg> { ExecuteWithSpec(Box) + Send>), ExecuteWithGossip(Box, &mut dyn Context) + Send>), GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec, GossipMessageRecipient), - GetValue(Multihash), + GetValue((futures::sync::oneshot::Sender, Multihash)), PutValue(Multihash, Vec), AddKnownAddress(PeerId, Multiaddr), } @@ -587,6 +633,9 @@ pub struct NetworkWorker, H: Ex from_worker: mpsc::UnboundedReceiver>, /// Receiver for queries from the on-demand that must be processed. on_demand_in: Option>>, + /// DHT event subscribers. + // TODO: We probably need to differentiate in *get* and *put* event subscribers. + dht_event_subscribers: HashMap>>, } impl, H: ExHashT> Future for NetworkWorker { @@ -636,8 +685,14 @@ impl, H: ExHashT> Future for Ne self.network_service.user_protocol_mut().request_justification(&hash, number), ServerToWorkerMsg::PropagateExtrinsics => self.network_service.user_protocol_mut().propagate_extrinsics(), - ServerToWorkerMsg::GetValue(key) => - self.network_service.get_value(&key), + ServerToWorkerMsg::GetValue(( tx, key )) => { + self.dht_event_subscribers.entry(key.clone()) + .or_insert(vec![]) + .push(tx); + + + self.network_service.get_value(&key); + } ServerToWorkerMsg::PutValue(key, value) => self.network_service.put_value(key, value), ServerToWorkerMsg::AddKnownAddress(peer_id, addr) => @@ -653,6 +708,7 @@ impl, H: ExHashT> Future for Ne Ok(Async::NotReady) => break, Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome, Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => { + self.notify_dht_event_subscribers(ev.clone()); self.network_service.user_protocol_mut() .on_event(Event::Dht(ev)); CustomMessageOutcome::None diff --git a/core/network/src/test/sync.rs b/core/network/src/test/sync.rs index f3a8f0c8ea428..00b480e23afbe 100644 --- a/core/network/src/test/sync.rs +++ b/core/network/src/test/sync.rs @@ -20,6 +20,10 @@ use consensus::BlockOrigin; use futures03::TryFutureExt as _; use std::time::Duration; use tokio::runtime::current_thread; +use libp2p::multihash::Hash; +use crate::protocol::event::DhtEvent; +use crate::service::NetworkStateInfo; +use futures::{sync::oneshot, future::{Future}}; use super::*; fn test_ancestor_search_when_common_is(n: usize) { @@ -527,3 +531,39 @@ fn light_peer_imports_header_from_announce() { let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true); import_with_announce(&mut net, &mut runtime, known_stale_hash); } + +// TODO: This function should probably go into a separate file. +#[test] +fn test_network_worker_get_value() { + let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); + let mut net = TestNet::new(1); + let h = libp2p::multihash::encode(Hash::SHA2256, b"hello world").unwrap(); + + let mut expect_dht_event = |net: &mut TestNet, mut f: Box>, expected_event| { + let mut deadline = futures_timer::Delay::new(Duration::from_secs(5)).compat(); + + runtime.block_on(futures::future::poll_fn::<(), String, _>(|| { + net.poll(); + + match f.poll() { + Ok(futures::Async::Ready(received_event)) => { + if received_event != expected_event { + panic!(format!("expected event {:?}, got {:?}", expected_event, received_event)); + } + + return Ok(futures::Async::Ready(())); + } + Ok(futures::Async::NotReady) => {}, + Err(e) => panic!(format!("expected no error on poll: {}", e)), + }; + + deadline.poll() + .map_err(|e| e.to_string()) + .and_then(|_| Err(format!("waiting for dht event {:?} timed out", expected_event))) + })).unwrap(); + }; + + let get_fut = Box::new(net.peer(0).network_service().get_value(&h)); + expect_dht_event(&mut net, get_fut, DhtEvent::ValueNotFound(h.clone())); +}