diff --git a/chain/chain/src/tests/garbage_collection.rs b/chain/chain/src/tests/garbage_collection.rs index 202f7da8da7..0a5e33c180f 100644 --- a/chain/chain/src/tests/garbage_collection.rs +++ b/chain/chain/src/tests/garbage_collection.rs @@ -670,7 +670,7 @@ fn test_fork_far_away_from_epoch_end() { do_fork( source_block, state_root, - tries.clone(), + tries, &mut chain, 1, &mut states, diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index cc36867f4d3..5740bb459aa 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -182,7 +182,7 @@ pub struct StartClientResult { pub client_actor: Addr, pub client_arbiter_handle: ArbiterHandle, pub resharding_handle: ReshardingHandle, - pub gc_arbiter_handle: Option, + pub gc_arbiter_handle: ArbiterHandle, } /// Starts client in a separate Arbiter (thread). @@ -228,20 +228,14 @@ pub fn start_client( .unwrap(); let resharding_handle = client.chain.resharding_handle.clone(); - let maybe_gc_arbiter = { - if !client_config.archive { - let (_, gc_arbiter) = start_gc_actor( - runtime.store().clone(), - genesis_height, - client_config.clone(), - runtime, - epoch_manager, - ); - Some(gc_arbiter) - } else { - None - } - }; + let (_, gc_arbiter_handle) = start_gc_actor( + runtime.store().clone(), + genesis_height, + client_config.clone(), + runtime, + epoch_manager, + ); + let client_addr = ClientActor::start_in_arbiter(&client_arbiter_handle, move |ctx| { ClientActor::new( clock, @@ -264,6 +258,6 @@ pub fn start_client( client_actor: client_addr, client_arbiter_handle, resharding_handle, - gc_arbiter_handle: maybe_gc_arbiter, + gc_arbiter_handle, } } diff --git a/chain/client/src/gc_actor.rs b/chain/client/src/gc_actor.rs index 3511e7b558a..7e2147ee30d 100644 --- a/chain/client/src/gc_actor.rs +++ b/chain/client/src/gc_actor.rs @@ -37,35 +37,32 @@ impl GCActor { } } + fn clear_data(&mut self) -> Result<(), near_chain::Error> { + // A RPC node should do regular garbage collection. + if !self.is_archive { + return self.store.clear_data( &self.gc_config, self.runtime_adapter.clone(), self.epoch_manager.clone()) + } + + // An archival node with split storage should perform garbage collection + // on the hot storage. In order to determine if split storage is enabled + // *and* that the migration to split storage is finished we can check + // the store kind. It's only set to hot after the migration is finished. + let store = self.store.store(); + let kind = store.get_db_kind()?; + if kind == Some(DbKind::Hot) { + return self.store.clear_data(&self.gc_config, self.runtime_adapter.clone(), self.epoch_manager.clone()) + } + + // An archival node with legacy storage or in the midst of migration to split + // storage should do the legacy clear_archive_data. + self.store.clear_archive_data(self.gc_config.gc_blocks_limit, self.runtime_adapter.clone()) + } + fn gc(&mut self, ctx: &mut Context) { let timer = metrics::GC_TIME.start_timer(); - if !self.is_archive { - let _span = tracing::debug_span!(target: "client", "gc"); - if let Err(e) = self.store.clear_data( - &self.gc_config, - self.runtime_adapter.clone(), - self.epoch_manager.clone(), - ) { - warn!(target: "client", "Error in gc: {}", e); - } - } else { - let _span = tracing::debug_span!(target: "client", "archive_gc"); - let kind = self.store.store().get_db_kind(); - match kind { - Ok(Some(DbKind::Hot)) => { - if let Err(e) = self.store.clear_data( - &self.gc_config, - self.runtime_adapter.clone(), - self.epoch_manager.clone(), - ) { - warn!(target: "client", "Error in gc: {}", e); - } - } - Err(e) => { - warn!(target: "client", "Error in gc: {}", e); - } - _ => {} - } + if let Err(e) = self.clear_data( + ) { + warn!(target: "garbage collection", "Error in gc: {}", e); } timer.observe_duration(); diff --git a/chain/client/src/tests/query_client.rs b/chain/client/src/tests/query_client.rs index f65b76fda69..ab0300be7cd 100644 --- a/chain/client/src/tests/query_client.rs +++ b/chain/client/src/tests/query_client.rs @@ -278,132 +278,3 @@ fn test_state_request() { near_network::test_utils::wait_or_panic(50000); }); } - -#[test] -/// When querying data which was garbage collected on a node it returns -/// `QueryError::GarbageCollectedBlock`. -fn test_garbage_collection() { - init_test_logger(); - run_actix(async { - let block_prod_time = 100; - let epoch_length = 5; - let target_height = epoch_length * (DEFAULT_GC_NUM_EPOCHS_TO_KEEP + 1); - let vs = ValidatorSchedule::new().num_shards(2).block_producers_per_epoch(vec![vec![ - "test1".parse().unwrap(), - "test2".parse().unwrap(), - ]]); - - setup_mock_all_validators( - Clock::real(), - vs, - vec![PeerInfo::random(), PeerInfo::random()], - true, - block_prod_time, - false, - false, - epoch_length, - true, - vec![false, true], // first validator non-archival, second archival - vec![true, true], - true, - None, - Box::new( - move |conns, - _, - msg: &PeerManagerMessageRequest| - -> (PeerManagerMessageResponse, bool) { - if let NetworkRequests::Block { block } = msg.as_network_requests_ref() { - if block.header().height() > target_height { - let view_client_non_archival = &conns[0].view_client_actor; - let view_client_archival = &conns[1].view_client_actor; - let mut tests = vec![]; - - // Recent data is present on all nodes (archival or not). - let prev_height = block.header().prev_height().unwrap(); - for actor_handles in conns.iter() { - tests.push(actix::spawn( - actor_handles - .view_client_actor - .send( - Query::new( - BlockReference::BlockId(BlockId::Height( - prev_height, - )), - QueryRequest::ViewAccount { - account_id: "test1".parse().unwrap(), - }, - ) - .with_span_context(), - ) - .then(move |res| { - let res = res.unwrap().unwrap(); - match res.kind { - QueryResponseKind::ViewAccount(_) => (), - _ => panic!("Invalid response"), - } - futures::future::ready(()) - }), - )); - } - - // On non-archival node old data is garbage collected. - tests.push(actix::spawn( - view_client_non_archival - .send( - Query::new( - BlockReference::BlockId(BlockId::Height(1)), - QueryRequest::ViewAccount { - account_id: "test1".parse().unwrap(), - }, - ) - .with_span_context(), - ) - .then(move |res| { - let res = res.unwrap(); - match res { - Err(err) => assert!(matches!( - err, - QueryError::GarbageCollectedBlock { .. } - )), - Ok(_) => panic!("Unexpected Ok variant"), - } - futures::future::ready(()) - }), - )); - - // On archival node old data is _not_ garbage collected. - tests.push(actix::spawn( - view_client_archival - .send( - Query::new( - BlockReference::BlockId(BlockId::Height(1)), - QueryRequest::ViewAccount { - account_id: "test1".parse().unwrap(), - }, - ) - .with_span_context(), - ) - .then(move |res| { - let res = res.unwrap().unwrap(); - match res.kind { - QueryResponseKind::ViewAccount(_) => (), - _ => panic!("Invalid response"), - } - futures::future::ready(()) - }), - )); - - actix::spawn(futures::future::join_all(tests).then(|_| { - System::current().stop(); - futures::future::ready(()) - })); - } - } - (NetworkResponses::NoResponse.into(), true) - }, - ), - ); - - near_network::test_utils::wait_or_panic(block_prod_time * target_height * 2 + 2000); - }) -} diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 2e4d5512187..4075a57900c 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -451,21 +451,13 @@ pub fn start_with_config_and_synchronization( tracing::trace!(target: "diagnostic", key = "log", "Starting NEAR node with diagnostic activated"); - let mut arbiters = match gc_arbiter_handle { - Some(handle) => vec![ - client_arbiter_handle, - shards_manager_arbiter_handle, - trie_metrics_arbiter, - state_snapshot_arbiter, - handle, - ], - None => vec![ - client_arbiter_handle, - shards_manager_arbiter_handle, - trie_metrics_arbiter, - state_snapshot_arbiter, - ], - }; + let mut arbiters = vec![ + client_arbiter_handle, + shards_manager_arbiter_handle, + trie_metrics_arbiter, + state_snapshot_arbiter, + gc_arbiter_handle, + ]; if let Some(db_metrics_arbiter) = db_metrics_arbiter { arbiters.push(db_metrics_arbiter); } diff --git a/pytest/tests/sanity/garbage_collection_intense.py b/pytest/tests/sanity/garbage_collection_intense.py new file mode 100644 index 00000000000..4f463144018 --- /dev/null +++ b/pytest/tests/sanity/garbage_collection_intense.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +# Spins up two validating nodes. Deploy a contract that allows for insertion and deletion of keys +# Randomly insert keys or delete keys every block. Let it run until GC kicks in +# Then delete all keys and let garbage collection catch up + +import sys, time +import pathlib +import string, random, json, base64 + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from cluster import start_cluster +from configured_logger import logger +from transaction import sign_deploy_contract_tx, sign_function_call_tx +from utils import load_test_contract, load_binary_file, wait_for_blocks + +EPOCH_LENGTH = 5 +TARGET_HEIGHT = 300 +GAS = 100_000_000_000_000 + +client_config = { + "consensus": { + "min_block_production_delay": { + "secs": 0, + "nanos": 100000000 + }, + "max_block_production_delay": { + "secs": 0, + "nanos": 400000000 + }, + "max_block_wait_delay": { + "secs": 0, + "nanos": 400000000 + } + }, + "gc_step_period": { + "secs": 0, + "nanos": 100000000 + }, + "rpc": { + "polling_config": { + "polling_interval": { + "secs": 0, + "nanos": 10000000 + }, + "polling_timeout": { + "secs": 10, + "nanos": 0 + } + } + } +} + +nodes = start_cluster( + 2, 0, 1, None, + [["epoch_length", EPOCH_LENGTH], ["num_block_producer_seats", 5], + ["num_block_producer_seats_per_shard", [5]], + ["chunk_producer_kickout_threshold", 80], + ["shard_layout", { + "V0": { + "num_shards": 1, + "version": 1, + } + }], ["validators", 0, "amount", "110000000000000000000000000000000"], + [ + "records", 0, "Account", "account", "locked", + "110000000000000000000000000000000" + ], ["total_supply", "3060000000000000000000000000000000"]], { + 0: client_config, + 1: client_config + }) + +# generate 20 keys +keys = ''.join(random.choices(string.ascii_letters, k=20)) +key_refcount = {x: 0 for x in keys} +nonce = 1 +repo_dir = pathlib.Path(__file__).resolve().parents[2] +path = repo_dir / 'tests/loadtest/contract/target/wasm32-unknown-unknown/release/loadtest_contract.wasm' +contract = load_binary_file(path) + +last_block_hash = nodes[0].get_latest_block().hash_bytes +tx = sign_deploy_contract_tx(nodes[0].signer_key, contract, nonce, + last_block_hash) +res = nodes[0].send_tx_and_wait(tx, 2) +nonce += 1 +assert 'SuccessValue' in res['result']['status'] +time.sleep(1) + +while True: + block_id = nodes[1].get_latest_block() + if int(block_id.height) > TARGET_HEIGHT: + break + for key in keys: + block_hash = nodes[1].get_latest_block().hash_bytes + args = {"key": key} + if random.random() > 0.5: + tx = sign_function_call_tx(nodes[0].signer_key, + nodes[0].signer_key.account_id, 'insert_key', + json.dumps(args).encode('utf-8'), GAS, 0, nonce, + block_hash) + else: + tx = sign_function_call_tx(nodes[0].signer_key, + nodes[0].signer_key.account_id, 'delete_key', + json.dumps(args).encode('utf-8'), GAS, 0, nonce, + block_hash) + res = nodes[1].send_tx(tx) + assert 'result' in res, res + nonce += 1 + +# delete all keys +for key in keys: + args = {"key": key} + block_id = nodes[1].get_latest_block() + tx = sign_function_call_tx(nodes[0].signer_key, + nodes[0].signer_key.account_id, 'delete_key', + json.dumps(args).encode('utf-8'), GAS, 0, nonce, + block_id.hash_bytes) + res = nodes[1].send_tx_and_wait(tx, 2) + assert 'result' in res, res + key_refcount[key] -= 1 + nonce += 1 + +# wait for the deletions to be garbage collected +deletion_finish_block_height = int(nodes[1].get_latest_block().height) +wait_for_blocks(nodes[1], target=deletion_finish_block_height + EPOCH_LENGTH * 6) + +# check that querying a garbage collected block gives Error::GarbageCollected +res = nodes[1].json_rpc('query', { + "request_type": "view_account", + "account_id": nodes[0].signer_key.account_id, + "block_id": deletion_finish_block_height + }) +assert res['error']['cause']['name'] == "GARBAGE_COLLECTED_BLOCK", res \ No newline at end of file