Skip to content

Commit

Permalink
feat(pytest): reenable store validator (#11127)
Browse files Browse the repository at this point in the history
Re-enable store validator tests that were disabled in #11022. It works
by first sending a signal to GC actor to stop garbage collection, then
run the store validator, and send a signal to re-enable gc afterwards.

Nayduck run https://nayduck.nearone.org/#/run/58
  • Loading branch information
bowenwang1996 authored Apr 23, 2024
1 parent 16e2321 commit 8954a36
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 8 deletions.
5 changes: 4 additions & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::sync::{Arc, RwLock};
use tokio::sync::broadcast;

use crate::client_actions::{ClientActionHandler, ClientActions, ClientSenderForClient};
use crate::gc_actor::GCActor;
use crate::start_gc_actor;
use crate::stateless_validation::state_witness_actor::StateWitnessSenderForClient;
use crate::sync_jobs_actions::SyncJobsActions;
Expand Down Expand Up @@ -181,6 +182,7 @@ fn wait_until_genesis(genesis_time: &Utc) {

pub struct StartClientResult {
pub client_actor: Addr<ClientActor>,
pub gc_actor: Addr<GCActor>,
pub client_arbiter_handle: ArbiterHandle,
pub resharding_handle: ReshardingHandle,
pub gc_arbiter_handle: ArbiterHandle,
Expand Down Expand Up @@ -231,7 +233,7 @@ pub fn start_client(
.unwrap();
let resharding_handle = client.chain.resharding_handle.clone();

let (_, gc_arbiter_handle) = start_gc_actor(
let (gc_actor, gc_arbiter_handle) = start_gc_actor(
runtime.store().clone(),
genesis_height,
client_config.clone(),
Expand Down Expand Up @@ -262,5 +264,6 @@ pub fn start_client(
client_arbiter_handle,
resharding_handle,
gc_arbiter_handle,
gc_actor,
}
}
39 changes: 35 additions & 4 deletions chain/client/src/gc_actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::metrics;
#[cfg(feature = "test_features")]
use actix::Handler;
use actix::{Actor, Addr, Arbiter, ArbiterHandle, AsyncContext, Context};
use near_chain::{types::RuntimeAdapter, ChainStore, ChainStoreAccess};
use near_chain_configs::{ClientConfig, GCConfig};
Expand All @@ -17,6 +19,8 @@ pub struct GCActor {
epoch_manager: Arc<dyn EpochManagerAdapter>,
gc_config: GCConfig,
is_archive: bool,
/// In some tests we may want to temporarily disable GC
no_gc: bool,
}

impl GCActor {
Expand All @@ -34,6 +38,7 @@ impl GCActor {
gc_config,
epoch_manager,
is_archive,
no_gc: false,
}
}

Expand Down Expand Up @@ -67,12 +72,14 @@ impl GCActor {
}

fn gc(&mut self, ctx: &mut Context<GCActor>) {
let timer = metrics::GC_TIME.start_timer();
if let Err(e) = self.clear_data() {
warn!(target: "garbage collection", "Error in gc: {}", e);
if !self.no_gc {
let timer = metrics::GC_TIME.start_timer();
if let Err(e) = self.clear_data() {
warn!(target: "garbage collection", "Error in gc: {}", e);
}
timer.observe_duration();
}

timer.observe_duration();
ctx.run_later(self.gc_config.gc_step_period, move |act, ctx| {
act.gc(ctx);
});
Expand All @@ -87,6 +94,30 @@ impl Actor for GCActor {
}
}

#[cfg(feature = "test_features")]
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub enum NetworkAdversarialMessage {
StopGC,
ResumeGC,
}

#[cfg(feature = "test_features")]
impl Handler<NetworkAdversarialMessage> for GCActor {
type Result = ();

fn handle(&mut self, msg: NetworkAdversarialMessage, _ctx: &mut Self::Context) -> Self::Result {
match msg {
NetworkAdversarialMessage::StopGC => {
self.no_gc = true;
}
NetworkAdversarialMessage::ResumeGC => {
self.no_gc = false;
}
}
}
}

pub fn start_gc_actor(
store: Store,
genesis_height: BlockHeight,
Expand Down
2 changes: 2 additions & 0 deletions chain/jsonrpc/jsonrpc-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub fn start_all_with_validity_period_and_no_epoch_sync(
actor_handles.client_actor.clone().with_auto_span_context().into_multi_sender(),
actor_handles.view_client_actor.clone().with_auto_span_context().into_multi_sender(),
noop().into_multi_sender(),
#[cfg(feature = "test_features")]
noop().into_multi_sender(),
Arc::new(DummyEntityDebugHandler {}),
);
(actor_handles.view_client_actor, addr)
Expand Down
30 changes: 28 additions & 2 deletions chain/jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,24 @@ pub struct ViewClientSenderForRpc(
#[cfg(feature = "test_features")] Sender<near_client::NetworkAdversarialMessage>,
);

#[cfg(feature = "test_features")]
#[derive(Clone, near_async::MultiSend, near_async::MultiSenderFrom)]
pub struct GCSenderForRpc(
AsyncSender<
near_client::gc_actor::NetworkAdversarialMessage,
ActixResult<near_client::gc_actor::NetworkAdversarialMessage>,
>,
);

#[derive(Clone, near_async::MultiSend, near_async::MultiSenderFrom)]
pub struct PeerManagerSenderForRpc(AsyncSender<GetDebugStatus, ActixResult<GetDebugStatus>>);

struct JsonRpcHandler {
client_sender: ClientSenderForRpc,
view_client_sender: ViewClientSenderForRpc,
peer_manager_sender: PeerManagerSenderForRpc,
#[cfg(feature = "test_features")]
gc_sender: GCSenderForRpc,
polling_config: RpcPollingConfig,
genesis_config: GenesisConfig,
enable_debug_rpc: bool,
Expand Down Expand Up @@ -1249,8 +1260,15 @@ impl JsonRpcHandler {
}
}

/// First, stop GC by sending a message to GC Actor. Then run store validator inside Client.
/// After store validator is done, resume GC by sending another message to GC Actor.
/// This ensures that store validator is not run concurrently with another thread that may modify storage.
async fn adv_check_store(&self, _params: Value) -> Result<Value, RpcError> {
match self
self.gc_sender
.send_async(near_client::gc_actor::NetworkAdversarialMessage::StopGC)
.await
.map_err(|_| RpcError::server_error::<String>(None))?;
let ret_val = match self
.client_sender
.send_async(near_client::NetworkAdversarialMessage::AdvCheckStorageConsistency)
.await
Expand All @@ -1260,7 +1278,12 @@ impl JsonRpcHandler {
None => Err(RpcError::server_error::<String>(None)),
},
_ => Err(RpcError::server_error::<String>(None)),
}
}?;
self.gc_sender
.send_async(near_client::gc_actor::NetworkAdversarialMessage::ResumeGC)
.await
.map_err(|_| RpcError::server_error::<String>(None))?;
Ok(ret_val)
}
}

Expand Down Expand Up @@ -1458,6 +1481,7 @@ pub fn start_http(
client_sender: ClientSenderForRpc,
view_client_sender: ViewClientSenderForRpc,
peer_manager_sender: PeerManagerSenderForRpc,
#[cfg(feature = "test_features")] gc_sender: GCSenderForRpc,
entity_debug_handler: Arc<dyn EntityDebugHandler>,
) -> Vec<(&'static str, actix_web::dev::ServerHandle)> {
let RpcConfig {
Expand Down Expand Up @@ -1485,6 +1509,8 @@ pub fn start_http(
enable_debug_rpc,
debug_pages_src_path: debug_pages_src_path.clone().map(Into::into),
entity_debug_handler: entity_debug_handler.clone(),
#[cfg(feature = "test_features")]
gc_sender: gc_sender.clone(),
}))
.app_data(web::JsonConfig::default().limit(limits_config.json_payload_max_size))
.wrap(middleware::Logger::default())
Expand Down
6 changes: 6 additions & 0 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ pub fn start_with_config_and_synchronization(
client_arbiter_handle,
resharding_handle,
gc_arbiter_handle,
#[cfg(feature = "test_features")]
gc_actor,
#[cfg(not(feature = "test_features"))]
gc_actor: _,
} = start_client(
Clock::real(),
config.client_config.clone(),
Expand Down Expand Up @@ -450,6 +454,8 @@ pub fn start_with_config_and_synchronization(
client_actor.clone().with_auto_span_context().into_multi_sender(),
view_client.clone().with_auto_span_context().into_multi_sender(),
network_actor.into_multi_sender(),
#[cfg(feature = "test_features")]
gc_actor.into_multi_sender(),
Arc::new(entity_debug_handler),
));
}
Expand Down
2 changes: 1 addition & 1 deletion pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def __init__(self):
self._proxy_local_stopped = None
self.proxy = None
self.store_tests = 0
self.is_check_store = False
self.is_check_store = True

def change_config(self, overrides: typing.Dict[str, typing.Any]) -> None:
"""Change client config.json of a node by applying given overrides.
Expand Down
3 changes: 3 additions & 0 deletions pytest/tests/sanity/garbage_collection_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,6 @@
"block_id": block_hash
})
assert 'error' in res, res

nodes[1].check_store()
nodes[2].check_store()
4 changes: 4 additions & 0 deletions pytest/tests/sanity/garbage_collection_intense.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
assert 'SuccessValue' in res['result']['status']
time.sleep(1)

nodes[1].stop_checking_store()

while True:
block_id = nodes[1].get_latest_block()
if int(block_id.height) > TARGET_HEIGHT:
Expand Down Expand Up @@ -132,3 +134,5 @@
"block_id": deletion_finish_block_height
})
assert res['error']['cause']['name'] == "GARBAGE_COLLECTED_BLOCK", res

nodes[1].check_store()

0 comments on commit 8954a36

Please sign in to comment.