diff --git a/crates/bsc/engine/src/lib.rs b/crates/bsc/engine/src/lib.rs index a34e74768f..52ac259603 100644 --- a/crates/bsc/engine/src/lib.rs +++ b/crates/bsc/engine/src/lib.rs @@ -46,7 +46,8 @@ pub struct ParliaEngineBuilder { network_block_event_rx: Arc>>, fetch_client: FetchClient, provider: Provider, - parlia_provider: P, + parlia: Parlia, + snapshot_reader: SnapshotReader

, } // === impl ParliaEngineBuilder === @@ -72,13 +73,26 @@ where .ok() .flatten() .unwrap_or_else(|| chain_spec.sealed_genesis_header()); + let parlia = Parlia::new(chain_spec.clone(), cfg.clone()); + + let mut finalized_hash = None; + let mut safe_hash = None; + let snapshot_reader = + SnapshotReader::new(Arc::new(parlia_provider), Arc::new(parlia.clone())); + let snapshot_result = snapshot_reader.snapshot(&latest_header, None); + if snapshot_result.is_ok() { + let snap = snapshot_result.unwrap(); + finalized_hash = Some(snap.vote_data.source_hash); + safe_hash = Some(snap.vote_data.target_hash); + } Self { chain_spec, cfg, provider, - parlia_provider, - storage: Storage::new(latest_header), + snapshot_reader, + parlia, + storage: Storage::new(latest_header, finalized_hash, safe_hash), to_engine, network_block_event_rx, fetch_client, @@ -96,16 +110,16 @@ where network_block_event_rx, fetch_client, provider, - parlia_provider, + parlia, + snapshot_reader, } = self; let parlia_client = ParliaClient::new(storage.clone(), fetch_client); - let parlia = Parlia::new(chain_spec.clone(), cfg.clone()); if start_engine_task { ParliaEngineTask::start( chain_spec, - parlia.clone(), + parlia, provider, - SnapshotReader::new(Arc::new(parlia_provider), Arc::new(parlia)), + snapshot_reader, to_engine, network_block_event_rx, storage, @@ -128,7 +142,14 @@ pub(crate) struct Storage { impl Storage { /// Initializes the [Storage] with the given best block. This should be initialized with the /// highest block in the chain, if there is a chain already stored on-disk. - fn new(best_block: SealedHeader) -> Self { + fn new( + best_block: SealedHeader, + finalized_hash: Option, + safe_hash: Option, + ) -> Self { + let best_finalized_hash = finalized_hash.unwrap_or_default(); + let best_safe_hash = safe_hash.unwrap_or_default(); + let mut storage = StorageInner { best_hash: best_block.hash(), best_block: best_block.number, @@ -136,8 +157,8 @@ impl Storage { headers: LimitedHashSet::new(STORAGE_CACHE_NUM), hash_to_number: LimitedHashSet::new(STORAGE_CACHE_NUM), bodies: LimitedHashSet::new(STORAGE_CACHE_NUM), - best_finalized_hash: B256::default(), - best_safe_hash: B256::default(), + best_finalized_hash, + best_safe_hash, }; storage.headers.put(best_block.number, best_block.clone()); storage.hash_to_number.put(best_block.hash(), best_block.number); diff --git a/crates/bsc/engine/src/task.rs b/crates/bsc/engine/src/task.rs index 73689f7cf6..3201c43bd2 100644 --- a/crates/bsc/engine/src/task.rs +++ b/crates/bsc/engine/src/task.rs @@ -20,6 +20,7 @@ use std::{ }; use tokio::sync::Mutex; +use reth_rpc_types::{BlockId, RpcBlockHash}; use tokio::{ signal, sync::{ @@ -40,7 +41,7 @@ enum ForkChoiceMessage { #[derive(Debug, Clone)] struct NewHeaderEvent { header: SealedHeader, - trusted_header: SealedHeader, + local_header: SealedHeader, pipeline_sync: bool, } @@ -147,6 +148,7 @@ impl< loop { let read_storage = storage.read().await; let best_header = read_storage.best_header.clone(); + let finalized_hash = read_storage.best_finalized_hash; drop(read_storage); let mut engine_rx_guard = engine_rx.lock().await; let mut info = BlockInfo { @@ -231,19 +233,25 @@ impl< } } let latest_header = header_option.unwrap(); - - // skip if parent hash is not equal to best hash - if latest_header.number == best_header.number + 1 && - latest_header.parent_hash != best_header.hash() - { - continue; - } - - let trusted_header = client + let finalized_header = client + .sealed_header_by_id(BlockId::Hash(RpcBlockHash::from(finalized_hash))) + .ok() + .flatten() + .unwrap_or_else(|| chain_spec.sealed_genesis_header()); + debug!(target: "consensus::parlia", { finalized_header_number = ?finalized_header.number, finalized_header_hash = ?finalized_header.hash() }, "Latest finalized header"); + let latest_unsafe_header = client .latest_header() .ok() .flatten() .unwrap_or_else(|| chain_spec.sealed_genesis_header()); + debug!(target: "consensus::parlia", { latest_unsafe_header_number = ?latest_unsafe_header.number, latest_unsafe_header_hash = ?latest_unsafe_header.hash() }, "Latest unsafe header"); + + let mut trusted_header = latest_unsafe_header.clone(); + // if parent hash is not equal to latest unsafe hash + // may be a fork chain detected, we need to trust the finalized header + if latest_header.parent_hash != latest_unsafe_header.hash() { + trusted_header = finalized_header.clone(); + } // verify header and timestamp // predict timestamp is the trusted header timestamp plus the block interval times @@ -354,7 +362,7 @@ impl< // and finalized hash. // this can make Block Sync Engine to use pipeline sync mode. pipeline_sync, - trusted_header: trusted_header.clone(), + local_header: latest_unsafe_header.clone(), })); if result.is_err() { error!(target: "consensus::parlia", "Failed to send new block event to @@ -366,7 +374,7 @@ impl< let result = chain_tracker_tx.send(ForkChoiceMessage::NewHeader(NewHeaderEvent { header: sealed_header.clone(), pipeline_sync, - trusted_header: trusted_header.clone(), + local_header: latest_unsafe_header.clone(), })); if result.is_err() { error!(target: "consensus::parlia", "Failed to send new block event to chain tracker"); @@ -476,7 +484,7 @@ impl< } match msg.unwrap() { ForkChoiceMessage::NewHeader(event) => { - let new_header = event.trusted_header; + let new_header = event.local_header; let snap = match snapshot_reader.snapshot(&new_header, None) { Ok(snap) => snap, diff --git a/crates/consensus/beacon/src/engine/hooks/static_file.rs b/crates/consensus/beacon/src/engine/hooks/static_file.rs index b52812b53a..54dd1ccb9a 100644 --- a/crates/consensus/beacon/src/engine/hooks/static_file.rs +++ b/crates/consensus/beacon/src/engine/hooks/static_file.rs @@ -142,6 +142,15 @@ impl EngineHook for StaticFileHook { return Poll::Pending }; + // The chain state may be rewind, if the finalized block number is greater than the tip + // block number. In this case, we should wait until the finalized block number is + // less than or equal to the tip block number. To prevent the static file producer + // from producing static files for the wrong block and incrementing the index number. + if finalized_block_number >= ctx.tip_block_number { + trace!(target: "consensus::engine::hooks::static_file", ?ctx, "Finalized block number is greater than tip number"); + return Poll::Pending + } + // Try to spawn a static_file_producer match self.try_spawn_static_file_producer(finalized_block_number)? { Some(EngineHookEvent::NotReady) => return Poll::Pending,