Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/network-gossip: Integrate GossipEngine tasks into Future impl #4767

Merged
merged 2 commits into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ pub fn new_full(config: Configuration<GenesisConfig>)
grandpa_link,
service.network(),
service.on_exit(),
service.spawn_task_handle(),
)?);
},
(true, false) => {
Expand All @@ -172,7 +171,6 @@ pub fn new_full(config: Configuration<GenesisConfig>)
on_exit: service.on_exit(),
telemetry_on_connect: Some(service.telemetry_on_connect_stream()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
executor: service.spawn_task_handle(),
};

// the GRANDPA voter task is considered infallible, i.e.
Expand Down
2 changes: 0 additions & 2 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ macro_rules! new_full {
grandpa_link,
service.network(),
service.on_exit(),
service.spawn_task_handle(),
)?);
},
(true, false) => {
Expand All @@ -229,7 +228,6 @@ macro_rules! new_full {
on_exit: service.on_exit(),
telemetry_on_connect: Some(service.telemetry_on_connect_stream()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
executor: service.spawn_task_handle(),
};
// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
Expand Down
14 changes: 9 additions & 5 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,14 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
service: N,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
executor: &impl futures::task::Spawn,
) -> Self {
let (validator, report_stream) = GossipValidator::new(
config,
set_state.clone(),
);

let validator = Arc::new(validator);
let gossip_engine = GossipEngine::new(service.clone(), executor, GRANDPA_ENGINE_ID, validator.clone());
let gossip_engine = GossipEngine::new(service.clone(), GRANDPA_ENGINE_ID, validator.clone());

{
// register all previous votes with the gossip service so that they're
Expand Down Expand Up @@ -374,10 +373,9 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|to, neighbor| self.neighbor_sender.send(to, neighbor),
);

let service = self.gossip_engine.clone();
let topic = global_topic::<B>(set_id.0);
let incoming = incoming_global(
service,
self.gossip_engine.clone(),
topic,
voters,
self.validator.clone(),
Expand Down Expand Up @@ -419,7 +417,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
type Output = Result<(), Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
Poll::Ready(Some((to, packet))) => {
Expand All @@ -444,6 +442,12 @@ impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
}
}

match self.gossip_engine.poll_unpin(cx) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GossipEngine needs to be polled by the NetworkBridge.

// The gossip engine future finished. We should do the same.
Poll::Ready(()) => return Poll::Ready(Ok(())),
Poll::Pending => {},
}

Poll::Pending
}
}
Expand Down
32 changes: 19 additions & 13 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ fn voter_set_state() -> SharedVoterSetState<Block> {
}

