Skip to content

Commit

Permalink
refactor: remove sync struct HeaderView
Browse files Browse the repository at this point in the history
  • Loading branch information
quake committed May 12, 2023
1 parent 989f7e6 commit 0c8e91e
Show file tree
Hide file tree
Showing 25 changed files with 515 additions and 337 deletions.
2 changes: 1 addition & 1 deletion resource/ckb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ bootnode_mode = false
support_protocols = ["Ping", "Discovery", "Identify", "Feeler", "DisconnectMessage", "Sync", "Relay", "Time", "Alert", "LightClient", "Filter"]

# [network.sync.header_map]
# memory_limit = "600MB"
# memory_limit = "256MB"

[rpc]
# By default RPC only binds to localhost, thus it only allows accessing from the same machine.
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/module/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ckb_logger::error;
use ckb_reward_calculator::RewardCalculator;
use ckb_shared::{shared::Shared, Snapshot};
use ckb_store::{data_loader_wrapper::AsDataLoader, ChainStore};
use ckb_traits::HeaderProvider;
use ckb_traits::HeaderFieldsProvider;
use ckb_types::core::tx_pool::TransactionWithStatus;
use ckb_types::{
core::{
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/module/stats.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use ckb_jsonrpc_types::{AlertMessage, ChainInfo, DeploymentInfo, DeploymentPos, DeploymentsInfo};
use ckb_network_alert::notifier::Notifier as AlertNotifier;
use ckb_shared::shared::Shared;
use ckb_traits::HeaderProvider;
use ckb_traits::HeaderFieldsProvider;
use ckb_types::prelude::Unpack;
use ckb_util::Mutex;
use jsonrpc_core::Result;
Expand Down
19 changes: 18 additions & 1 deletion store/src/data_loader_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! TODO(doc): @quake
use crate::ChainStore;
use ckb_traits::{CellDataProvider, EpochProvider, HeaderProvider};
use ckb_traits::{
CellDataProvider, EpochProvider, HeaderFields, HeaderFieldsProvider, HeaderProvider,
};
use ckb_types::{
bytes::Bytes,
core::{BlockExt, BlockNumber, EpochExt, HeaderView},
Expand Down Expand Up @@ -56,6 +58,21 @@ where
}
}

impl<T> HeaderFieldsProvider for DataLoaderWrapper<T>
where
T: ChainStore,
{
fn get_header_fields(&self, hash: &Byte32) -> Option<HeaderFields> {
self.0.get_block_header(hash).map(|header| HeaderFields {
number: header.number(),
epoch: header.epoch(),
parent_hash: header.data().raw().parent_hash(),
timestamp: header.timestamp(),
hash: header.hash(),
})
}
}

impl<T> EpochProvider for DataLoaderWrapper<T>
where
T: ChainStore,
Expand Down
35 changes: 25 additions & 10 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ckb_chain_spec::consensus::Consensus;
use ckb_logger::{self, debug_target};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_systemtime::unix_time_as_millis;
use ckb_traits::HeaderProvider;
use ckb_traits::{HeaderFields, HeaderFieldsProvider};
use ckb_types::{
core::{EpochNumberWithFraction, HeaderView},
packed::{self, Byte32, CompactBlock},
Expand Down Expand Up @@ -167,11 +167,11 @@ impl<'a> CompactBlockProcess<'a> {
}

struct CompactBlockMedianTimeView<'a> {
fn_get_pending_header: Box<dyn Fn(packed::Byte32) -> Option<HeaderView> + 'a>,
fn_get_pending_header: Box<dyn Fn(packed::Byte32) -> Option<HeaderFields> + 'a>,
}

impl<'a> HeaderProvider for CompactBlockMedianTimeView<'a> {
fn get_header(&self, hash: &packed::Byte32) -> Option<HeaderView> {
impl<'a> HeaderFieldsProvider for CompactBlockMedianTimeView<'a> {
fn get_header_fields(&self, hash: &packed::Byte32) -> Option<HeaderFields> {
// Note: don't query store because we already did that in `fn_get_pending_header -> get_header_view`.
(self.fn_get_pending_header)(hash.to_owned())
}
Expand Down Expand Up @@ -233,7 +233,7 @@ fn contextual_check(
if status.contains(BlockStatus::BLOCK_STORED) {
// update last common header and best known
let parent = shared
.get_header_view(&compact_block_header.data().raw().parent_hash(), Some(true))
.get_header_index_view(&compact_block_header.data().raw().parent_hash(), true)
.expect("parent block must exist");

let header_index = HeaderIndex::new(
Expand All @@ -253,9 +253,9 @@ fn contextual_check(
}

let store_first = tip.number() + 1 >= compact_block_header.number();
let parent = shared.get_header_view(
let parent = shared.get_header_index_view(
&compact_block_header.data().raw().parent_hash(),
Some(store_first),
store_first,
);
if parent.is_none() {
debug_target!(
Expand Down Expand Up @@ -287,11 +287,26 @@ fn contextual_check(
|block_hash| {
pending_compact_blocks
.get(&block_hash)
.map(|(compact_block, _, _)| compact_block.header().into_view())
.map(|(compact_block, _, _)| {
let header = compact_block.header().into_view();
HeaderFields {
hash: header.hash(),
number: header.number(),
epoch: header.epoch(),
timestamp: header.timestamp(),
parent_hash: header.parent_hash(),
}
})
.or_else(|| {
shared
.get_header_view(&block_hash, None)
.map(|header_view| header_view.into_inner())
.get_header_index_view(&block_hash, false)
.map(|header| HeaderFields {
hash: header.hash(),
number: header.number(),
epoch: header.epoch(),
timestamp: header.timestamp(),
parent_hash: header.parent_hash(),
})
})
}
};
Expand Down
6 changes: 3 additions & 3 deletions sync/src/relayer/tests/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ fn test_accept_block() {
fn test_ignore_a_too_old_block() {
let (relayer, _) = build_chain(1804);

let active_chain = relayer.shared.active_chain();
let parent = active_chain.tip_header();
let parent = active_chain.get_ancestor(&parent.hash(), 2).unwrap();
let snapshot = relayer.shared.shared().snapshot();
let parent = snapshot.tip_header();
let parent = snapshot.get_ancestor(&parent.hash(), 2).unwrap();

let too_old_block = new_header_builder(relayer.shared.shared(), &parent).build();

Expand Down
23 changes: 7 additions & 16 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::block_status::BlockStatus;
use crate::synchronizer::Synchronizer;
use crate::types::{ActiveChain, BlockNumberAndHash, HeaderIndex, IBDState};
use crate::types::{ActiveChain, BlockNumberAndHash, HeaderIndex, HeaderIndexView, IBDState};
use ckb_constant::sync::{
BLOCK_DOWNLOAD_WINDOW, CHECK_POINT_WINDOW, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
};
use ckb_logger::{debug, trace};
use ckb_network::PeerIndex;
use ckb_systemtime::unix_time_as_millis;
use ckb_types::{core, packed};
use ckb_types::packed;
use std::cmp::min;

pub struct BlockFetcher<'a> {
Expand Down Expand Up @@ -86,15 +86,10 @@ impl<'a> BlockFetcher<'a> {
while let Some(hash) = state.peers().take_unknown_last(self.peer) {
// Here we need to first try search from headermap, if not, fallback to search from the db.
// if not search from db, it can stuck here when the headermap may have been removed just as the block was downloaded
if let Some(header) = self.synchronizer.shared.get_header_view(&hash, None) {
let header_index = HeaderIndex::new(
header.number(),
header.hash(),
header.total_difficulty().clone(),
);
if let Some(header) = self.synchronizer.shared.get_header_index_view(&hash, false) {
state
.peers()
.may_set_best_known_header(self.peer, header_index);
.may_set_best_known_header(self.peer, header.as_header_index());
} else {
state.peers().insert_unknown_header_hash(self.peer, hash);
break;
Expand Down Expand Up @@ -155,7 +150,7 @@ impl<'a> BlockFetcher<'a> {
// So we can skip the search of this space directly
self.synchronizer
.peers()
.set_last_common_header(self.peer, (&header).into());
.set_last_common_header(self.peer, header.number_and_hash());
end = min(best_known.number(), header.number() + BLOCK_DOWNLOAD_WINDOW);
break;
} else if status.contains(BlockStatus::BLOCK_RECEIVED) {
Expand All @@ -171,11 +166,7 @@ impl<'a> BlockFetcher<'a> {
header = self
.synchronizer
.shared
.get_header_view(
&parent_hash,
Some(status.contains(BlockStatus::BLOCK_STORED)),
)?
.into_inner();
.get_header_index_view(&parent_hash, false)?;
}

// Move `start` forward
Expand Down Expand Up @@ -209,7 +200,7 @@ impl<'a> BlockFetcher<'a> {
Some(
fetch
.chunks(INIT_BLOCKS_IN_TRANSIT_PER_PEER)
.map(|headers| headers.iter().map(core::HeaderView::hash).collect())
.map(|headers| headers.iter().map(HeaderIndexView::hash).collect())
.collect(),
)
}
Expand Down
17 changes: 9 additions & 8 deletions sync/src/synchronizer/headers_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ckb_constant::sync::MAX_HEADERS_LEN;
use ckb_error::Error;
use ckb_logger::{debug, log_enabled, warn, Level};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_traits::HeaderProvider;
use ckb_traits::HeaderFieldsProvider;
use ckb_types::{core, packed, prelude::*};
use ckb_verification::{HeaderError, HeaderVerifier};
use ckb_verification_traits::Verifier;
Expand Down Expand Up @@ -209,14 +209,14 @@ impl<'a> HeadersProcess<'a> {
}
}

pub struct HeaderAcceptor<'a, DL: HeaderProvider> {
pub struct HeaderAcceptor<'a, DL: HeaderFieldsProvider> {
header: &'a core::HeaderView,
active_chain: ActiveChain,
peer: PeerIndex,
verifier: HeaderVerifier<'a, DL>,
}

impl<'a, DL: HeaderProvider> HeaderAcceptor<'a, DL> {
impl<'a, DL: HeaderFieldsProvider> HeaderAcceptor<'a, DL> {
pub fn new(
header: &'a core::HeaderView,
peer: PeerIndex,
Expand Down Expand Up @@ -283,15 +283,16 @@ impl<'a, DL: HeaderProvider> HeaderAcceptor<'a, DL> {
// type should we return?
let status = self.active_chain.get_block_status(&self.header.hash());
if status.contains(BlockStatus::HEADER_VALID) {
let header_view = shared
.get_header_view(
let header_index = shared
.get_header_index_view(
&self.header.hash(),
Some(status.contains(BlockStatus::BLOCK_STORED)),
status.contains(BlockStatus::BLOCK_STORED),
)
.expect("header with HEADER_VALID should exist");
.expect("header with HEADER_VALID should exist")
.as_header_index();
state
.peers()
.may_set_best_known_header(self.peer, header_view.as_header_index());
.may_set_best_known_header(self.peer, header_index);
return result;
}

Expand Down
16 changes: 10 additions & 6 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub(crate) use self::headers_process::HeadersProcess;
pub(crate) use self::in_ibd_process::InIBDProcess;

use crate::block_status::BlockStatus;
use crate::types::{HeadersSyncController, IBDState, Peers, SyncShared};
use crate::types::{HeaderIndexView, HeadersSyncController, IBDState, Peers, SyncShared};
use crate::utils::{metric_ckb_message_bytes, send_message_to, MetricDirection};
use crate::{Status, StatusCode};

Expand Down Expand Up @@ -305,7 +305,7 @@ impl Synchronizer {
self.shared().state().peers()
}

fn better_tip_header(&self) -> core::HeaderView {
fn better_tip_header(&self) -> HeaderIndexView {
let (header, total_difficulty) = {
let active_chain = self.shared.active_chain();
(
Expand All @@ -316,9 +316,9 @@ impl Synchronizer {
let best_known = self.shared.state().shared_best_header();
// is_better_chain
if total_difficulty > *best_known.total_difficulty() {
header
(header, total_difficulty).into()
} else {
best_known.into_inner()
best_known
}
}

Expand Down Expand Up @@ -387,7 +387,11 @@ impl Synchronizer {
continue;
}
} else {
active_chain.send_getheaders_to_peer(nc, *peer, (&better_tip_header).into());
active_chain.send_getheaders_to_peer(
nc,
*peer,
better_tip_header.number_and_hash(),
);
}
}

Expand Down Expand Up @@ -512,7 +516,7 @@ impl Synchronizer {
}

debug!("start sync peer={}", peer);
active_chain.send_getheaders_to_peer(nc, peer, (&tip).into());
active_chain.send_getheaders_to_peer(nc, peer, tip.number_and_hash());
}
}

Expand Down
5 changes: 3 additions & 2 deletions sync/src/tests/synchronizer/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,14 @@ fn test_get_ancestor() {
assert!(tip.is_some());
assert!(header.is_some());
assert!(noop.is_none());
assert_eq!(tip.unwrap(), shared.snapshot().tip_header().to_owned());
assert_eq!(tip.unwrap().hash(), shared.snapshot().tip_header().hash());
assert_eq!(
header.unwrap(),
header.unwrap().hash(),
shared
.store()
.get_block_header(&shared.store().get_block_hash(100).unwrap())
.unwrap()
.hash()
);
}

Expand Down
10 changes: 5 additions & 5 deletions sync/src/tests/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use std::{
sync::atomic::{AtomicUsize, Ordering::Relaxed},
};

use crate::types::{HeaderView, TtlFilter, FILTER_TTL};
use crate::types::{HeaderIndexView, TtlFilter, FILTER_TTL};

const SKIPLIST_LENGTH: u64 = 10_000;

#[test]
fn test_get_ancestor_use_skip_list() {
let mut header_map: HashMap<Byte32, HeaderView> = HashMap::default();
let mut header_map: HashMap<Byte32, HeaderIndexView> = HashMap::default();
let mut hashes: BTreeMap<BlockNumber, Byte32> = BTreeMap::default();

let mut parent_hash = None;
Expand All @@ -31,7 +31,7 @@ fn test_get_ancestor_use_skip_list() {
hashes.insert(number, header.hash());
parent_hash = Some(header.hash());

let mut view = HeaderView::new(header, U256::zero());
let mut view: HeaderIndexView = (header, U256::zero()).into();
view.build_skip(0, |hash, _| header_map.get(hash).cloned(), |_, _| None);
header_map.insert(view.hash(), view);
}
Expand All @@ -40,15 +40,15 @@ fn test_get_ancestor_use_skip_list() {
if *number > 0 {
let skip_view = header_map
.get(hash)
.and_then(|view| header_map.get(view.skip_hash.as_ref().unwrap()))
.and_then(|view| header_map.get(view.skip_hash().unwrap()))
.unwrap();
assert_eq!(
Some(skip_view.hash()).as_ref(),
hashes.get(&skip_view.number())
);
assert!(skip_view.number() < *number);
} else {
assert!(header_map[hash].skip_hash.is_none());
assert!(header_map[hash].skip_hash().is_none());
}
}

Expand Down
10 changes: 5 additions & 5 deletions sync/src/types/header_map/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::path;

use ckb_types::packed::Byte32;

use crate::types::HeaderView;
use crate::types::HeaderIndexView;

pub(crate) trait KeyValueBackend {
fn new<P>(tmpdir: Option<P>) -> Self
Expand All @@ -15,9 +15,9 @@ pub(crate) trait KeyValueBackend {
}

fn contains_key(&self, key: &Byte32) -> bool;
fn get(&self, key: &Byte32) -> Option<HeaderView>;
fn insert(&self, value: &HeaderView) -> Option<()>;
fn insert_batch(&self, values: &[HeaderView]);
fn remove(&self, key: &Byte32) -> Option<HeaderView>;
fn get(&self, key: &Byte32) -> Option<HeaderIndexView>;
fn insert(&self, value: &HeaderIndexView) -> Option<()>;
fn insert_batch(&self, values: &[HeaderIndexView]);
fn remove(&self, key: &Byte32) -> Option<HeaderIndexView>;
fn remove_no_return(&self, key: &Byte32);
}
Loading

0 comments on commit 0c8e91e

Please sign in to comment.