Skip to content

Commit

Permalink
Manually step the Tendermint machine when we synced a block over the …
Browse files Browse the repository at this point in the history
…network
  • Loading branch information
kayabaNerve committed Nov 14, 2022
1 parent 138866f commit 8c51bc0
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 32 deletions.
99 changes: 68 additions & 31 deletions substrate/tendermint/client/src/authority/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use log::{warn, error};

use futures::{
SinkExt, StreamExt,
lock::Mutex,
channel::mpsc::{self, UnboundedSender},
};

Expand All @@ -26,7 +27,7 @@ use sp_consensus::{Error, BlockOrigin, Proposer, Environment};
use sc_consensus::import_queue::IncomingBlock;

use sc_service::ImportQueue;
use sc_client_api::{BlockBackend, Finalizer};
use sc_client_api::{BlockBackend, Finalizer, BlockchainEvents};
use sc_network::{ProtocolName, NetworkBlock};
use sc_network_gossip::GossipEngine;

Expand Down Expand Up @@ -64,7 +65,7 @@ struct ActiveAuthority<T: TendermintValidator> {
>,

// Block producer
env: T::Environment,
env: Arc<Mutex<T::Environment>>,
announce: T::Network,
}

Expand All @@ -74,6 +75,35 @@ pub struct TendermintAuthority<T: TendermintValidator> {
active: Option<ActiveAuthority<T>>,
}

async fn get_proposal<T: TendermintValidator>(
env: &Arc<Mutex<T::Environment>>,
import: &TendermintImport<T>,
header: &<T::Block as Block>::Header,
stub: bool
) -> T::Block {
let proposer =
env.lock().await.init(header).await.expect("Failed to create a proposer for the new block");

proposer
.propose(
import.inherent_data(*header.parent_hash()).await,
Digest::default(),
if stub {
Duration::ZERO
} else {
// The first processing time is to build the block.
// The second is for it to be downloaded (assumes a block won't take longer to download
// than it'll take to process)
// The third is for it to actually be processed
Duration::from_secs((T::BLOCK_PROCESSING_TIME_IN_SECONDS / 3).into())
},
Some(T::PROPOSED_BLOCK_SIZE_LIMIT),
)
.await
.expect("Failed to crate a new block proposal")
.block
}

impl<T: TendermintValidator> TendermintAuthority<T> {
/// Create a new TendermintAuthority.
pub fn new(import: TendermintImport<T>) -> Self {
Expand Down Expand Up @@ -114,32 +144,8 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
)
}

pub(crate) async fn get_proposal(&mut self, header: &<T::Block as Block>::Header) -> T::Block {
let parent = *header.parent_hash();

let proposer = self
.active
.as_mut()
.unwrap()
.env
.init(header)
.await
.expect("Failed to create a proposer for the new block");

proposer
.propose(
self.import.inherent_data(parent).await,
Digest::default(),
// The first processing time is to build the block.
// The second is for it to be downloaded (assumes a block won't take longer to download
// than it'll take to process)
// The third is for it to actually be processed
Duration::from_secs((T::BLOCK_PROCESSING_TIME_IN_SECONDS / 3).into()),
Some(T::PROPOSED_BLOCK_SIZE_LIMIT),
)
.await
.expect("Failed to crate a new block proposal")
.block
async fn get_proposal(&mut self, header: &<T::Block as Block>::Header) -> T::Block {
get_proposal(&self.active.as_mut().unwrap().env, &self.import, header, false).await
}

/// Act as a network authority, proposing and voting on blocks. This should be spawned on a task
Expand Down Expand Up @@ -175,6 +181,12 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
let (new_number_tx, mut new_number_rx) = mpsc::unbounded();
let (gossip_tx, mut gossip_rx) = mpsc::unbounded();

// Clone the import object
let import = self.import.clone();

// Move the env into an Arc
let env = Arc::new(Mutex::new(env));

// Create the Tendermint machine
let TendermintHandle { mut step, mut messages, machine } = {
// Set this struct as active
Expand All @@ -185,7 +197,7 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
new_number: new_number_tx,
gossip: gossip_tx,

env,
env: env.clone(),
announce: network,
});

Expand All @@ -201,16 +213,41 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
// Start receiving messages about the Tendermint process for this block
let mut recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number));

// Get finality events from Substrate
let mut finality = import.client.finality_notification_stream();

loop {
futures::select_biased! {
// GossipEngine closed down
_ = gossip => break,

// Synced a block from the network
notif = finality.next() => {
if let Some(notif) = notif {
let justifications = import.client.justifications(notif.hash).unwrap().unwrap();
step.send((
Commit::decode(&mut justifications.get(CONSENSUS_ID).unwrap().as_ref()).unwrap(),
// This will fail if syncing occurs radically faster than machine stepping takes
// TODO: Set true when initial syncing
get_proposal(&env, &import, &notif.header, false).await
)).await.unwrap();

let new_number = match (*notif.header.number()).try_into() {
Ok(number) => number,
Err(_) => panic!("BlockNumber exceeded u64"),
};
*number.write().unwrap() = new_number;
recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number))
} else {
break;
}
},

// Machine reached a new height
new_number = new_number_rx.next() => {
if let Some(new_number) = new_number {
*number.write().unwrap() = new_number;
recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number));
recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number))
} else {
break;
}
Expand Down Expand Up @@ -239,7 +276,7 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
continue;
}
}
).await.unwrap()
).await.unwrap();
} else {
break;
}
Expand Down
4 changes: 3 additions & 1 deletion substrate/tendermint/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use sp_blockchain::HeaderBackend;
use sp_api::{StateBackend, StateBackendFor, TransactionFor, ApiExt, ProvideRuntimeApi};
use sp_consensus::{Error, Environment};

use sc_client_api::{BlockBackend, Backend, Finalizer};
use sc_client_api::{BlockBackend, Backend, Finalizer, BlockchainEvents};
use sc_block_builder::BlockBuilderApi;
use sc_consensus::{BlockImport, BasicQueue};

Expand Down Expand Up @@ -81,6 +81,7 @@ pub trait TendermintClient: Send + Sync + 'static {
+ BlockBackend<Self::Block>
+ BlockImport<Self::Block, Transaction = Self::BackendTransaction>
+ Finalizer<Self::Block, Self::Backend>
+ BlockchainEvents<Self::Block>
+ ProvideRuntimeApi<Self::Block, Api = Self::Api>
+ 'static;
}
Expand All @@ -100,6 +101,7 @@ pub trait TendermintClientMinimal: Send + Sync + 'static {
+ BlockBackend<Self::Block>
+ BlockImport<Self::Block, Transaction = TransactionFor<Self::Client, Self::Block>>
+ Finalizer<Self::Block, Self::Backend>
+ BlockchainEvents<Self::Block>
+ ProvideRuntimeApi<Self::Block, Api = Self::Api>
+ 'static;
}
Expand Down

0 comments on commit 8c51bc0

Please sign in to comment.