Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
client/finality-grandpa: Make round_communication use bounded channel (
Browse files Browse the repository at this point in the history
…#4691)

* clinet/finality-grandpa: Make round_communication use bounded channel

`round_communication` returns a `Sink` and a `Stream` for outgoing and
incoming messages. The messages send into the `Sink` are forwarded down
to the network as well as send back into the `Stream` to ensure the node
processes its own messages.

So far, to send messages into the `Sink` back into the `Stream`, an
unbounded channel was used. This patch updates `round_communication` and
`OutgoingMessages` to use a bounded channel.

This is part of a greater effort to reduce the number of owners of
components within `finality-grandpa` and `network` as well as to reduce
the amount of unbounded channels. For details see d4fbb89 and
f0c1852.

* client/finality-grandpa: Import futures03::future::ready at the top

* client/finality-grandpa: Make tests use compat of future 03

* client/finality-grandpa: Do not import ready into scope

Instead of importing futures03::future::ready into the scope, only
import futures::future03 into scope and call ready as furure03::ready.
  • Loading branch information
mxinden authored and rphmeier committed Jan 23, 2020
1 parent 7bd0dbf commit 5afc777
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 47 deletions.
89 changes: 48 additions & 41 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
//! In the future, there will be a fallback for allowing sending the same message
//! under certain conditions that are used to un-stick the protocol.
use futures::{prelude::*, sync::mpsc};
use futures::prelude::*;
use futures03::{
channel::mpsc as mpsc03,
compat::Compat,
future::{Future as Future03},
stream::StreamExt,
future::{self as future03, Future as Future03},
sink::Sink as Sink03,
stream::{Stream as Stream03, StreamExt},
};
use log::{debug, trace};
use parking_lot::Mutex;
Expand Down Expand Up @@ -276,8 +277,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
local_key: Option<AuthorityPair>,
has_voted: HasVoted<B>,
) -> (
impl Stream<Item=SignedMessage<B>,Error=Error>,
impl Sink<SinkItem=Message<B>,SinkError=Error>,
impl Stream03<Item=SignedMessage<B>> + Unpin,
OutgoingMessages<B>,
) {
self.note_round(
round,
Expand All @@ -295,22 +296,20 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
});

let topic = round_topic::<B>(round.0, set_id.0);
let incoming = Compat::new(self.gossip_engine.messages_for(topic)
.map(|item| Ok::<_, ()>(item)))
.filter_map(|notification| {
let incoming = self.gossip_engine.messages_for(topic)
.filter_map(move |notification| {
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
if let Err(ref e) = decoded {
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
}
decoded.ok()
})
.and_then(move |msg| {
match msg {
GossipMessage::Vote(msg) => {

match decoded {
Err(ref e) => {
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
return future03::ready(None);
}
Ok(GossipMessage::Vote(msg)) => {
// check signature.
if !voters.contains_key(&msg.message.id) {
debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
return Ok(None);
return future03::ready(None);
}

if voters.len() <= TELEMETRY_VOTERS_LIMIT {
Expand Down Expand Up @@ -339,18 +338,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
};
}

Ok(Some(msg.message))
future03::ready(Some(msg.message))
}
_ => {
debug!(target: "afg", "Skipping unknown message type");
return Ok(None);
return future03::ready(None);
}
}
})
.filter_map(|x| x)
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")));
});

let (tx, out_rx) = mpsc::unbounded();
let (tx, out_rx) = mpsc03::channel(0);
let outgoing = OutgoingMessages::<B> {
round: round.0,
set_id: set_id.0,
Expand All @@ -360,14 +357,10 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
has_voted,
};

let out_rx = out_rx.map_err(move |()| Error::Network(
format!("Failed to receive on unbounded receiver for round {}", round.0)
));

// Combine incoming votes from external GRANDPA nodes with outgoing
// votes from our own GRANDPA voter to have a single
// vote-import-pipeline.
let incoming = incoming.select(out_rx);
let incoming = futures03::stream::select(incoming, out_rx);

(incoming, outgoing)
}
Expand Down Expand Up @@ -690,21 +683,29 @@ pub(crate) fn check_message_sig_with_buffer<Block: BlockT>(
/// use the same raw message and key to sign. This is currently true for
/// `ed25519` and `BLS` signatures (which we might use in the future), care must
/// be taken when switching to different key types.
struct OutgoingMessages<Block: BlockT> {
pub(crate) struct OutgoingMessages<Block: BlockT> {
round: RoundNumber,
set_id: SetIdNumber,
locals: Option<(AuthorityPair, AuthorityId)>,
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
sender: mpsc03::Sender<SignedMessage<Block>>,
network: GossipEngine<Block>,
has_voted: HasVoted<Block>,
}

impl<Block: BlockT> Sink for OutgoingMessages<Block>
impl<B: BlockT> Unpin for OutgoingMessages<B> {}

impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block>
{
type SinkItem = Message<Block>;
type SinkError = Error;
type Error = Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Sink03::poll_ready(Pin::new(&mut self.sender), cx)
.map(|elem| { elem.map_err(|e| {
Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
})})
}

fn start_send(&mut self, mut msg: Message<Block>) -> StartSend<Message<Block>, Error> {
fn start_send(mut self: Pin<&mut Self>, mut msg: Message<Block>) -> Result<(), Self::Error> {
// if we've voted on this round previously under the same key, send that vote instead
match &mut msg {
finality_grandpa::Message::PrimaryPropose(ref mut vote) =>
Expand Down Expand Up @@ -760,17 +761,23 @@ impl<Block: BlockT> Sink for OutgoingMessages<Block>
self.network.gossip_message(topic, message.encode(), false);

// forward the message to the inner sender.
let _ = self.sender.unbounded_send(signed);
}
return self.sender.start_send(signed).map_err(|e| {
Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
});
};

Ok(AsyncSink::Ready)
Ok(())
}

fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Poll03::Ready(Ok(()))
}

fn close(&mut self) -> Poll<(), Error> {
// ignore errors since we allow this inner sender to be closed already.
self.sender.close().or_else(|_| Ok(Async::Ready(())))
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Sink03::poll_close(Pin::new(&mut self.sender), cx)
.map(|elem| { elem.map_err(|e| {
Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
})})
}
}

