diff --git a/flexidag/src/blockdag.rs b/flexidag/src/blockdag.rs index 35284f42da..6010ab6169 100644 --- a/flexidag/src/blockdag.rs +++ b/flexidag/src/blockdag.rs @@ -171,6 +171,11 @@ impl BlockDAG { header.id(), reachability::ReachabilityError::DataInconsistency ); + self.set_reindex_root(origin)?; + bail!( + "failed to add a block: {:?} when committing for data inconsistency.", + header.id() + ); } Err(reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg))) => { if msg == *REINDEX_ROOT_KEY.to_string() { diff --git a/sync/src/store/sync_absent_ancestor.rs b/sync/src/store/sync_absent_ancestor.rs index 64b1598a88..c3a6cea28b 100644 --- a/sync/src/store/sync_absent_ancestor.rs +++ b/sync/src/store/sync_absent_ancestor.rs @@ -168,65 +168,3 @@ impl AbsentDagBlockStoreWriter for SyncAbsentBlockStore { Ok(()) } } -///////////////////////////////////////////////////////////////////////////// - -// #[cfg(test)] -// mod tests { -// use std::ops::DerefMut; - -// use super::*; -// use crate::consensusdb::prelude::{FlexiDagStorage, FlexiDagStorageConfig}; - -// #[test] -// fn test_db_relations_store() { -// let db_tempdir = tempfile::tempdir().unwrap(); -// let config = FlexiDagStorageConfig::new(); - -// let db = FlexiDagStorage::create_from_path(db_tempdir.path(), config) -// .expect("failed to create flexidag storage"); -// test_relations_store(db.relations_store.write().deref_mut().clone()); -// } - -// fn test_relations_store(store: T) { -// let parents = [ -// (1, vec![]), -// (2, vec![1]), -// (3, vec![1]), -// (4, vec![2, 3]), -// (5, vec![1, 4]), -// ]; -// for (i, vec) in parents.iter().cloned() { -// store -// .insert( -// i.into(), -// BlockHashes::new(vec.iter().copied().map(Hash::from).collect()), -// ) -// .unwrap(); -// } - -// let expected_children = [ -// (1, vec![2, 3, 5]), -// (2, vec![4]), -// (3, vec![4]), -// (4, vec![5]), -// (5, vec![]), -// ]; -// for (i, vec) in expected_children { -// assert!(store -// .get_children(i.into()) -// .unwrap() -// .iter() -// .copied() -// .eq(vec.iter().copied().map(Hash::from))); -// } - -// for (i, vec) in parents { -// assert!(store -// .get_parents(i.into()) -// .unwrap() -// .iter() -// .copied() -// .eq(vec.iter().copied().map(Hash::from))); -// } -// } -// } diff --git a/sync/src/tasks/block_sync_task.rs b/sync/src/tasks/block_sync_task.rs index 6ae4d2c2a7..597f9e96dd 100644 --- a/sync/src/tasks/block_sync_task.rs +++ b/sync/src/tasks/block_sync_task.rs @@ -464,7 +464,6 @@ where self.find_absent_ancestor(vec![block_header.clone()]) .await?; - let mut absent_ancestor = vec![]; let sync_dag_store = self.sync_dag_store.clone(); let mut absent_block_iter = sync_dag_store.iter_at_first()?; loop { @@ -476,8 +475,7 @@ where info!("absent block is empty, continue to sync"); break; } - absent_ancestor.extend(local_absent_block); - self.execute_absent_block(&mut absent_ancestor)?; + self.execute_absent_block(&mut local_absent_block)?; } Err(e) => { error!("failed to read local absent block, error: {:?}", e); diff --git a/sync/src/tasks/continue_execute_absent_block.rs b/sync/src/tasks/continue_execute_absent_block.rs index c07b1fac1f..aa98621079 100644 --- a/sync/src/tasks/continue_execute_absent_block.rs +++ b/sync/src/tasks/continue_execute_absent_block.rs @@ -50,7 +50,6 @@ impl<'a> ContinueExecuteAbsentBlock<'a> { parent_id, ) })?; - let mut executed_children = vec![]; for child in &parent_block.children { let child_block = self.local_store @@ -84,7 +83,6 @@ impl<'a> ContinueExecuteAbsentBlock<'a> { executed_block.block.id(), executed_block.block.header().number() ); - executed_children.push(*child); self.operator.notify(executed_block)?; next_parent_blocks.push(*child); } @@ -133,58 +131,48 @@ impl<'a> ContinueExecuteAbsentBlock<'a> { if absent_ancestor.is_empty() { return anyhow::Result::Ok(()); } - // let mut process_dag_ancestors = HashMap::new(); - let mut max_loop_count = absent_ancestor.len(); - loop { - absent_ancestor.retain(|block| { - match self.operator.has_dag_block(block.header().id()) { - Ok(has) => { - if has { - info!("{:?} was already applied", block.header().id()); - false // remove the executed block - } else { - true // retain the un-executed block - } + absent_ancestor.retain(|block| { + match self.operator.has_dag_block(block.header().id()) { + Ok(has) => { + if has { + info!("{:?} was already applied", block.header().id()); + false // remove the executed block + } else { + true // retain the un-executed block } - Err(_) => true, // retain the un-executed block } - }); - - let result: anyhow::Result<()> = absent_ancestor.iter().try_for_each(|block| { - if self.check_parents_exist(block.header())? { - info!( - "now apply for sync after fetching a dag block: {:?}, number: {:?}", - block.id(), - block.header().number() - ); - let executed_block = self.operator.apply(block.clone())?; - info!( - "succeed to apply a dag block: {:?}, number: {:?}", - executed_block.block.id(), - executed_block.block.header().number() - ); - - self.execute_if_parent_ready_norecursion(executed_block.block.id())?; - - self.local_store - .delete_dag_sync_block(executed_block.block.id())?; - - self.sync_dag_store.delete_dag_sync_block( - executed_block.block.header().number(), - executed_block.block.id(), - )?; - - self.operator.notify(executed_block)?; - } - anyhow::Result::Ok(()) - }); - result?; - - max_loop_count = max_loop_count.saturating_sub(1); - if max_loop_count == 0 { - break; + Err(_) => true, // retain the un-executed block } - } - Ok(()) + }); + + let result: anyhow::Result<()> = absent_ancestor.iter().try_for_each(|block| { + if self.check_parents_exist(block.header())? { + info!( + "now apply for sync after fetching a dag block: {:?}, number: {:?}", + block.id(), + block.header().number() + ); + let executed_block = self.operator.apply(block.clone())?; + info!( + "succeed to apply a dag block: {:?}, number: {:?}", + executed_block.block.id(), + executed_block.block.header().number() + ); + + self.execute_if_parent_ready_norecursion(executed_block.block.id())?; + + self.local_store + .delete_dag_sync_block(executed_block.block.id())?; + + self.sync_dag_store.delete_dag_sync_block( + executed_block.block.header().number(), + executed_block.block.id(), + )?; + + self.operator.notify(executed_block)?; + } + anyhow::Result::Ok(()) + }); + result } } diff --git a/sync/src/tasks/inner_sync_task.rs b/sync/src/tasks/inner_sync_task.rs index a280f0e414..cec54ca413 100644 --- a/sync/src/tasks/inner_sync_task.rs +++ b/sync/src/tasks/inner_sync_task.rs @@ -161,7 +161,6 @@ where self.sync_dag_store.clone(), ); - let mut absent_ancestor = vec![]; let mut absent_block_iter = self.sync_dag_store.iter_at_first()?; loop { let mut local_absent_block = vec![]; @@ -173,8 +172,7 @@ where info!("absent block is empty, continue to sync"); break; } - absent_ancestor.extend(local_absent_block); - block_collector.execute_absent_block(&mut absent_ancestor)?; + block_collector.execute_absent_block(&mut local_absent_block)?; } Err(e) => { error!("failed to read local absent block, error: {:?}", e); diff --git a/sync/src/tasks/tests_dag.rs b/sync/src/tasks/tests_dag.rs index 5ce9294e34..2efcc036e9 100644 --- a/sync/src/tasks/tests_dag.rs +++ b/sync/src/tasks/tests_dag.rs @@ -122,3 +122,49 @@ async fn test_sync_dag_blocks() -> Result<()> { Ok(()) } + +#[stest::test(timeout = 600)] +async fn test_continue_sync_dag_blocks() -> Result<()> { + let test_system = super::test_tools::SyncTestSystem::initialize_sync_system() + .await + .expect("failed to init system"); + + let one_fork_count = 30; + let two_fork_count = 20; + + let mut target_node = Arc::new(test_system.target_node); + let local_node = Arc::new(test_system.local_node); + Arc::get_mut(&mut target_node) + .unwrap() + .produce_fork_chain(one_fork_count, two_fork_count)?; + + ///// + let target_dag_genesis_header_id = target_node.chain().get_block_dag_genesis()?; + let local_dag_genesis_header_id = local_node.chain().get_block_dag_genesis()?; + + assert_eq!(target_dag_genesis_header_id, local_dag_genesis_header_id); + + let dag_genesis_header = target_node + .get_storage() + .get_block_header_by_hash(target_dag_genesis_header_id)? + .ok_or_else(|| format_err!("dag genesis header should exist."))?; + assert!( + dag_genesis_header.number() == 0, + "dag genesis header number should be 0, but {:?}", + dag_genesis_header.number() + ); + + // sync, the local and target will be a single chain to be a dag chain + let (local_node, mut target_node) = + sync_block_process(target_node, local_node, &test_system.registry).await?; + + Arc::get_mut(&mut target_node) + .unwrap() + .produce_fork_chain(20, 25)?; + + Arc::get_mut(&mut target_node).unwrap().produce_block(3)?; + + sync_block_process(target_node, local_node, &test_system.registry).await?; + + Ok(()) +}