// needs to run in a tokio runtime.
pub(crate) fn make_test_network(executor: &impl futures::task::Spawn) -> (
pub(crate) fn make_test_network() -> (
impl Future<Output = Tester>,
TestNetwork,
) {
Expand All @@ -187,7 +187,6 @@ pub(crate) fn make_test_network(executor: &impl futures::task::Spawn) -> (
net.clone(),
config(),
voter_set_state(),
executor,
);

(
Expand Down Expand Up @@ -261,8 +260,7 @@ fn good_commit_leads_to_relay() {
let id = sc_network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let test = make_test_network(&threads_pool).0
let test = make_test_network().0
.then(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, sc_network::config::Roles::FULL);
Expand All @@ -281,6 +279,7 @@ fn good_commit_leads_to_relay() {
}

let commit_to_send = encoded_commit.clone();
let network_bridge = tester.net_handle.clone();

// asking for global communication will cause the test network
// to send us an event asking us for a stream. use it to
Expand Down Expand Up @@ -325,15 +324,19 @@ fn good_commit_leads_to_relay() {

// once the message is sent and commit is "handled" we should have
// a repropagation event coming from the network.
future::join(send_message, handle_commit).then(move |(tester, ())| {
let fut = future::join(send_message, handle_commit).then(move |(tester, ())| {
tester.filter_network_events(move |event| match event {
Event::WriteNotification(_, data) => {
data == encoded_commit
}
_ => false,
})
})
.map(|_| ())
.map(|_| ());

// Poll both the future sending and handling the commit, as well as the underlying
// NetworkBridge. Complete once the former completes.
future::select(fut, network_bridge)
});

futures::executor::block_on(test);
Expand Down Expand Up @@ -385,8 +388,7 @@ fn bad_commit_leads_to_report() {
let id = sc_network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let test = make_test_network(&threads_pool).0
let test = make_test_network().0
.map(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, sc_network::config::Roles::FULL);
Expand All @@ -405,6 +407,7 @@ fn bad_commit_leads_to_report() {
}

let commit_to_send = encoded_commit.clone();
let network_bridge = tester.net_handle.clone();

// asking for global communication will cause the test network
// to send us an event asking us for a stream. use it to
Expand All @@ -427,7 +430,7 @@ fn bad_commit_leads_to_report() {
_ => false,
});

// when the commit comes in, we'll tell the callback it was good.
// when the commit comes in, we'll tell the callback it was bad.
let handle_commit = commits_in.into_future()
.map(|(item, _)| {
match item.unwrap() {
Expand All @@ -440,15 +443,19 @@ fn bad_commit_leads_to_report() {

// once the message is sent and commit is "handled" we should have
// a report event coming from the network.
future::join(send_message, handle_commit).then(move |(tester, ())| {
let fut = future::join(send_message, handle_commit).then(move |(tester, ())| {
tester.filter_network_events(move |event| match event {
Event::Report(who, cost_benefit) => {
who == id && cost_benefit == super::cost::INVALID_COMMIT
}
_ => false,
})
})
.map(|_| ())
.map(|_| ());

// Poll both the future sending and handling the commit, as well as the underlying
// NetworkBridge. Complete once the former completes.
future::select(fut, network_bridge)
});

futures::executor::block_on(test);
Expand All @@ -458,8 +465,7 @@ fn bad_commit_leads_to_report() {
fn peer_with_higher_view_leads_to_catch_up_request() {
let id = sc_network::PeerId::random();

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let (tester, mut net) = make_test_network(&threads_pool);
let (tester, mut net) = make_test_network();
let test = tester
.map(move |tester| {
// register a peer with authority role.
Expand Down
16 changes: 5 additions & 11 deletions client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ fn register_finality_tracker_inherent_data_provider<B, E, Block: BlockT, RA>(
}

/// Parameters used to run Grandpa.
pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X, Sp> {
pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X> {
/// Configuration for the GRANDPA service.
pub config: Config,
/// A link to the block import worker.
Expand All @@ -531,14 +531,12 @@ pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X, Sp> {
pub telemetry_on_connect: Option<futures::channel::mpsc::UnboundedReceiver<()>>,
/// A voting rule used to potentially restrict target votes.
pub voting_rule: VR,
/// How to spawn background tasks.
pub executor: Sp,
}

/// Run a GRANDPA voter as a task. Provide configuration and a link to a
/// block import worker that has already been instantiated with `block_import`.
pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>,
pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X>,
) -> sp_blockchain::Result<impl Future<Output = ()> + Unpin + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block> + 'static,
Expand All @@ -551,7 +549,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
RA: Send + Sync + 'static,
X: futures::Future<Output=()> + Clone + Send + Unpin + 'static,
Client<B, E, Block, RA>: AuxStore,
Sp: futures::task::Spawn + 'static,
{
let GrandpaParams {
config,
Expand All @@ -561,7 +558,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
on_exit,
telemetry_on_connect,
voting_rule,
executor,
} = grandpa_params;

let LinkHalf {
Expand All @@ -575,7 +571,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
network,
config.clone(),
persistent_data.set_state.clone(),
&executor,
);

register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
Expand Down Expand Up @@ -863,8 +858,8 @@ where
}

#[deprecated(since = "1.1.0", note = "Please switch to run_grandpa_voter.")]
pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>,
pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X>,
) -> sp_blockchain::Result<impl Future<Output=()> + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block> + 'static,
Expand All @@ -877,7 +872,6 @@ pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
X: futures::Future<Output=()> + Clone + Send + Unpin + 'static,
Client<B, E, Block, RA>: AuxStore,
Sp: futures::task::Spawn + 'static,
{
run_grandpa_voter(grandpa_params)
}
Expand Down
9 changes: 2 additions & 7 deletions client/finality-grandpa/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,18 @@ fn grandpa_observer<B, E, Block: BlockT, RA, S, F>(
/// listening for and validating GRANDPA commits instead of following the full
/// protocol. Provide configuration and a link to a block import worker that has
/// already been instantiated with `block_import`.
pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC, Sp>(
pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC>(
config: Config,
link: LinkHalf<B, E, Block, RA, SC>,
network: N,
on_exit: impl futures::Future<Output=()> + Clone + Send + Unpin + 'static,
executor: Sp,
) -> sp_blockchain::Result<impl Future<Output = ()> + Unpin + Send + 'static> where
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
N: NetworkT<Block> + Send + Clone + 'static,
SC: SelectChain<Block> + 'static,
NumberFor<Block>: BlockNumberOps,
RA: Send + Sync + 'static,
Sp: futures::task::Spawn + 'static,
Client<B, E, Block, RA>: AuxStore,
{
let LinkHalf {
Expand All @@ -177,7 +175,6 @@ pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC, Sp>(
network,
config.clone(),
persistent_data.set_state.clone(),
&executor,
);

let observer_work = ObserverWork::new(
Expand Down Expand Up @@ -392,10 +389,8 @@ mod tests {
/// network.
#[test]
fn observer_work_polls_underlying_network_bridge() {
let thread_pool = ThreadPool::new().unwrap();

// Create a test network.
let (tester_fut, _network) = make_test_network(&thread_pool);
let (tester_fut, _network) = make_test_network();
let mut tester = executor::block_on(tester_fut);

// Create an observer.
Expand Down
Loading