diff --git a/fendermint/vm/topdown/src/lib.rs b/fendermint/vm/topdown/src/lib.rs index 98878a1d1..71730ba65 100644 --- a/fendermint/vm/topdown/src/lib.rs +++ b/fendermint/vm/topdown/src/lib.rs @@ -13,6 +13,7 @@ pub mod voting; pub mod observation; pub mod observe; +pub mod syncer; pub mod vote; use async_stm::Stm; @@ -28,6 +29,7 @@ use std::time::Duration; pub use crate::cache::{SequentialAppendError, SequentialKeyCache, ValueIter}; pub use crate::error::Error; pub use crate::finality::CachedFinalityProvider; +use crate::observation::Observation; pub use crate::toggle::Toggle; pub type BlockHeight = u64; @@ -108,6 +110,13 @@ impl Config { } } +/// On-chain data structure representing a topdown checkpoint agreed to by a +/// majority of subnet validators. DAG-CBOR encoded, embedded in CertifiedCheckpoint. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum Checkpoint { + V1(Observation), +} + /// The finality view for IPC parent at certain height. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct IPCParentFinality { @@ -193,3 +202,23 @@ pub(crate) fn is_null_round_error(err: &anyhow::Error) -> bool { pub(crate) fn is_null_round_str(s: &str) -> bool { s.contains(NULL_ROUND_ERR_MSG) } + +impl Checkpoint { + pub fn target_height(&self) -> BlockHeight { + match self { + Checkpoint::V1(b) => b.parent_height, + } + } + + pub fn target_hash(&self) -> &Bytes { + match self { + Checkpoint::V1(b) => &b.parent_hash, + } + } + + pub fn cumulative_effects_comm(&self) -> &Bytes { + match self { + Checkpoint::V1(b) => &b.cumulative_effects_comm, + } + } +} diff --git a/fendermint/vm/topdown/src/observation.rs b/fendermint/vm/topdown/src/observation.rs index eea36ffde..d59c9d9fa 100644 --- a/fendermint/vm/topdown/src/observation.rs +++ b/fendermint/vm/topdown/src/observation.rs @@ -1,25 +1,44 @@ // Copyright 2022-2024 Protocol Labs // SPDX-License-Identifier: Apache-2.0, MIT -use crate::{BlockHeight, Bytes}; +use crate::syncer::error::Error; +use crate::syncer::store::ParentViewStore; +use crate::{BlockHash, BlockHeight, Bytes, Checkpoint}; use anyhow::anyhow; use arbitrary::Arbitrary; +use cid::Cid; use fendermint_crypto::secp::RecoverableECDSASignature; use fendermint_crypto::SecretKey; use fendermint_vm_genesis::ValidatorKey; +use fvm_ipld_encoding::DAG_CBOR; +use multihash::Code; +use multihash::MultihashDigest; use serde::{Deserialize, Serialize}; +use std::cmp::min; use std::fmt::{Display, Formatter}; +use crate::syncer::payload::ParentBlockView; + +/// Default topdown observation height range +const DEFAULT_MAX_OBSERVATION_RANGE: BlockHeight = 100; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ObservationConfig { + /// The max number of blocks one should make the topdown observation from the previous + /// committed checkpoint + pub max_observation_range: Option, +} + /// The content that validators gossip among each other. #[derive(Serialize, Deserialize, Hash, Debug, Clone, Eq, PartialEq, Arbitrary)] pub struct Observation { - pub(crate) parent_height: u64, + pub(crate) parent_subnet_height: u64, /// The hash of the chain unit at that height. Usually a block hash, but could /// be another entity (e.g. tipset CID), depending on the parent chain /// and our interface to it. For example, if the parent is a Filecoin network, /// this would be a tipset CID coerced into a block hash if queried through /// the Eth API, or the tipset CID as-is if accessed through the Filecoin API. - pub(crate) parent_hash: Bytes, + pub(crate) parent_subnet_hash: Bytes, /// A rolling/cumulative commitment to topdown effects since the beginning of /// time, including the ones in this block. pub(crate) cumulative_effects_comm: Bytes, @@ -41,6 +60,53 @@ pub struct CertifiedObservation { signature: RecoverableECDSASignature, } +/// Check in the store to see if there is a new observation available. +/// Caller should make sure: +/// - the store has votes since the last committed checkpoint +/// - the votes have at least 1 non-null block +pub fn deduce_new_observation( + store: &S, + checkpoint: &Checkpoint, + config: &ObservationConfig, +) -> Result { + let Some(latest_height) = store.max_parent_view_height()? else { + tracing::info!("no observation yet as height not available"); + return Err(Error::BlockStoreEmpty); + }; + + if latest_height < checkpoint.target_height() { + tracing::info!("committed vote height more than latest parent view"); + return Err(Error::CommittedParentHeightNotPurged); + } + + let max_observation_height = checkpoint.target_height() + config.max_observation_range(); + let candidate_height = min(max_observation_height, latest_height); + tracing::debug!( + max_observation_height, + candidate_height, + "propose observation height" + ); + + // aggregate commitment for the observation + let mut agg = LinearizedParentBlockView::from(checkpoint); + for h in checkpoint.target_height() + 1..=candidate_height { + let Some(p) = store.get(h)? else { + tracing::debug!(height = h, "not parent block view"); + return Err(Error::MissingBlockView(h, candidate_height)); + }; + + agg.append(p)?; + } + + let observation = agg.into_observation()?; + tracing::info!( + height = observation.parent_subnet_height, + "new observation derived" + ); + + Ok(observation) +} + impl TryFrom<&[u8]> for CertifiedObservation { type Error = anyhow::Error; @@ -54,6 +120,10 @@ impl CertifiedObservation { &self.observation } + pub fn observation_signature(&self) -> &RecoverableECDSASignature { + &self.observation_signature + } + pub fn ensure_valid(&self) -> anyhow::Result { let to_sign = fvm_ipld_encoding::to_vec(&self.observation)?; let (pk1, _) = self.observation_signature.recover(&to_sign)?; @@ -97,8 +167,8 @@ impl CertifiedObservation { impl Observation { pub fn new(parent_height: BlockHeight, parent_hash: Bytes, commitment: Bytes) -> Self { Self { - parent_height, - parent_hash, + parent_subnet_height: parent_height, + parent_subnet_hash: parent_hash, cumulative_effects_comm: commitment, } } @@ -109,8 +179,8 @@ impl Display for Observation { write!( f, "Observation(parent_height={}, parent_hash={}, commitment={})", - self.parent_height, - hex::encode(&self.parent_hash), + self.parent_subnet_height, + hex::encode(&self.parent_subnet_hash), hex::encode(&self.cumulative_effects_comm), ) } @@ -118,6 +188,80 @@ impl Display for Observation { impl Observation { pub fn parent_height(&self) -> BlockHeight { - self.parent_height + self.parent_subnet_height } } + +impl ObservationConfig { + pub fn max_observation_range(&self) -> BlockHeight { + self.max_observation_range + .unwrap_or(DEFAULT_MAX_OBSERVATION_RANGE) + } +} + +pub(crate) struct LinearizedParentBlockView { + parent_height: u64, + parent_hash: Option, + cumulative_effects_comm: Bytes, +} + +impl From<&Checkpoint> for LinearizedParentBlockView { + fn from(value: &Checkpoint) -> Self { + LinearizedParentBlockView { + parent_height: value.target_height(), + parent_hash: Some(value.target_hash().clone()), + cumulative_effects_comm: value.cumulative_effects_comm().clone(), + } + } +} + +impl From<&Observation> for LinearizedParentBlockView { + fn from(value: &Observation) -> Self { + LinearizedParentBlockView { + parent_height: value.parent_subnet_height, + parent_hash: Some(value.parent_subnet_hash.clone()), + cumulative_effects_comm: value.cumulative_effects_comm.clone(), + } + } +} + +impl LinearizedParentBlockView { + fn new_commitment(&mut self, to_append: Bytes) { + let bytes = [ + self.cumulative_effects_comm.as_slice(), + to_append.as_slice(), + ] + .concat(); + let cid = Cid::new_v1(DAG_CBOR, Code::Blake2b256.digest(&bytes)); + self.cumulative_effects_comm = cid.to_bytes(); + } + + pub fn append(&mut self, view: ParentBlockView) -> Result<(), Error> { + if self.parent_height + 1 != view.parent_height { + return Err(Error::NotSequential); + } + + self.parent_height += 1; + + self.new_commitment(view.effects_commitment()?); + + if let Some(p) = view.payload { + self.parent_hash = Some(p.parent_hash); + } + + Ok(()) + } + + pub fn into_observation(self) -> Result { + let Some(hash) = self.parent_hash else { + return Err(Error::CannotCommitObservationAtNullBlock( + self.parent_height, + )); + }; + Ok(Observation::new( + self.parent_height, + hash, + self.cumulative_effects_comm, + )) + } +} \ No newline at end of file diff --git a/fendermint/vm/topdown/src/proxy.rs b/fendermint/vm/topdown/src/proxy.rs index 94a8e3177..882b11bb0 100644 --- a/fendermint/vm/topdown/src/proxy.rs +++ b/fendermint/vm/topdown/src/proxy.rs @@ -13,6 +13,7 @@ use ipc_api::subnet_id::SubnetID; use ipc_observability::emit; use ipc_provider::manager::{GetBlockHashResult, TopDownQueryPayload}; use ipc_provider::IpcProvider; +use std::sync::Arc; use std::time::Instant; use tracing::instrument; @@ -42,6 +43,35 @@ pub trait ParentQueryProxy { ) -> anyhow::Result>>; } +#[async_trait] +impl ParentQueryProxy for Arc

