Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

grandpa: remove the periodic block announcer #4062

Merged
merged 2 commits into from
Nov 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions core/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
service: N,
validator: Arc<GossipValidator<B>>,
neighbor_sender: periodic::NeighborPacketSender<B>,
announce_sender: periodic::BlockAnnounceSender<B>,
}

impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
Expand Down Expand Up @@ -341,19 +340,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}

let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone());
let (announce_job, announce_sender) = periodic::block_announce_worker(service.clone());
let reporting_job = report_stream.consume(service.clone());

let bridge = NetworkBridge { service, validator, neighbor_sender, announce_sender };
let bridge = NetworkBridge { service, validator, neighbor_sender };

let startup_work = futures::future::lazy(move || {
// lazily spawn these jobs onto their own tasks. the lazy future has access
// to tokio globals, which aren't available outside.
let mut executor = tokio_executor::DefaultExecutor::current();
executor.spawn(Box::new(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(()))))
.expect("failed to spawn grandpa rebroadcast job task");
executor.spawn(Box::new(announce_job.select(on_exit.clone()).then(|_| Ok(()))))
.expect("failed to spawn grandpa block announce job task");
executor.spawn(Box::new(reporting_job.select(on_exit.clone()).then(|_| Ok(()))))
.expect("failed to spawn grandpa reporting job task");
Ok(())
Expand Down Expand Up @@ -470,7 +466,6 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
network: self.service.clone(),
locals,
sender: tx,
announce_sender: self.announce_sender.clone(),
has_voted,
};

Expand Down Expand Up @@ -676,7 +671,6 @@ impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
service: self.service.clone(),
validator: Arc::clone(&self.validator),
neighbor_sender: self.neighbor_sender.clone(),
announce_sender: self.announce_sender.clone(),
}
}
}
Expand Down Expand Up @@ -723,7 +717,6 @@ struct OutgoingMessages<Block: BlockT, N: Network<Block>> {
set_id: SetIdNumber,
locals: Option<(AuthorityPair, AuthorityId)>,
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
announce_sender: periodic::BlockAnnounceSender<Block>,
network: N,
has_voted: HasVoted<Block>,
}
Expand Down Expand Up @@ -781,8 +774,8 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
"block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id,
);

// send the target block hash to the background block announcer
self.announce_sender.send(target_hash, Vec::new());
// announce the block we voted on to our peers.
self.network.announce(target_hash, Vec::new());

// propagate the message to peers
let topic = round_topic::<Block>(self.round, self.set_id);
Expand Down
142 changes: 0 additions & 142 deletions core/finality-grandpa/src/communication/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

//! Periodic rebroadcast of neighbor packets.

use std::collections::VecDeque;
use std::time::{Instant, Duration};

use codec::Encode;
Expand All @@ -32,11 +31,6 @@ use super::{gossip::{NeighborPacket, GossipMessage}, Network};
// how often to rebroadcast, if no other
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);

/// The number of block hashes that we have previously voted on that we should
/// keep around for announcement. The current value should be enough for 3
/// rounds assuming we have prevoted and precommited on different blocks.
const LATEST_VOTED_BLOCKS_TO_ANNOUNCE: usize = 6;

fn rebroadcast_instant() -> Instant {
Instant::now() + REBROADCAST_AFTER
}
Expand Down Expand Up @@ -114,139 +108,3 @@ pub(super) fn neighbor_packet_worker<B, N>(net: N) -> (

(work, NeighborPacketSender(tx))
}

/// A background worker for performing block announcements.
struct BlockAnnouncer<B: BlockT, N> {
net: N,
block_rx: mpsc::UnboundedReceiver<(B::Hash, Vec<u8>)>,
latest_voted_blocks: VecDeque<B::Hash>,
reannounce_after: Duration,
delay: Delay,
}

/// A background worker for announcing block hashes to peers. The worker keeps
/// track of `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` and periodically announces these
/// blocks to all peers if no new blocks to announce are noted (i.e. presumably
/// GRANDPA progress is stalled).
pub(super) fn block_announce_worker<B: BlockT, N: Network<B>>(net: N) -> (
impl Future<Item = (), Error = ()>,
BlockAnnounceSender<B>,
) {
block_announce_worker_aux(net, REBROADCAST_AFTER)
}

#[cfg(test)]
pub(super) fn block_announce_worker_with_delay<B: BlockT, N: Network<B>>(
net: N,
reannounce_after: Duration,
) -> (
impl Future<Item = (), Error = ()>,
BlockAnnounceSender<B>,
) {
block_announce_worker_aux(net, reannounce_after)
}

