Skip to content

Commit

Permalink
add log and fix for the robot advice
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzhhuang committed Jul 5, 2024
1 parent 0e96775 commit 08c03b2
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 24 deletions.
2 changes: 1 addition & 1 deletion chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ impl BlockChain {
.dag
.commit(header.to_owned(), self.get_block_dag_origin()?);
match result {
anyhow::Result::Ok(_) => (),
anyhow::Result::Ok(_) => info!("finish to commit dag block: {:?}", block_id),
Err(e) => {
if let Some(StoreError::KeyAlreadyExists(_)) = e.downcast_ref::<StoreError>() {
info!("dag block already exist, ignore");
Expand Down
17 changes: 16 additions & 1 deletion flexidag/src/blockdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{process_key_already_error, reachability};
use anyhow::{bail, Ok};
use starcoin_config::{temp_dir, RocksdbConfig};
use starcoin_crypto::{HashValue as Hash, HashValue};
use starcoin_logger::prelude::info;
use starcoin_logger::prelude::{debug, info};
use starcoin_types::block::BlockHeader;
use starcoin_types::{
blockhash::{BlockHashes, KType},
Expand Down Expand Up @@ -123,8 +123,18 @@ impl BlockDAG {
}

pub fn commit(&mut self, header: BlockHeader, origin: HashValue) -> anyhow::Result<()> {
info!(
"start to commit header: {:?}, number: {:?}",
header.id(),
header.number()
);
// Generate ghostdag data
let parents = header.parents();
debug!(
"start to get the ghost data from block: {:?}, number: {:?}",
header.id(),
header.number()
);
let ghostdata = match self.ghostdata_by_hash(header.id())? {
None => {
// It must be the dag genesis if header is a format for a single chain
Expand All @@ -145,6 +155,11 @@ impl BlockDAG {
)?;

// Update reachability store
debug!(
"start to update reachability data for block: {:?}, number: {:?}",
header.id(),
header.number()
);
let reachability_store = self.storage.reachability_store.clone();
let mut merge_set = ghostdata
.unordered_mergeset_without_selected_parent()
Expand Down
43 changes: 30 additions & 13 deletions sync/src/store/sync_dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::format_err;
use starcoin_config::{temp_dir, RocksdbConfig, StorageConfig};
use starcoin_crypto::HashValue;
use starcoin_dag::consensusdb::prelude::StoreError;
use starcoin_logger::prelude::error;
use starcoin_storage::db_storage::{DBStorage, SchemaIterator};
use starcoin_types::block::{Block, BlockNumber};

Expand Down Expand Up @@ -60,13 +61,16 @@ impl SyncDagStore {
db_path: P,
config: SyncDagStoreConfig,
) -> anyhow::Result<Self> {
let db = Arc::new(DBStorage::open_with_cfs(
db_path,
vec![SYNC_ABSENT_BLOCK_CF],
false,
config.rocksdb_config,
None,
)?);
let db = Arc::new(
DBStorage::open_with_cfs(
db_path,
vec![SYNC_ABSENT_BLOCK_CF],
false,
config.rocksdb_config,
None,
)
.map_err(|e| format_err!("Failed to open database: {:?}", e))?,
);

Ok(Self {
absent_dag_store: SyncAbsentBlockStore::new(db, config.cache_size),
Expand Down Expand Up @@ -102,10 +106,12 @@ impl SyncDagStore {
}
Err(e) => match e {
StoreError::KeyNotFound(_) => {
self.absent_dag_store.save_absent_block(vec![DagSyncBlock {
block: Some(block.clone()),
children: vec![],
}])?;
self.absent_dag_store
.save_absent_block(vec![DagSyncBlock {
block: Some(block.clone()),
children: vec![],
}])
.map_err(|e| format_err!("Failed to save absent block: {:?}", e))?;
Ok(())
}
_ => Err(format_err!(
Expand Down Expand Up @@ -136,6 +142,13 @@ impl SyncDagStore {
) -> anyhow::Result<DagSyncBlock, StoreError> {
self.absent_dag_store
.get_absent_block_by_id(number, block_id)
.map_err(|e| {
error!(
"Failed to get DAG sync block with number: {}, block_id: {}. Error: {:?}",
number, block_id, e
);
e
})
}

pub fn update_children(
Expand All @@ -144,12 +157,16 @@ impl SyncDagStore {
parent_id: HashValue,
child_id: HashValue,
) -> anyhow::Result<()> {
let mut syn_dag = self.get_dag_sync_block(parent_number, parent_id)?;
let mut syn_dag = self
.get_dag_sync_block(parent_number, parent_id)
.map_err(|e| format_err!("Failed to get DAG sync block for update: {:?}", e))?;
if syn_dag.children.contains(&child_id) {
return Ok(());
}
syn_dag.children.push(child_id);
self.absent_dag_store.save_absent_block(vec![syn_dag])?;
self.absent_dag_store
.save_absent_block(vec![syn_dag])
.map_err(|e| format_err!("Failed to save updated DAG sync block: {:?}", e))?;
Ok(())
}
}
16 changes: 12 additions & 4 deletions sync/src/tasks/block_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::store::sync_dag_store::SyncDagStore;
use crate::tasks::continue_execute_absent_block::ContinueExecuteAbsentBlock;
use crate::tasks::{BlockConnectedEvent, BlockConnectedEventHandle, BlockFetcher, BlockLocalStore};
use crate::verified_rpc_client::RpcVerifyError;
use anyhow::{format_err, Result};
use anyhow::{format_err, Context, Result};
use bcs_ext::BCSCodec;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -209,7 +209,10 @@ where
H: BlockConnectedEventHandle + 'static,
{
fn has_dag_block(&self, block_id: HashValue) -> anyhow::Result<bool> {
self.chain.dag().has_dag_block(block_id)
self.chain
.dag()
.has_dag_block(block_id)
.context("Failed to check if DAG block exists")
}

fn apply(&mut self, block: Block) -> anyhow::Result<ExecutedBlock> {
Expand All @@ -229,6 +232,7 @@ where
BlockConnectAction::ConnectNewBlock,
self.check_enough_by_info(block_info)?,
)
.context("Failed to notify connected block")
}
}

Expand Down Expand Up @@ -439,8 +443,11 @@ where
.await?;

let sync_dag_store = self.sync_dag_store.clone();
let mut absent_block_iter = sync_dag_store.iter_at_first()?;
let mut absent_block_iter = sync_dag_store
.iter_at_first()
.context("Failed to create iterator for sync_dag_store")?;
loop {
debug!("start to read local absent block and try to execute the dag if its parents are ready.");
let mut local_absent_block = vec![];
match self.read_local_absent_block(&mut absent_block_iter, &mut local_absent_block)
{
Expand All @@ -449,7 +456,8 @@ where
info!("absent block is empty, continue to sync");
break;
}
self.execute_absent_block(&mut local_absent_block)?;
self.execute_absent_block(&mut local_absent_block)
.context("Failed to execute absent block")?;
}
Err(e) => {
error!("failed to read local absent block, error: {:?}", e);
Expand Down
12 changes: 10 additions & 2 deletions sync/src/tasks/continue_execute_absent_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::anyhow;
use starcoin_chain_api::ExecutedBlock;
use starcoin_crypto::HashValue;
use starcoin_logger::prelude::{error, info};
use starcoin_logger::prelude::{debug, error, info};
use starcoin_storage::Store;
use starcoin_types::block::{Block, BlockHeader};
use stream_task::CollectorState;
Expand Down Expand Up @@ -135,7 +135,6 @@ impl<'a> ContinueExecuteAbsentBlock<'a> {
match self.operator.has_dag_block(block.header().id()) {
Ok(has) => {
if has {
info!("{:?} was already applied", block.header().id());
false // remove the executed block
} else {
true // retain the un-executed block
Expand All @@ -161,12 +160,21 @@ impl<'a> ContinueExecuteAbsentBlock<'a> {
executed_block.block.header().number()
);

let block_id = executed_block.block.id();
let block_number = executed_block.block.header().number();

debug!("start to execute the children blocks if the parent: {:?}, number: {:?} was executed.", block_id, block_number);
self.execute_if_parent_ready_norecursion(executed_block.block.id())?;
debug!("finish to execute the children blocks if the parent: {:?}, number: {:?} was executed.", block_id, block_number);

debug!("delete from the local store after the block was executed: {:?}, number: {:?}", block_id, block_number);
self.local_store
.delete_dag_sync_block(executed_block.block.id())?;

debug!("notify the collector after the block was executed: {:?}, number: {:?}", block_id, block_number);
self.operator.notify(executed_block)?;

debug!("finish to process the block: {:?}, number: {:?}", block_id, block_number);
}
// delete the block anyway
self.sync_dag_store
Expand Down
3 changes: 2 additions & 1 deletion sync/src/tasks/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ impl SyncNodeMocker {
None,
);
let peer_selector = PeerSelector::new(vec![peer_info], PeerStrategy::default(), None);
let sync_dag_store = SyncDagStore::create_for_testing()?;
let sync_dag_store = SyncDagStore::create_for_testing()
.context("Failed to create SyncDagStore for testing")?;
Ok(Self::new_inner(
peer_id,
chain,
Expand Down
3 changes: 1 addition & 2 deletions types/src/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,8 +864,7 @@ impl Block {

pub fn random() -> Self {
let body = BlockBody::sample();
let mut header = BlockHeader::random();
header.body_hash = body.hash();
let header = BlockHeader::random();

Self { header, body }
}
Expand Down

0 comments on commit 08c03b2

Please sign in to comment.