{ + async fn get_chain_head_height(&self) -> Result { + self.as_ref().get_chain_head_height().await + } + + async fn get_genesis_epoch(&self) -> Result { + self.as_ref().get_genesis_epoch().await + } + + async fn get_block_hash(&self, height: BlockHeight) -> Result { + self.as_ref().get_block_hash(height).await + } + + async fn get_top_down_msgs( + &self, + height: BlockHeight, + ) -> Result>> { + self.as_ref().get_top_down_msgs(height).await + } + + async fn get_validator_changes( + &self, + height: BlockHeight, + ) -> Result>> { + self.as_ref().get_validator_changes(height).await + } +} + /// The proxy to the subnet's parent pub struct IPCProviderProxy { ipc_provider: IpcProvider, diff --git a/fendermint/vm/topdown/src/syncer/error.rs b/fendermint/vm/topdown/src/syncer/error.rs new file mode 100644 index 000000000..ca92175ab --- /dev/null +++ b/fendermint/vm/topdown/src/syncer/error.rs @@ -0,0 +1,28 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::BlockHeight; +use thiserror::Error; + +/// The errors for top down checkpointing +#[derive(Error, Debug, Eq, PartialEq, Clone)] +pub enum Error { + #[error("Incoming items are not order sequentially")] + NotSequential, + #[error("The parent view update with block height is not sequential")] + NonSequentialParentViewInsert, + #[error("Parent chain reorg detected")] + ParentChainReorgDetected, + #[error("Cannot query parent at height {1}: {0}")] + CannotQueryParent(String, BlockHeight), + #[error("Parent block view store is empty")] + BlockStoreEmpty, + #[error("Committed block height not purged yet")] + CommittedParentHeightNotPurged, + #[error("Cannot serialize parent block view payload to bytes")] + CannotSerializeParentBlockView, + #[error("Cannot create commitment at null parent block {0}")] + CannotCommitObservationAtNullBlock(BlockHeight), + #[error("Missing block view at height {0} for target observation height {0}")] + MissingBlockView(BlockHeight, BlockHeight), +} diff --git a/fendermint/vm/topdown/src/syncer/mod.rs b/fendermint/vm/topdown/src/syncer/mod.rs new file mode 100644 index 000000000..3def2e614 --- /dev/null +++ b/fendermint/vm/topdown/src/syncer/mod.rs @@ -0,0 +1,173 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::observation::{LinearizedParentBlockView, Observation, ObservationConfig}; +use crate::syncer::store::ParentViewStore; +use crate::{BlockHeight, Checkpoint}; +use anyhow::anyhow; +use async_trait::async_trait; +use ipc_api::cross::IpcEnvelope; +use ipc_api::staking::StakingChangeRequest; +use serde::Deserialize; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::select; +use tokio::sync::{broadcast, mpsc}; + +pub mod error; +pub mod payload; +pub mod poll; +pub mod store; + +pub type QuorumCertContent = (Observation, Vec, Vec); + +#[derive(Clone, Debug)] +pub enum TopDownSyncEvent { + /// The fendermint node is syncing with peers + NodeSyncing, + NewProposal(Box), +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ParentSyncerConfig { + pub request_channel_size: usize, + /// The event broadcast channel buffer size + pub broadcast_channel_size: usize, + /// The number of blocks to delay before reporting a height as final on the parent chain. + /// To propose a certain number of epochs delayed from the latest height, we see to be + /// conservative and avoid other from rejecting the proposal because they don't see the + /// height as final yet. + pub chain_head_delay: BlockHeight, + /// Parent syncing cron period, in millis + pub polling_interval_millis: Duration, + /// Max number of requests to process in the reactor loop + pub max_requests_per_loop: usize, + /// Max number of un-finalized parent blocks that should be stored in the store + pub max_store_blocks: BlockHeight, + /// Attempts to sync as many block as possible till the finalized chain head + pub sync_many: bool, + + pub observation: ObservationConfig, +} + +#[derive(Clone)] +pub struct ParentSyncerReactorClient { + tx: mpsc::Sender, + checkpoint: Arc>, + store: S, +} + +impl ParentSyncerReactorClient { + pub fn new( + request_channel_size: usize, + store: S, + ) -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(request_channel_size); + let checkpoint = Arc::new(Mutex::new(Checkpoint::v1(0, vec![], vec![]))); + ( + Self { + tx, + checkpoint, + store, + }, + rx, + ) + } +} + +pub fn start_polling_reactor( + mut rx: mpsc::Receiver, + mut poller: P, + config: ParentSyncerConfig, +) { + let polling_interval = config.polling_interval_millis; + tokio::spawn(async move { + loop { + select! { + _ = tokio::time::sleep(polling_interval) => { + if let Err(e) = poller.try_poll().await { + tracing::error!(err = e.to_string(), "cannot sync with parent"); + } + } + req = rx.recv() => { + let Some(req) = req else { break }; + match req { + ParentSyncerRequest::Finalized(cp) => { + if let Err(e) = poller.finalize(cp) { + tracing::error!(err = e.to_string(), "cannot finalize syncer") + } + }, + } + } + } + } + }); +} + +/// Polls the parent block view +#[async_trait] +pub trait ParentPoller { + type Store: ParentViewStore + Send + Sync + 'static + Clone; + + fn subscribe(&self) -> broadcast::Receiver; + + fn store(&self) -> Self::Store; + + /// The target block height is finalized, purge all the parent view before the target height + fn finalize(&mut self, checkpoint: Checkpoint) -> anyhow::Result<()>; + + /// Try to poll the next parent height + async fn try_poll(&mut self) -> anyhow::Result<()>; +} + +impl ParentSyncerReactorClient { + fn set_checkpoint(&self, cp: Checkpoint) { + let mut checkpoint = self.checkpoint.lock().unwrap(); + *checkpoint = cp.clone(); + } + /// Marks the height as finalized. + /// There is no need to wait for ack from the reactor + pub async fn finalize_parent_height(&self, cp: Checkpoint) -> anyhow::Result<()> { + self.set_checkpoint(cp.clone()); + self.tx.send(ParentSyncerRequest::Finalized(cp)).await?; + Ok(()) + } + + pub fn prepare_quorum_cert_content( + &self, + end_height: BlockHeight, + ) -> anyhow::Result { + let latest_checkpoint = self.checkpoint.lock().unwrap().clone(); + + let mut xnet_msgs = vec![]; + let mut validator_changes = vec![]; + let mut linear = LinearizedParentBlockView::from(&latest_checkpoint); + + let start = latest_checkpoint.target_height() + 1; + for h in start..=end_height { + let Some(v) = self.store.get(h)? else { + return Err(anyhow!("parent block view store does not have data at {h}")); + }; + + if let Err(e) = linear.append(v.clone()) { + return Err(anyhow!("parent block view cannot be appended: {e}")); + } + + if let Some(payload) = v.payload { + xnet_msgs.extend(payload.xnet_msgs); + validator_changes.extend(payload.validator_changes); + } + } + + let ob = linear + .into_observation() + .map_err(|e| anyhow!("cannot convert linearized parent view into observation: {e}"))?; + + Ok((ob, xnet_msgs, validator_changes)) + } +} + +pub enum ParentSyncerRequest { + /// A new parent height is finalized + Finalized(Checkpoint), +} \ No newline at end of file diff --git a/fendermint/vm/topdown/src/syncer/payload.rs b/fendermint/vm/topdown/src/syncer/payload.rs new file mode 100644 index 000000000..0d855cafc --- /dev/null +++ b/fendermint/vm/topdown/src/syncer/payload.rs @@ -0,0 +1,66 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::syncer::error::Error; +use crate::{BlockHash, BlockHeight, Bytes}; +use cid::Cid; +use fvm_ipld_encoding::DAG_CBOR; +use ipc_api::cross::IpcEnvelope; +use ipc_api::staking::StakingChangeRequest; +use multihash::Code; +use multihash::MultihashDigest; + +#[derive(Clone, Debug)] +pub struct ParentBlockViewPayload { + pub parent_hash: BlockHash, + /// Encodes cross-net messages. + pub xnet_msgs: Vec, + /// Encodes validator membership change commands. + pub validator_changes: Vec, +} + +#[derive(Clone, Debug)] +pub struct ParentBlockView { + pub parent_height: BlockHeight, + /// If the payload is None, this means the parent height is a null block + pub payload: Option, +} + +impl ParentBlockView { + pub fn null_block(h: BlockHeight) -> Self { + Self { + parent_height: h, + payload: None, + } + } + + pub fn nonnull_block( + h: BlockHeight, + parent_hash: BlockHash, + xnet_msgs: Vec, + validator_changes: Vec, + ) -> Self { + Self { + parent_height: h, + payload: Some(ParentBlockViewPayload { + parent_hash, + xnet_msgs, + validator_changes, + }), + } + } + + pub fn effects_commitment(&self) -> Result { + let Some(ref p) = self.payload else { + return Ok(Cid::default().to_bytes()); + }; + + let bytes = + fvm_ipld_encoding::to_vec(&(&p.xnet_msgs, &p.validator_changes)).map_err(|e| { + tracing::error!(err = e.to_string(), "cannot serialize parent block view"); + Error::CannotSerializeParentBlockView + })?; + let cid = Cid::new_v1(DAG_CBOR, Code::Blake2b256.digest(&bytes)); + Ok(cid.to_bytes()) + } +} diff --git a/fendermint/vm/topdown/src/syncer/poll.rs b/fendermint/vm/topdown/src/syncer/poll.rs new file mode 100644 index 000000000..6bce1ae60 --- /dev/null +++ b/fendermint/vm/topdown/src/syncer/poll.rs @@ -0,0 +1,316 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::observation::deduce_new_observation; +use crate::observe::ParentFinalityAcquired; +use crate::proxy::ParentQueryProxy; +use crate::syncer::error::Error; +use crate::syncer::payload::ParentBlockView; +use crate::syncer::store::ParentViewStore; +use crate::syncer::{ParentPoller, ParentSyncerConfig, TopDownSyncEvent}; +use crate::{is_null_round_str, BlockHash, BlockHeight, Checkpoint}; +use anyhow::anyhow; +use async_trait::async_trait; +use ipc_observability::emit; +use ipc_observability::serde::HexEncodableBlockHash; +use libp2p::futures::TryFutureExt; +use tokio::sync::broadcast; +use tokio::sync::broadcast::Receiver; +use tracing::instrument; + +pub struct ParentPoll { + config: ParentSyncerConfig, + parent_proxy: P, + store: S, + event_broadcast: broadcast::Sender, + last_finalized: Checkpoint, +} + +#[async_trait] +impl ParentPoller for ParentPoll + where + S: ParentViewStore + Send + Sync + 'static + Clone, + P: Send + Sync + 'static + ParentQueryProxy, +{ + type Store = S; + + fn subscribe(&self) -> Receiver { + self.event_broadcast.subscribe() + } + + fn store(&self) -> Self::Store { + self.store.clone() + } + + /// The target block height is finalized, purge all the parent view before the target height + fn finalize(&mut self, checkpoint: Checkpoint) -> anyhow::Result<()> { + let Some(min_height) = self.store.min_parent_view_height()? else { + return Ok(()); + }; + for h in min_height..=checkpoint.target_height() { + self.store.purge(h)?; + } + + self.last_finalized = checkpoint; + + Ok(()) + } + + /// Insert the height into cache when we see a new non null block + async fn try_poll(&mut self) -> anyhow::Result<()> { + let Some(chain_head) = self.finalized_chain_head().await? else { + return Ok(()); + }; + + let (mut latest_height_fetched, mut first_non_null_parent_hash) = + self.latest_nonnull_data()?; + tracing::debug!(chain_head, latest_height_fetched, "syncing heights"); + + if latest_height_fetched > chain_head { + tracing::warn!( + chain_head, + latest_height_fetched, + "chain head went backwards, potential reorg detected from height" + ); + todo!("handle reorg, maybe just a warning???") + } + + if latest_height_fetched == chain_head { + tracing::debug!( + chain_head, + latest_height_fetched, + "the parent has yet to produce a new block" + ); + return Ok(()); + } + + loop { + if self.store_full()? { + tracing::debug!("exceeded cache size limit"); + break; + } + + first_non_null_parent_hash = match self + .poll_next(latest_height_fetched + 1, first_non_null_parent_hash) + .await + { + Ok(h) => h, + Err(Error::ParentChainReorgDetected) => { + tracing::warn!("potential reorg detected, clear cache and retry"); + todo!(); + // break; + } + Err(e) => return Err(anyhow!(e)), + }; + + latest_height_fetched += 1; + + if latest_height_fetched == chain_head { + tracing::debug!("reached the tip of the chain"); + break; + } else if !self.config.sync_many { + break; + } + } + + Ok(()) + } +} + +impl ParentPoll + where + S: ParentViewStore + Send + Sync + 'static, + P: Send + Sync + 'static + ParentQueryProxy, +{ + pub fn new(config: ParentSyncerConfig, proxy: P, store: S, last_finalized: Checkpoint) -> Self { + let (tx, _) = broadcast::channel(config.broadcast_channel_size); + Self { + config, + parent_proxy: proxy, + store, + event_broadcast: tx, + last_finalized, + } + } + + /// Get the latest non null block data stored + fn latest_nonnull_data(&self) -> anyhow::Result<(BlockHeight, BlockHash)> { + let Some(latest_height) = self.store.max_parent_view_height()? else { + return Ok(( + self.last_finalized.target_height(), + self.last_finalized.target_hash().clone(), + )); + }; + + let start = self.last_finalized.target_height() + 1; + for h in (start..=latest_height).rev() { + let Some(p) = self.store.get(h)? else { + continue; + }; + + // if parent hash of the proposal is null, it means the + let Some(p) = p.payload else { + continue; + }; + + return Ok((h, p.parent_hash)); + } + + // this means the votes stored are all null blocks, return last committed finality + Ok(( + self.last_finalized.target_height(), + self.last_finalized.target_hash().clone(), + )) + } + + fn store_full(&self) -> anyhow::Result { + let Some(h) = self.store.max_parent_view_height()? else { + return Ok(false); + }; + Ok(h - self.last_finalized.target_height() > self.config.max_store_blocks) + } + + async fn finalized_chain_head(&self) -> anyhow::Result> { + let parent_chain_head_height = self.parent_proxy.get_chain_head_height().await?; + // sanity check + if parent_chain_head_height < self.config.chain_head_delay { + tracing::debug!("latest height not more than the chain head delay"); + return Ok(None); + } + + // we consider the chain head finalized only after the `chain_head_delay` + Ok(Some( + parent_chain_head_height - self.config.chain_head_delay, + )) + } + + /// Poll the next block height. Returns finalized and executed block data. + async fn poll_next( + &mut self, + height: BlockHeight, + parent_block_hash: BlockHash, + ) -> Result { + tracing::debug!( + height, + parent_block_hash = hex::encode(&parent_block_hash), + "polling height with parent hash" + ); + + let block_hash_res = match self.parent_proxy.get_block_hash(height).await { + Ok(res) => res, + Err(e) => { + let err = e.to_string(); + if is_null_round_str(&err) { + tracing::debug!( + height, + "detected null round at height, inserted None to cache" + ); + + self.store.store(ParentBlockView::null_block(height))?; + + // self.store.store(ParentView::null_block(height))?; + + emit(ParentFinalityAcquired { + source: "Parent syncer", + is_null: true, + block_height: height, + block_hash: None, + commitment_hash: None, + num_msgs: 0, + num_validator_changes: 0, + }); + + // Null block received, no block hash for the current height being polled. + // Return the previous parent hash as the non-null block hash. + return Ok(parent_block_hash); + } + return Err(Error::CannotQueryParent( + format!("get_block_hash: {e}"), + height, + )); + } + }; + + if block_hash_res.parent_block_hash != parent_block_hash { + tracing::warn!( + height, + parent_hash = hex::encode(&block_hash_res.parent_block_hash), + previous_hash = hex::encode(&parent_block_hash), + "parent block hash diff than previous hash", + ); + return Err(Error::ParentChainReorgDetected); + } + + let view = fetch_data(&self.parent_proxy, height, block_hash_res.block_hash).await?; + + self.store.store(view.clone())?; + let commitment = + deduce_new_observation(&self.store, &self.last_finalized, &self.config.observation)?; + // if there is an error, ignore, we can always try next loop + let _ = self + .event_broadcast + .send(TopDownSyncEvent::NewProposal(Box::new(commitment))); + + let payload = view.payload.as_ref().unwrap(); + emit(ParentFinalityAcquired { + source: "Parent syncer", + is_null: false, + block_height: height, + block_hash: Some(HexEncodableBlockHash(payload.parent_hash.clone())), + // TODO Karel, Willes - when we introduce commitment hash, we should add it here + commitment_hash: None, + num_msgs: payload.xnet_msgs.len(), + num_validator_changes: payload.validator_changes.len(), + }); + + Ok(view.payload.unwrap().parent_hash) + } +} + +#[instrument(skip(parent_proxy))] +async fn fetch_data

