Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move garbage collection into a separate thread #11022

Merged
merged 14 commits into from
Apr 19, 2024
86 changes: 61 additions & 25 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use near_store::flat::store_helper;
use near_store::{DBCol, KeyForStateChanges, ShardTries, ShardUId};

use crate::types::RuntimeAdapter;
use crate::{metrics, Chain, ChainStoreAccess, ChainStoreUpdate};
use crate::{metrics, Chain, ChainStore, ChainStoreAccess, ChainStoreUpdate};

#[derive(Clone)]
pub enum GCMode {
Expand All @@ -34,7 +34,23 @@ impl fmt::Debug for GCMode {
}
}

/// Both functions here are only used for testing as they create convenient wrappers
/// that allow us to do correctness integration testing without having to fully spin up GCActor
impl Chain {
pub fn clear_data(&mut self, gc_config: &GCConfig) -> Result<(), Error> {
let runtime_adapter = self.runtime_adapter.clone();
let epoch_manager = self.epoch_manager.clone();
self.mut_chain_store().clear_data(gc_config, runtime_adapter, epoch_manager)
}

pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> {
let runtime_adapter = self.runtime_adapter.clone();
let epoch_manager = self.epoch_manager.clone();
self.mut_chain_store().reset_data_pre_state_sync(sync_hash, runtime_adapter, epoch_manager)
}
}

