From 6be2e0e8e3a92543cde0a1703708ada12a033b13 Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Wed, 6 Apr 2022 19:41:10 +0200 Subject: [PATCH] chain: gc partial chunks on archival nodes (#6439) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Start garbage collecting ColPartialChunks and ColInvalidChunks on archival nodes. The former is quite sizeable column and its data can be recovered from ColChunks. The latter is only needed when operating at head. Note that this is likely insufficient for the garbage collection to happen in reasonable time (since with current default options we’re garbage collecting only two heights at a time). It’s best to clean out the two columns. Issue: https://github.com/near/nearcore/issues/6242 --- chain/chain/src/chain.rs | 26 +++ chain/chain/src/store.rs | 47 ++++- chain/chunks/src/chunk_cache.rs | 2 - chain/client/src/client.rs | 25 ++- nightly/pytest-sanity.txt | 6 +- pytest/lib/cluster.py | 10 +- pytest/tests/sanity/block_sync_archival.py | 233 +++++++++++---------- 7 files changed, 221 insertions(+), 128 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 7289a958af3..ced050a31d9 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -868,6 +868,32 @@ impl Chain { Ok(()) } + /// Garbage collect data which archival node doesn’t need to keep. + /// + /// Normally, archival nodes keep all the data from the genesis block and + /// don’t run garbage collection. On the other hand, for better performance + /// the storage contains some data duplication, i.e. values in some of the + /// columns can be recomputed from data in different columns. To save on + /// storage, archival nodes do garbage collect that data. + /// + /// `gc_height_limit` limits how many heights will the function process. + pub fn clear_archive_data(&mut self, gc_height_limit: BlockHeightDelta) -> Result<(), Error> { + let _d = DelayDetector::new(|| "GC".into()); + + let head = self.store.head()?; + let gc_stop_height = self.runtime_adapter.get_gc_stop_height(&head.last_block_hash); + if gc_stop_height > head.height { + return Err(ErrorKind::GCError( + "gc_stop_height cannot be larger than head.height".into(), + ) + .into()); + } + + let mut chain_store_update = self.store.store_update(); + chain_store_update.clear_redundant_chunk_data(gc_stop_height, gc_height_limit)?; + chain_store_update.commit() + } + pub fn clear_forks_data( &mut self, tries: ShardTries, diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index f45588c7660..d02344f939e 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -27,9 +27,9 @@ use near_primitives::transaction::{ use near_primitives::trie_key::{trie_key_parsers, TrieKey}; use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{ - AccountId, BlockExtra, BlockHeight, EpochId, GCCount, NumBlocks, ShardId, StateChanges, - StateChangesExt, StateChangesForSplitStates, StateChangesKinds, StateChangesKindsExt, - StateChangesRequest, + AccountId, BlockExtra, BlockHeight, BlockHeightDelta, EpochId, GCCount, NumBlocks, ShardId, + StateChanges, StateChangesExt, StateChangesForSplitStates, StateChangesKinds, + StateChangesKindsExt, StateChangesRequest, }; use near_primitives::utils::{get_block_shard_id, index_to_bytes, to_timestamp}; use near_primitives::views::LightClientBlockView; @@ -2037,6 +2037,47 @@ impl<'a> ChainStoreUpdate<'a> { Ok(()) } + /// Clears chunk data which can be computed from other data in the storage. + /// + /// We are storing PartialEncodedChunk objects in the ColPartialChunks in + /// the storage. However, those objects can be computed from data in + /// ColChunks and as such are redundant. For performance reasons we want to + /// keep that data when operating at head of the chain but the data can be + /// safely removed from archival storage. + /// + /// `gc_stop_height` indicates height starting from which no data should be + /// garbage collected. Roughly speaking this represents start of the ‘hot’ + /// data that we want to keep. + /// + /// `gt_height_limit` indicates limit of how many non-empty heights to + /// process. This limit means that the method may stop garbage collection + /// before reaching `gc_stop_height`. + pub fn clear_redundant_chunk_data( + &mut self, + gc_stop_height: BlockHeight, + gc_height_limit: BlockHeightDelta, + ) -> Result<(), Error> { + let mut height = self.chunk_tail()?; + let mut remaining = gc_height_limit; + while height < gc_stop_height && remaining > 0 { + let chunk_hashes = self.chain_store.get_all_chunk_hashes_by_height(height)?; + height += 1; + if !chunk_hashes.is_empty() { + remaining -= 1; + for chunk_hash in chunk_hashes { + let chunk_header_hash = chunk_hash.into(); + self.gc_col(ColPartialChunks, &chunk_header_hash); + // Data in ColInvalidChunks isn’t technically redundant (it + // cannot be calculated from other data) but it is data we + // don’t need for anything so it can be deleted as well. + self.gc_col(ColInvalidChunks, &chunk_header_hash); + } + } + } + self.update_chunk_tail(height); + Ok(()) + } + fn get_shard_uids_to_gc( &mut self, runtime_adapter: &dyn RuntimeAdapter, diff --git a/chain/chunks/src/chunk_cache.rs b/chain/chunks/src/chunk_cache.rs index 02419b4c2ed..1808b1f6c8e 100644 --- a/chain/chunks/src/chunk_cache.rs +++ b/chain/chunks/src/chunk_cache.rs @@ -25,8 +25,6 @@ use tracing::warn; // Users of the data structure are responsible for adding chunk to this map at the right time. /// A chunk is out of horizon if its height + HEIGHT_HORIZON < largest_seen_height -// Note: If this number increases, make sure `LONG_TARGET_HEIGHT` in -// block_sync_archival.py is updated as well. const HEIGHT_HORIZON: BlockHeightDelta = 1024; /// A chunk is out of horizon if its height > HEIGHT_HORIZON + largest_seen_height const MAX_HEIGHTS_AHEAD: BlockHeightDelta = 5; diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 56b26f997eb..19a24874a6f 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1103,17 +1103,20 @@ impl Client { self.chain.get_block_header(last_final_block).map_or(0, |header| header.height()) }; self.chain.blocks_with_missing_chunks.prune_blocks_below_height(last_finalized_height); - if !self.config.archive { - let timer = metrics::GC_TIME.start_timer(); - if let Err(err) = self - .chain - .clear_data(self.runtime_adapter.get_tries(), self.config.gc_blocks_limit) - { - error!(target: "client", "Can't clear old data, {:?}", err); - debug_assert!(false); - }; - timer.observe_duration(); - } + + let timer = metrics::GC_TIME.start_timer(); + let gc_blocks_limit = self.config.gc_blocks_limit; + let result = if self.config.archive { + self.chain.clear_archive_data(gc_blocks_limit) + } else { + let tries = self.runtime_adapter.get_tries(); + self.chain.clear_data(tries, gc_blocks_limit) + }; + if let Err(err) = result { + error!(target: "client", "Can't clear old data, {:?}", err); + debug_assert!(false); + }; + timer.observe_duration(); if self.runtime_adapter.is_next_block_epoch_start(block.hash()).unwrap_or(false) { let next_epoch_protocol_version = unwrap_or_return!(self diff --git a/nightly/pytest-sanity.txt b/nightly/pytest-sanity.txt index b4c773bfdeb..957f3ff550b 100644 --- a/nightly/pytest-sanity.txt +++ b/nightly/pytest-sanity.txt @@ -60,10 +60,8 @@ pytest sanity/rpc_light_client_execution_outcome_proof.py pytest sanity/rpc_light_client_execution_outcome_proof.py --features nightly_protocol,nightly_protocol_features pytest --timeout=240 sanity/block_sync.py pytest --timeout=240 sanity/block_sync.py --features nightly_protocol,nightly_protocol_features -pytest --timeout=5m sanity/block_sync_archival.py -pytest --timeout=5m sanity/block_sync_archival.py --features nightly_protocol,nightly_protocol_features -pytest --timeout=30m sanity/block_sync_archival.py --long-run -pytest --timeout=30m sanity/block_sync_archival.py --long-run --features nightly_protocol,nightly_protocol_features +pytest --timeout=10m sanity/block_sync_archival.py +pytest --timeout=10m sanity/block_sync_archival.py --features nightly_protocol,nightly_protocol_features pytest --timeout=240 sanity/validator_switch.py pytest --timeout=240 sanity/validator_switch.py --features nightly_protocol,nightly_protocol_features pytest --timeout=240 sanity/rpc_state_changes.py diff --git a/pytest/lib/cluster.py b/pytest/lib/cluster.py index 8c6f967a369..0bab493474e 100644 --- a/pytest/lib/cluster.py +++ b/pytest/lib/cluster.py @@ -466,9 +466,17 @@ def start(self, *, boot_node: BootNode = None, skip_starting_proxy=False): logger.error( '=== failed to start node, rpc does not ready in 10 seconds') - def kill(self): + def kill(self, *, gentle=False): + """Kills the process. If `gentle` sends SIGINT before killing.""" if self._proxy_local_stopped is not None: self._proxy_local_stopped.value = 1 + if self._process and gentle: + self._process.send_signal(signal.SIGINT) + try: + self._process.wait(5) + self._process = None + except subprocess.TimeoutExpired: + pass if self._process: self._process.kill() self._process.wait(5) diff --git a/pytest/tests/sanity/block_sync_archival.py b/pytest/tests/sanity/block_sync_archival.py index bc344e3b127..f07b2744cc5 100755 --- a/pytest/tests/sanity/block_sync_archival.py +++ b/pytest/tests/sanity/block_sync_archival.py @@ -1,17 +1,30 @@ #!/usr/bin/env python3 """Tests that archival node can sync up history from another archival node. -Initialises a cluster with three archival nodes: one validator and two -observers. Starts the validator with one observer keeping in sync. Once -several epochs worth of blocks are generated kills the validator (so that no new -blocks are generated) and starts the second observer making sure that it -properly synchronises the full history. - -When called with --long-run the test will generate enough blocks so that entries -in EncodedChunksCache start being evicted. That it, it’ll generate more than -HEIGHT_HORIZON blocks defined in chunk_cache.rs. Since that number is 1024, -this may take a while to run but to help with that the validator will be run -with much shorter min_block_production_delay. +The overview of this test is that it starts archival nodes which need to sync +their state from already running archival nodes. The test can be divided into +two stages: + +1. The test first starts a validator and an observer node (let’s call it Fred). + Both configured as archival nodes. It then waits for several epochs worth of + blocks to be generated and received by the observer node. Once that happens, + the test kills the validator node so that no new blocks are generated. + + At this stage, the test verifies that Fred can sync correctly and that the + boot node serves all partial chunks requests from its in-memory cache (which + is determined by looking at Prometheus metrics). + +2. The test then restarts Fred so that its in-memory cache is cleared. It + finally starts a new observer (let’s call it Barney) and points it at Fred as + a boot node. The test waits for Barney to synchronise with Fred and then + verifies that all the blocks have been correctly fetched. + + At this stage, the test verifies that Barney synchronises correctly and that + Fred serves all requests from storage (since it's in-memory cache has been + cleared). This is again done through Prometheus metrics and in addition the + test verifies that data from ColChunks and ColPartialChunks was used. This + also implies that Fred correctly performed ColPartialChunks garbage + collection. """ import argparse @@ -30,35 +43,19 @@ import utils EPOCH_LENGTH = 5 -SHORT_TARGET_HEIGHT = 20 * EPOCH_LENGTH -# This must be greater than HEIGHT_HORIZON in chunk_cache.rs -LONG_TARGET_HEIGHT = 1500 +TARGET_HEIGHT = 20 * EPOCH_LENGTH _DurationMaybe = typing.Optional[datetime.timedelta] class Cluster: - def __init__(self, - *, - min_block_production_delay: _DurationMaybe = None, - max_block_production_delay: _DurationMaybe = None): + def __init__(self): node_config = { 'archive': True, 'tracked_shards': [0], } - for key, duration in (('min_block_production_delay', - min_block_production_delay), - ('max_block_production_delay', - max_block_production_delay)): - if duration: - secs, td = divmod(duration, datetime.timedelta(seconds=1)) - node_config.setdefault('consensus', {})[key] = { - 'secs': secs, - 'nanos': td.microseconds * 1000, - } - self._config = cluster.load_config() self._near_root, self._node_dirs = cluster.init_cluster( num_nodes=1, @@ -71,20 +68,20 @@ def __init__(self, 0: node_config, 1: node_config, 2: node_config, + 3: node_config }) self._nodes = [None] * len(self._node_dirs) - def start_node(self, - ordinal: int, - *, - boot_node_index: int = 0) -> cluster.BaseNode: + def start_node( + self, ordinal: int, *, + boot_node: typing.Optional[cluster.BaseNode]) -> cluster.BaseNode: assert self._nodes[ordinal] is None self._nodes[ordinal] = node = cluster.spin_up_node( self._config, self._near_root, self._node_dirs[ordinal], ordinal, - boot_node=self._nodes[boot_node_index], + boot_node=boot_node, single_node=not ordinal) return node @@ -97,11 +94,63 @@ def __exit__(self, *_): node.cleanup() -def get_all_blocks(node: cluster.BaseNode, *, - head: cluster.BlockId) -> typing.Sequence[cluster.BlockId]: +# TODO(#6458): Move this to separate file and merge with metrics module. +def get_metrics(node_name: str, + node: cluster.BootNode) -> typing.Dict[str, int]: + """Fetches partial encoded chunk request count metrics from node. + + Args: + node_name: Node’s name used when logging the counters. This is purely + for debugging. + node: Node to fetch metrics from. + + Returns: + A `{key: count}` dictionary where key is in ‘method/success’ format. + The values correspond to the + near_partial_encoded_chunk_request_processing_time_count Prometheus + metric. + """ + url = 'http://{}:{}/metrics'.format(*node.rpc_addr()) + response = requests.get(url) + response.raise_for_status() + + metric_name = 'near_partial_encoded_chunk_request_processing_time' + histogram = next( + (metric + for metric in prometheus_client.parser.text_string_to_metric_families( + response.content.decode('utf8')) + if metric.name == metric_name), None) + if not histogram: + return {} + + counts = dict((sample.labels['method'] + '/' + sample.labels['success'], + int(sample.value)) + for sample in histogram.samples + if sample.name.endswith('_count')) + logger.info(f'{node_name} counters: ' + '; '.join( + f'{key}: {count}' for key, count in sorted(counts.items()))) + return counts + + +def assert_metrics(metrics: typing.Dict[str, int], + allowed_non_zero: typing.Sequence[str]) -> None: + """Asserts that only given keys are non-zero. + + Args: + metrics: Metrics as returned by get_metrics() function. + allowed_non_zero: Keys that are expected to be non-zero in the metrics. + """ + for key in allowed_non_zero: + assert metrics.get(key), f'Expected {key} to be non-zero' + for key, count in metrics.items(): + ok = key in allowed_non_zero or not count + assert ok, f'Expected {key} to be zero but got {count}' + + +def get_all_blocks(node: cluster.BaseNode) -> typing.Sequence[cluster.BlockId]: """Returns all blocks from given head down to genesis block.""" ids = [] - block_hash = head.hash + block_hash = node.get_latest_block().hash while block_hash != '11111111111111111111111111111111': block = node.get_block(block_hash) assert 'result' in block, block @@ -111,76 +160,46 @@ def get_all_blocks(node: cluster.BaseNode, *, return list(reversed(ids)) -def main(argv: typing.Sequence[str]) -> None: - parser = argparse.ArgumentParser(description='Run an end-to-end test') - parser.add_argument('--long-run', action='store_true') - opts = parser.parse_args(argv[1:]) - - target_height = SHORT_TARGET_HEIGHT - min_delay = None - max_delay = None - if opts.long_run: - min_delay = datetime.timedelta(milliseconds=1) - max_delay = datetime.timedelta(milliseconds=10) - target_height = LONG_TARGET_HEIGHT - - with Cluster(min_block_production_delay=min_delay, - max_block_production_delay=max_delay) as cluster: - # Start the validator and the first observer. Wait until the observer - # synchronises a few epoch’s worth of blocks to be generated and then - # kill validator so no more blocks are generated. - boot = cluster.start_node(0) - fred = cluster.start_node(1) - utils.wait_for_blocks(fred, target=target_height, poll_interval=1) - boot.kill() - latest = fred.get_latest_block() - - # Start the second observer node and wait for it to catch up with the - # first observer. - barney = cluster.start_node(2, boot_node_index=1) - utils.wait_for_blocks(barney, target=latest.height, poll_interval=1) - - # Verify that observer got all the blocks. Note that get_all_blocks - # verifies that the node has full chain from head to genesis block. - fred_blocks = get_all_blocks(fred, head=latest) - barney_blocks = get_all_blocks(barney, head=latest) - if barney_blocks != fred_blocks: - for a, b in zip(fred_blocks, barney_blocks): - if a != b: - logger.error(f'{a} != {b}') - assert False - - # Get near_partial_encoded_chunk_request_processing_time metric - response = requests.get('http://{}:{}/metrics'.format(*fred.rpc_addr())) - response.raise_for_status() - histogram = next( - metric for metric in prometheus_client.parser. - text_string_to_metric_families(response.content.decode('utf8')) if - metric.name == 'near_partial_encoded_chunk_request_processing_time') - counts = dict((sample.labels['method'] + '/' + sample.labels['success'], - int(sample.value)) - for sample in histogram.samples - if sample.name.endswith('_count')) - logger.info('Counters: ' + '; '.join( - f'{key}: {count}' for key, count in sorted(counts.items()))) - - # In ‘short’ run (i.e. without --long-run flag) we expect all requests - # to be served from in-memory cache. In --long-run we expect chunks to - # be removed from in-memory cache causing some of the requests to be - # served from partial chunks. - if opts.long_run: - keys = ('cache/ok', 'partial/ok') - else: - keys = ('cache/ok',) - for key in keys: - counts.setdefault(key, 0) - for key, count in counts.items(): - if key in keys: - assert count, f'Expected {key} counter to be non-zero' - else: - assert not count, ( - f'Expected {key} counter to be zero but got {count}') +def run_test(cluster: Cluster) -> None: + # Start the validator and the first observer. Wait until the observer + # synchronises a few epoch’s worth of blocks to be generated and then kill + # validator so no more blocks are generated. + boot = cluster.start_node(0, boot_node=None) + fred = cluster.start_node(1, boot_node=boot) + utils.wait_for_blocks(fred, target=TARGET_HEIGHT, poll_interval=1) + metrics = get_metrics('boot', boot) + boot.kill() + + # We didn’t generate enough blocks to fill boot’s in-memory cache which + # means all Fred’s requests should be served from it. + assert_metrics(metrics, ('cache/ok',)) + + # Restart Fred so that its cache is cleared. Then start the second + # observer, Barney, and wait for it to sync up. + fred_blocks = get_all_blocks(fred) + fred.kill(gentle=True) + fred.start() + + barney = cluster.start_node(2, boot_node=fred) + utils.wait_for_blocks(barney, + target=fred_blocks[-1].height, + poll_interval=1) + barney_blocks = get_all_blocks(barney) + if fred_blocks != barney_blocks: + for f, b in zip(fred_blocks, barney_blocks): + if f != b: + logger.error(f'{f} != {b}') + assert False + + # Since Fred’s in-memory cache is clear, all Barney’s requests are served + # from storage. Since ColPartialChunks is garbage collected, some of the + # requests are served from ColChunks. + assert_metrics(get_metrics('fred', fred), ( + 'chunk/ok', + 'partial/ok', + )) if __name__ == '__main__': - main(sys.argv) + with Cluster() as cl: + run_test(cl)