diff --git a/client/finality-grandpa/rpc/Cargo.toml b/client/finality-grandpa/rpc/Cargo.toml index 28197405c8db7..6f3014644eaa4 100644 --- a/client/finality-grandpa/rpc/Cargo.toml +++ b/client/finality-grandpa/rpc/Cargo.toml @@ -8,9 +8,10 @@ edition = "2018" license = "GPL-3.0-or-later WITH Classpath-exception-2.0" [dependencies] +sc-finality-grandpa = { version = "0.8.0-rc6", path = "../" } sc-rpc = { version = "2.0.0-rc6", path = "../../rpc" } +sp-core = { version = "2.0.0-rc6", path = "../../../primitives/core" } sp-runtime = { version = "2.0.0-rc6", path = "../../../primitives/runtime" } -sc-finality-grandpa = { version = "0.8.0-rc6", path = "../" } finality-grandpa = { version = "0.12.3", features = ["derive-codec"] } jsonrpc-core = "14.2.0" jsonrpc-core-client = "14.2.0" diff --git a/client/finality-grandpa/rpc/src/lib.rs b/client/finality-grandpa/rpc/src/lib.rs index c00c95c5f776f..5606da42d5947 100644 --- a/client/finality-grandpa/rpc/src/lib.rs +++ b/client/finality-grandpa/rpc/src/lib.rs @@ -406,7 +406,7 @@ mod tests { // Notify with a header and justification let justification = create_justification(); - let _ = justification_sender.notify(justification.clone()).unwrap(); + justification_sender.notify(|| Ok(justification.clone())).unwrap(); // Inspect what we received let recv = receiver.take(1).wait().flatten().collect::>(); @@ -418,7 +418,7 @@ mod tests { let recv_sub_id: String = serde_json::from_value(json_map["subscription"].take()).unwrap(); - let recv_justification: Vec = + let recv_justification: sp_core::Bytes = serde_json::from_value(json_map["result"].take()).unwrap(); let recv_justification: GrandpaJustification = Decode::decode(&mut &recv_justification[..]).unwrap(); diff --git a/client/finality-grandpa/rpc/src/notification.rs b/client/finality-grandpa/rpc/src/notification.rs index 831f4681549a7..fd03a622b2196 100644 --- a/client/finality-grandpa/rpc/src/notification.rs +++ b/client/finality-grandpa/rpc/src/notification.rs @@ -23,10 +23,10 @@ use sc_finality_grandpa::GrandpaJustification; /// An encoded justification proving that the given header has been finalized #[derive(Clone, Serialize, Deserialize)] -pub struct JustificationNotification(Vec); +pub struct JustificationNotification(sp_core::Bytes); impl From> for JustificationNotification { fn from(notification: GrandpaJustification) -> Self { - JustificationNotification(notification.encode()) + JustificationNotification(notification.encode().into()) } } diff --git a/client/finality-grandpa/src/environment.rs b/client/finality-grandpa/src/environment.rs index a7a29fe0e8a65..d862372770518 100644 --- a/client/finality-grandpa/src/environment.rs +++ b/client/finality-grandpa/src/environment.rs @@ -645,7 +645,8 @@ pub(crate) fn ancestry( client: &Arc, base: Block::Hash, block: Block::Hash, -) -> Result, GrandpaError> where +) -> Result, GrandpaError> +where Client: HeaderMetadata, { if base == block { return Err(GrandpaError::NotDescendent) } @@ -671,15 +672,14 @@ pub(crate) fn ancestry( Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect()) } -impl - voter::Environment> -for Environment +impl voter::Environment> + for Environment where Block: 'static, B: Backend, C: crate::ClientForGrandpa + 'static, C::Api: GrandpaApi, - N: NetworkT + 'static + Send + Sync, + N: NetworkT + 'static + Send + Sync, SC: SelectChain + 'static, VR: VotingRule, NumberFor: BlockNumberOps, @@ -1023,7 +1023,7 @@ where number, (round, commit).into(), false, - &self.justification_sender, + self.justification_sender.as_ref(), ) } @@ -1088,9 +1088,10 @@ pub(crate) fn finalize_block( number: NumberFor, justification_or_commit: JustificationOrCommit, initial_sync: bool, - justification_sender: &Option>, -) -> Result<(), CommandOrError>> where - Block: BlockT, + justification_sender: Option<&GrandpaJustificationSender>, +) -> Result<(), CommandOrError>> +where + Block: BlockT, BE: Backend, Client: crate::ClientForGrandpa, { @@ -1154,6 +1155,18 @@ pub(crate) fn finalize_block( } } + // send a justification notification if a sender exists and in case of error log it. + fn notify_justification( + justification_sender: Option<&GrandpaJustificationSender>, + justification: impl FnOnce() -> Result, Error>, + ) { + if let Some(sender) = justification_sender { + if let Err(err) = sender.notify(justification) { + warn!(target: "afg", "Error creating justification for subscriber: {:?}", err); + } + } + } + // NOTE: this code assumes that honest voters will never vote past a // transition block, thus we don't have to worry about the case where // we have a transition with `effective_block = N`, but we finalize @@ -1161,7 +1174,10 @@ pub(crate) fn finalize_block( // justifications for transition blocks which will be requested by // syncing clients. let justification = match justification_or_commit { - JustificationOrCommit::Justification(justification) => Some(justification), + JustificationOrCommit::Justification(justification) => { + notify_justification(justification_sender, || Ok(justification.clone())); + Some(justification.encode()) + }, JustificationOrCommit::Commit((round_number, commit)) => { let mut justification_required = // justification is always required when block that enacts new authorities @@ -1181,29 +1197,31 @@ pub(crate) fn finalize_block( } } + // NOTE: the code below is a bit more verbose because we + // really want to avoid creating a justification if it isn't + // needed (e.g. if there's no subscribers), and also to avoid + // creating it twice. depending on the vote tree for the round, + // creating a justification might require multiple fetches of + // headers from the database. + let justification = || GrandpaJustification::from_commit( + &client, + round_number, + commit, + ); + if justification_required { - let justification = GrandpaJustification::from_commit( - &client, - round_number, - commit, - )?; + let justification = justification()?; + notify_justification(justification_sender, || Ok(justification.clone())); - Some(justification) + Some(justification.encode()) } else { + notify_justification(justification_sender, justification); + None } }, }; - // Notify any registered listeners in case we have a justification - if let Some(sender) = justification_sender { - if let Some(ref justification) = justification { - let _ = sender.notify(justification.clone()); - } - } - - let justification = justification.map(|j| j.encode()); - debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash); // ideally some handle to a synchronization oracle would be used diff --git a/client/finality-grandpa/src/import.rs b/client/finality-grandpa/src/import.rs index d5b0a650096ca..04df95a3187e1 100644 --- a/client/finality-grandpa/src/import.rs +++ b/client/finality-grandpa/src/import.rs @@ -619,7 +619,6 @@ where Client: crate::ClientForGrandpa, NumberFor: finality_grandpa::BlockNumberOps, { - /// Import a block justification and finalize the block. /// /// If `enacts_change` is set to true, then finalizing this block *must* @@ -653,7 +652,7 @@ where number, justification.into(), initial_sync, - &Some(self.justification_sender.clone()), + Some(&self.justification_sender), ); match result { diff --git a/client/finality-grandpa/src/notification.rs b/client/finality-grandpa/src/notification.rs index 16f705f0eeb1f..8415583051902 100644 --- a/client/finality-grandpa/src/notification.rs +++ b/client/finality-grandpa/src/notification.rs @@ -20,9 +20,10 @@ use std::sync::Arc; use parking_lot::Mutex; use sp_runtime::traits::Block as BlockT; -use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; +use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use crate::justification::GrandpaJustification; +use crate::Error; // Stream of justifications returned when subscribing. type JustificationStream = TracingUnboundedReceiver>; @@ -54,10 +55,22 @@ impl GrandpaJustificationSender { /// Send out a notification to all subscribers that a new justification /// is available for a block. - pub fn notify(&self, notification: GrandpaJustification) -> Result<(), ()> { - self.subscribers.lock().retain(|n| { - !n.is_closed() && n.unbounded_send(notification.clone()).is_ok() - }); + pub fn notify( + &self, + justification: impl FnOnce() -> Result, Error>, + ) -> Result<(), Error> { + let mut subscribers = self.subscribers.lock(); + + // do an initial prune on closed subscriptions + subscribers.retain(|n| !n.is_closed()); + + // if there's no subscribers we avoid creating + // the justification which is a costly operation + if !subscribers.is_empty() { + let justification = justification()?; + subscribers.retain(|n| n.unbounded_send(justification.clone()).is_ok()); + } + Ok(()) } } diff --git a/client/finality-grandpa/src/observer.rs b/client/finality-grandpa/src/observer.rs index 8fb536a369751..6a9955aa86d81 100644 --- a/client/finality-grandpa/src/observer.rs +++ b/client/finality-grandpa/src/observer.rs @@ -74,11 +74,10 @@ fn grandpa_observer( last_finalized_number: NumberFor, commits: S, note_round: F, -) -> impl Future>>> where +) -> impl Future>>> +where NumberFor: BlockNumberOps, - S: Stream< - Item = Result, CommandOrError>>, - >, + S: Stream, CommandOrError>>>, F: Fn(u64), BE: Backend, Client: crate::ClientForGrandpa, @@ -130,7 +129,7 @@ fn grandpa_observer( finalized_number, (round, commit).into(), false, - &justification_sender, + justification_sender.as_ref(), ) { Ok(_) => {}, Err(e) => return future::err(e),