impl ChainStore {
// GC CONTRACT
// ===
//
Expand Down Expand Up @@ -110,18 +126,23 @@ impl Chain {
// and the Trie is updated with having only Genesis data.
// 4. State Sync Clearing happens in `reset_data_pre_state_sync()`.
//
pub fn clear_data(&mut self, tries: ShardTries, gc_config: &GCConfig) -> Result<(), Error> {
pub fn clear_data(
&mut self,
gc_config: &GCConfig,
runtime_adapter: Arc<dyn RuntimeAdapter>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "garbage_collection", "clear_data").entered();

let head = self.chain_store().head()?;
let tail = self.chain_store().tail()?;
let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash);
let tries = runtime_adapter.get_tries();
let head = self.head()?;
let tail = self.tail()?;
let gc_stop_height = runtime_adapter.get_gc_stop_height(&head.last_block_hash);
if gc_stop_height > head.height {
return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into()));
}
let prev_epoch_id = self.get_block_header(&head.prev_block_hash)?.epoch_id().clone();
let epoch_change = prev_epoch_id != head.epoch_id;
let mut fork_tail = self.chain_store().fork_tail()?;
let mut fork_tail = self.fork_tail()?;
metrics::TAIL_HEIGHT.set(tail as i64);
metrics::FORK_TAIL_HEIGHT.set(fork_tail as i64);
metrics::CHUNK_TAIL_HEIGHT.set(self.chain_store().chunk_tail()? as i64);
Expand All @@ -130,7 +151,7 @@ impl Chain {
// if head doesn't change on the epoch boundary, we may update fork tail several times
// but that is fine since it doesn't affect correctness and also we limit the number of
// heights that fork cleaning goes through so it doesn't slow down client either.
let mut chain_store_update = self.mut_chain_store().store_update();
let mut chain_store_update = self.store_update();
chain_store_update.update_fork_tail(gc_stop_height);
chain_store_update.commit()?;
fork_tail = gc_stop_height;
Expand All @@ -141,11 +162,16 @@ impl Chain {
let gc_fork_clean_step = gc_config.gc_fork_clean_step;
let stop_height = tail.max(fork_tail.saturating_sub(gc_fork_clean_step));
for height in (stop_height..fork_tail).rev() {
self.clear_forks_data(tries.clone(), height, &mut gc_blocks_remaining)?;
self.clear_forks_data(
tries.clone(),
height,
&mut gc_blocks_remaining,
epoch_manager.clone(),
)?;
if gc_blocks_remaining == 0 {
return Ok(());
}
let mut chain_store_update = self.mut_chain_store().store_update();
let mut chain_store_update = self.store_update();
chain_store_update.update_fork_tail(height);
chain_store_update.commit()?;
}
Expand All @@ -162,9 +188,9 @@ impl Chain {
.flatten()
.cloned()
.collect::<Vec<_>>();
let epoch_manager = self.epoch_manager.clone();
let runtime = self.runtime_adapter.clone();
let mut chain_store_update = self.mut_chain_store().store_update();
let epoch_manager = epoch_manager.clone();
let runtime = runtime_adapter.clone();
let mut chain_store_update = self.store_update();
if let Some(block_hash) = blocks_current_height.first() {
let prev_hash = *chain_store_update.get_block_header(block_hash)?.prev_hash();
let prev_block_refcount = chain_store_update.get_block_refcount(&prev_hash)?;
Expand Down Expand Up @@ -205,16 +231,20 @@ impl Chain {
/// storage, archival nodes do garbage collect that data.
///
/// `gc_height_limit` limits how many heights will the function process.
pub fn clear_archive_data(&mut self, gc_height_limit: BlockHeightDelta) -> Result<(), Error> {
pub fn clear_archive_data(
&mut self,
gc_height_limit: BlockHeightDelta,
runtime_adapter: Arc<dyn RuntimeAdapter>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "chain", "clear_archive_data").entered();

let head = self.chain_store().head()?;
let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash);
let head = self.head()?;
let gc_stop_height = runtime_adapter.get_gc_stop_height(&head.last_block_hash);
if gc_stop_height > head.height {
return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into()));
}

let mut chain_store_update = self.mut_chain_store().store_update();
let mut chain_store_update = self.store_update();
chain_store_update.clear_redundant_chunk_data(gc_stop_height, gc_height_limit)?;
metrics::CHUNK_TAIL_HEIGHT.set(chain_store_update.chunk_tail()? as i64);
metrics::GC_STOP_HEIGHT.set(gc_stop_height as i64);
Expand All @@ -226,6 +256,7 @@ impl Chain {
tries: ShardTries,
height: BlockHeight,
gc_blocks_remaining: &mut NumBlocks,
epoch_manager: Arc<dyn EpochManagerAdapter>,
) -> Result<(), Error> {
let blocks_current_height = self
.chain_store()
Expand All @@ -244,8 +275,8 @@ impl Chain {
// because shorter chain cannot be Canonical one
// and it may be safely deleted
// and all its ancestors while there are no other sibling blocks rely on it.
let epoch_manager = self.epoch_manager.clone();
let mut chain_store_update = self.mut_chain_store().store_update();
let epoch_manager = epoch_manager.clone();
let mut chain_store_update = self.store_update();
if chain_store_update.get_block_refcount(&current_hash)? == 0 {
let prev_hash =
*chain_store_update.get_block_header(&current_hash)?.prev_hash();
Expand All @@ -270,7 +301,12 @@ impl Chain {
Ok(())
}

pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> {
pub fn reset_data_pre_state_sync(
&mut self,
sync_hash: CryptoHash,
runtime_adapter: Arc<dyn RuntimeAdapter>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "sync", "reset_data_pre_state_sync").entered();
let head = self.head()?;
// Get header we were syncing into.
Expand All @@ -292,8 +328,8 @@ impl Chain {
.cloned()
.collect::<Vec<_>>();
for block_hash in blocks_current_height {
let epoch_manager = self.epoch_manager.clone();
let mut chain_store_update = self.mut_chain_store().store_update();
let epoch_manager = epoch_manager.clone();
let mut chain_store_update = self.store_update();
if !tail_prev_block_cleaned {
let prev_block_hash =
*chain_store_update.get_block_header(&block_hash)?.prev_hash();
Expand All @@ -316,16 +352,16 @@ impl Chain {
}

// Clear Chunks data
let mut chain_store_update = self.mut_chain_store().store_update();
let mut chain_store_update = self.store_update();
// The largest height of chunk we have in storage is head.height + 1
let chunk_height = std::cmp::min(head.height + 2, sync_height);
chain_store_update.clear_chunk_data_and_headers(chunk_height)?;
chain_store_update.commit()?;

// clear all trie data

let tries = self.runtime_adapter.get_tries();
let mut chain_store_update = self.mut_chain_store().store_update();
let tries = runtime_adapter.get_tries();
let mut chain_store_update = self.store_update();
let mut store_update = tries.store_update();
store_update.delete_all(DBCol::State);
chain_store_update.merge(store_update);
Expand Down
30 changes: 10 additions & 20 deletions chain/chain/src/tests/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,7 @@ fn gc_fork_common(simple_chains: Vec<SimpleChain>, max_changes: usize) {
}

// GC execution
chain1
.clear_data(tries1.clone(), &GCConfig { gc_blocks_limit: 1000, ..GCConfig::default() })
.unwrap();
chain1.clear_data(&GCConfig { gc_blocks_limit: 1000, ..GCConfig::default() }).unwrap();

let tries2 = get_chain_with_num_shards(Clock::real(), num_shards).runtime_adapter.get_tries();

Expand Down Expand Up @@ -636,14 +634,11 @@ fn test_fork_far_away_from_epoch_end() {

// GC execution
chain
.clear_data(
tries.clone(),
&GCConfig {
gc_blocks_limit: 100,
gc_fork_clean_step: fork_clean_step,
..GCConfig::default()
},
)
.clear_data(&GCConfig {
gc_blocks_limit: 100,
gc_fork_clean_step: fork_clean_step,
..GCConfig::default()
})
.expect("Clear data failed");

// The run above would clear just the first 5 blocks from the beginning, but shouldn't clear any forks
Expand Down Expand Up @@ -675,7 +670,7 @@ fn test_fork_far_away_from_epoch_end() {
do_fork(
source_block,
state_root,
tries.clone(),
tries,
&mut chain,
1,
&mut states,
Expand All @@ -685,7 +680,7 @@ fn test_fork_far_away_from_epoch_end() {
);
}
chain
.clear_data(tries, &GCConfig { gc_blocks_limit: 100, ..GCConfig::default() })
.clear_data(&GCConfig { gc_blocks_limit: 100, ..GCConfig::default() })
.expect("Clear data failed");
// And now all these blocks should be safely removed.
for i in 6..50 {
Expand Down Expand Up @@ -722,8 +717,7 @@ fn test_clear_old_data() {
);
}

let trie = chain.runtime_adapter.get_tries();
chain.clear_data(trie, &GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }).unwrap();
chain.clear_data(&GCConfig { gc_blocks_limit: 100, ..GCConfig::default() }).unwrap();

for i in 0..=max_height {
println!("height = {} hash = {}", i, blocks[i].hash());
Expand Down Expand Up @@ -905,13 +899,9 @@ fn test_clear_old_data_too_many_heights_common(gc_blocks_limit: NumBlocks) {
prev_block = block.clone();
}

let trie = chain.runtime_adapter.get_tries();

for iter in 0..10 {
println!("ITERATION #{:?}", iter);
assert!(chain
.clear_data(trie.clone(), &GCConfig { gc_blocks_limit, ..GCConfig::default() })
.is_ok());
assert!(chain.clear_data(&GCConfig { gc_blocks_limit, ..GCConfig::default() }).is_ok());

// epoch didn't change so no data is garbage collected.
for i in 0..1000 {
Expand Down
37 changes: 0 additions & 37 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use near_network::types::{AccountKeys, ChainInfo, PeerManagerMessageRequest, Set
use near_network::types::{
HighestHeightPeerInfo, NetworkRequests, PeerManagerAdapter, ReasonForBan,
};
use near_o11y::log_assert;
use near_o11y::WithSpanContextExt;
use near_pool::InsertTransactionResult;
use near_primitives::block::{Approval, ApprovalInner, ApprovalMessage, Block, BlockHeader, Tip};
Expand All @@ -83,7 +82,6 @@ use near_primitives::utils::MaybeValidated;
use near_primitives::validator_signer::ValidatorSigner;
use near_primitives::version::PROTOCOL_VERSION;
use near_primitives::views::{CatchupStatusView, DroppedReason};
use near_store::metadata::DbKind;
use near_store::ShardUId;
use std::cmp::max;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -1575,18 +1573,6 @@ impl Client {
};
self.chain.blocks_with_missing_chunks.prune_blocks_below_height(last_finalized_height);

{
let _span = tracing::debug_span!(
target: "client",
"garbage_collection",
block_hash = ?block.hash(),
height = block.header().height())
.entered();
let _gc_timer = metrics::GC_TIME.start_timer();
let result = self.clear_data();
log_assert!(result.is_ok(), "Can't clear old data, {:?}", result);
}

// send_network_chain_info should be called whenever the chain head changes.
// See send_network_chain_info() for more details.
if let Err(err) = self.send_network_chain_info() {
Expand Down Expand Up @@ -2543,29 +2529,6 @@ impl Client {
};
Ok(result)
}

fn clear_data(&mut self) -> Result<(), near_chain::Error> {
// A RPC node should do regular garbage collection.
if !self.config.archive {
let tries = self.runtime_adapter.get_tries();
return self.chain.clear_data(tries, &self.config.gc);
}

// An archival node with split storage should perform garbage collection
// on the hot storage. In order to determine if split storage is enabled
// *and* that the migration to split storage is finished we can check
// the store kind. It's only set to hot after the migration is finished.
let store = self.chain.chain_store().store();
let kind = store.get_db_kind()?;
if kind == Some(DbKind::Hot) {
let tries = self.runtime_adapter.get_tries();
return self.chain.clear_data(tries, &self.config.gc);
}

// An archival node with legacy storage or in the midst of migration to split
// storage should do the legacy clear_archive_data.
self.chain.clear_archive_data(self.config.gc.gc_blocks_limit)
}
}

/* implements functions used to communicate with network */
Expand Down
7 changes: 6 additions & 1 deletion chain/client/src/client_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,12 @@ impl ClientActions {
unwrap_and_report!(self
.client
.chain
.reset_data_pre_state_sync(sync_hash));
.mut_chain_store()
.reset_data_pre_state_sync(
sync_hash,
self.client.runtime_adapter.clone(),
self.client.epoch_manager.clone()
));
}
self.client.sync_status.update(SyncStatus::StateSync(
StateSyncStatus { sync_hash, sync_status: HashMap::default() },
Expand Down
Loading
Loading