Skip to content

Commit

Permalink
fix: fix pending compact block memory bloat on abnormal flow
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Oct 19, 2021
1 parent ebf1d51 commit 21a09cd
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 37 deletions.
7 changes: 6 additions & 1 deletion sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ckb_logger::{self, debug_target};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_traits::HeaderProvider;
use ckb_types::{core, packed, prelude::*};
use ckb_util::shrink_to_fit;
use ckb_verification::{HeaderError, HeaderVerifier};
use ckb_verification_traits::Verifier;
use std::collections::HashMap;
Expand Down Expand Up @@ -204,7 +205,11 @@ impl<'a> CompactBlockProcess<'a> {
// into database
match ret {
ReconstructionResult::Block(block) => {
pending_compact_blocks.remove(&block_hash);
pending_compact_blocks.retain(|_, (v, _)| {
Unpack::<u64>::unpack(&v.header().as_reader().raw().number())
> block.number()
});
shrink_to_fit!(pending_compact_blocks, 20);
self.relayer
.accept_block(self.nc.as_ref(), self.peer, block);
return Status::ok();
Expand Down
24 changes: 12 additions & 12 deletions sync/src/relayer/tests/block_transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ fn test_accept_block() {
.uncles(vec![uncle.as_uncle().data()].pack())
.build();

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);

let process = BlockTransactionsProcess::new(
block_transactions.as_reader(),
Expand Down Expand Up @@ -136,8 +136,8 @@ fn test_unknown_request() {
.transactions(vec![tx2.data()].pack())
.build();

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);

let process = BlockTransactionsProcess::new(
block_transactions.as_reader(),
Expand Down Expand Up @@ -200,8 +200,8 @@ fn test_invalid_transaction_root() {
.transactions(vec![tx2.data()].pack())
.build();

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);

let process = BlockTransactionsProcess::new(
block_transactions.as_reader(),
Expand Down Expand Up @@ -295,8 +295,8 @@ fn test_collision_and_send_missing_indexes() {
.transactions(vec![tx2.data()].pack())
.build();

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);

let process = BlockTransactionsProcess::new(
block_transactions.as_reader(),
Expand Down Expand Up @@ -338,8 +338,8 @@ fn test_collision_and_send_missing_indexes() {
.transactions(vec![tx2.data(), tx3.data()].pack())
.build();

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);

let process = BlockTransactionsProcess::new(
new_block_transactions.as_reader(),
Expand Down Expand Up @@ -402,8 +402,8 @@ fn test_missing() {
.transactions(vec![tx2.data()].pack())
.build();

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);

let process = BlockTransactionsProcess::new(
block_transactions.as_reader(),
Expand Down
64 changes: 40 additions & 24 deletions sync/src/relayer/tests/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ fn test_in_block_status_map() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 1.into();

let compact_block_process = CompactBlockProcess::new(
Expand Down Expand Up @@ -103,8 +103,8 @@ fn test_unknow_parent() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 1.into();

let compact_block_process = CompactBlockProcess::new(
Expand Down Expand Up @@ -156,8 +156,8 @@ fn test_accept_not_a_better_block() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 1.into();

let compact_block_process = CompactBlockProcess::new(
Expand Down Expand Up @@ -190,8 +190,8 @@ fn test_already_in_flight() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 1.into();

// Already in flight
Expand Down Expand Up @@ -232,8 +232,8 @@ fn test_already_pending() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 1.into();

// Already in pending
Expand Down Expand Up @@ -283,8 +283,8 @@ fn test_header_invalid() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 1.into();

let compact_block_process = CompactBlockProcess::new(
Expand Down Expand Up @@ -337,8 +337,8 @@ fn test_inflight_blocks_reach_limit() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 100.into();

// in_flight_blocks is full
Expand Down Expand Up @@ -399,8 +399,8 @@ fn test_send_missing_indexes() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 100.into();

let compact_block_process = CompactBlockProcess::new(
Expand Down Expand Up @@ -470,6 +470,19 @@ fn test_accept_block() {
.uncle(uncle.as_uncle())
.build();

let mock_block = BlockBuilder::default().number(4.pack()).build();
let mock_compact_block = CompactBlock::build_from_block(&mock_block, &Default::default());
{
let mut pending_compact_blocks = relayer.shared.state().pending_compact_blocks();
pending_compact_blocks.insert(
mock_block.header().hash(),
(
mock_compact_block,
HashMap::from_iter(vec![(1.into(), (vec![1], vec![0]))]),
),
);
}

let uncle_hash = uncle.hash();
{
let db_txn = relayer.shared().shared().store().begin_transaction();
Expand All @@ -484,8 +497,8 @@ fn test_accept_block() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 100.into();

let compact_block_process = CompactBlockProcess::new(
Expand All @@ -495,6 +508,9 @@ fn test_accept_block() {
peer_index,
);
assert_eq!(compact_block_process.execute(), Status::ok(),);

let pending_compact_blocks = relayer.shared.state().pending_compact_blocks();
assert!(pending_compact_blocks.is_empty());
}

#[test]
Expand All @@ -516,8 +532,8 @@ fn test_ignore_a_too_old_block() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 1.into();

let compact_block_process = CompactBlockProcess::new(
Expand Down Expand Up @@ -552,8 +568,8 @@ fn test_invalid_transaction_root() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 100.into();

let compact_block_process = CompactBlockProcess::new(
Expand Down Expand Up @@ -627,8 +643,8 @@ fn test_collision() {
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);

let mock_protocal_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocal_context);
let mock_protocol_context = MockProtocolContext::new(SupportProtocols::Relay);
let nc = Arc::new(mock_protocol_context);
let peer_index: PeerIndex = 100.into();

let compact_block_process = CompactBlockProcess::new(
Expand Down

0 comments on commit 21a09cd

Please sign in to comment.