Expand Down
9 changes: 8 additions & 1 deletion client/finality-grandpa/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use std::time::Duration;
use log::{debug, warn, info};
use parity_scale_codec::{Decode, Encode};
use futures::prelude::*;
use futures03::future::{FutureExt as _, TryFutureExt as _};
use futures03::{
compat::{Compat, CompatSink},
future::{FutureExt as _, TryFutureExt as _},
stream::StreamExt as _,
};
use futures_timer::Delay;
use parking_lot::RwLock;
use sp_blockchain::{HeaderBackend, Error as ClientError};
Expand Down Expand Up @@ -608,6 +612,9 @@ where
has_voted,
);

let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item)));
let outgoing = CompatSink::new(outgoing);

// schedule incoming messages from the network to be held until
// corresponding blocks are imported.
let incoming = Box::new(UntilVoteTargetImported::new(
Expand Down
25 changes: 20 additions & 5 deletions client/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ use sp_consensus::{
BlockOrigin, ForkChoiceStrategy, ImportedAux, BlockImportParams, ImportResult, BlockImport,
import_queue::{BoxJustificationImport, BoxFinalityProofImport},
};
use std::collections::{HashMap, HashSet};
use std::result;
use std::{
collections::{HashMap, HashSet},
result,
pin::Pin, task,
};
use parity_scale_codec::Decode;
use sp_runtime::traits::{Header as HeaderT, HasherFor};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, HasherFor};
use sp_runtime::generic::{BlockId, DigestItem};
use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public};
use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi};
use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check};
use std::{pin::Pin, task};

use authorities::AuthoritySet;
use finality_proof::{
Expand Down Expand Up @@ -1282,6 +1284,9 @@ fn voter_persists_its_votes() {
HasVoted::No,
);

let round_rx = futures03::compat::Compat::new(round_rx.map(|item| Ok::<_, Error>(item)));
let round_tx = futures03::compat::CompatSink::new(round_tx);

let round_tx = Arc::new(Mutex::new(round_tx));
let exit_tx = Arc::new(Mutex::new(Some(exit_tx)));

Expand Down Expand Up @@ -1332,7 +1337,17 @@ fn voter_persists_its_votes() {
target_hash: block_30_hash,
};

round_tx.lock().start_send(finality_grandpa::Message::Prevote(prevote)).unwrap();
// One should either be calling `Sink::send` or `Sink::start_send` followed
// by `Sink::poll_complete` to make sure items are being flushed. Given that
// we send in a loop including a delay until items are received, this can be
// ignored for the sake of reduced complexity.
if !round_tx.lock()
.start_send(finality_grandpa::Message::Prevote(prevote))
.unwrap()
.is_ready() {
panic!("expected sink to be ready to write to.");
}

Ok(())
}).map_err(|_| panic!()))

Expand Down

0 comments on commit 5afc777

Please sign in to comment.