diff --git a/substrate/tendermint/client/src/authority/mod.rs b/substrate/tendermint/client/src/authority/mod.rs index e4972fef8..31c2c597f 100644 --- a/substrate/tendermint/client/src/authority/mod.rs +++ b/substrate/tendermint/client/src/authority/mod.rs @@ -10,6 +10,7 @@ use log::{warn, error}; use futures::{ SinkExt, StreamExt, + lock::Mutex, channel::mpsc::{self, UnboundedSender}, }; @@ -26,7 +27,7 @@ use sp_consensus::{Error, BlockOrigin, Proposer, Environment}; use sc_consensus::import_queue::IncomingBlock; use sc_service::ImportQueue; -use sc_client_api::{BlockBackend, Finalizer}; +use sc_client_api::{BlockBackend, Finalizer, BlockchainEvents}; use sc_network::{ProtocolName, NetworkBlock}; use sc_network_gossip::GossipEngine; @@ -64,7 +65,7 @@ struct ActiveAuthority { >, // Block producer - env: T::Environment, + env: Arc>, announce: T::Network, } @@ -74,6 +75,35 @@ pub struct TendermintAuthority { active: Option>, } +async fn get_proposal( + env: &Arc>, + import: &TendermintImport, + header: &::Header, + stub: bool +) -> T::Block { + let proposer = + env.lock().await.init(header).await.expect("Failed to create a proposer for the new block"); + + proposer + .propose( + import.inherent_data(*header.parent_hash()).await, + Digest::default(), + if stub { + Duration::ZERO + } else { + // The first processing time is to build the block. + // The second is for it to be downloaded (assumes a block won't take longer to download + // than it'll take to process) + // The third is for it to actually be processed + Duration::from_secs((T::BLOCK_PROCESSING_TIME_IN_SECONDS / 3).into()) + }, + Some(T::PROPOSED_BLOCK_SIZE_LIMIT), + ) + .await + .expect("Failed to crate a new block proposal") + .block +} + impl TendermintAuthority { /// Create a new TendermintAuthority. pub fn new(import: TendermintImport) -> Self { @@ -114,32 +144,8 @@ impl TendermintAuthority { ) } - pub(crate) async fn get_proposal(&mut self, header: &::Header) -> T::Block { - let parent = *header.parent_hash(); - - let proposer = self - .active - .as_mut() - .unwrap() - .env - .init(header) - .await - .expect("Failed to create a proposer for the new block"); - - proposer - .propose( - self.import.inherent_data(parent).await, - Digest::default(), - // The first processing time is to build the block. - // The second is for it to be downloaded (assumes a block won't take longer to download - // than it'll take to process) - // The third is for it to actually be processed - Duration::from_secs((T::BLOCK_PROCESSING_TIME_IN_SECONDS / 3).into()), - Some(T::PROPOSED_BLOCK_SIZE_LIMIT), - ) - .await - .expect("Failed to crate a new block proposal") - .block + async fn get_proposal(&mut self, header: &::Header) -> T::Block { + get_proposal(&self.active.as_mut().unwrap().env, &self.import, header, false).await } /// Act as a network authority, proposing and voting on blocks. This should be spawned on a task @@ -175,6 +181,12 @@ impl TendermintAuthority { let (new_number_tx, mut new_number_rx) = mpsc::unbounded(); let (gossip_tx, mut gossip_rx) = mpsc::unbounded(); + // Clone the import object + let import = self.import.clone(); + + // Move the env into an Arc + let env = Arc::new(Mutex::new(env)); + // Create the Tendermint machine let TendermintHandle { mut step, mut messages, machine } = { // Set this struct as active @@ -185,7 +197,7 @@ impl TendermintAuthority { new_number: new_number_tx, gossip: gossip_tx, - env, + env: env.clone(), announce: network, }); @@ -201,16 +213,41 @@ impl TendermintAuthority { // Start receiving messages about the Tendermint process for this block let mut recv = gossip.messages_for(TendermintGossip::::topic(new_number)); + // Get finality events from Substrate + let mut finality = import.client.finality_notification_stream(); + loop { futures::select_biased! { // GossipEngine closed down _ = gossip => break, + // Synced a block from the network + notif = finality.next() => { + if let Some(notif) = notif { + let justifications = import.client.justifications(notif.hash).unwrap().unwrap(); + step.send(( + Commit::decode(&mut justifications.get(CONSENSUS_ID).unwrap().as_ref()).unwrap(), + // This will fail if syncing occurs radically faster than machine stepping takes + // TODO: Set true when initial syncing + get_proposal(&env, &import, ¬if.header, false).await + )).await.unwrap(); + + let new_number = match (*notif.header.number()).try_into() { + Ok(number) => number, + Err(_) => panic!("BlockNumber exceeded u64"), + }; + *number.write().unwrap() = new_number; + recv = gossip.messages_for(TendermintGossip::::topic(new_number)) + } else { + break; + } + }, + // Machine reached a new height new_number = new_number_rx.next() => { if let Some(new_number) = new_number { *number.write().unwrap() = new_number; - recv = gossip.messages_for(TendermintGossip::::topic(new_number)); + recv = gossip.messages_for(TendermintGossip::::topic(new_number)) } else { break; } @@ -239,7 +276,7 @@ impl TendermintAuthority { continue; } } - ).await.unwrap() + ).await.unwrap(); } else { break; } diff --git a/substrate/tendermint/client/src/lib.rs b/substrate/tendermint/client/src/lib.rs index 4ebf6e475..90c50244d 100644 --- a/substrate/tendermint/client/src/lib.rs +++ b/substrate/tendermint/client/src/lib.rs @@ -7,7 +7,7 @@ use sp_blockchain::HeaderBackend; use sp_api::{StateBackend, StateBackendFor, TransactionFor, ApiExt, ProvideRuntimeApi}; use sp_consensus::{Error, Environment}; -use sc_client_api::{BlockBackend, Backend, Finalizer}; +use sc_client_api::{BlockBackend, Backend, Finalizer, BlockchainEvents}; use sc_block_builder::BlockBuilderApi; use sc_consensus::{BlockImport, BasicQueue}; @@ -81,6 +81,7 @@ pub trait TendermintClient: Send + Sync + 'static { + BlockBackend + BlockImport + Finalizer + + BlockchainEvents + ProvideRuntimeApi + 'static; } @@ -100,6 +101,7 @@ pub trait TendermintClientMinimal: Send + Sync + 'static { + BlockBackend + BlockImport> + Finalizer + + BlockchainEvents + ProvideRuntimeApi + 'static; }