fn block_announce_worker_aux<B: BlockT, N: Network<B>>(
net: N,
reannounce_after: Duration,
) -> (
impl Future<Item = (), Error = ()>,
BlockAnnounceSender<B>,
) {
let latest_voted_blocks = VecDeque::with_capacity(LATEST_VOTED_BLOCKS_TO_ANNOUNCE);

let (block_tx, block_rx) = mpsc::unbounded();

let announcer = BlockAnnouncer {
net,
block_rx,
latest_voted_blocks,
reannounce_after,
delay: Delay::new(Instant::now() + reannounce_after),
};

(announcer, BlockAnnounceSender(block_tx))
}


impl<B: BlockT, N> BlockAnnouncer<B, N> {
fn note_block(&mut self, block: B::Hash) -> bool {
if !self.latest_voted_blocks.contains(&block) {
if self.latest_voted_blocks.len() >= LATEST_VOTED_BLOCKS_TO_ANNOUNCE {
self.latest_voted_blocks.pop_front();
}

self.latest_voted_blocks.push_back(block);

true
} else {
false
}
}

fn reset_delay(&mut self) {
self.delay.reset(Instant::now() + self.reannounce_after);
}
}

impl<B: BlockT, N: Network<B>> Future for BlockAnnouncer<B, N> {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// note any new blocks to announce and announce them
loop {
match self.block_rx.poll().expect("unbounded receivers do not error; qed") {
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some(block)) => {
if self.note_block(block.0) {
self.net.announce(block.0, block.1);
self.reset_delay();
}
},
Async::NotReady => break,
}
}

// check the reannouncement delay timer, has to be done in a loop
// because it needs to be polled after re-scheduling.
loop {
match self.delay.poll() {
Err(e) => {
warn!(target: "afg", "Error in periodic block announcer timer: {:?}", e);
self.reset_delay();
},
// after the delay fires announce all blocks that we have
// stored. note that this only happens if we don't receive any
// new blocks above for the duration of `reannounce_after`.
Ok(Async::Ready(())) => {
self.reset_delay();

debug!(
target: "afg",
"Re-announcing latest voted blocks due to lack of progress: {:?}",
self.latest_voted_blocks,
);

for block in self.latest_voted_blocks.iter() {
self.net.announce(*block, Vec::new());
}
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
}
}
}
}

/// A sender used to send block hashes to announce to a background job.
#[derive(Clone)]
pub(super) struct BlockAnnounceSender<B: BlockT>(mpsc::UnboundedSender<(B::Hash, Vec<u8>)>);

impl<B: BlockT> BlockAnnounceSender<B> {
/// Send a block hash for the background worker to announce.
pub fn send(&self, block: B::Hash, associated_data: Vec<u8>) {
if let Err(err) = self.0.unbounded_send((block, associated_data)) {
debug!(target: "afg", "Failed to send block to background announcer: {:?}", err);
}
}
}
76 changes: 0 additions & 76 deletions core/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,79 +504,3 @@ fn peer_with_higher_view_leads_to_catch_up_request() {

current_thread::block_on_all(test).unwrap();
}

#[test]
fn periodically_reannounce_voted_blocks_on_stall() {
use futures::try_ready;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;

let (tester, net) = make_test_network();
let (announce_worker, announce_sender) = super::periodic::block_announce_worker_with_delay(
net,
Duration::from_secs(1),
);

let hashes = Arc::new(Mutex::new(Vec::new()));

fn wait_all(tester: Tester, hashes: &[Hash]) -> impl Future<Item = Tester, Error = ()> {
struct WaitAll {
remaining_hashes: Arc<Mutex<HashSet<Hash>>>,
events_fut: Box<dyn Future<Item = Tester, Error = ()>>,
}

impl Future for WaitAll {
type Item = Tester;
type Error = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let tester = try_ready!(self.events_fut.poll());

if self.remaining_hashes.lock().is_empty() {
return Ok(Async::Ready(tester));
}

let remaining_hashes = self.remaining_hashes.clone();
self.events_fut = Box::new(tester.filter_network_events(move |event| match event {
Event::Announce(h) =>
remaining_hashes.lock().remove(&h) || panic!("unexpected announce"),
_ => false,
}));

self.poll()
}
}

WaitAll {
remaining_hashes: Arc::new(Mutex::new(hashes.iter().cloned().collect())),
events_fut: Box::new(futures::future::ok(tester)),
}
}

let test = tester
.and_then(move |tester| {
current_thread::spawn(announce_worker);
Ok(tester)
})
.and_then(|tester| {
// announce 12 blocks
for _ in 0..=12 {
let hash = Hash::random();
hashes.lock().push(hash);
announce_sender.send(hash, Vec::new());
}

// we should see an event for each of those announcements
wait_all(tester, &hashes.lock())
})
.and_then(|tester| {
// after a period of inactivity we should see the last
// `LATEST_VOTED_BLOCKS_TO_ANNOUNCE` being rebroadcast
wait_all(tester, &hashes.lock()[7..=12])
});

let mut runtime = current_thread::Runtime::new().unwrap();
runtime.block_on(test).unwrap();
}