Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenwang1996 committed Apr 14, 2024
1 parent 9e42635 commit ac6adfe
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 193 deletions.
2 changes: 1 addition & 1 deletion chain/chain/src/tests/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ fn test_fork_far_away_from_epoch_end() {
do_fork(
source_block,
state_root,
tries.clone(),
tries,
&mut chain,
1,
&mut states,
Expand Down
26 changes: 10 additions & 16 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub struct StartClientResult {
pub client_actor: Addr<ClientActor>,
pub client_arbiter_handle: ArbiterHandle,
pub resharding_handle: ReshardingHandle,
pub gc_arbiter_handle: Option<ArbiterHandle>,
pub gc_arbiter_handle: ArbiterHandle,
}

/// Starts client in a separate Arbiter (thread).
Expand Down Expand Up @@ -228,20 +228,14 @@ pub fn start_client(
.unwrap();
let resharding_handle = client.chain.resharding_handle.clone();

let maybe_gc_arbiter = {
if !client_config.archive {
let (_, gc_arbiter) = start_gc_actor(
runtime.store().clone(),
genesis_height,
client_config.clone(),
runtime,
epoch_manager,
);
Some(gc_arbiter)
} else {
None
}
};
let (_, gc_arbiter_handle) = start_gc_actor(
runtime.store().clone(),
genesis_height,
client_config.clone(),
runtime,
epoch_manager,
);

let client_addr = ClientActor::start_in_arbiter(&client_arbiter_handle, move |ctx| {
ClientActor::new(
clock,
Expand All @@ -264,6 +258,6 @@ pub fn start_client(
client_actor: client_addr,
client_arbiter_handle,
resharding_handle,
gc_arbiter_handle: maybe_gc_arbiter,
gc_arbiter_handle,
}
}
54 changes: 29 additions & 25 deletions chain/client/src/gc_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,39 @@ impl GCActor {
}
}

fn gc(&mut self, ctx: &mut Context<GCActor>) {
let timer = metrics::GC_TIME.start_timer();
fn clear_data(&mut self) -> Result<(), near_chain::Error> {
// A RPC node should do regular garbage collection.
if !self.is_archive {
let _span = tracing::debug_span!(target: "client", "gc");
if let Err(e) = self.store.clear_data(
return self.store.clear_data(
&self.gc_config,
self.runtime_adapter.clone(),
self.epoch_manager.clone(),
);
}

// An archival node with split storage should perform garbage collection
// on the hot storage. In order to determine if split storage is enabled
// *and* that the migration to split storage is finished we can check
// the store kind. It's only set to hot after the migration is finished.
let store = self.store.store();
let kind = store.get_db_kind()?;
if kind == Some(DbKind::Hot) {
return self.store.clear_data(
&self.gc_config,
self.runtime_adapter.clone(),
self.epoch_manager.clone(),
) {
warn!(target: "client", "Error in gc: {}", e);
}
} else {
let _span = tracing::debug_span!(target: "client", "archive_gc");
let kind = self.store.store().get_db_kind();
match kind {
Ok(Some(DbKind::Hot)) => {
if let Err(e) = self.store.clear_data(
&self.gc_config,
self.runtime_adapter.clone(),
self.epoch_manager.clone(),
) {
warn!(target: "client", "Error in gc: {}", e);
}
}
Err(e) => {
warn!(target: "client", "Error in gc: {}", e);
}
_ => {}
}
);
}

// An archival node with legacy storage or in the midst of migration to split
// storage should do the legacy clear_archive_data.
self.store.clear_archive_data(self.gc_config.gc_blocks_limit, self.runtime_adapter.clone())
}

fn gc(&mut self, ctx: &mut Context<GCActor>) {
let timer = metrics::GC_TIME.start_timer();
if let Err(e) = self.clear_data() {
warn!(target: "garbage collection", "Error in gc: {}", e);
}

timer.observe_duration();
Expand Down
138 changes: 2 additions & 136 deletions chain/client/src/tests/query_client.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
use crate::test_utils::{setup_mock_all_validators, setup_no_network, setup_only_view};
use crate::test_utils::{setup_no_network, setup_only_view};
use crate::{
GetBlock, GetBlockWithMerkleTree, GetExecutionOutcomesForBlock, Query, QueryError, Status,
TxStatus,
GetBlock, GetBlockWithMerkleTree, GetExecutionOutcomesForBlock, Query, Status, TxStatus,
};
use actix::System;
use futures::{future, FutureExt};
use near_actix_test_utils::run_actix;
use near_async::messaging::IntoMultiSender;
use near_async::time::{Clock, Duration};
use near_chain::test_utils::ValidatorSchedule;
use near_chain_configs::DEFAULT_GC_NUM_EPOCHS_TO_KEEP;
use near_crypto::{InMemorySigner, KeyType};
use near_network::client::{
BlockResponse, ProcessTxRequest, ProcessTxResponse, StateRequestHeader,
};
use near_network::test_utils::MockPeerManagerAdapter;
use near_network::types::PeerInfo;
use near_network::types::{
NetworkRequests, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse,
};
use near_o11y::testonly::init_test_logger;
use near_o11y::WithSpanContextExt;
use near_primitives::block::{Block, BlockHeader};
Expand Down Expand Up @@ -278,132 +273,3 @@ fn test_state_request() {
near_network::test_utils::wait_or_panic(50000);
});
}

#[test]
/// When querying data which was garbage collected on a node it returns
/// `QueryError::GarbageCollectedBlock`.
fn test_garbage_collection() {
init_test_logger();
run_actix(async {
let block_prod_time = 100;
let epoch_length = 5;
let target_height = epoch_length * (DEFAULT_GC_NUM_EPOCHS_TO_KEEP + 1);
let vs = ValidatorSchedule::new().num_shards(2).block_producers_per_epoch(vec![vec![
"test1".parse().unwrap(),
"test2".parse().unwrap(),
]]);

setup_mock_all_validators(
Clock::real(),
vs,
vec![PeerInfo::random(), PeerInfo::random()],
true,
block_prod_time,
false,
false,
epoch_length,
true,
vec![false, true], // first validator non-archival, second archival
vec![true, true],
true,
None,
Box::new(
move |conns,
_,
msg: &PeerManagerMessageRequest|
-> (PeerManagerMessageResponse, bool) {
if let NetworkRequests::Block { block } = msg.as_network_requests_ref() {
if block.header().height() > target_height {
let view_client_non_archival = &conns[0].view_client_actor;
let view_client_archival = &conns[1].view_client_actor;
let mut tests = vec![];

// Recent data is present on all nodes (archival or not).
let prev_height = block.header().prev_height().unwrap();
for actor_handles in conns.iter() {
tests.push(actix::spawn(
actor_handles
.view_client_actor
.send(
Query::new(
BlockReference::BlockId(BlockId::Height(
prev_height,
)),
QueryRequest::ViewAccount {
account_id: "test1".parse().unwrap(),
},
)
.with_span_context(),
)
.then(move |res| {
let res = res.unwrap().unwrap();
match res.kind {
QueryResponseKind::ViewAccount(_) => (),
_ => panic!("Invalid response"),
}
futures::future::ready(())
}),
));
}

// On non-archival node old data is garbage collected.
tests.push(actix::spawn(
view_client_non_archival
.send(
Query::new(
BlockReference::BlockId(BlockId::Height(1)),
QueryRequest::ViewAccount {
account_id: "test1".parse().unwrap(),
},
)
.with_span_context(),
)
.then(move |res| {
let res = res.unwrap();
match res {
Err(err) => assert!(matches!(
err,
QueryError::GarbageCollectedBlock { .. }
)),
Ok(_) => panic!("Unexpected Ok variant"),
}
futures::future::ready(())
}),
));

// On archival node old data is _not_ garbage collected.
tests.push(actix::spawn(
view_client_archival
.send(
Query::new(
BlockReference::BlockId(BlockId::Height(1)),
QueryRequest::ViewAccount {
account_id: "test1".parse().unwrap(),
},
)
.with_span_context(),
)
.then(move |res| {
let res = res.unwrap().unwrap();
match res.kind {
QueryResponseKind::ViewAccount(_) => (),
_ => panic!("Invalid response"),
}
futures::future::ready(())
}),
));

actix::spawn(futures::future::join_all(tests).then(|_| {
System::current().stop();
futures::future::ready(())
}));
}
}
(NetworkResponses::NoResponse.into(), true)
},
),
);

near_network::test_utils::wait_or_panic(block_prod_time * target_height * 2 + 2000);
})
}
22 changes: 7 additions & 15 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,21 +451,13 @@ pub fn start_with_config_and_synchronization(

tracing::trace!(target: "diagnostic", key = "log", "Starting NEAR node with diagnostic activated");

let mut arbiters = match gc_arbiter_handle {
Some(handle) => vec![
client_arbiter_handle,
shards_manager_arbiter_handle,
trie_metrics_arbiter,
state_snapshot_arbiter,
handle,
],
None => vec![
client_arbiter_handle,
shards_manager_arbiter_handle,
trie_metrics_arbiter,
state_snapshot_arbiter,
],
};
let mut arbiters = vec![
client_arbiter_handle,
shards_manager_arbiter_handle,
trie_metrics_arbiter,
state_snapshot_arbiter,
gc_arbiter_handle,
];
if let Some(db_metrics_arbiter) = db_metrics_arbiter {
arbiters.push(db_metrics_arbiter);
}
Expand Down
Loading

0 comments on commit ac6adfe

Please sign in to comment.