Skip to content

Commit

Permalink
chain: gc partial chunks on archival nodes (near#6439)
Browse files Browse the repository at this point in the history
Start garbage collecting ColPartialChunks and ColInvalidChunks on
archival nodes.  The former is quite sizeable column and its data can
be recovered from ColChunks.  The latter is only needed when operating
at head.

Note that this is likely insufficient for the garbage collection to
happen in reasonable time (since with current default options we’re
garbage collecting only two heights at a time).  It’s best to clean
out the two columns.

Issue: near#6242
  • Loading branch information
mina86 authored Apr 6, 2022
1 parent bdc9410 commit 6be2e0e
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 128 deletions.
26 changes: 26 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,32 @@ impl Chain {
Ok(())
}

/// Garbage collect data which archival node doesn’t need to keep.
///
/// Normally, archival nodes keep all the data from the genesis block and
/// don’t run garbage collection. On the other hand, for better performance
/// the storage contains some data duplication, i.e. values in some of the
/// columns can be recomputed from data in different columns. To save on
/// storage, archival nodes do garbage collect that data.
///
/// `gc_height_limit` limits how many heights will the function process.
pub fn clear_archive_data(&mut self, gc_height_limit: BlockHeightDelta) -> Result<(), Error> {
let _d = DelayDetector::new(|| "GC".into());

let head = self.store.head()?;
let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash);
if gc_stop_height > head.height {
return Err(ErrorKind::GCError(
"gc_stop_height cannot be larger than head.height".into(),
)
.into());
}

let mut chain_store_update = self.store.store_update();
chain_store_update.clear_redundant_chunk_data(gc_stop_height, gc_height_limit)?;
chain_store_update.commit()
}

