Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check header timestamp in parlia task #46

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions crates/bsc/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ use secp256k1::{
};
use sha3::{Digest, Keccak256};
use std::{
clone::Clone,
collections::{HashMap, VecDeque},
fmt::{Debug, Formatter},
num::NonZeroUsize,
sync::Arc,
time::SystemTime,
};

use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
Mutex, RwLockReadGuard, RwLockWriteGuard,
Expand Down Expand Up @@ -434,10 +436,34 @@ impl Parlia {

/// Header and Block validation
impl Parlia {
fn present_timestamp(&self) -> u64 {
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()
}

fn validate_header_with_predicted_timestamp(
&self,
header: &SealedHeader,
predicted_timestamp: u64,
) -> Result<(), ConsensusError> {
if header.timestamp < predicted_timestamp {
return Err(ConsensusError::TimestampNotExpected {
timestamp: header.timestamp,
predicted_timestamp,
});
}
let present_timestamp = self.present_timestamp();
if predicted_timestamp > present_timestamp {
return Err(ConsensusError::TimestampIsInFuture {
timestamp: predicted_timestamp,
present_timestamp,
});
}
self.validate_header(header)
}

fn validate_header(&self, header: &SealedHeader) -> Result<(), ConsensusError> {
// Don't waste time checking blocks from the future
let present_timestamp =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let present_timestamp = self.present_timestamp();
if header.timestamp > present_timestamp {
return Err(ConsensusError::TimestampIsInFuture {
timestamp: header.timestamp,
Expand Down Expand Up @@ -573,14 +599,14 @@ pub struct ParliaEngineBuilder<Client, Engine: EngineTypes> {
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
network_block_event_rx: Arc<Mutex<UnboundedReceiver<EngineMessage>>>,
fetch_client: FetchClient,
_client: Client,
client: Client,
}

// === impl ParliaEngineBuilder ===

impl<Client, Engine> ParliaEngineBuilder<Client, Engine>
where
Client: BlockReaderIdExt,
Client: BlockReaderIdExt + Clone + 'static,
Engine: EngineTypes + 'static,
{
/// Creates a new builder instance to configure all parts.
Expand All @@ -601,7 +627,7 @@ where
Self {
chain_spec,
cfg,
_client: client,
client,
storage: Storage::new(latest_header),
to_engine,
network_block_event_rx,
Expand All @@ -619,12 +645,13 @@ where
to_engine,
network_block_event_rx,
fetch_client,
_client,
client,
} = self;
let parlia_client = ParliaClient::new(storage.clone(), fetch_client);
ParliaEngineTask::start(
chain_spec.clone(),
Parlia::new(chain_spec, cfg.clone()),
client,
to_engine,
network_block_event_rx,
storage,
Expand Down
51 changes: 39 additions & 12 deletions crates/bsc/consensus/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use reth_engine_primitives::EngineTypes;
use reth_network::message::EngineMessage;
use reth_network_p2p::{headers::client::HeadersClient, priority::Priority};
use reth_primitives::{Block, BlockBody, BlockHashOrNumber, B256};
use reth_provider::BlockReaderIdExt;
use reth_rpc_types::engine::ForkchoiceState;
use std::{
clone::Clone,
fmt,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
Expand Down Expand Up @@ -45,14 +47,17 @@ struct BlockInfo {
}

/// A Future that listens for new headers and puts into storage
pub(crate) struct ParliaEngineTask<Engine: EngineTypes> {
pub(crate) struct ParliaEngineTask<Engine: EngineTypes, Client: BlockReaderIdExt> {
/// The configured chain spec
chain_spec: Arc<ChainSpec>,
/// The coneensus instance
consensus: Parlia,
/// The client used to read the block and header from the inserted chain
client: Client,
/// The client used to fetch headers
block_fetcher: ParliaClient,
fetch_header_timeout_duration: u64,
/// The interval of the block producing
block_interval: u64,
/// Shared storage to insert new headers
storage: Storage,
/// The engine to send messages to the beacon engine
Expand All @@ -66,27 +71,31 @@ pub(crate) struct ParliaEngineTask<Engine: EngineTypes> {
}

// === impl ParliaEngineTask ===
impl<Engine: EngineTypes + 'static> ParliaEngineTask<Engine> {
impl<Engine: EngineTypes + 'static, Client: BlockReaderIdExt + Clone + 'static>
ParliaEngineTask<Engine, Client>
{
/// Creates a new instance of the task
#[allow(clippy::too_many_arguments)]
pub(crate) fn start(
chain_spec: Arc<ChainSpec>,
consensus: Parlia,
client: Client,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
network_block_event_rx: Arc<Mutex<UnboundedReceiver<EngineMessage>>>,
storage: Storage,
block_fetcher: ParliaClient,
fetch_header_timeout_duration: u64,
block_interval: u64,
) {
let (fork_choice_tx, fork_choice_rx) = mpsc::unbounded_channel();
let this = Self {
chain_spec,
consensus,
client,
to_engine,
network_block_event_rx,
storage,
block_fetcher,
fetch_header_timeout_duration,
block_interval,
fork_choice_tx,
fork_choice_rx: Arc::new(Mutex::new(fork_choice_rx)),
};
Expand All @@ -98,12 +107,15 @@ impl<Engine: EngineTypes + 'static> ParliaEngineTask<Engine> {
/// Start listening to the network block event
fn start_block_event_listening(&self) {
let engine_rx = self.network_block_event_rx.clone();
let mut interval = interval(Duration::from_secs(10));
let block_interval = self.block_interval;
let mut interval = interval(Duration::from_secs(block_interval));
let chain_spec = self.chain_spec.clone();
let storage = self.storage.clone();
let client = self.client.clone();
let block_fetcher = self.block_fetcher.clone();
let consensus = self.consensus.clone();
let fork_choice_tx = self.fork_choice_tx.clone();
let fetch_header_timeout_duration = Duration::from_secs(self.fetch_header_timeout_duration);
let fetch_header_timeout_duration = Duration::from_secs(block_interval);

tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -157,8 +169,6 @@ impl<Engine: EngineTypes + 'static> ParliaEngineTask<Engine> {
}

// skip if number is lower than best number
// TODO: if there is a big number incoming, will cause the sync broken, need a
// better solution to handle this
if info.block_number <= best_header.number {
continue;
}
Expand Down Expand Up @@ -203,9 +213,23 @@ impl<Engine: EngineTypes + 'static> ParliaEngineTask<Engine> {
continue;
}

// verify header
let trusted_header = client
.latest_header()
.ok()
.flatten()
.unwrap_or_else(|| chain_spec.sealed_genesis_header());

// verify header and timestamp
// predict timestamp is the trusted header timestamp plus the block interval times
// the difference between the latest header number and the trusted
// header number the timestamp of latest header should be bigger
// than the predicted timestamp and less than the current timestamp.
let predicted_timestamp = trusted_header.timestamp +
block_interval * (latest_header.number - trusted_header.number);
let sealed_header = latest_header.clone().seal_slow();
let is_valid_header = match consensus.validate_header(&sealed_header) {
let is_valid_header = match consensus
.validate_header_with_predicted_timestamp(&sealed_header, predicted_timestamp)
{
Ok(_) => true,
Err(err) => {
debug!(target: "consensus::parlia", %err, "Parlia verify header failed");
Expand Down Expand Up @@ -308,13 +332,16 @@ impl<Engine: EngineTypes + 'static> ParliaEngineTask<Engine> {
}
}

impl<Engine: EngineTypes> fmt::Debug for ParliaEngineTask<Engine> {
impl<Engine: EngineTypes, Client: BlockReaderIdExt> fmt::Debug
for ParliaEngineTask<Engine, Client>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("chain_spec")
.field("chain_spec", &self.chain_spec)
.field("consensus", &self.consensus)
.field("storage", &self.storage)
.field("block_fetcher", &self.block_fetcher)
.field("block_interval", &self.block_interval)
.finish_non_exhaustive()
}
}
9 changes: 9 additions & 0 deletions crates/consensus/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,15 @@ pub enum ConsensusError {
#[error("mismatched parent hash: {0}")]
ParentHashMismatch(GotExpectedBoxed<B256>),

/// Error when the block timestamp is not expected compared to the predicted timestamp.
#[error("block timestamp {timestamp} is not expected compared to {predicted_timestamp}")]
TimestampNotExpected {
/// The block's timestamp.
timestamp: u64,
/// The predicted timestamp.
predicted_timestamp: u64,
},

/// Error when the block timestamp is in the future compared to our clock time.
#[error("block timestamp {timestamp} is in the future compared to our clock time {present_timestamp}")]
TimestampIsInFuture {
Expand Down