Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fork block handling in parlia engine and rewinding blocks to the block before the finalized block issue #89

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions crates/bsc/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ pub struct ParliaEngineBuilder<Provider, Engine: EngineTypes, P> {
network_block_event_rx: Arc<Mutex<UnboundedReceiver<EngineMessage>>>,
fetch_client: FetchClient,
provider: Provider,
parlia_provider: P,
parlia: Parlia,
snapshot_reader: SnapshotReader<P>,
}

// === impl ParliaEngineBuilder ===
Expand All @@ -72,13 +73,26 @@ where
.ok()
.flatten()
.unwrap_or_else(|| chain_spec.sealed_genesis_header());
let parlia = Parlia::new(chain_spec.clone(), cfg.clone());

let mut finalized_hash = None;
let mut safe_hash = None;
let snapshot_reader =
SnapshotReader::new(Arc::new(parlia_provider), Arc::new(parlia.clone()));
let snapshot_result = snapshot_reader.snapshot(&latest_header, None);
if snapshot_result.is_ok() {
let snap = snapshot_result.unwrap();
finalized_hash = Some(snap.vote_data.source_hash);
safe_hash = Some(snap.vote_data.target_hash);
}

Self {
chain_spec,
cfg,
provider,
parlia_provider,
storage: Storage::new(latest_header),
snapshot_reader,
parlia,
storage: Storage::new(latest_header, finalized_hash, safe_hash),
to_engine,
network_block_event_rx,
fetch_client,
Expand All @@ -96,16 +110,16 @@ where
network_block_event_rx,
fetch_client,
provider,
parlia_provider,
parlia,
snapshot_reader,
} = self;
let parlia_client = ParliaClient::new(storage.clone(), fetch_client);
let parlia = Parlia::new(chain_spec.clone(), cfg.clone());
if start_engine_task {
ParliaEngineTask::start(
chain_spec,
parlia.clone(),
parlia,
provider,
SnapshotReader::new(Arc::new(parlia_provider), Arc::new(parlia)),
snapshot_reader,
to_engine,
network_block_event_rx,
storage,
Expand All @@ -128,16 +142,23 @@ pub(crate) struct Storage {
impl Storage {
/// Initializes the [Storage] with the given best block. This should be initialized with the
/// highest block in the chain, if there is a chain already stored on-disk.
fn new(best_block: SealedHeader) -> Self {
fn new(
best_block: SealedHeader,
finalized_hash: Option<B256>,
safe_hash: Option<B256>,
) -> Self {
let best_finalized_hash = finalized_hash.unwrap_or_default();
let best_safe_hash = safe_hash.unwrap_or_default();

let mut storage = StorageInner {
best_hash: best_block.hash(),
best_block: best_block.number,
best_header: best_block.clone(),
headers: LimitedHashSet::new(STORAGE_CACHE_NUM),
hash_to_number: LimitedHashSet::new(STORAGE_CACHE_NUM),
bodies: LimitedHashSet::new(STORAGE_CACHE_NUM),
best_finalized_hash: B256::default(),
best_safe_hash: B256::default(),
best_finalized_hash,
best_safe_hash,
};
storage.headers.put(best_block.number, best_block.clone());
storage.hash_to_number.put(best_block.hash(), best_block.number);
Expand Down
34 changes: 21 additions & 13 deletions crates/bsc/engine/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{
};
use tokio::sync::Mutex;

use reth_rpc_types::{BlockId, RpcBlockHash};
use tokio::{
signal,
sync::{
Expand All @@ -40,7 +41,7 @@ enum ForkChoiceMessage {
#[derive(Debug, Clone)]
struct NewHeaderEvent {
header: SealedHeader,
trusted_header: SealedHeader,
local_header: SealedHeader,
pipeline_sync: bool,
}

Expand Down Expand Up @@ -147,6 +148,7 @@ impl<
loop {
let read_storage = storage.read().await;
let best_header = read_storage.best_header.clone();
let finalized_hash = read_storage.best_finalized_hash;
drop(read_storage);
let mut engine_rx_guard = engine_rx.lock().await;
let mut info = BlockInfo {
Expand Down Expand Up @@ -231,19 +233,25 @@ impl<
}
}
let latest_header = header_option.unwrap();

// skip if parent hash is not equal to best hash
if latest_header.number == best_header.number + 1 &&
latest_header.parent_hash != best_header.hash()
{
continue;
}

let trusted_header = client
let finalized_header = client
.sealed_header_by_id(BlockId::Hash(RpcBlockHash::from(finalized_hash)))
.ok()
.flatten()
.unwrap_or_else(|| chain_spec.sealed_genesis_header());
debug!(target: "consensus::parlia", { finalized_header_number = ?finalized_header.number, finalized_header_hash = ?finalized_header.hash() }, "Latest finalized header");
let latest_unsafe_header = client
.latest_header()
.ok()
.flatten()
.unwrap_or_else(|| chain_spec.sealed_genesis_header());
debug!(target: "consensus::parlia", { latest_unsafe_header_number = ?latest_unsafe_header.number, latest_unsafe_header_hash = ?latest_unsafe_header.hash() }, "Latest unsafe header");

let mut trusted_header = latest_unsafe_header.clone();
// if parent hash is not equal to latest unsafe hash
// may be a fork chain detected, we need to trust the finalized header
if latest_header.parent_hash != latest_unsafe_header.hash() {
trusted_header = finalized_header.clone();
}

// verify header and timestamp
// predict timestamp is the trusted header timestamp plus the block interval times
Expand Down Expand Up @@ -354,7 +362,7 @@ impl<
// and finalized hash.
// this can make Block Sync Engine to use pipeline sync mode.
pipeline_sync,
trusted_header: trusted_header.clone(),
local_header: latest_unsafe_header.clone(),
}));
if result.is_err() {
error!(target: "consensus::parlia", "Failed to send new block event to
Expand All @@ -366,7 +374,7 @@ impl<
let result = chain_tracker_tx.send(ForkChoiceMessage::NewHeader(NewHeaderEvent {
header: sealed_header.clone(),
pipeline_sync,
trusted_header: trusted_header.clone(),
local_header: latest_unsafe_header.clone(),
}));
if result.is_err() {
error!(target: "consensus::parlia", "Failed to send new block event to chain tracker");
Expand Down Expand Up @@ -476,7 +484,7 @@ impl<
}
match msg.unwrap() {
ForkChoiceMessage::NewHeader(event) => {
let new_header = event.trusted_header;
let new_header = event.local_header;

let snap = match snapshot_reader.snapshot(&new_header, None) {
Ok(snap) => snap,
Expand Down
9 changes: 9 additions & 0 deletions crates/consensus/beacon/src/engine/hooks/static_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ impl<DB: Database + 'static> EngineHook for StaticFileHook<DB> {
return Poll::Pending
};

// The chain state may be rewind, if the finalized block number is greater than the tip
// block number. In this case, we should wait until the finalized block number is
// less than or equal to the tip block number. To prevent the static file producer
// from producing static files for the wrong block and incrementing the index number.
if finalized_block_number >= ctx.tip_block_number {
trace!(target: "consensus::engine::hooks::static_file", ?ctx, "Finalized block number is greater than tip number");
return Poll::Pending
}

// Try to spawn a static_file_producer
match self.try_spawn_static_file_producer(finalized_block_number)? {
Some(EngineHookEvent::NotReady) => return Poll::Pending,
Expand Down