Skip to content

Commit

Permalink
feat(resharding): implemented error handling and test for it (#10179)
Browse files Browse the repository at this point in the history
- added a new ReshardingStatus - Failed - and use it when resharding
fails
- made a number of delays and timeouts configurable so that it's
testable
- added a new nayduck test for checking the error handling
- added a bunch of new debug, warn and error logs

https://nayduck.near.org/#/run/3270
  • Loading branch information
wacban authored Nov 15, 2023
1 parent 4afba2a commit d782f03
Show file tree
Hide file tree
Showing 17 changed files with 395 additions and 41 deletions.
2 changes: 0 additions & 2 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2078,8 +2078,6 @@ impl Chain {
"start_process_block_impl",
height = block_height)
.entered();

tracing::debug!(target: "chain", "start process block");
// 0) Before we proceed with any further processing, we first check that the block
// hash and signature matches to make sure the block is indeed produced by the assigned
// block producer. If not, we drop the block immediately
Expand Down
3 changes: 3 additions & 0 deletions chain/chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ pub(crate) enum ReshardingStatus {
BuildingState,
/// The resharding is finished.
Finished,
/// The resharding failed. Manual recovery is necessary!
Failed,
}

impl From<ReshardingStatus> for i64 {
Expand All @@ -158,6 +160,7 @@ impl From<ReshardingStatus> for i64 {
ReshardingStatus::Scheduled => 0,
ReshardingStatus::BuildingState => 1,
ReshardingStatus::Finished => 2,
ReshardingStatus::Failed => -1,
}
}
}
Expand Down
47 changes: 34 additions & 13 deletions chain/chain/src/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use std::sync::Arc;
use std::time::Duration;
use tracing::debug;

const MAX_RESHARDING_POLL_TIME: Duration = Duration::from_secs(5 * 60 * 60); // 5 hrs