pub fn clear_forks_data(
&mut self,
tries: ShardTries,
Expand Down
47 changes: 44 additions & 3 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use near_primitives::transaction::{
use near_primitives::trie_key::{trie_key_parsers, TrieKey};
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{
AccountId, BlockExtra, BlockHeight, EpochId, GCCount, NumBlocks, ShardId, StateChanges,
StateChangesExt, StateChangesForSplitStates, StateChangesKinds, StateChangesKindsExt,
StateChangesRequest,
AccountId, BlockExtra, BlockHeight, BlockHeightDelta, EpochId, GCCount, NumBlocks, ShardId,
StateChanges, StateChangesExt, StateChangesForSplitStates, StateChangesKinds,
StateChangesKindsExt, StateChangesRequest,
};
use near_primitives::utils::{get_block_shard_id, index_to_bytes, to_timestamp};
use near_primitives::views::LightClientBlockView;
Expand Down Expand Up @@ -2037,6 +2037,47 @@ impl<'a> ChainStoreUpdate<'a> {
Ok(())
}

/// Clears chunk data which can be computed from other data in the storage.
///
/// We are storing PartialEncodedChunk objects in the ColPartialChunks in
/// the storage. However, those objects can be computed from data in
/// ColChunks and as such are redundant. For performance reasons we want to
/// keep that data when operating at head of the chain but the data can be
/// safely removed from archival storage.
///
/// `gc_stop_height` indicates height starting from which no data should be
/// garbage collected. Roughly speaking this represents start of the ‘hot’
/// data that we want to keep.
///
/// `gt_height_limit` indicates limit of how many non-empty heights to
/// process. This limit means that the method may stop garbage collection
/// before reaching `gc_stop_height`.
pub fn clear_redundant_chunk_data(
&mut self,
gc_stop_height: BlockHeight,
gc_height_limit: BlockHeightDelta,
) -> Result<(), Error> {
let mut height = self.chunk_tail()?;
let mut remaining = gc_height_limit;
while height < gc_stop_height && remaining > 0 {
let chunk_hashes = self.chain_store.get_all_chunk_hashes_by_height(height)?;
height += 1;
if !chunk_hashes.is_empty() {
remaining -= 1;
for chunk_hash in chunk_hashes {
let chunk_header_hash = chunk_hash.into();
self.gc_col(ColPartialChunks, &chunk_header_hash);
// Data in ColInvalidChunks isn’t technically redundant (it
// cannot be calculated from other data) but it is data we
// don’t need for anything so it can be deleted as well.
self.gc_col(ColInvalidChunks, &chunk_header_hash);
}
}
}
self.update_chunk_tail(height);
Ok(())
}

fn get_shard_uids_to_gc(
&mut self,
runtime_adapter: &dyn RuntimeAdapter,
Expand Down
2 changes: 0 additions & 2 deletions chain/chunks/src/chunk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use tracing::warn;
// Users of the data structure are responsible for adding chunk to this map at the right time.

/// A chunk is out of horizon if its height + HEIGHT_HORIZON < largest_seen_height
// Note: If this number increases, make sure `LONG_TARGET_HEIGHT` in
// block_sync_archival.py is updated as well.
const HEIGHT_HORIZON: BlockHeightDelta = 1024;
/// A chunk is out of horizon if its height > HEIGHT_HORIZON + largest_seen_height
const MAX_HEIGHTS_AHEAD: BlockHeightDelta = 5;
Expand Down
25 changes: 14 additions & 11 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1103,17 +1103,20 @@ impl Client {
self.chain.get_block_header(last_final_block).map_or(0, |header| header.height())
};
self.chain.blocks_with_missing_chunks.prune_blocks_below_height(last_finalized_height);
if !self.config.archive {
let timer = metrics::GC_TIME.start_timer();
if let Err(err) = self
.chain
.clear_data(self.runtime_adapter.get_tries(), self.config.gc_blocks_limit)
{
error!(target: "client", "Can't clear old data, {:?}", err);
debug_assert!(false);
};
timer.observe_duration();
}

let timer = metrics::GC_TIME.start_timer();
let gc_blocks_limit = self.config.gc_blocks_limit;
let result = if self.config.archive {
self.chain.clear_archive_data(gc_blocks_limit)
} else {
let tries = self.runtime_adapter.get_tries();
self.chain.clear_data(tries, gc_blocks_limit)
};
if let Err(err) = result {
error!(target: "client", "Can't clear old data, {:?}", err);
debug_assert!(false);
};
timer.observe_duration();

if self.runtime_adapter.is_next_block_epoch_start(block.hash()).unwrap_or(false) {
let next_epoch_protocol_version = unwrap_or_return!(self
Expand Down
6 changes: 2 additions & 4 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ pytest sanity/rpc_light_client_execution_outcome_proof.py
pytest sanity/rpc_light_client_execution_outcome_proof.py --features nightly_protocol,nightly_protocol_features
pytest --timeout=240 sanity/block_sync.py
pytest --timeout=240 sanity/block_sync.py --features nightly_protocol,nightly_protocol_features
pytest --timeout=5m sanity/block_sync_archival.py
pytest --timeout=5m sanity/block_sync_archival.py --features nightly_protocol,nightly_protocol_features
pytest --timeout=30m sanity/block_sync_archival.py --long-run
pytest --timeout=30m sanity/block_sync_archival.py --long-run --features nightly_protocol,nightly_protocol_features
pytest --timeout=10m sanity/block_sync_archival.py
pytest --timeout=10m sanity/block_sync_archival.py --features nightly_protocol,nightly_protocol_features
pytest --timeout=240 sanity/validator_switch.py
pytest --timeout=240 sanity/validator_switch.py --features nightly_protocol,nightly_protocol_features
pytest --timeout=240 sanity/rpc_state_changes.py
Expand Down
10 changes: 9 additions & 1 deletion pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,17 @@ def start(self, *, boot_node: BootNode = None, skip_starting_proxy=False):
logger.error(
'=== failed to start node, rpc does not ready in 10 seconds')

def kill(self):
def kill(self, *, gentle=False):
"""Kills the process. If `gentle` sends SIGINT before killing."""
if self._proxy_local_stopped is not None:
self._proxy_local_stopped.value = 1
if self._process and gentle:
self._process.send_signal(signal.SIGINT)
try:
self._process.wait(5)
self._process = None
except subprocess.TimeoutExpired:
pass
if self._process:
self._process.kill()
self._process.wait(5)
Expand Down
Loading

0 comments on commit 6be2e0e

Please sign in to comment.