Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(resharding): implemented error handling and test for it #10179

Merged
merged 4 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2146,8 +2146,6 @@ impl Chain {
"start_process_block_impl",
height = block_height)
.entered();

tracing::debug!(target: "chain", "start process block");
// 0) Before we proceed with any further processing, we first check that the block
// hash and signature matches to make sure the block is indeed produced by the assigned
// block producer. If not, we drop the block immediately
Expand Down
3 changes: 3 additions & 0 deletions chain/chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ pub(crate) enum ReshardingStatus {
BuildingState,
/// The resharding is finished.
Finished,
/// The resharding failed. Manual recovery is necessary!
Failed,
}

impl From<ReshardingStatus> for i64 {
Expand All @@ -158,6 +160,7 @@ impl From<ReshardingStatus> for i64 {
ReshardingStatus::Scheduled => 0,
ReshardingStatus::BuildingState => 1,
ReshardingStatus::Finished => 2,
ReshardingStatus::Failed => -1,
}
}
}
Expand Down
47 changes: 34 additions & 13 deletions chain/chain/src/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use std::sync::Arc;
use std::time::Duration;
use tracing::debug;

const MAX_RESHARDING_POLL_TIME: Duration = Duration::from_secs(5 * 60 * 60); // 5 hrs

/// StateSplitRequest has all the information needed to start a resharding job. This message is sent
/// from ClientActor to SyncJobsActor. We do not want to stall the ClientActor with a long running
/// resharding job. The SyncJobsActor is helpful for handling such long running jobs.
Expand Down Expand Up @@ -68,7 +66,8 @@ impl Debug for StateSplitRequest {
.field("prev_prev_hash", &self.prev_prev_hash)
.field("shard_uid", &self.shard_uid)
.field("state_root", &self.state_root)
.field("next_epoch_shard_layout", &self.next_epoch_shard_layout)
.field("next_epoch_shard_layout_version", &self.next_epoch_shard_layout.version())
.field("curr_poll_time", &self.curr_poll_time)
.finish()
}
}
Expand Down Expand Up @@ -200,6 +199,7 @@ impl Chain {
shard_id: ShardId,
state_split_scheduler: &dyn Fn(StateSplitRequest),
) -> Result<(), Error> {
tracing::debug!(target: "resharding", ?shard_id, ?sync_hash, "preprocessing started");
let block_header = self.get_block_header(sync_hash)?;
let shard_layout = self.epoch_manager.get_shard_layout(block_header.epoch_id())?;
let next_epoch_shard_layout =
Expand Down Expand Up @@ -233,26 +233,47 @@ impl Chain {
/// Function to check whether the snapshot is ready for resharding or not. We return true if the snapshot is not
/// ready and we need to retry/reschedule the resharding job.
pub fn retry_build_state_for_split_shards(state_split_request: &StateSplitRequest) -> bool {
let StateSplitRequest { tries, prev_prev_hash, curr_poll_time, .. } = state_split_request;
// Do not retry if we have spent more than MAX_RESHARDING_POLL_TIME
let StateSplitRequest { tries, prev_prev_hash, curr_poll_time, config, .. } =
state_split_request;

// Do not retry if we have spent more than max_poll_time
// The error would be caught in build_state_for_split_shards and propagated to client actor
if curr_poll_time > &MAX_RESHARDING_POLL_TIME {
if curr_poll_time > &config.max_poll_time {
tracing::warn!(target: "resharding", ?curr_poll_time, ?config.max_poll_time, "exceeded max poll time while waiting for snapsthot");
return false;
}
tries.get_state_snapshot(prev_prev_hash).is_err_and(|err| match err {
SnapshotError::SnapshotNotFound(_) => true,
SnapshotError::LockWouldBlock => true,
SnapshotError::IncorrectSnapshotRequested(_, _) => false,
SnapshotError::Other(_) => false,
})

let state_snapshot = tries.get_state_snapshot(prev_prev_hash);
if let Err(err) = state_snapshot {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the old code too convoluted to understand?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, I just wanted to add that debug line in the middle of it

tracing::debug!(target: "resharding", ?err, "state snapshot is not ready");
return match err {
SnapshotError::SnapshotNotFound(_) => true,
SnapshotError::LockWouldBlock => true,
SnapshotError::IncorrectSnapshotRequested(_, _) => false,
SnapshotError::Other(_) => false,
};
}

// The snapshot is Ok, no need to retry.
return false;
}

pub fn build_state_for_split_shards(
state_split_request: StateSplitRequest,
) -> StateSplitResponse {
let shard_id = state_split_request.shard_uid.shard_id();
let shard_uid = state_split_request.shard_uid;
let shard_id = shard_uid.shard_id();
let sync_hash = state_split_request.sync_hash;
let new_state_roots = Self::build_state_for_split_shards_impl(state_split_request);
match &new_state_roots {
Ok(_) => {}
Err(err) => {
tracing::error!(target: "resharding", ?shard_uid, ?err, "Resharding failed, manual recovery is necessary!");
RESHARDING_STATUS
.with_label_values(&[&shard_uid.to_string()])
.set(ReshardingStatus::Failed.into());
}
}
StateSplitResponse { shard_id, sync_hash, new_state_roots }
}

Expand Down
1 change: 1 addition & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2388,6 +2388,7 @@ impl Client {

let (state_sync, shards_to_split, blocks_catch_up_state) =
self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| {
tracing::debug!(target: "client", ?sync_hash, "inserting new state sync");
notify_state_sync = true;
(
StateSync::new(
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ impl StateSync {
shard_id,
state_split_scheduler,
)?;
tracing::debug!(target: "sync", %shard_id, %sync_hash, ?me, "State sync split scheduled");
tracing::debug!(target: "sync", %shard_id, %sync_hash, ?me, "resharding scheduled");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah, how did we miss this in the past?

*shard_sync_download =
ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateSplitApplying };
Ok(())
Expand Down
24 changes: 18 additions & 6 deletions chain/client/src/sync_jobs_actor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::ClientActor;
use actix::AsyncContext;
use std::time::Duration;

use near_chain::chain::{
do_apply_chunks, ApplyStatePartsRequest, ApplyStatePartsResponse, BlockCatchUpRequest,
Expand Down Expand Up @@ -155,18 +156,29 @@ impl actix::Handler<WithSpanContext<StateSplitRequest>> for SyncJobsActor {
msg: WithSpanContext<StateSplitRequest>,
context: &mut Self::Context,
) -> Self::Result {
let (_span, mut state_split_request) = handler_debug_span!(target: "client", msg);
let (_span, mut state_split_request) = handler_debug_span!(target: "resharding", msg);

// Wait for the initial delay. It should only be used in tests.
let initial_delay = state_split_request.config.initial_delay;
if state_split_request.curr_poll_time == Duration::ZERO && initial_delay > Duration::ZERO {
tracing::debug!(target: "resharding", ?state_split_request, ?initial_delay, "Waiting for the initial delay");
state_split_request.curr_poll_time += initial_delay;
context.notify_later(state_split_request.with_span_context(), initial_delay);
return;
}

if Chain::retry_build_state_for_split_shards(&state_split_request) {
// Actix implementation let's us send message to ourselves with a delay.
// In case snapshots are not ready yet, we will retry resharding later.
tracing::debug!(target: "client", ?state_split_request, "Snapshot missing, retrying resharding later");
let retry_delay = state_split_request.config.retry_delay;
tracing::debug!(target: "resharding", ?state_split_request, ?retry_delay, "Snapshot missing, retrying resharding later");
state_split_request.curr_poll_time += retry_delay;
context.notify_later(state_split_request.with_span_context(), retry_delay);
} else {
tracing::debug!(target: "client", ?state_split_request, "Starting resharding");
let response = Chain::build_state_for_split_shards(state_split_request);
self.client_addr.do_send(response.with_span_context());
return;
}

tracing::debug!(target: "resharding", ?state_split_request, "Starting resharding");
let response = Chain::build_state_for_split_shards(state_split_request);
self.client_addr.do_send(response.with_span_context());
}
}
14 changes: 14 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ pub struct StateSplitConfig {
/// The delay between attempts to start resharding while waiting for the
/// state snapshot to become available.
pub retry_delay: Duration,

/// The delay between the resharding request is received and when the actor
/// actually starts working on it. This delay should only be used in tests.
pub initial_delay: Duration,

/// The maximum time that the actor will wait for the snapshot to be ready,
/// before starting resharding. Do not wait indefinitely since we want to
/// report error early enough for the node maintainer to have time to recover.
pub max_poll_time: Duration,
}

impl Default for StateSplitConfig {
Expand All @@ -187,6 +196,11 @@ impl Default for StateSplitConfig {
batch_size: bytesize::ByteSize::kb(500),
batch_delay: Duration::from_millis(100),
retry_delay: Duration::from_secs(10),
initial_delay: Duration::from_secs(0),
// The snapshot typically is available within a minute from the
// epoch start. Set the default higher in case we need to wait for
// state sync.
max_poll_time: Duration::from_secs(2 * 60 * 60), // 2 hours
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,7 @@ pytest --timeout=600 sanity/split_storage.py --features nightly
# Test for resharding
pytest --timeout=120 sanity/resharding.py
pytest --timeout=120 sanity/resharding.py --features nightly

# Test for resharding error handling
pytest --timeout=120 sanity/resharding_error_handling.py
pytest --timeout=120 sanity/resharding_error_handling.py --features nightly
25 changes: 25 additions & 0 deletions pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,7 @@ def apply_config_changes(node_dir, client_config_change):
'consensus.block_fetch_horizon',
'consensus.min_block_production_delay',
'consensus.state_sync_timeout',
'expected_shutdown',
'log_summary_period',
'max_gas_burnt_view',
'rosetta_rpc',
Expand Down Expand Up @@ -982,3 +983,27 @@ def get_binary_protocol_version(config) -> typing.Optional[int]:
if tokens[i] == "protocol" and i + 1 < n:
return int(tokens[i + 1])
return None


def corrupt_state_snapshot(config, node_dir, shard_layout_version):
near_root = config['near_root']
binary_name = config.get('binary_name', 'neard')
binary_path = os.path.join(near_root, binary_name)

cmd = [
binary_path,
"--home",
node_dir,
"database",
"corrupt-state-snapshot",
"--shard-layout-version",
str(shard_layout_version),
]

env = os.environ.copy()
env["RUST_BACKTRACE"] = "1"
env["RUST_LOG"] = "db=warn,db_opener=warn," + env.get("RUST_LOG", "debug")

out = subprocess.check_output(cmd, text=True, env=env)

return out
9 changes: 9 additions & 0 deletions pytest/lib/resharding_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,12 @@ def get_target_num_shards(binary_protocol_version):
return 4

assert False


def get_epoch_offset(binary_protocol_version):
if binary_protocol_version >= V2_PROTOCOL_VERSION:
return 1
if binary_protocol_version >= V1_PROTOCOL_VERSION:
return 0

assert False
38 changes: 23 additions & 15 deletions pytest/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,25 +144,33 @@ def get_all_metrics(self) -> str:
f"Could not fetch metrics from {self.addr}: {response}")
return response.content.decode('utf-8')

def get_metric_all_values(
self, metric_name: str) -> typing.List[typing.Tuple[str, str]]:
for family in text_string_to_metric_families(self.get_all_metrics()):
if family.name == metric_name:
return [
(sample.labels, sample.value) for sample in family.samples
]
return []

def get_metric_value(
self,
metric_name: str,
labels: typing.Optional[typing.Dict[str, str]] = None
) -> typing.Optional[str]:
for family in text_string_to_metric_families(self.get_all_metrics()):
if family.name == metric_name:
all_samples = [sample for sample in family.samples]
if not labels:
if len(all_samples) > 1:
raise AssertionError(
f"Too many metric values ({len(all_samples)}) for {metric_name} - please specify a label"
)
if not all_samples:
return None
return all_samples[0].value
for sample in all_samples:
if sample.labels == labels:
return sample.value
all_samples = self.get_metric_all_values(metric_name)
if not labels:
if len(all_samples) > 1:
raise AssertionError(
f"Too many metric values ({len(all_samples)}) for {metric_name} - please specify a label"
)
if not all_samples:
return None
(sample_labels, sample_value) = all_samples[0]
return sample_value
for (sample_labels, sample_value) in all_samples:
if sample_labels == labels:
return sample_value
return None

def get_int_metric_value(
Expand All @@ -172,7 +180,7 @@ def get_int_metric_value(
) -> typing.Optional[int]:
"""Helper function to return the integer value of the metric (as function above returns strings)."""
value = self.get_metric_value(metric_name, labels)
if not value:
if value is None:
return None
return round(float(value))

Expand Down
9 changes: 6 additions & 3 deletions pytest/tests/sanity/resharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from configured_logger import logger
from cluster import get_binary_protocol_version, init_cluster, load_config, spin_up_node
from utils import MetricsTracker, poll_blocks
from resharding_lib import append_shard_layout_config_changes, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version
from resharding_lib import append_shard_layout_config_changes, get_epoch_offset, get_genesis_num_shards, get_genesis_shard_layout_version, get_target_num_shards, get_target_shard_layout_version


class ReshardingTest(unittest.TestCase):
Expand All @@ -36,6 +36,8 @@ def setUp(self) -> None:
self.target_num_shards = get_target_num_shards(
self.binary_protocol_version)

self.epoch_offset = get_epoch_offset(self.binary_protocol_version)

def __get_genesis_config_changes(self):
genesis_config_changes = [
["epoch_length", self.epoch_length],
Expand All @@ -62,7 +64,7 @@ def __get_client_config_changes(self, num_nodes):
# retry often to start resharding as fast as possible
"retry_delay": {
"secs": 0,
"nanos": 500_000_000
"nanos": 100_000_000
}
}
}
Expand Down Expand Up @@ -114,7 +116,8 @@ def test_resharding(self):
# after the block is processed. If there is some delay the shard
# layout may change and the assertions below will fail.

if height <= 2 * self.epoch_length + 1:
# TODO(resharding) Why is epoch offset needed here?
if height <= 2 * self.epoch_length + self.epoch_offset:
self.assertEqual(version, self.genesis_shard_layout_version)
self.assertEqual(num_shards, self.genesis_num_shards)
else:
Expand Down
Loading
Loading