Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

network/service: Let get_value return a future returning a DhtEvent #3292

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/network/src/protocol/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use libp2p::multihash::Multihash;

/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone, PartialEq)]
pub enum DhtEvent {
/// The value was found.
ValueFound(Vec<(Multihash, Vec<u8>)>),
Expand Down
68 changes: 62 additions & 6 deletions core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::config::{Params, TransportConfig};
use crate::error::Error;
use crate::protocol::{self, Protocol, Context, CustomMessageOutcome, PeerInfo};
use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use crate::protocol::{event::Event, on_demand::{AlwaysBadChecker, RequestData}};
use crate::protocol::{event::{Event, DhtEvent}, on_demand::{AlwaysBadChecker, RequestData}};
use crate::protocol::specialization::NetworkSpecialization;
use crate::protocol::sync::SyncState;

Expand Down Expand Up @@ -242,6 +242,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
import_queue: params.import_queue,
from_worker,
on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()),
dht_event_subscribers: HashMap::new(),
})
}

Expand Down Expand Up @@ -301,6 +302,48 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
self.network_service.user_protocol_mut().on_block_finalized(hash, &header);
}

fn notify_dht_event_subscribers(&mut self, e: DhtEvent) {
match e {
DhtEvent::ValueFound(vs) => {
// ValueFound event contains multiple (Multihash, Vec<u8>) tuples. Let's sort them into buckets by
// Multihash first and then pass them on to the right subscribers.
let mut value_buckets = HashMap::<Multihash, Vec<Vec<u8>>>::new();

for (h, v) in vs.into_iter() {
value_buckets.entry(h).and_modify(|values| values.push(v));
}

for (h, vs) in value_buckets.into_iter() {
let subscribers = self.dht_event_subscribers.remove(&h);

match subscribers {
Some(subscribers) => {
for s in subscribers.into_iter() {
s.send(DhtEvent::ValueFound(vs.clone().into_iter().map(|v| (h.clone(), v)).collect())).unwrap();
}
}
None => {},
}
}
},
DhtEvent::ValueNotFound(h) => {
let subscribers = self.dht_event_subscribers.remove(&h);

match subscribers {
Some(subscribers) => {
for s in subscribers.into_iter() {
s.send(DhtEvent::ValueNotFound(h.clone())).unwrap();
}
}
None => {
},
}
},
// TODO: Should we also offer a subscription way for putting values?
DhtEvent::ValuePut(_) | DhtEvent::ValuePutFailed(_) => {},
}
}

/// Get network state.
///
/// **Note**: Use this only for debugging. This API is unstable. There are warnings literaly
Expand Down Expand Up @@ -451,10 +494,13 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
///
/// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it to
/// `on_event` on the network specialization.
pub fn get_value(&self, key: &Multihash) {
pub fn get_value(&self, key: &Multihash) -> impl futures::Future<Item=DhtEvent, Error=futures::sync::oneshot::Canceled> {
let (tx, rx) = futures::sync::oneshot::channel::<DhtEvent>();
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::GetValue(key.clone()));
.unbounded_send(ServerToWorkerMsg::GetValue(( tx, key.clone() )));

rx
}

/// Start putting a value in the DHT.
Expand Down Expand Up @@ -561,7 +607,7 @@ enum ServerToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> {
ExecuteWithSpec(Box<dyn FnOnce(&mut S, &mut dyn Context<B>) + Send>),
ExecuteWithGossip(Box<dyn FnOnce(&mut ConsensusGossip<B>, &mut dyn Context<B>) + Send>),
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, GossipMessageRecipient),
GetValue(Multihash),
GetValue((futures::sync::oneshot::Sender<DhtEvent>, Multihash)),
PutValue(Multihash, Vec<u8>),
AddKnownAddress(PeerId, Multiaddr),
}
Expand All @@ -587,6 +633,9 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
from_worker: mpsc::UnboundedReceiver<ServerToWorkerMsg<B, S>>,
/// Receiver for queries from the on-demand that must be processed.
on_demand_in: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
/// DHT event subscribers.
// TODO: We probably need to differentiate in *get* and *put* event subscribers.
dht_event_subscribers: HashMap<Multihash, Vec<futures::sync::oneshot::Sender<DhtEvent>>>,
}

impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for NetworkWorker<B, S, H> {
Expand Down Expand Up @@ -636,8 +685,14 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
self.network_service.user_protocol_mut().request_justification(&hash, number),
ServerToWorkerMsg::PropagateExtrinsics =>
self.network_service.user_protocol_mut().propagate_extrinsics(),
ServerToWorkerMsg::GetValue(key) =>
self.network_service.get_value(&key),
ServerToWorkerMsg::GetValue(( tx, key )) => {
self.dht_event_subscribers.entry(key.clone())
.or_insert(vec![])
.push(tx);


self.network_service.get_value(&key);
}
ServerToWorkerMsg::PutValue(key, value) =>
self.network_service.put_value(key, value),
ServerToWorkerMsg::AddKnownAddress(peer_id, addr) =>
Expand All @@ -653,6 +708,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome,
Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => {
self.notify_dht_event_subscribers(ev.clone());
self.network_service.user_protocol_mut()
.on_event(Event::Dht(ev));
CustomMessageOutcome::None
Expand Down
40 changes: 40 additions & 0 deletions core/network/src/test/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ use consensus::BlockOrigin;
use futures03::TryFutureExt as _;
use std::time::Duration;
use tokio::runtime::current_thread;
use libp2p::multihash::Hash;
use crate::protocol::event::DhtEvent;
use crate::service::NetworkStateInfo;
use futures::{sync::oneshot, future::{Future}};
use super::*;

fn test_ancestor_search_when_common_is(n: usize) {
Expand Down Expand Up @@ -527,3 +531,39 @@ fn light_peer_imports_header_from_announce() {
let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true);
import_with_announce(&mut net, &mut runtime, known_stale_hash);
}

// TODO: This function should probably go into a separate file.
#[test]
fn test_network_worker_get_value() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(1);
let h = libp2p::multihash::encode(Hash::SHA2256, b"hello world").unwrap();

let mut expect_dht_event = |net: &mut TestNet, mut f: Box<Future<Item=_, Error=_>>, expected_event| {
let mut deadline = futures_timer::Delay::new(Duration::from_secs(5)).compat();

runtime.block_on(futures::future::poll_fn::<(), String, _>(|| {
net.poll();

match f.poll() {
Ok(futures::Async::Ready(received_event)) => {
if received_event != expected_event {
panic!(format!("expected event {:?}, got {:?}", expected_event, received_event));
}

return Ok(futures::Async::Ready(()));
}
Ok(futures::Async::NotReady) => {},
Err(e) => panic!(format!("expected no error on poll: {}", e)),
};

deadline.poll()
.map_err(|e| e.to_string())
.and_then(|_| Err(format!("waiting for dht event {:?} timed out", expected_event)))
})).unwrap();
};

let get_fut = Box::new(net.peer(0).network_service().get_value(&h));
expect_dht_event(&mut net, get_fut, DhtEvent::ValueNotFound(h.clone()));
}