/// StateSplitRequest has all the information needed to start a resharding job. This message is sent
/// from ClientActor to SyncJobsActor. We do not want to stall the ClientActor with a long running
/// resharding job. The SyncJobsActor is helpful for handling such long running jobs.
Expand Down Expand Up @@ -68,7 +66,8 @@ impl Debug for StateSplitRequest {
.field("prev_prev_hash", &self.prev_prev_hash)
.field("shard_uid", &self.shard_uid)
.field("state_root", &self.state_root)
.field("next_epoch_shard_layout", &self.next_epoch_shard_layout)
.field("next_epoch_shard_layout_version", &self.next_epoch_shard_layout.version())
.field("curr_poll_time", &self.curr_poll_time)
.finish()
}
}
Expand Down Expand Up @@ -200,6 +199,7 @@ impl Chain {
shard_id: ShardId,
state_split_scheduler: &dyn Fn(StateSplitRequest),
) -> Result<(), Error> {
tracing::debug!(target: "resharding", ?shard_id, ?sync_hash, "preprocessing started");
let block_header = self.get_block_header(sync_hash)?;
let shard_layout = self.epoch_manager.get_shard_layout(block_header.epoch_id())?;
let next_epoch_shard_layout =
Expand Down Expand Up @@ -233,26 +233,47 @@ impl Chain {
/// Function to check whether the snapshot is ready for resharding or not. We return true if the snapshot is not
/// ready and we need to retry/reschedule the resharding job.
pub fn retry_build_state_for_split_shards(state_split_request: &StateSplitRequest) -> bool {
let StateSplitRequest { tries, prev_prev_hash, curr_poll_time, .. } = state_split_request;
// Do not retry if we have spent more than MAX_RESHARDING_POLL_TIME
let StateSplitRequest { tries, prev_prev_hash, curr_poll_time, config, .. } =
state_split_request;

// Do not retry if we have spent more than max_poll_time
// The error would be caught in build_state_for_split_shards and propagated to client actor
if curr_poll_time > &MAX_RESHARDING_POLL_TIME {
if curr_poll_time > &config.max_poll_time {
tracing::warn!(target: "resharding", ?curr_poll_time, ?config.max_poll_time, "exceeded max poll time while waiting for snapsthot");
return false;
}
tries.get_state_snapshot(prev_prev_hash).is_err_and(|err| match err {
SnapshotError::SnapshotNotFound(_) => true,
SnapshotError::LockWouldBlock => true,
SnapshotError::IncorrectSnapshotRequested(_, _) => false,
SnapshotError::Other(_) => false,
})

let state_snapshot = tries.get_state_snapshot(prev_prev_hash);
if let Err(err) = state_snapshot {
tracing::debug!(target: "resharding", ?err, "state snapshot is not ready");
return match err {
SnapshotError::SnapshotNotFound(_) => true,
SnapshotError::LockWouldBlock => true,
SnapshotError::IncorrectSnapshotRequested(_, _) => false,
SnapshotError::Other(_) => false,
};
}

// The snapshot is Ok, no need to retry.
return false;
}

pub fn build_state_for_split_shards(
state_split_request: StateSplitRequest,
) -> StateSplitResponse {
let shard_id = state_split_request.shard_uid.shard_id();
let shard_uid = state_split_request.shard_uid;
let shard_id = shard_uid.shard_id();
let sync_hash = state_split_request.sync_hash;
let new_state_roots = Self::build_state_for_split_shards_impl(state_split_request);
match &new_state_roots {
Ok(_) => {}
Err(err) => {
tracing::error!(target: "resharding", ?shard_uid, ?err, "Resharding failed, manual recovery is necessary!");
RESHARDING_STATUS
.with_label_values(&[&shard_uid.to_string()])
.set(ReshardingStatus::Failed.into());
}
}
StateSplitResponse { shard_id, sync_hash, new_state_roots }
}

Expand Down
1 change: 1 addition & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,7 @@ impl Client {

let (state_sync, shards_to_split, blocks_catch_up_state) =
self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| {
tracing::debug!(target: "client", ?sync_hash, "inserting new state sync");
notify_state_sync = true;
(
StateSync::new(
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ impl StateSync {
shard_id,
state_split_scheduler,
)?;
tracing::debug!(target: "sync", %shard_id, %sync_hash, ?me, "State sync split scheduled");
tracing::debug!(target: "sync", %shard_id, %sync_hash, ?me, "resharding scheduled");
*shard_sync_download =
ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateSplitApplying };
Ok(())
Expand Down
24 changes: 18 additions & 6 deletions chain/client/src/sync_jobs_actor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::ClientActor;
use actix::AsyncContext;
use std::time::Duration;

use near_chain::chain::{
do_apply_chunks, ApplyStatePartsRequest, ApplyStatePartsResponse, BlockCatchUpRequest,
Expand Down Expand Up @@ -155,18 +156,29 @@ impl actix::Handler<WithSpanContext<StateSplitRequest>> for SyncJobsActor {
msg: WithSpanContext<StateSplitRequest>,
context: &mut Self::Context,
) -> Self::Result {
let (_span, mut state_split_request) = handler_debug_span!(target: "client", msg);
let (_span, mut state_split_request) = handler_debug_span!(target: "resharding", msg);

// Wait for the initial delay. It should only be used in tests.
let initial_delay = state_split_request.config.initial_delay;
if state_split_request.curr_poll_time == Duration::ZERO && initial_delay > Duration::ZERO {
tracing::debug!(target: "resharding", ?state_split_request, ?initial_delay, "Waiting for the initial delay");
state_split_request.curr_poll_time += initial_delay;
context.notify_later(state_split_request.with_span_context(), initial_delay);
return;
}

if Chain::retry_build_state_for_split_shards(&state_split_request) {
// Actix implementation let's us send message to ourselves with a delay.
// In case snapshots are not ready yet, we will retry resharding later.
tracing::debug!(target: "client", ?state_split_request, "Snapshot missing, retrying resharding later");
let retry_delay = state_split_request.config.retry_delay;
tracing::debug!(target: "resharding", ?state_split_request, ?retry_delay, "Snapshot missing, retrying resharding later");
state_split_request.curr_poll_time += retry_delay;
context.notify_later(state_split_request.with_span_context(), retry_delay);
} else {
tracing::debug!(target: "client", ?state_split_request, "Starting resharding");
let response = Chain::build_state_for_split_shards(state_split_request);
self.client_addr.do_send(response.with_span_context());
return;
}

tracing::debug!(target: "resharding", ?state_split_request, "Starting resharding");
let response = Chain::build_state_for_split_shards(state_split_request);
self.client_addr.do_send(response.with_span_context());
}
}
14 changes: 14 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ pub struct StateSplitConfig {
/// The delay between attempts to start resharding while waiting for the
/// state snapshot to become available.
pub retry_delay: Duration,

/// The delay between the resharding request is received and when the actor
/// actually starts working on it. This delay should only be used in tests.
pub initial_delay: Duration,

/// The maximum time that the actor will wait for the snapshot to be ready,
/// before starting resharding. Do not wait indefinitely since we want to
/// report error early enough for the node maintainer to have time to recover.
pub max_poll_time: Duration,
}

impl Default for StateSplitConfig {
Expand All @@ -187,6 +196,11 @@ impl Default for StateSplitConfig {
batch_size: bytesize::ByteSize::kb(500),
batch_delay: Duration::from_millis(100),
retry_delay: Duration::from_secs(10),
initial_delay: Duration::from_secs(0),
// The snapshot typically is available within a minute from the
// epoch start. Set the default higher in case we need to wait for
// state sync.
max_poll_time: Duration::from_secs(2 * 60 * 60), // 2 hours
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,7 @@ pytest --timeout=600 sanity/split_storage.py --features nightly
# Test for resharding
pytest --timeout=120 sanity/resharding.py
pytest --timeout=120 sanity/resharding.py --features nightly

# Test for resharding error handling
pytest --timeout=120 sanity/resharding_error_handling.py
pytest --timeout=120 sanity/resharding_error_handling.py --features nightly
25 changes: 25 additions & 0 deletions pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,7 @@ def apply_config_changes(node_dir, client_config_change):
'consensus.block_fetch_horizon',
'consensus.min_block_production_delay',
'consensus.state_sync_timeout',
'expected_shutdown',
'log_summary_period',
'max_gas_burnt_view',
'rosetta_rpc',
Expand Down Expand Up @@ -982,3 +983,27 @@ def get_binary_protocol_version(config) -> typing.Optional[int]:
if tokens[i] == "protocol" and i + 1 < n:
return int(tokens[i + 1])
return None


def corrupt_state_snapshot(config, node_dir, shard_layout_version):
near_root = config['near_root']
binary_name = config.get('binary_name', 'neard')
binary_path = os.path.join(near_root, binary_name)

cmd = [
binary_path,
"--home",
node_dir,
"database",
"corrupt-state-snapshot",
"--shard-layout-version",
str(shard_layout_version),
]

env = os.environ.copy()
env["RUST_BACKTRACE"] = "1"
env["RUST_LOG"] = "db=warn,db_opener=warn," + env.get("RUST_LOG", "debug")

out = subprocess.check_output(cmd, text=True, env=env)

return out
9 changes: 9 additions & 0 deletions pytest/lib/resharding_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,12 @@ def get_target_num_shards(binary_protocol_version):
return 4

assert False


def get_epoch_offset(binary_protocol_version):
if binary_protocol_version >= V2_PROTOCOL_VERSION:
return 1
if binary_protocol_version >= V1_PROTOCOL_VERSION:
return 0

assert False
38 changes: 23 additions & 15 deletions pytest/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,25 +144,33 @@ def get_all_metrics(self) -> str:
f"Could not fetch metrics from {self.addr}: {response}")
return response.content.decode('utf-8')

def get_metric_all_values(
self, metric_name: str) -> typing.List[typing.Tuple[str, str]]:
for family in text_string_to_metric_families(self.get_all_metrics()):
if family.name == metric_name:
return [
(sample.labels, sample.value) for sample in family.samples
]
return []

def get_metric_value(
self,
metric_name: str,
labels: typing.Optional[typing.Dict[str, str]] = None
) -> typing.Optional[str]:
for family in text_string_to_metric_families(self.get_all_metrics()):
if family.name == metric_name:
all_samples = [sample for sample in family.samples]
if not labels:
if len(all_samples) > 1:
raise AssertionError(
f"Too many metric values ({len(all_samples)}) for {metric_name} - please specify a label"
)
if not all_samples:
return None
return all_samples[0].value
for sample in all_samples:
if sample.labels == labels:
return sample.value
all_samples = self.get_metric_all_values(metric_name)
if not labels:
if len(all_samples) > 1:
raise AssertionError(
f"Too many metric values ({len(all_samples)}) for {metric_name} - please specify a label"
)
if not all_samples:
return None
(sample_labels, sample_value) = all_samples[0]
return sample_value
for (sample_labels, sample_value) in all_samples:
if sample_labels == labels:
return sample_value
return None

def get_int_metric_value(
Expand All @@ -172,7 +180,7 @@ def get_int_metric_value(
) -> typing.Optional[int]:
"""Helper function to return the integer value of the metric (as function above returns strings)."""
value = self.get_metric_value(metric_name, labels)
if not value:
if value is None:
return None
return round(float(value))

Expand Down
9 changes: 6 additions & 3 deletions pytest/tests/sanity/resharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from configured_logger import logger
from cluster import get_binary_protocol_version, init_cluster, load_config, spin_up_node
from utils import MetricsTracker, poll_blocks
from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version
from resharding_lib import append_shard_layout_config_changes, get_epoch_offset, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version


class ReshardingTest(unittest.TestCase):
Expand All @@ -36,6 +36,8 @@ def setUp(self) -> None:
self.target_num_shards = get_target_num_shards(
self.binary_protocol_version)

self.epoch_offset = get_epoch_offset(self.binary_protocol_version)

def __get_genesis_config_changes(self):
genesis_config_changes = [
["epoch_length", self.epoch_length],
Expand All @@ -62,7 +64,7 @@ def __get_client_config_changes(self, num_nodes):
# retry often to start resharding as fast as possible
"retry_delay": {
"secs": 0,
"nanos": 500_000_000
"nanos": 100_000_000
}
}
}
Expand Down Expand Up @@ -114,7 +116,8 @@ def test_resharding(self):
# after the block is processed. If there is some delay the shard
# layout may change and the assertions below will fail.

if height <= 2 * self.epoch_length + 1:
# TODO(resharding) Why is epoch offset needed here?
if height <= 2 * self.epoch_length + self.epoch_offset:
self.assertEqual(version, self.genesis_shard_layout_version)
self.assertEqual(num_shards, self.genesis_num_shards)
else:
Expand Down
Loading

0 comments on commit d782f03

Please sign in to comment.