( + parent_proxy: &P, + height: BlockHeight, + block_hash: BlockHash, +) -> Result + where + P: ParentQueryProxy + Send + Sync + 'static, +{ + let changes_res = parent_proxy + .get_validator_changes(height) + .map_err(|e| Error::CannotQueryParent(format!("get_validator_changes: {e}"), height)); + + let topdown_msgs_res = parent_proxy + .get_top_down_msgs(height) + .map_err(|e| Error::CannotQueryParent(format!("get_top_down_msgs: {e}"), height)); + + let (changes_res, topdown_msgs_res) = tokio::join!(changes_res, topdown_msgs_res); + let (changes_res, topdown_msgs_res) = (changes_res?, topdown_msgs_res?); + + if changes_res.block_hash != block_hash { + tracing::warn!( + height, + change_set_hash = hex::encode(&changes_res.block_hash), + block_hash = hex::encode(&block_hash), + "change set block hash does not equal block hash", + ); + return Err(Error::ParentChainReorgDetected); + } + + if topdown_msgs_res.block_hash != block_hash { + tracing::warn!( + height, + topdown_msgs_hash = hex::encode(&topdown_msgs_res.block_hash), + block_hash = hex::encode(&block_hash), + "topdown messages block hash does not equal block hash", + ); + return Err(Error::ParentChainReorgDetected); + } + + Ok(ParentBlockView::nonnull_block( + height, + block_hash, + topdown_msgs_res.value, + changes_res.value, + )) +} \ No newline at end of file diff --git a/fendermint/vm/topdown/src/syncer/store.rs b/fendermint/vm/topdown/src/syncer/store.rs new file mode 100644 index 000000000..21015b89a --- /dev/null +++ b/fendermint/vm/topdown/src/syncer/store.rs @@ -0,0 +1,72 @@ +// Copyright 2022-2024 Protocol Labs +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::syncer::error::Error; +use crate::syncer::payload::ParentBlockView; +use crate::{BlockHeight, SequentialKeyCache}; +use std::sync::{Arc, RwLock}; + +/// Stores the parent view observed of the current node +pub trait ParentViewStore { + /// Store a newly observed parent view + fn store(&self, view: ParentBlockView) -> Result<(), Error>; + + /// Get the parent view at the specified height + fn get(&self, height: BlockHeight) -> Result, Error>; + + /// Purge the parent view at the target height + fn purge(&self, height: BlockHeight) -> Result<(), Error>; + + fn min_parent_view_height(&self) -> Result, Error>; + + fn max_parent_view_height(&self) -> Result, Error>; +} + +#[derive(Clone)] +pub struct InMemoryParentViewStore { + inner: Arc>>, +} + +impl Default for InMemoryParentViewStore { + fn default() -> Self { + Self::new() + } +} + +impl InMemoryParentViewStore { + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(SequentialKeyCache::sequential())), + } + } +} + +impl ParentViewStore for InMemoryParentViewStore { + fn store(&self, view: ParentBlockView) -> Result<(), Error> { + let mut inner = self.inner.write().unwrap(); + inner + .append(view.parent_height, view) + .map_err(|_| Error::NonSequentialParentViewInsert) + } + + fn get(&self, height: BlockHeight) -> Result, Error> { + let inner = self.inner.read().unwrap(); + Ok(inner.get_value(height).cloned()) + } + + fn purge(&self, height: BlockHeight) -> Result<(), Error> { + let mut inner = self.inner.write().unwrap(); + inner.remove_key_below(height + 1); + Ok(()) + } + + fn min_parent_view_height(&self) -> Result, Error> { + let inner = self.inner.read().unwrap(); + Ok(inner.lower_bound()) + } + + fn max_parent_view_height(&self) -> Result, Error> { + let inner = self.inner.read().unwrap(); + Ok(inner.upper_bound()) + } +} \ No newline at end of file