Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzhhuang committed Jul 4, 2024
1 parent 3a07fc7 commit 883ddf4
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 120 deletions.
5 changes: 5 additions & 0 deletions flexidag/src/blockdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ impl BlockDAG {
header.id(),
reachability::ReachabilityError::DataInconsistency
);
self.set_reindex_root(origin)?;
bail!(
"failed to add a block: {:?} when committing for data inconsistency.",
header.id()
);
}
Err(reachability::ReachabilityError::StoreError(StoreError::KeyNotFound(msg))) => {
if msg == *REINDEX_ROOT_KEY.to_string() {
Expand Down
62 changes: 0 additions & 62 deletions sync/src/store/sync_absent_ancestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,65 +168,3 @@ impl AbsentDagBlockStoreWriter for SyncAbsentBlockStore {
Ok(())
}
}
/////////////////////////////////////////////////////////////////////////////

// #[cfg(test)]
// mod tests {
// use std::ops::DerefMut;

// use super::*;
// use crate::consensusdb::prelude::{FlexiDagStorage, FlexiDagStorageConfig};

// #[test]
// fn test_db_relations_store() {
// let db_tempdir = tempfile::tempdir().unwrap();
// let config = FlexiDagStorageConfig::new();

// let db = FlexiDagStorage::create_from_path(db_tempdir.path(), config)
// .expect("failed to create flexidag storage");
// test_relations_store(db.relations_store.write().deref_mut().clone());
// }

// fn test_relations_store<T: RelationsStore>(store: T) {
// let parents = [
// (1, vec![]),
// (2, vec![1]),
// (3, vec![1]),
// (4, vec![2, 3]),
// (5, vec![1, 4]),
// ];
// for (i, vec) in parents.iter().cloned() {
// store
// .insert(
// i.into(),
// BlockHashes::new(vec.iter().copied().map(Hash::from).collect()),
// )
// .unwrap();
// }

// let expected_children = [
// (1, vec![2, 3, 5]),
// (2, vec![4]),
// (3, vec![4]),
// (4, vec![5]),
// (5, vec![]),
// ];
// for (i, vec) in expected_children {
// assert!(store
// .get_children(i.into())
// .unwrap()
// .iter()
// .copied()
// .eq(vec.iter().copied().map(Hash::from)));
// }

// for (i, vec) in parents {
// assert!(store
// .get_parents(i.into())
// .unwrap()
// .iter()
// .copied()
// .eq(vec.iter().copied().map(Hash::from)));
// }
// }
// }
4 changes: 1 addition & 3 deletions sync/src/tasks/block_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ where
self.find_absent_ancestor(vec![block_header.clone()])
.await?;

let mut absent_ancestor = vec![];
let sync_dag_store = self.sync_dag_store.clone();
let mut absent_block_iter = sync_dag_store.iter_at_first()?;
loop {
Expand All @@ -476,8 +475,7 @@ where
info!("absent block is empty, continue to sync");
break;
}
absent_ancestor.extend(local_absent_block);
self.execute_absent_block(&mut absent_ancestor)?;
self.execute_absent_block(&mut local_absent_block)?;
}
Err(e) => {
error!("failed to read local absent block, error: {:?}", e);
Expand Down
92 changes: 40 additions & 52 deletions sync/src/tasks/continue_execute_absent_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ impl<'a> ContinueExecuteAbsentBlock<'a> {
parent_id,
)
})?;
let mut executed_children = vec![];
for child in &parent_block.children {
let child_block =
self.local_store
Expand Down Expand Up @@ -84,7 +83,6 @@ impl<'a> ContinueExecuteAbsentBlock<'a> {
executed_block.block.id(),
executed_block.block.header().number()
);
executed_children.push(*child);
self.operator.notify(executed_block)?;
next_parent_blocks.push(*child);
}
Expand Down Expand Up @@ -133,58 +131,48 @@ impl<'a> ContinueExecuteAbsentBlock<'a> {
if absent_ancestor.is_empty() {
return anyhow::Result::Ok(());
}
// let mut process_dag_ancestors = HashMap::new();
let mut max_loop_count = absent_ancestor.len();
loop {
absent_ancestor.retain(|block| {
match self.operator.has_dag_block(block.header().id()) {
Ok(has) => {
if has {
info!("{:?} was already applied", block.header().id());
false // remove the executed block
} else {
true // retain the un-executed block
}
absent_ancestor.retain(|block| {
match self.operator.has_dag_block(block.header().id()) {
Ok(has) => {
if has {
info!("{:?} was already applied", block.header().id());
false // remove the executed block
} else {
true // retain the un-executed block
}
Err(_) => true, // retain the un-executed block
}
});

let result: anyhow::Result<()> = absent_ancestor.iter().try_for_each(|block| {
if self.check_parents_exist(block.header())? {
info!(
"now apply for sync after fetching a dag block: {:?}, number: {:?}",
block.id(),
block.header().number()
);
let executed_block = self.operator.apply(block.clone())?;
info!(
"succeed to apply a dag block: {:?}, number: {:?}",
executed_block.block.id(),
executed_block.block.header().number()
);

self.execute_if_parent_ready_norecursion(executed_block.block.id())?;

self.local_store
.delete_dag_sync_block(executed_block.block.id())?;

self.sync_dag_store.delete_dag_sync_block(
executed_block.block.header().number(),
executed_block.block.id(),
)?;

self.operator.notify(executed_block)?;
}
anyhow::Result::Ok(())
});
result?;

max_loop_count = max_loop_count.saturating_sub(1);
if max_loop_count == 0 {
break;
Err(_) => true, // retain the un-executed block
}
}
Ok(())
});

let result: anyhow::Result<()> = absent_ancestor.iter().try_for_each(|block| {
if self.check_parents_exist(block.header())? {
info!(
"now apply for sync after fetching a dag block: {:?}, number: {:?}",
block.id(),
block.header().number()
);
let executed_block = self.operator.apply(block.clone())?;
info!(
"succeed to apply a dag block: {:?}, number: {:?}",
executed_block.block.id(),
executed_block.block.header().number()
);

self.execute_if_parent_ready_norecursion(executed_block.block.id())?;

self.local_store
.delete_dag_sync_block(executed_block.block.id())?;

self.sync_dag_store.delete_dag_sync_block(
executed_block.block.header().number(),
executed_block.block.id(),
)?;

self.operator.notify(executed_block)?;
}
anyhow::Result::Ok(())
});
result
}
}
4 changes: 1 addition & 3 deletions sync/src/tasks/inner_sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ where
self.sync_dag_store.clone(),
);

let mut absent_ancestor = vec![];
let mut absent_block_iter = self.sync_dag_store.iter_at_first()?;
loop {
let mut local_absent_block = vec![];
Expand All @@ -173,8 +172,7 @@ where
info!("absent block is empty, continue to sync");
break;
}
absent_ancestor.extend(local_absent_block);
block_collector.execute_absent_block(&mut absent_ancestor)?;
block_collector.execute_absent_block(&mut local_absent_block)?;
}
Err(e) => {
error!("failed to read local absent block, error: {:?}", e);
Expand Down
46 changes: 46 additions & 0 deletions sync/src/tasks/tests_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,49 @@ async fn test_sync_dag_blocks() -> Result<()> {

Ok(())
}

#[stest::test(timeout = 600)]
async fn test_continue_sync_dag_blocks() -> Result<()> {
let test_system = super::test_tools::SyncTestSystem::initialize_sync_system()
.await
.expect("failed to init system");

let one_fork_count = 30;
let two_fork_count = 20;

let mut target_node = Arc::new(test_system.target_node);
let local_node = Arc::new(test_system.local_node);
Arc::get_mut(&mut target_node)
.unwrap()
.produce_fork_chain(one_fork_count, two_fork_count)?;

/////
let target_dag_genesis_header_id = target_node.chain().get_block_dag_genesis()?;
let local_dag_genesis_header_id = local_node.chain().get_block_dag_genesis()?;

assert_eq!(target_dag_genesis_header_id, local_dag_genesis_header_id);

let dag_genesis_header = target_node
.get_storage()
.get_block_header_by_hash(target_dag_genesis_header_id)?
.ok_or_else(|| format_err!("dag genesis header should exist."))?;
assert!(
dag_genesis_header.number() == 0,
"dag genesis header number should be 0, but {:?}",
dag_genesis_header.number()
);

// sync, the local and target will be a single chain to be a dag chain
let (local_node, mut target_node) =
sync_block_process(target_node, local_node, &test_system.registry).await?;

Arc::get_mut(&mut target_node)
.unwrap()
.produce_fork_chain(20, 25)?;

Arc::get_mut(&mut target_node).unwrap().produce_block(3)?;

sync_block_process(target_node, local_node, &test_system.registry).await?;

Ok(())
}

0 comments on commit 883ddf4

Please sign in to comment.