Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
core/network: Make network worker return Dht events on poll
Browse files Browse the repository at this point in the history
Instead of network worker implement the Future trait, have it implement
the Stream interface returning Dht events.

For now these events are ignored in build_network_future but will be
used by the core/authority-discovery module in subsequent commits.
  • Loading branch information
mxinden committed Aug 21, 2019
1 parent f72e1cb commit d34eb4f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 12 deletions.
1 change: 1 addition & 0 deletions core/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub use service::{
NetworkStateInfo,
};
pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization};
pub use protocol::event::{Event, DhtEvent};
pub use protocol::sync::SyncState;
pub use libp2p::{Multiaddr, PeerId};
#[doc(inline)]
Expand Down
2 changes: 2 additions & 0 deletions core/network/src/protocol/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use libp2p::kad::record::Key;

/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
pub enum DhtEvent {
/// The value was found.
ValueFound(Vec<(Key, Vec<u8>)>),
Expand All @@ -35,6 +36,7 @@ pub enum DhtEvent {
}

/// Type for events generated by networking layer.
#[derive(Debug, Clone)]
pub enum Event {
/// Event generated by a DHT.
Dht(DhtEvent),
Expand Down
13 changes: 7 additions & 6 deletions core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,11 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
light_client_rqs: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
}

impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for NetworkWorker<B, S, H> {
type Item = ();
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for NetworkWorker<B, S, H> {
type Item = Event;
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Poll the import queue for actions to perform.
let _ = futures03::future::poll_fn(|cx| {
self.import_queue.poll_actions(cx, &mut NetworkLink {
Expand All @@ -636,7 +636,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
// Process the next message coming from the `NetworkService`.
let msg = match self.from_worker.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::NotReady),
Ok(Async::NotReady) => break,
};

Expand Down Expand Up @@ -677,8 +677,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome,
Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => {
self.network_service.user_protocol_mut()
.on_event(Event::Dht(ev));
CustomMessageOutcome::None
.on_event(Event::Dht(ev.clone()));

return Ok(Async::Ready(Some(Event::Dht(ev))));
},
Ok(Async::Ready(None)) => CustomMessageOutcome::None,
Err(err) => {
Expand Down
12 changes: 6 additions & 6 deletions core/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use exit_future::Signal;
use futures::prelude::*;
use futures03::stream::{StreamExt as _, TryStreamExt as _};
use keystore::Store as Keystore;
use network::{NetworkState, NetworkStateInfo};
use network::{NetworkState, NetworkStateInfo, Event, DhtEvent};
use log::{log, info, warn, debug, error, Level};
use codec::{Encode, Decode};
use sr_primitives::generic::BlockId;
Expand Down Expand Up @@ -708,11 +708,11 @@ fn build_network_future<
}

// Main network polling.
match network.poll() {
Ok(Async::NotReady) => {}
Err(err) => warn!(target: "service", "Error in network: {:?}", err),
Ok(Async::Ready(())) => warn!(target: "service", "Network service finished"),
}
while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| {
warn!(target: "service", "Error in network: {:?}", err);
}) {
// Ignore for now.
};

// Now some diagnostic for performances.
let polling_dur = before_polling.elapsed();
Expand Down

0 comments on commit d34eb4f

Please sign in to comment.