Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(perf): replace sync struct HeaderView with HeaderIndexView #3970

Merged
merged 1 commit into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion resource/ckb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ bootnode_mode = false
support_protocols = ["Ping", "Discovery", "Identify", "Feeler", "DisconnectMessage", "Sync", "Relay", "Time", "Alert", "LightClient", "Filter"]

# [network.sync.header_map]
# memory_limit = "600MB"
# memory_limit = "256MB"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the default memory limit to smaller one, it can keep about the same amount of header data in memory as before.


[rpc]
# By default RPC only binds to localhost, thus it only allows accessing from the same machine.
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/module/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ckb_logger::error;
use ckb_reward_calculator::RewardCalculator;
use ckb_shared::{shared::Shared, Snapshot};
use ckb_store::{data_loader_wrapper::AsDataLoader, ChainStore};
use ckb_traits::HeaderProvider;
use ckb_traits::HeaderFieldsProvider;
use ckb_types::core::tx_pool::TransactionWithStatus;
use ckb_types::{
core::{
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/module/stats.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use ckb_jsonrpc_types::{AlertMessage, ChainInfo, DeploymentInfo, DeploymentPos, DeploymentsInfo};
use ckb_network_alert::notifier::Notifier as AlertNotifier;
use ckb_shared::shared::Shared;
use ckb_traits::HeaderProvider;
use ckb_traits::HeaderFieldsProvider;
use ckb_types::prelude::Unpack;
use ckb_util::Mutex;
use jsonrpc_core::Result;
Expand Down
19 changes: 18 additions & 1 deletion store/src/data_loader_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! TODO(doc): @quake
use crate::ChainStore;
use ckb_traits::{CellDataProvider, EpochProvider, HeaderProvider};
use ckb_traits::{
CellDataProvider, EpochProvider, HeaderFields, HeaderFieldsProvider, HeaderProvider,
};
use ckb_types::{
bytes::Bytes,
core::{BlockExt, BlockNumber, EpochExt, HeaderView},
Expand Down Expand Up @@ -56,6 +58,21 @@ where
}
}

impl<T> HeaderFieldsProvider for DataLoaderWrapper<T>
where
T: ChainStore,
{
fn get_header_fields(&self, hash: &Byte32) -> Option<HeaderFields> {
self.0.get_block_header(hash).map(|header| HeaderFields {
number: header.number(),
epoch: header.epoch(),
parent_hash: header.data().raw().parent_hash(),
timestamp: header.timestamp(),
hash: header.hash(),
})
}
}

impl<T> EpochProvider for DataLoaderWrapper<T>
where
T: ChainStore,
Expand Down
35 changes: 25 additions & 10 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ckb_chain_spec::consensus::Consensus;
use ckb_logger::{self, debug_target};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_systemtime::unix_time_as_millis;
use ckb_traits::HeaderProvider;
use ckb_traits::{HeaderFields, HeaderFieldsProvider};
use ckb_types::{
core::{EpochNumberWithFraction, HeaderView},
packed::{self, Byte32, CompactBlock},
Expand Down Expand Up @@ -167,11 +167,11 @@ impl<'a> CompactBlockProcess<'a> {
}

struct CompactBlockMedianTimeView<'a> {
fn_get_pending_header: Box<dyn Fn(packed::Byte32) -> Option<HeaderView> + 'a>,
fn_get_pending_header: Box<dyn Fn(packed::Byte32) -> Option<HeaderFields> + 'a>,
}

impl<'a> HeaderProvider for CompactBlockMedianTimeView<'a> {
fn get_header(&self, hash: &packed::Byte32) -> Option<HeaderView> {
impl<'a> HeaderFieldsProvider for CompactBlockMedianTimeView<'a> {
fn get_header_fields(&self, hash: &packed::Byte32) -> Option<HeaderFields> {
// Note: don't query store because we already did that in `fn_get_pending_header -> get_header_view`.
(self.fn_get_pending_header)(hash.to_owned())
}
Expand Down Expand Up @@ -233,7 +233,7 @@ fn contextual_check(
if status.contains(BlockStatus::BLOCK_STORED) {
// update last common header and best known
let parent = shared
.get_header_view(&compact_block_header.data().raw().parent_hash(), Some(true))
.get_header_index_view(&compact_block_header.data().raw().parent_hash(), true)
.expect("parent block must exist");

let header_index = HeaderIndex::new(
Expand All @@ -253,9 +253,9 @@ fn contextual_check(
}

let store_first = tip.number() + 1 >= compact_block_header.number();
let parent = shared.get_header_view(
let parent = shared.get_header_index_view(
&compact_block_header.data().raw().parent_hash(),
Some(store_first),
store_first,
);
if parent.is_none() {
debug_target!(
Expand Down Expand Up @@ -287,11 +287,26 @@ fn contextual_check(
|block_hash| {
pending_compact_blocks
.get(&block_hash)
.map(|(compact_block, _, _)| compact_block.header().into_view())
.map(|(compact_block, _, _)| {
let header = compact_block.header().into_view();
HeaderFields {
hash: header.hash(),
number: header.number(),
epoch: header.epoch(),
timestamp: header.timestamp(),
parent_hash: header.parent_hash(),
}
})
.or_else(|| {
shared
.get_header_view(&block_hash, None)
.map(|header_view| header_view.into_inner())
.get_header_index_view(&block_hash, false)
.map(|header| HeaderFields {
hash: header.hash(),
number: header.number(),
epoch: header.epoch(),
timestamp: header.timestamp(),
parent_hash: header.parent_hash(),
})
})
}
};
Expand Down
6 changes: 3 additions & 3 deletions sync/src/relayer/tests/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ fn test_accept_block() {
fn test_ignore_a_too_old_block() {
let (relayer, _) = build_chain(1804);

let active_chain = relayer.shared.active_chain();
let parent = active_chain.tip_header();
let parent = active_chain.get_ancestor(&parent.hash(), 2).unwrap();
let snapshot = relayer.shared.shared().snapshot();
let parent = snapshot.tip_header();
let parent = snapshot.get_ancestor(&parent.hash(), 2).unwrap();

let too_old_block = new_header_builder(relayer.shared.shared(), &parent).build();

Expand Down
23 changes: 7 additions & 16 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::block_status::BlockStatus;
use crate::synchronizer::Synchronizer;
use crate::types::{ActiveChain, BlockNumberAndHash, HeaderIndex, IBDState};
use crate::types::{ActiveChain, BlockNumberAndHash, HeaderIndex, HeaderIndexView, IBDState};
use ckb_constant::sync::{
BLOCK_DOWNLOAD_WINDOW, CHECK_POINT_WINDOW, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
};
use ckb_logger::{debug, trace};
use ckb_network::PeerIndex;
use ckb_systemtime::unix_time_as_millis;
use ckb_types::{core, packed};
use ckb_types::packed;
use std::cmp::min;

pub struct BlockFetcher<'a> {
Expand Down Expand Up @@ -86,15 +86,10 @@ impl<'a> BlockFetcher<'a> {
while let Some(hash) = state.peers().take_unknown_last(self.peer) {
// Here we need to first try search from headermap, if not, fallback to search from the db.
// if not search from db, it can stuck here when the headermap may have been removed just as the block was downloaded
if let Some(header) = self.synchronizer.shared.get_header_view(&hash, None) {
let header_index = HeaderIndex::new(
header.number(),
header.hash(),
header.total_difficulty().clone(),
);
if let Some(header) = self.synchronizer.shared.get_header_index_view(&hash, false) {
state
.peers()
.may_set_best_known_header(self.peer, header_index);
.may_set_best_known_header(self.peer, header.as_header_index());
} else {
state.peers().insert_unknown_header_hash(self.peer, hash);
break;
Expand Down Expand Up @@ -155,7 +150,7 @@ impl<'a> BlockFetcher<'a> {
// So we can skip the search of this space directly
self.synchronizer
.peers()
.set_last_common_header(self.peer, (&header).into());
.set_last_common_header(self.peer, header.number_and_hash());
end = min(best_known.number(), header.number() + BLOCK_DOWNLOAD_WINDOW);
break;
} else if status.contains(BlockStatus::BLOCK_RECEIVED) {
Expand All @@ -171,11 +166,7 @@ impl<'a> BlockFetcher<'a> {
header = self
.synchronizer
.shared
.get_header_view(
&parent_hash,
Some(status.contains(BlockStatus::BLOCK_STORED)),
)?
.into_inner();
.get_header_index_view(&parent_hash, false)?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have checked the status if contains BLOCK_STORED in line 148 and returned early, no need to check it again here.

}

// Move `start` forward
Expand Down Expand Up @@ -209,7 +200,7 @@ impl<'a> BlockFetcher<'a> {
Some(
fetch
.chunks(INIT_BLOCKS_IN_TRANSIT_PER_PEER)
.map(|headers| headers.iter().map(core::HeaderView::hash).collect())
.map(|headers| headers.iter().map(HeaderIndexView::hash).collect())
.collect(),
)
}
Expand Down
17 changes: 9 additions & 8 deletions sync/src/synchronizer/headers_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ckb_constant::sync::MAX_HEADERS_LEN;
use ckb_error::Error;
use ckb_logger::{debug, log_enabled, warn, Level};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_traits::HeaderProvider;
use ckb_traits::HeaderFieldsProvider;
use ckb_types::{core, packed, prelude::*};
use ckb_verification::{HeaderError, HeaderVerifier};
use ckb_verification_traits::Verifier;
Expand Down Expand Up @@ -209,14 +209,14 @@ impl<'a> HeadersProcess<'a> {
}
}

pub struct HeaderAcceptor<'a, DL: HeaderProvider> {
pub struct HeaderAcceptor<'a, DL: HeaderFieldsProvider> {
header: &'a core::HeaderView,
active_chain: ActiveChain,
peer: PeerIndex,
verifier: HeaderVerifier<'a, DL>,
}

impl<'a, DL: HeaderProvider> HeaderAcceptor<'a, DL> {
impl<'a, DL: HeaderFieldsProvider> HeaderAcceptor<'a, DL> {
pub fn new(
header: &'a core::HeaderView,
peer: PeerIndex,
Expand Down Expand Up @@ -283,15 +283,16 @@ impl<'a, DL: HeaderProvider> HeaderAcceptor<'a, DL> {
// type should we return?
let status = self.active_chain.get_block_status(&self.header.hash());
if status.contains(BlockStatus::HEADER_VALID) {
let header_view = shared
.get_header_view(
let header_index = shared
.get_header_index_view(
&self.header.hash(),
Some(status.contains(BlockStatus::BLOCK_STORED)),
status.contains(BlockStatus::BLOCK_STORED),
)
.expect("header with HEADER_VALID should exist");
.expect("header with HEADER_VALID should exist")
.as_header_index();
state
.peers()
.may_set_best_known_header(self.peer, header_view.as_header_index());
.may_set_best_known_header(self.peer, header_index);
return result;
}

Expand Down
16 changes: 10 additions & 6 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub(crate) use self::headers_process::HeadersProcess;
pub(crate) use self::in_ibd_process::InIBDProcess;

use crate::block_status::BlockStatus;
use crate::types::{HeadersSyncController, IBDState, Peers, SyncShared};
use crate::types::{HeaderIndexView, HeadersSyncController, IBDState, Peers, SyncShared};
use crate::utils::{metric_ckb_message_bytes, send_message_to, MetricDirection};
use crate::{Status, StatusCode};

Expand Down Expand Up @@ -305,7 +305,7 @@ impl Synchronizer {
self.shared().state().peers()
}

fn better_tip_header(&self) -> core::HeaderView {
fn better_tip_header(&self) -> HeaderIndexView {
let (header, total_difficulty) = {
let active_chain = self.shared.active_chain();
(
Expand All @@ -316,9 +316,9 @@ impl Synchronizer {
let best_known = self.shared.state().shared_best_header();
// is_better_chain
if total_difficulty > *best_known.total_difficulty() {
header
(header, total_difficulty).into()
} else {
best_known.into_inner()
best_known
}
}

Expand Down Expand Up @@ -387,7 +387,11 @@ impl Synchronizer {
continue;
}
} else {
active_chain.send_getheaders_to_peer(nc, *peer, (&better_tip_header).into());
active_chain.send_getheaders_to_peer(
nc,
*peer,
better_tip_header.number_and_hash(),
);
}
}

Expand Down Expand Up @@ -512,7 +516,7 @@ impl Synchronizer {
}

debug!("start sync peer={}", peer);
active_chain.send_getheaders_to_peer(nc, peer, (&tip).into());
active_chain.send_getheaders_to_peer(nc, peer, tip.number_and_hash());
}
}

Expand Down
5 changes: 3 additions & 2 deletions sync/src/tests/synchronizer/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,14 @@ fn test_get_ancestor() {
assert!(tip.is_some());
assert!(header.is_some());
assert!(noop.is_none());
assert_eq!(tip.unwrap(), shared.snapshot().tip_header().to_owned());
assert_eq!(tip.unwrap().hash(), shared.snapshot().tip_header().hash());
assert_eq!(
header.unwrap(),
header.unwrap().hash(),
shared
.store()
.get_block_header(&shared.store().get_block_hash(100).unwrap())
.unwrap()
.hash()
);
}

Expand Down
10 changes: 5 additions & 5 deletions sync/src/tests/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use std::{
sync::atomic::{AtomicUsize, Ordering::Relaxed},
};

use crate::types::{HeaderView, TtlFilter, FILTER_TTL};
use crate::types::{HeaderIndexView, TtlFilter, FILTER_TTL};

const SKIPLIST_LENGTH: u64 = 10_000;

#[test]
fn test_get_ancestor_use_skip_list() {
let mut header_map: HashMap<Byte32, HeaderView> = HashMap::default();
let mut header_map: HashMap<Byte32, HeaderIndexView> = HashMap::default();
let mut hashes: BTreeMap<BlockNumber, Byte32> = BTreeMap::default();

let mut parent_hash = None;
Expand All @@ -31,7 +31,7 @@ fn test_get_ancestor_use_skip_list() {
hashes.insert(number, header.hash());
parent_hash = Some(header.hash());

let mut view = HeaderView::new(header, U256::zero());
let mut view: HeaderIndexView = (header, U256::zero()).into();
view.build_skip(0, |hash, _| header_map.get(hash).cloned(), |_, _| None);
header_map.insert(view.hash(), view);
}
Expand All @@ -40,15 +40,15 @@ fn test_get_ancestor_use_skip_list() {
if *number > 0 {
let skip_view = header_map
.get(hash)
.and_then(|view| header_map.get(view.skip_hash.as_ref().unwrap()))
.and_then(|view| header_map.get(view.skip_hash().unwrap()))
.unwrap();
assert_eq!(
Some(skip_view.hash()).as_ref(),
hashes.get(&skip_view.number())
);
assert!(skip_view.number() < *number);
} else {
assert!(header_map[hash].skip_hash.is_none());
assert!(header_map[hash].skip_hash().is_none());
}
}

Expand Down
10 changes: 5 additions & 5 deletions sync/src/types/header_map/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::path;

use ckb_types::packed::Byte32;

use crate::types::HeaderView;
use crate::types::HeaderIndexView;

pub(crate) trait KeyValueBackend {
fn new<P>(tmpdir: Option<P>) -> Self
Expand All @@ -15,9 +15,9 @@ pub(crate) trait KeyValueBackend {
}

fn contains_key(&self, key: &Byte32) -> bool;
fn get(&self, key: &Byte32) -> Option<HeaderView>;
fn insert(&self, value: &HeaderView) -> Option<()>;
fn insert_batch(&self, values: &[HeaderView]);
fn remove(&self, key: &Byte32) -> Option<HeaderView>;
fn get(&self, key: &Byte32) -> Option<HeaderIndexView>;
fn insert(&self, value: &HeaderIndexView) -> Option<()>;
fn insert_batch(&self, values: &[HeaderIndexView]);
fn remove(&self, key: &Byte32) -> Option<HeaderIndexView>;
fn remove_no_return(&self, key: &Byte32);
}
Loading