diff --git a/Cargo.lock b/Cargo.lock index 6c0b65143cf..41c002191e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4955,6 +4955,7 @@ dependencies = [ "lru 0.12.3", "memoffset 0.8.0", "near-crypto", + "near-o11y", "near-parameters", "near-primitives-core", "near-stdx", @@ -4969,6 +4970,7 @@ dependencies = [ "parity-wasm 0.41.0", "parity-wasm 0.42.2", "prefix-sum-vec", + "prometheus", "pwasm-utils", "rand", "ripemd", diff --git a/chain/chain/src/tests/garbage_collection.rs b/chain/chain/src/tests/garbage_collection.rs index 0a5e33c180f..03990df105b 100644 --- a/chain/chain/src/tests/garbage_collection.rs +++ b/chain/chain/src/tests/garbage_collection.rs @@ -547,20 +547,14 @@ fn test_gc_pine_small() { fn test_gc_pine() { for max_changes in 1..=20 { let mut chains = vec![SimpleChain { from: 0, length: 101, is_removed: false }]; - for i in 1..50 { - chains.push(SimpleChain { from: i, length: 1, is_removed: true }); - } - for i in 50..100 { - chains.push(SimpleChain { from: i, length: 1, is_removed: false }); + for i in 1..100 { + chains.push(SimpleChain { from: i, length: 1, is_removed: i < 60 }); } gc_fork_common(chains, max_changes); let mut chains = vec![SimpleChain { from: 0, length: 101, is_removed: false }]; - for i in 1..40 { - chains.push(SimpleChain { from: i, length: 11, is_removed: true }); - } - for i in 40..90 { - chains.push(SimpleChain { from: i, length: 11, is_removed: false }); + for i in 1..90 { + chains.push(SimpleChain { from: i, length: 11, is_removed: i < 50 }); } gc_fork_common(chains, max_changes); } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index c1b0bcc4bf8..8ea7b508049 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -7,7 +7,7 @@ use crate::debug::BlockProductionTracker; use crate::debug::PRODUCTION_TIMES_CACHE_SIZE; use crate::stateless_validation::chunk_endorsement_tracker::ChunkEndorsementTracker; use crate::stateless_validation::chunk_validator::ChunkValidator; -use crate::stateless_validation::state_witness_actor::StateWitnessSenderForClient; +use crate::stateless_validation::partial_witness::partial_witness_actor::PartialWitnessSenderForClient; use crate::sync::adapter::SyncShardInfo; use crate::sync::block::BlockSync; use crate::sync::epoch::EpochSync; @@ -186,8 +186,8 @@ pub struct Client { pub chunk_inclusion_tracker: ChunkInclusionTracker, /// Tracks chunk endorsements received from chunk validators. Used to filter out chunks ready for inclusion pub chunk_endorsement_tracker: Arc, - /// Adapter to send request to state_witness_actor to distribute state witness. - pub state_witness_adapter: StateWitnessSenderForClient, + /// Adapter to send request to partial_witness_actor to distribute state witness. + pub partial_witness_adapter: PartialWitnessSenderForClient, // Optional value used for the Chunk Distribution Network Feature. chunk_distribution_network: Option, } @@ -249,7 +249,7 @@ impl Client { rng_seed: RngSeed, snapshot_callbacks: Option, async_computation_spawner: Arc, - state_witness_adapter: StateWitnessSenderForClient, + partial_witness_adapter: PartialWitnessSenderForClient, ) -> Result { let doomslug_threshold_mode = if enable_doomslug { DoomslugThresholdMode::TwoThirds @@ -404,7 +404,7 @@ impl Client { chunk_validator, chunk_inclusion_tracker: ChunkInclusionTracker::new(), chunk_endorsement_tracker, - state_witness_adapter, + partial_witness_adapter, chunk_distribution_network, }) } diff --git a/chain/client/src/client_actions.rs b/chain/client/src/client_actions.rs index a095be0ab21..a37604ca458 100644 --- a/chain/client/src/client_actions.rs +++ b/chain/client/src/client_actions.rs @@ -96,7 +96,7 @@ pub struct SyncJobsSenderForClient { #[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)] #[multi_send_message_derive(Debug)] -pub struct ClientSenderForStateWitness { +pub struct ClientSenderForPartialWitness { pub receive_chunk_state_witness: Sender, } diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index c5cfd3236da..8bffa280c20 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -35,7 +35,7 @@ use tokio::sync::broadcast; use crate::client_actions::{ClientActionHandler, ClientActions, ClientSenderForClient}; use crate::gc_actor::GCActor; use crate::start_gc_actor; -use crate::stateless_validation::state_witness_actor::StateWitnessSenderForClient; +use crate::stateless_validation::partial_witness::partial_witness_actor::PartialWitnessSenderForClient; use crate::sync_jobs_actions::SyncJobsActions; use crate::sync_jobs_actor::SyncJobsActor; use crate::{metrics, Client, ConfigUpdater, SyncAdapter}; @@ -206,7 +206,7 @@ pub fn start_client( sender: Option>, adv: crate::adversarial::Controls, config_updater: Option, - state_witness_adapter: StateWitnessSenderForClient, + partial_witness_adapter: PartialWitnessSenderForClient, ) -> StartClientResult { let client_arbiter = Arbiter::new(); let client_arbiter_handle = client_arbiter.handle(); @@ -228,7 +228,7 @@ pub fn start_client( random_seed_from_thread(), snapshot_callbacks, Arc::new(RayonAsyncComputationSpawner), - state_witness_adapter, + partial_witness_adapter, ) .unwrap(); let resharding_handle = client.chain.resharding_handle.clone(); diff --git a/chain/client/src/lib.rs b/chain/client/src/lib.rs index 628664ed36b..f72adaebc9c 100644 --- a/chain/client/src/lib.rs +++ b/chain/client/src/lib.rs @@ -21,11 +21,11 @@ pub use near_client_primitives::debug::DebugStatus; pub use near_network::client::{ BlockApproval, BlockResponse, ProcessTxRequest, ProcessTxResponse, SetNetworkInfo, }; -pub use stateless_validation::processing_tracker::{ProcessingDoneTracker, ProcessingDoneWaiter}; -pub use stateless_validation::state_witness_actions::StateWitnessActions; -pub use stateless_validation::state_witness_actor::{ - DistributeStateWitnessRequest, StateWitnessActor, StateWitnessSenderForClientMessage, +pub use stateless_validation::partial_witness::partial_witness_actions::PartialWitnessActions; +pub use stateless_validation::partial_witness::partial_witness_actor::{ + DistributeStateWitnessRequest, PartialWitnessActor, PartialWitnessSenderForClientMessage, }; +pub use stateless_validation::processing_tracker::{ProcessingDoneTracker, ProcessingDoneWaiter}; pub mod adapter; pub mod adversarial; diff --git a/chain/client/src/stateless_validation/mod.rs b/chain/client/src/stateless_validation/mod.rs index b889b761955..55798f17716 100644 --- a/chain/client/src/stateless_validation/mod.rs +++ b/chain/client/src/stateless_validation/mod.rs @@ -1,9 +1,7 @@ pub mod chunk_endorsement_tracker; pub mod chunk_validator; -mod partial_witness_tracker; +pub mod partial_witness; pub mod processing_tracker; mod shadow_validate; -pub mod state_witness_actions; -pub mod state_witness_actor; mod state_witness_producer; pub mod state_witness_tracker; diff --git a/chain/client/src/stateless_validation/partial_witness/mod.rs b/chain/client/src/stateless_validation/partial_witness/mod.rs new file mode 100644 index 00000000000..5c5e5e3271d --- /dev/null +++ b/chain/client/src/stateless_validation/partial_witness/mod.rs @@ -0,0 +1,3 @@ +pub mod partial_witness_actions; +pub mod partial_witness_actor; +mod partial_witness_tracker; diff --git a/chain/client/src/stateless_validation/state_witness_actions.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actions.rs similarity index 97% rename from chain/client/src/stateless_validation/state_witness_actions.rs rename to chain/client/src/stateless_validation/partial_witness/partial_witness_actions.rs index 40fdaa3f344..df2ca0b7131 100644 --- a/chain/client/src/stateless_validation/state_witness_actions.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actions.rs @@ -16,14 +16,14 @@ use near_primitives::stateless_validation::{ use near_primitives::types::{AccountId, EpochId}; use near_primitives::validator_signer::ValidatorSigner; -use crate::client_actions::ClientSenderForStateWitness; +use crate::client_actions::ClientSenderForPartialWitness; use crate::metrics; +use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker; +use super::partial_witness_actor::DistributeStateWitnessRequest; use super::partial_witness_tracker::{PartialEncodedStateWitnessTracker, RsMap}; -use super::state_witness_actor::DistributeStateWitnessRequest; -use super::state_witness_tracker::ChunkStateWitnessTracker; -pub struct StateWitnessActions { +pub struct PartialWitnessActions { /// Adapter to send messages to the network. network_adapter: PeerManagerAdapter, /// Validator signer to sign the state witness. @@ -39,11 +39,11 @@ pub struct StateWitnessActions { rs_map: RsMap, } -impl StateWitnessActions { +impl PartialWitnessActions { pub fn new( clock: Clock, network_adapter: PeerManagerAdapter, - client_sender: ClientSenderForStateWitness, + client_sender: ClientSenderForPartialWitness, my_signer: Arc, epoch_manager: Arc, ) -> Self { diff --git a/chain/client/src/stateless_validation/state_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs similarity index 87% rename from chain/client/src/stateless_validation/state_witness_actor.rs rename to chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 074851c0406..7e07a62666f 100644 --- a/chain/client/src/stateless_validation/state_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -17,25 +17,25 @@ use near_primitives::stateless_validation::ChunkStateWitness; use near_primitives::types::EpochId; use near_primitives::validator_signer::ValidatorSigner; -use crate::client_actions::ClientSenderForStateWitness; +use crate::client_actions::ClientSenderForPartialWitness; -use super::state_witness_actions::StateWitnessActions; +use super::partial_witness_actions::PartialWitnessActions; -pub struct StateWitnessActor { - pub actions: StateWitnessActions, +pub struct PartialWitnessActor { + pub actions: PartialWitnessActions, } -impl StateWitnessActor { +impl PartialWitnessActor { pub fn spawn( clock: Clock, network_adapter: PeerManagerAdapter, - client_sender: ClientSenderForStateWitness, + client_sender: ClientSenderForPartialWitness, my_signer: Arc, epoch_manager: Arc, ) -> (actix::Addr, actix::ArbiterHandle) { let arbiter = actix::Arbiter::new().handle(); let addr = Self::start_in_arbiter(&arbiter, |_ctx| Self { - actions: StateWitnessActions::new( + actions: PartialWitnessActions::new( clock, network_adapter, client_sender, @@ -47,7 +47,7 @@ impl StateWitnessActor { } } -impl actix::Actor for StateWitnessActor { +impl actix::Actor for PartialWitnessActor { type Context = actix::Context; } @@ -61,11 +61,11 @@ pub struct DistributeStateWitnessRequest { #[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)] #[multi_send_message_derive(Debug)] -pub struct StateWitnessSenderForClient { +pub struct PartialWitnessSenderForClient { pub distribute_chunk_state_witness: Sender, } -impl actix::Handler> for StateWitnessActor { +impl actix::Handler> for PartialWitnessActor { type Result = (); #[perf] @@ -81,7 +81,7 @@ impl actix::Handler> for StateWit } } -impl actix::Handler> for StateWitnessActor { +impl actix::Handler> for PartialWitnessActor { type Result = (); fn handle( @@ -94,7 +94,7 @@ impl actix::Handler> for StateWitne } } -impl actix::Handler> for StateWitnessActor { +impl actix::Handler> for PartialWitnessActor { type Result = (); fn handle( @@ -110,7 +110,7 @@ impl actix::Handler> for Stat } impl actix::Handler> - for StateWitnessActor + for PartialWitnessActor { type Result = (); diff --git a/chain/client/src/stateless_validation/partial_witness_tracker.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs similarity index 98% rename from chain/client/src/stateless_validation/partial_witness_tracker.rs rename to chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs index e53bb2ffd4b..0313d10f28b 100644 --- a/chain/client/src/stateless_validation/partial_witness_tracker.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs @@ -11,7 +11,7 @@ use near_primitives::stateless_validation::{EncodedChunkStateWitness, PartialEnc use near_primitives::types::{BlockHeight, ShardId}; use reed_solomon_erasure::galois_8::ReedSolomon; -use crate::client_actions::ClientSenderForStateWitness; +use crate::client_actions::ClientSenderForPartialWitness; /// Max number of chunks to keep in the witness tracker cache. We reach here only after validation /// of the partial_witness so the LRU cache size need not be too large. @@ -142,7 +142,7 @@ impl CacheEntry { /// recreate the full state witness. pub struct PartialEncodedStateWitnessTracker { /// Sender to send the encoded state witness to the client actor. - client_sender: ClientSenderForStateWitness, + client_sender: ClientSenderForPartialWitness, /// Epoch manager to get the set of chunk validators epoch_manager: Arc, /// Keeps track of state witness parts received from chunk producers. @@ -153,7 +153,7 @@ pub struct PartialEncodedStateWitnessTracker { impl PartialEncodedStateWitnessTracker { pub fn new( - client_sender: ClientSenderForStateWitness, + client_sender: ClientSenderForPartialWitness, epoch_manager: Arc, ) -> Self { Self { diff --git a/chain/client/src/stateless_validation/state_witness_producer.rs b/chain/client/src/stateless_validation/state_witness_producer.rs index 8023ef6f492..9ba32b25ef0 100644 --- a/chain/client/src/stateless_validation/state_witness_producer.rs +++ b/chain/client/src/stateless_validation/state_witness_producer.rs @@ -15,9 +15,10 @@ use near_primitives::stateless_validation::{ use near_primitives::types::{AccountId, EpochId}; use crate::stateless_validation::chunk_validator::send_chunk_endorsement_to_block_producers; -use crate::stateless_validation::state_witness_actor::DistributeStateWitnessRequest; use crate::Client; +use super::partial_witness::partial_witness_actor::DistributeStateWitnessRequest; + impl Client { /// Distributes the chunk state witness to chunk validators that are /// selected to validate this chunk. @@ -65,7 +66,7 @@ impl Client { ); } - self.state_witness_adapter.send(DistributeStateWitnessRequest { + self.partial_witness_adapter.send(DistributeStateWitnessRequest { epoch_id: epoch_id.clone(), chunk_header, state_witness, diff --git a/chain/client/src/test_utils/mock_state_witness_adapter.rs b/chain/client/src/test_utils/mock_partial_witness_adapter.rs similarity index 66% rename from chain/client/src/test_utils/mock_state_witness_adapter.rs rename to chain/client/src/test_utils/mock_partial_witness_adapter.rs index fcdeee084ac..e006f0ad479 100644 --- a/chain/client/src/test_utils/mock_state_witness_adapter.rs +++ b/chain/client/src/test_utils/mock_partial_witness_adapter.rs @@ -3,20 +3,20 @@ use std::sync::{Arc, RwLock}; use near_async::messaging::CanSend; -use crate::stateless_validation::state_witness_actor::DistributeStateWitnessRequest; +use crate::stateless_validation::partial_witness::partial_witness_actor::DistributeStateWitnessRequest; #[derive(Clone, Default)] -pub struct MockStateWitnessAdapter { +pub struct MockPartialWitnessAdapter { distribution_request: Arc>>, } -impl CanSend for MockStateWitnessAdapter { +impl CanSend for MockPartialWitnessAdapter { fn send(&self, msg: DistributeStateWitnessRequest) { self.distribution_request.write().unwrap().push_back(msg); } } -impl MockStateWitnessAdapter { +impl MockPartialWitnessAdapter { pub fn pop_distribution_request(&self) -> Option { self.distribution_request.write().unwrap().pop_front() } diff --git a/chain/client/src/test_utils/mod.rs b/chain/client/src/test_utils/mod.rs index 9ee619953bc..1e986bc23bb 100644 --- a/chain/client/src/test_utils/mod.rs +++ b/chain/client/src/test_utils/mod.rs @@ -1,6 +1,6 @@ pub mod block_stats; pub mod client; -mod mock_state_witness_adapter; +mod mock_partial_witness_adapter; pub mod peer_manager_mock; pub mod setup; pub mod test_env; diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 3665c66e2fc..9826203a3a9 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -4,8 +4,8 @@ use super::block_stats::BlockStats; use super::peer_manager_mock::PeerManagerMock; -use crate::stateless_validation::state_witness_actor::{ - StateWitnessActor, StateWitnessSenderForClient, +use crate::stateless_validation::partial_witness::partial_witness_actor::{ + PartialWitnessActor, PartialWitnessSenderForClient, }; use crate::{start_view_client, Client, ClientActor, SyncAdapter, SyncStatus, ViewClientActor}; use actix::{Actor, Addr, AsyncContext, Context}; @@ -177,14 +177,14 @@ pub fn setup( SyncAdapter::actix_actor_maker(), ))); - let (state_witness_addr, _) = StateWitnessActor::spawn( + let (partial_witness_addr, _) = PartialWitnessActor::spawn( clock.clone(), network_adapter.clone(), noop().into_multi_sender(), signer.clone(), epoch_manager.clone(), ); - let state_witness_adapter = state_witness_addr.with_auto_span_context(); + let partial_witness_adapter = partial_witness_addr.with_auto_span_context(); let client = Client::new( clock.clone(), config.clone(), @@ -200,7 +200,7 @@ pub fn setup( TEST_SEED, None, Arc::new(RayonAsyncComputationSpawner), - state_witness_adapter.into_multi_sender(), + partial_witness_adapter.into_multi_sender(), ) .unwrap(); let client_actor = ClientActor::new( @@ -974,7 +974,7 @@ pub fn setup_client_with_runtime( archive: bool, save_trie_changes: bool, snapshot_callbacks: Option, - state_witness_adapter: StateWitnessSenderForClient, + partial_witness_adapter: PartialWitnessSenderForClient, validator_signer: Arc, ) -> Client { let mut config = ClientConfig::test( @@ -1008,7 +1008,7 @@ pub fn setup_client_with_runtime( rng_seed, snapshot_callbacks, Arc::new(RayonAsyncComputationSpawner), - state_witness_adapter, + partial_witness_adapter, ) .unwrap(); client.sync_status = SyncStatus::NoSync; diff --git a/chain/client/src/test_utils/test_env.rs b/chain/client/src/test_utils/test_env.rs index 730ec43d18a..0a015dcc298 100644 --- a/chain/client/src/test_utils/test_env.rs +++ b/chain/client/src/test_utils/test_env.rs @@ -42,7 +42,7 @@ use once_cell::sync::OnceCell; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex}; -use super::mock_state_witness_adapter::MockStateWitnessAdapter; +use super::mock_partial_witness_adapter::MockPartialWitnessAdapter; use super::setup::setup_client_with_runtime; use super::test_env_builder::TestEnvBuilder; use super::TEST_SEED; @@ -58,7 +58,7 @@ pub struct TestEnv { pub validators: Vec, pub network_adapters: Vec>, pub client_adapters: Vec>, - pub state_witness_adapters: Vec, + pub partial_witness_adapters: Vec, pub shards_manager_adapters: Vec, pub clients: Vec, pub(crate) account_indices: AccountIndices, @@ -346,16 +346,16 @@ impl TestEnv { }; let mut witness_processing_done_waiters: Vec = Vec::new(); - // Here we are completely bypassing the state_witness_actor and directly distributing the state witness to the + // Here we are completely bypassing the partial_witness_actor and directly distributing the state witness to the // clients. Ideally the route should have been the following: - // [client] ----(DistributeStateWitnessRequest)----> [state_witness_actor] - // [state_witness_actor] ----(PartialEncodedStateWitness + Forward)----> [state_witness_actor] - // [state_witness_actor] ----(ProcessChunkStateWitnessMessage)----> [client] + // [client] ----(DistributeStateWitnessRequest)----> [partial_witness_actor] + // [partial_witness_actor] ----(PartialEncodedStateWitness + Forward)----> [partial_witness_actor] + // [partial_witness_actor] ----(ProcessChunkStateWitnessMessage)----> [client] // But we go directly from processing DistributeStateWitnessRequest to sending it to all the chunk validators. - // Validation of state witness is done in the state_witness_actor which should be tested by test_loop. - let state_witness_adapters = self.state_witness_adapters.clone(); - for (client_idx, state_witness_adapter) in state_witness_adapters.iter().enumerate() { - while let Some(request) = state_witness_adapter.pop_distribution_request() { + // Validation of state witness is done in the partial_witness_actor which should be tested by test_loop. + let partial_witness_adapters = self.partial_witness_adapters.clone(); + for (client_idx, partial_witness_adapter) in partial_witness_adapters.iter().enumerate() { + while let Some(request) = partial_witness_adapter.pop_distribution_request() { let DistributeStateWitnessRequest { epoch_id, chunk_header, state_witness } = request; let (encoded_witness, _) = @@ -625,7 +625,7 @@ impl TestEnv { self.archive, self.save_trie_changes, None, - self.clients[idx].state_witness_adapter.clone(), + self.clients[idx].partial_witness_adapter.clone(), self.clients[idx].validator_signer.clone().unwrap(), ) } diff --git a/chain/client/src/test_utils/test_env_builder.rs b/chain/client/src/test_utils/test_env_builder.rs index 13157c16694..5c8497fa328 100644 --- a/chain/client/src/test_utils/test_env_builder.rs +++ b/chain/client/src/test_utils/test_env_builder.rs @@ -1,4 +1,4 @@ -use super::mock_state_witness_adapter::MockStateWitnessAdapter; +use super::mock_partial_witness_adapter::MockPartialWitnessAdapter; use super::setup::{setup_client_with_runtime, setup_synchronous_shards_manager}; use super::test_env::TestEnv; use super::{AccountIndices, TEST_SEED}; @@ -511,8 +511,8 @@ impl TestEnvBuilder { let client_adapters = (0..num_clients) .map(|_| Arc::new(MockClientAdapterForShardsManager::default())) .collect_vec(); - let state_witness_adapters = - (0..num_clients).map(|_| MockStateWitnessAdapter::default()).collect_vec(); + let partial_witness_adapters = + (0..num_clients).map(|_| MockPartialWitnessAdapter::default()).collect_vec(); let shards_manager_adapters = (0..num_clients) .map(|i| { let clock = clock.clone(); @@ -537,7 +537,7 @@ impl TestEnvBuilder { .map(|i| { let account_id = clients[i].clone(); let network_adapter = network_adapters[i].clone(); - let state_witness_adapter = state_witness_adapters[i].clone(); + let partial_witness_adapter = partial_witness_adapters[i].clone(); let shards_manager_adapter = shards_manager_adapters[i].clone(); let epoch_manager = epoch_managers[i].clone(); let shard_tracker = shard_trackers[i].clone(); @@ -576,7 +576,7 @@ impl TestEnvBuilder { self.archive, self.save_trie_changes, Some(snapshot_callbacks), - state_witness_adapter.into_multi_sender(), + partial_witness_adapter.into_multi_sender(), validator_signer, ) }) @@ -588,7 +588,7 @@ impl TestEnvBuilder { validators, network_adapters, client_adapters, - state_witness_adapters, + partial_witness_adapters, shards_manager_adapters, clients, account_indices: AccountIndices( diff --git a/chain/client/src/test_utils/test_loop.rs b/chain/client/src/test_utils/test_loop.rs index 187ddd7d11f..5f4376315f8 100644 --- a/chain/client/src/test_utils/test_loop.rs +++ b/chain/client/src/test_utils/test_loop.rs @@ -1,10 +1,11 @@ pub mod client_actions; -pub mod state_witness_actions; +pub mod partial_witness_actions; pub mod sync_actor; pub mod sync_jobs_actions; -use crate::client_actions::ClientSenderForStateWitnessMessage; -use crate::client_actions::{ClientActionHandler, ClientActions}; +use crate::client_actions::{ + ClientActionHandler, ClientActions, ClientSenderForPartialWitnessMessage, +}; use near_async::messaging::{CanSend, SendAsync}; use near_async::test_loop::delay_sender::DelaySender; use near_async::test_loop::event_handler::{LoopEventHandler, TryIntoOrSelf}; @@ -18,8 +19,8 @@ use near_network::client::{ }; use near_network::state_witness::{ ChunkStateWitnessAckMessage, PartialEncodedStateWitnessForwardMessage, - PartialEncodedStateWitnessMessage, StateWitnessSenderForNetwork, - StateWitnessSenderForNetworkMessage, + PartialEncodedStateWitnessMessage, PartialWitnessSenderForNetwork, + PartialWitnessSenderForNetworkMessage, }; use near_network::test_loop::SupportsRoutingLookup; use near_network::types::{NetworkRequests, PeerManagerMessageRequest}; @@ -89,7 +90,7 @@ pub fn route_network_messages_to_client< Event: TryIntoOrSelf + From + From - + From, + + From, >( sender: DelaySender<(usize, Event)>, network_delay: Duration, @@ -117,7 +118,7 @@ pub fn route_network_messages_to_client< sender .with_additional_delay(network_delay) .for_index(idx) - .into_wrapped_multi_sender::() + .into_wrapped_multi_sender::() }) .collect::>(); @@ -350,10 +351,10 @@ impl + AsRef> ClientQueries for Vec { } } -pub fn forward_messages_from_state_witness_actor_to_client( -) -> LoopEventHandler { +pub fn forward_messages_from_partial_witness_actor_to_client( +) -> LoopEventHandler { LoopEventHandler::new_simple(|msg, client_actions: &mut ClientActions| match msg { - ClientSenderForStateWitnessMessage::_receive_chunk_state_witness(msg) => { + ClientSenderForPartialWitnessMessage::_receive_chunk_state_witness(msg) => { client_actions.handle(msg) } }) diff --git a/chain/client/src/test_utils/test_loop/partial_witness_actions.rs b/chain/client/src/test_utils/test_loop/partial_witness_actions.rs new file mode 100644 index 00000000000..aab8470ab8b --- /dev/null +++ b/chain/client/src/test_utils/test_loop/partial_witness_actions.rs @@ -0,0 +1,32 @@ +use crate::{PartialWitnessActions, PartialWitnessSenderForClientMessage}; +use near_async::test_loop::event_handler::LoopEventHandler; +use near_network::state_witness::PartialWitnessSenderForNetworkMessage; + +pub fn forward_messages_from_network_to_partial_witness_actor( +) -> LoopEventHandler { + LoopEventHandler::new_simple(|msg, partial_witness_actions: &mut PartialWitnessActions| { + match msg { + PartialWitnessSenderForNetworkMessage::_chunk_state_witness_ack(msg) => { + partial_witness_actions.handle_chunk_state_witness_ack(msg.0); + } + PartialWitnessSenderForNetworkMessage::_partial_encoded_state_witness(msg) => { + partial_witness_actions.handle_partial_encoded_state_witness(msg.0).unwrap(); + } + PartialWitnessSenderForNetworkMessage::_partial_encoded_state_witness_forward(msg) => { + partial_witness_actions + .handle_partial_encoded_state_witness_forward(msg.0) + .unwrap(); + } + } + }) +} + +pub fn forward_messages_from_client_to_partial_witness_actor( +) -> LoopEventHandler { + LoopEventHandler::new_simple(|msg, state_partial_actions: &mut PartialWitnessActions| match msg + { + PartialWitnessSenderForClientMessage::_distribute_chunk_state_witness(msg) => { + state_partial_actions.handle_distribute_state_witness_request(msg).unwrap(); + } + }) +} diff --git a/chain/client/src/test_utils/test_loop/state_witness_actions.rs b/chain/client/src/test_utils/test_loop/state_witness_actions.rs deleted file mode 100644 index 2083e46ae67..00000000000 --- a/chain/client/src/test_utils/test_loop/state_witness_actions.rs +++ /dev/null @@ -1,27 +0,0 @@ -use crate::{StateWitnessActions, StateWitnessSenderForClientMessage}; -use near_async::test_loop::event_handler::LoopEventHandler; -use near_network::state_witness::StateWitnessSenderForNetworkMessage; - -pub fn forward_messages_from_network_to_state_witness_actor( -) -> LoopEventHandler { - LoopEventHandler::new_simple(|msg, state_witness_actions: &mut StateWitnessActions| match msg { - StateWitnessSenderForNetworkMessage::_chunk_state_witness_ack(msg) => { - state_witness_actions.handle_chunk_state_witness_ack(msg.0); - } - StateWitnessSenderForNetworkMessage::_partial_encoded_state_witness(msg) => { - state_witness_actions.handle_partial_encoded_state_witness(msg.0).unwrap(); - } - StateWitnessSenderForNetworkMessage::_partial_encoded_state_witness_forward(msg) => { - state_witness_actions.handle_partial_encoded_state_witness_forward(msg.0).unwrap(); - } - }) -} - -pub fn forward_messages_from_client_to_state_witness_actor( -) -> LoopEventHandler { - LoopEventHandler::new_simple(|msg, state_witness_actions: &mut StateWitnessActions| match msg { - StateWitnessSenderForClientMessage::_distribute_chunk_state_witness(msg) => { - state_witness_actions.handle_distribute_state_witness_request(msg).unwrap(); - } - }) -} diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 315f70facc9..b41499c8172 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -1021,7 +1021,7 @@ impl PeerActor { None } RoutedMessageBody::ChunkStateWitnessAck(ack) => { - network_state.state_witness_adapter.send(ChunkStateWitnessAckMessage(ack)); + network_state.partial_witness_adapter.send(ChunkStateWitnessAckMessage(ack)); None } RoutedMessageBody::ChunkEndorsement(endorsement) => { @@ -1030,13 +1030,13 @@ impl PeerActor { } RoutedMessageBody::PartialEncodedStateWitness(witness) => { network_state - .state_witness_adapter + .partial_witness_adapter .send(PartialEncodedStateWitnessMessage(witness)); None } RoutedMessageBody::PartialEncodedStateWitnessForward(witness) => { network_state - .state_witness_adapter + .partial_witness_adapter .send(PartialEncodedStateWitnessForwardMessage(witness)); None } diff --git a/chain/network/src/peer/testonly.rs b/chain/network/src/peer/testonly.rs index b3bbaae6855..21436cd213f 100644 --- a/chain/network/src/peer/testonly.rs +++ b/chain/network/src/peer/testonly.rs @@ -13,7 +13,7 @@ use crate::peer_manager::peer_store; use crate::private_actix::SendMessage; use crate::shards_manager::ShardsManagerRequestFromNetwork; use crate::state_witness::{ - StateWitnessSenderForNetworkInput, StateWitnessSenderForNetworkMessage, + PartialWitnessSenderForNetworkInput, PartialWitnessSenderForNetworkMessage, }; use crate::store; use crate::tcp; @@ -46,7 +46,7 @@ pub(crate) enum Event { ShardsManager(ShardsManagerRequestFromNetwork), Client(ClientSenderForNetworkInput), Network(peer_manager_actor::Event), - StateWitness(StateWitnessSenderForNetworkInput), + PartialWitness(PartialWitnessSenderForNetworkInput), } pub(crate) struct PeerHandle { @@ -125,8 +125,8 @@ impl PeerHandle { }); let state_witness_sender = Sender::from_fn({ let send = send.clone(); - move |event: StateWitnessSenderForNetworkMessage| { - send.send(Event::StateWitness(event.into_input())); + move |event: PartialWitnessSenderForNetworkMessage| { + send.send(Event::PartialWitness(event.into_input())); } }); let network_state = Arc::new(NetworkState::new( diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index a0fd81d3745..d66b8208120 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -18,7 +18,7 @@ use crate::routing::route_back_cache::RouteBackCache; use crate::routing::NetworkTopologyChange; use crate::shards_manager::ShardsManagerRequestFromNetwork; use crate::snapshot_hosts::{SnapshotHostInfoError, SnapshotHostsCache}; -use crate::state_witness::StateWitnessSenderForNetwork; +use crate::state_witness::PartialWitnessSenderForNetwork; use crate::stats::metrics; use crate::store; use crate::tcp; @@ -96,7 +96,7 @@ pub(crate) struct NetworkState { pub genesis_id: GenesisId, pub client: ClientSenderForNetwork, pub shards_manager_adapter: Sender, - pub state_witness_adapter: StateWitnessSenderForNetwork, + pub partial_witness_adapter: PartialWitnessSenderForNetwork, /// Network-related info about the chain. pub chain_info: ArcSwap>, @@ -165,7 +165,7 @@ impl NetworkState { genesis_id: GenesisId, client: ClientSenderForNetwork, shards_manager_adapter: Sender, - state_witness_adapter: StateWitnessSenderForNetwork, + partial_witness_adapter: PartialWitnessSenderForNetwork, whitelist_nodes: Vec, ) -> Self { Self { @@ -182,7 +182,7 @@ impl NetworkState { genesis_id, client, shards_manager_adapter, - state_witness_adapter, + partial_witness_adapter, chain_info: Default::default(), tier2: connection::Pool::new(config.node_id()), tier1: connection::Pool::new(config.node_id()), diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index cdd4eaf4814..21aa76d2853 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -11,7 +11,7 @@ use crate::peer_manager::connection; use crate::peer_manager::network_state::{NetworkState, WhitelistNode}; use crate::peer_manager::peer_store; use crate::shards_manager::ShardsManagerRequestFromNetwork; -use crate::state_witness::StateWitnessSenderForNetwork; +use crate::state_witness::PartialWitnessSenderForNetwork; use crate::stats::metrics; use crate::store; use crate::tcp; @@ -208,7 +208,7 @@ impl PeerManagerActor { config: config::NetworkConfig, client: ClientSenderForNetwork, shards_manager_adapter: Sender, - state_witness_adapter: StateWitnessSenderForNetwork, + partial_witness_adapter: PartialWitnessSenderForNetwork, genesis_id: GenesisId, ) -> anyhow::Result> { let config = config.verify().context("config")?; @@ -239,7 +239,7 @@ impl PeerManagerActor { genesis_id, client, shards_manager_adapter, - state_witness_adapter, + partial_witness_adapter, whitelist_nodes, )); arbiter.spawn({ diff --git a/chain/network/src/peer_manager/testonly.rs b/chain/network/src/peer_manager/testonly.rs index 17a0e26cb86..282e3f9575a 100644 --- a/chain/network/src/peer_manager/testonly.rs +++ b/chain/network/src/peer_manager/testonly.rs @@ -19,8 +19,8 @@ use crate::peer_manager::network_state::NetworkState; use crate::peer_manager::peer_manager_actor::Event as PME; use crate::shards_manager::ShardsManagerRequestFromNetwork; use crate::snapshot_hosts::SnapshotHostsCache; -use crate::state_witness::StateWitnessSenderForNetworkInput; -use crate::state_witness::StateWitnessSenderForNetworkMessage; +use crate::state_witness::PartialWitnessSenderForNetworkInput; +use crate::state_witness::PartialWitnessSenderForNetworkMessage; use crate::tcp; use crate::test_utils; use crate::testonly::actix::ActixSystem; @@ -73,7 +73,7 @@ pub enum Event { ShardsManager(ShardsManagerRequestFromNetwork), Client(ClientSenderForNetworkInput), PeerManager(PME), - StateWitness(StateWitnessSenderForNetworkInput), + PartialWitness(PartialWitnessSenderForNetworkInput), } pub(crate) struct ActorHandler { @@ -627,8 +627,8 @@ pub(crate) async fn start( }); let state_witness_sender = Sender::from_fn({ let send = send.clone(); - move |event: StateWitnessSenderForNetworkMessage| { - send.send(Event::StateWitness(event.into_input())); + move |event: PartialWitnessSenderForNetworkMessage| { + send.send(Event::PartialWitness(event.into_input())); } }); PeerManagerActor::spawn( diff --git a/chain/network/src/state_witness.rs b/chain/network/src/state_witness.rs index 8f5b73fd340..35677d8818d 100644 --- a/chain/network/src/state_witness.rs +++ b/chain/network/src/state_witness.rs @@ -17,7 +17,7 @@ pub struct PartialEncodedStateWitnessForwardMessage(pub PartialEncodedStateWitne #[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)] #[multi_send_message_derive(Debug)] #[multi_send_input_derive(Debug, Clone, PartialEq, Eq)] -pub struct StateWitnessSenderForNetwork { +pub struct PartialWitnessSenderForNetwork { pub chunk_state_witness_ack: Sender, pub partial_encoded_state_witness: Sender, pub partial_encoded_state_witness_forward: Sender, diff --git a/core/primitives/src/congestion_info.rs b/core/primitives/src/congestion_info.rs index 535c04b1791..743e084f815 100644 --- a/core/primitives/src/congestion_info.rs +++ b/core/primitives/src/congestion_info.rs @@ -1,9 +1,87 @@ +use crate::errors::RuntimeError; use borsh::{BorshDeserialize, BorshSerialize}; use near_primitives_core::types::{Gas, ShardId}; -/// The CongestionInfo stores information about the congestion of a shard. It is -/// used by other shards to throttle the transactions and receipts to prevent -/// unbounded growth of the queues and buffers in the system. +const PGAS: Gas = 10u64.pow(15); +const TGAS: Gas = 10u64.pow(12); + +// The following constants have been defined in +// [NEP-539](https://github.com/near/NEPs/pull/539) after extensive fine-tuning +// and discussions. + +/// How much gas in delayed receipts of a shard is 100% incoming congestion. +/// +/// Based on incoming congestion levels, a shard reduces the gas it spends on +/// accepting new transactions instead of working on incoming receipts. Plus, +/// incoming congestion contributes to overall congestion, which reduces how +/// much other shards are allowed to forward to this shard. +const MAX_CONGESTION_INCOMING_GAS: Gas = 20 * PGAS; + +/// How much gas in outgoing buffered receipts of a shard is 100% congested. +/// +/// Outgoing congestion contributes to overall congestion, which reduces how +/// much other shards are allowed to forward to this shard. +const MAX_CONGESTION_OUTGOING_GAS: Gas = 2 * PGAS; + +/// How much memory space of all delayed and buffered receipts in a shard is +/// considered 100% congested. +/// +/// Memory congestion contributes to overall congestion, which reduces how much +/// other shards are allowed to forward to this shard. +/// +/// This threshold limits memory requirements of validators to a degree but it +/// is not a hard guarantee. +const MAX_CONGESTION_MEMORY_CONSUMPTION: u64 = bytesize::ByteSize::mb(1000u64).0; + +/// The maximum amount of gas attached to receipts a shard can forward to +/// another shard per chunk. +/// +/// The actual gas forwarding allowance is a linear interpolation between +/// [`MIN_OUTGOING_GAS`] and [`MAX_OUTGOING_GAS`], or 0 if the receiver is +/// fully congested. +const MAX_OUTGOING_GAS: Gas = 300 * PGAS; + +/// The minimum gas each shard can send to a shard that is not fully congested. +/// +/// The actual gas forwarding allowance is a linear interpolation between +/// [`MIN_OUTGOING_GAS`] and [`MAX_OUTGOING_GAS`], or 0 if the receiver is +/// fully congested. +const MIN_OUTGOING_GAS: Gas = 1 * PGAS; + +/// How much gas the chosen allowed shard can send to a 100% congested shard. +/// +/// This amount is the absolute minimum of new workload a congested shard has to +/// accept every round. It ensures deadlocks are provably impossible. But in +/// ideal conditions, the gradual reduction of new workload entering the system +/// combined with gradually limited forwarding to congested shards should +/// prevent shards from becoming 100% congested in the first place. +const RED_GAS: Gas = 1 * PGAS; + +/// The maximum amount of gas in a chunk spent on converting new transactions to +/// receipts. +/// +/// The actual gas forwarding allowance is a linear interpolation between +/// [`MIN_TX_GAS`] and [`MAX_TX_GAS`], based on the incoming congestion of the +/// local shard. Additionally, transactions can be rejected if the receiving +/// remote shard is congested more than [`REJECT_TX_CONGESTION_THRESHOLD`] based +/// on their general congestion level. +const MAX_TX_GAS: Gas = 500 * TGAS; + +/// The minimum amount of gas in a chunk spent on converting new transactions +/// to receipts, as long as the receiving shard is not congested. +/// +/// The actual gas forwarding allowance is a linear interpolation between +/// [`MIN_TX_GAS`] and [`MAX_TX_GAS`], based on the incoming congestion of the +/// local shard. Additionally, transactions can be rejected if the receiving +/// remote shard is congested more than [`REJECT_TX_CONGESTION_THRESHOLD`] based +/// on their general congestion level. +const MIN_TX_GAS: Gas = 20 * TGAS; + +/// How much congestion a shard can tolerate before it stops all shards from +/// accepting new transactions with the receiver set to the congested shard. +const REJECT_TX_CONGESTION_THRESHOLD: f64 = 0.25; + +/// Stores the congestion level of a shard. /// /// The CongestionInfo is a part of the ChunkHeader. It is versioned and each /// version should not be changed. Rather a new version with the desired changes @@ -57,18 +135,375 @@ impl Default for CongestionInfo { impl CongestionInfo { /// How much gas another shard can send to us in the next block. - pub fn outgoing_limit(&self, _sender_shard: ShardId) -> Gas { - todo!() + pub fn outgoing_limit(&self, sender_shard: ShardId) -> Gas { + match self { + CongestionInfo::V1(inner) => inner.outgoing_limit(sender_shard), + } + } + + /// How much gas we accept for executing new transactions going to any + /// uncongested shards. + pub fn process_tx_limit(&self) -> Gas { + match self { + CongestionInfo::V1(inner) => inner.process_tx_limit(), + } + } + + /// Whether we can accept new transaction with the receiver set to this shard. + pub fn shard_accepts_transactions(&self) -> bool { + match self { + CongestionInfo::V1(inner) => inner.shard_accepts_transactions(), + } + } + + /// Congestion level in the range [0.0,1.0]. + pub fn congestion_level(&self) -> f64 { + match self { + CongestionInfo::V1(inner) => inner.congestion_level(), + } + } + + pub fn add_receipt_bytes(&mut self, bytes: u64) -> Result<(), RuntimeError> { + match self { + CongestionInfo::V1(inner) => { + inner.receipt_bytes = inner + .receipt_bytes + .checked_add(bytes) + .ok_or_else(|| RuntimeError::UnexpectedIntegerOverflow)?; + } + } + Ok(()) + } + + pub fn remove_receipt_bytes(&mut self, bytes: u64) -> Result<(), RuntimeError> { + match self { + CongestionInfo::V1(inner) => { + inner.receipt_bytes = inner + .receipt_bytes + .checked_sub(bytes) + .ok_or_else(|| RuntimeError::UnexpectedIntegerOverflow)?; + } + } + Ok(()) + } + + pub fn add_delayed_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> { + match self { + CongestionInfo::V1(inner) => { + inner.delayed_receipts_gas = inner + .delayed_receipts_gas + .checked_add(gas as u128) + .ok_or_else(|| RuntimeError::UnexpectedIntegerOverflow)?; + } + } + Ok(()) + } + + pub fn remove_delayed_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> { + match self { + CongestionInfo::V1(inner) => { + inner.delayed_receipts_gas = inner + .delayed_receipts_gas + .checked_sub(gas as u128) + .ok_or_else(|| RuntimeError::UnexpectedIntegerOverflow)?; + } + } + Ok(()) + } + + pub fn add_buffered_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> { + match self { + CongestionInfo::V1(inner) => { + inner.buffered_receipts_gas = inner + .buffered_receipts_gas + .checked_add(gas as u128) + .ok_or_else(|| RuntimeError::UnexpectedIntegerOverflow)?; + } + } + Ok(()) + } + + pub fn remove_buffered_receipt_gas(&mut self, gas: Gas) -> Result<(), RuntimeError> { + match self { + CongestionInfo::V1(inner) => { + inner.buffered_receipts_gas = inner + .buffered_receipts_gas + .checked_sub(gas as u128) + .ok_or_else(|| RuntimeError::UnexpectedIntegerOverflow)?; + } + } + Ok(()) + } +} + +impl CongestionInfoV1 { + /// How much gas another shard can send to us in the next block. + pub fn outgoing_limit(&self, sender_shard: ShardId) -> Gas { + let congestion = self.congestion_level(); + + // note: using float equality is okay here because + // `clamped_f64_fraction` clamps to exactly 1.0. + if congestion == 1.0 { + // Red traffic light: reduce to minimum speed + if sender_shard == self.allowed_shard as u64 { + RED_GAS + } else { + 0 + } + } else { + mix(MAX_OUTGOING_GAS, MIN_OUTGOING_GAS, congestion) + } + } + + fn congestion_level(&self) -> f64 { + let incoming_congestion = self.incoming_congestion(); + let outgoing_congestion = self.outgoing_congestion(); + let memory_congestion = self.memory_congestion(); + + incoming_congestion.max(outgoing_congestion).max(memory_congestion) + } + + fn incoming_congestion(&self) -> f64 { + clamped_f64_fraction(self.delayed_receipts_gas, MAX_CONGESTION_INCOMING_GAS) + } + fn outgoing_congestion(&self) -> f64 { + clamped_f64_fraction(self.buffered_receipts_gas, MAX_CONGESTION_OUTGOING_GAS) + } + fn memory_congestion(&self) -> f64 { + clamped_f64_fraction(self.receipt_bytes as u128, MAX_CONGESTION_MEMORY_CONSUMPTION) } /// How much gas we accept for executing new transactions going to any /// uncongested shards. pub fn process_tx_limit(&self) -> Gas { - todo!() + mix(MAX_TX_GAS, MIN_TX_GAS, self.incoming_congestion()) } /// Whether we can accept new transaction with the receiver set to this shard. pub fn shard_accepts_transactions(&self) -> bool { - todo!() + self.congestion_level() < REJECT_TX_CONGESTION_THRESHOLD + } +} + +/// Returns `value / max` clamped to te range [0,1]. +#[inline] +fn clamped_f64_fraction(value: u128, max: u64) -> f64 { + assert!(max > 0); + if max as u128 <= value { + 1.0 + } else { + value as f64 / max as f64 + } +} + +/// linearly interpolate between two values +/// +/// This method treats u16 as a fraction of u16::MAX. +/// This makes multiplication of numbers on the upper end of `u128` better behaved +/// than using f64 which lacks precision for such high numbers and might have platform incompatibilities. +fn mix(left: u64, right: u64, ratio: f64) -> u64 { + debug_assert!(ratio >= 0.0); + debug_assert!(ratio <= 1.0); + + // Note on precision: f64 is only precise to 53 binary digits. That is + // enough to represent ~9 PGAS without error. Precision above that is + // rounded according to the IEEE 754-2008 standard which Rust's f64 + // implements. + // For example, a value of 100 Pgas is rounded to steps of 8 gas. + let left_part = left as f64 * (1.0 - ratio); + let right_part = right as f64 * ratio; + // Accumulated error is doubled again, up to 16 gas for 100 Pgas. + let total = left_part + right_part; + + // Conversion is save because left and right were both u64 and the result is + // between the two. Even with precision errors, we cannot breach the + // boundaries. + return total.round() as u64; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mix() { + assert_eq!(500, mix(0, 1000, 0.5)); + assert_eq!(0, mix(0, 0, 0.3)); + assert_eq!(1000, mix(1000, 1000, 0.1)); + assert_eq!(60, mix(50, 80, 0.33)); + } + + #[test] + fn test_mix_edge_cases() { + // at `u64::MAX` we should see no precision errors + assert_eq!(u64::MAX, mix(u64::MAX, u64::MAX, 0.33)); + assert_eq!(u64::MAX, mix(u64::MAX, u64::MAX, 0.63)); + assert_eq!(u64::MAX, mix(u64::MAX, u64::MAX, 0.99)); + + // precision errors must be consistent + assert_eq!(u64::MAX, mix(u64::MAX - 1, u64::MAX, 0.25)); + assert_eq!(u64::MAX, mix(u64::MAX - 255, u64::MAX, 0.25)); + assert_eq!(u64::MAX, mix(u64::MAX - 1023, u64::MAX, 0.25)); + + assert_eq!(u64::MAX - 2047, mix(u64::MAX - 1024, u64::MAX, 0.25)); + assert_eq!(u64::MAX - 2047, mix(u64::MAX - 1500, u64::MAX, 0.25)); + assert_eq!(u64::MAX - 2047, mix(u64::MAX - 2047, u64::MAX, 0.25)); + assert_eq!(u64::MAX - 2047, mix(u64::MAX - 2048, u64::MAX, 0.25)); + assert_eq!(u64::MAX - 2047, mix(u64::MAX - 2049, u64::MAX, 0.25)); + assert_eq!(u64::MAX - 2047, mix(u64::MAX - 3000, u64::MAX, 0.25)); + + assert_eq!(u64::MAX - 4095, mix(u64::MAX - 4000, u64::MAX, 0.25)); + } + + #[test] + fn test_clamped_f64_fraction() { + assert_eq!(0.0, clamped_f64_fraction(0, 10)); + assert_eq!(0.5, clamped_f64_fraction(5, 10)); + assert_eq!(1.0, clamped_f64_fraction(10, 10)); + + assert_eq!(0.0, clamped_f64_fraction(0, 1)); + assert_eq!(0.0, clamped_f64_fraction(0, u64::MAX)); + + assert_eq!(0.5, clamped_f64_fraction(1, 2)); + assert_eq!(0.5, clamped_f64_fraction(100, 200)); + assert_eq!(0.5, clamped_f64_fraction(u64::MAX as u128 / 2, u64::MAX)); + + // test clamp + assert_eq!(1.0, clamped_f64_fraction(11, 10)); + assert_eq!(1.0, clamped_f64_fraction(u128::MAX, 10)); + assert_eq!(1.0, clamped_f64_fraction(u128::MAX, u64::MAX)); + } + + /// Default congestion info should be no congestion => maximally permissive. + #[test] + fn test_default_congestion() { + let inner_congestion_info = CongestionInfoV1::default(); + + assert_eq!(0.0, inner_congestion_info.memory_congestion()); + assert_eq!(0.0, inner_congestion_info.incoming_congestion()); + assert_eq!(0.0, inner_congestion_info.outgoing_congestion()); + assert_eq!(0.0, inner_congestion_info.congestion_level()); + + let congestion_info = CongestionInfo::V1(inner_congestion_info); + assert_eq!(MAX_OUTGOING_GAS, congestion_info.outgoing_limit(0)); + assert_eq!(MAX_TX_GAS, congestion_info.process_tx_limit()); + assert!(congestion_info.shard_accepts_transactions()); + } + + #[test] + fn test_memory_congestion() { + let mut congestion_info = CongestionInfo::default(); + + congestion_info.add_receipt_bytes(MAX_CONGESTION_MEMORY_CONSUMPTION).unwrap(); + congestion_info.add_receipt_bytes(500).unwrap(); + congestion_info.remove_receipt_bytes(500).unwrap(); + + assert_eq!(1.0, congestion_info.congestion_level()); + // fully congested, no more forwarding allowed + assert_eq!(0, congestion_info.outgoing_limit(1)); + assert!(!congestion_info.shard_accepts_transactions()); + // processing to other shards is not restricted by memory congestion + assert_eq!(MAX_TX_GAS, congestion_info.process_tx_limit()); + + // remove half the congestion + congestion_info.remove_receipt_bytes(MAX_CONGESTION_MEMORY_CONSUMPTION / 2).unwrap(); + assert_eq!(0.5, congestion_info.congestion_level()); + assert_eq!( + (0.5 * MIN_OUTGOING_GAS as f64 + 0.5 * MAX_OUTGOING_GAS as f64) as u64, + congestion_info.outgoing_limit(1) + ); + // at 50%, still no new transactions are allowed + assert!(!congestion_info.shard_accepts_transactions()); + + // reduce congestion to 1/8 + congestion_info.remove_receipt_bytes(3 * MAX_CONGESTION_MEMORY_CONSUMPTION / 8).unwrap(); + assert_eq!(0.125, congestion_info.congestion_level()); + assert_eq!( + (0.125 * MIN_OUTGOING_GAS as f64 + 0.875 * MAX_OUTGOING_GAS as f64) as u64, + congestion_info.outgoing_limit(1) + ); + // at 12.5%, new transactions are allowed (threshold is 0.25) + assert!(congestion_info.shard_accepts_transactions()); + } + + #[test] + fn test_incoming_congestion() { + let mut congestion_info = CongestionInfo::default(); + + congestion_info.add_delayed_receipt_gas(MAX_CONGESTION_INCOMING_GAS).unwrap(); + congestion_info.add_delayed_receipt_gas(500).unwrap(); + congestion_info.remove_delayed_receipt_gas(500).unwrap(); + + assert_eq!(1.0, congestion_info.congestion_level()); + // fully congested, no more forwarding allowed + assert_eq!(0, congestion_info.outgoing_limit(1)); + assert!(!congestion_info.shard_accepts_transactions()); + // processing to other shards is restricted by own incoming congestion + assert_eq!(MIN_TX_GAS, congestion_info.process_tx_limit()); + + // remove halve the congestion + congestion_info.remove_delayed_receipt_gas(MAX_CONGESTION_INCOMING_GAS / 2).unwrap(); + assert_eq!(0.5, congestion_info.congestion_level()); + assert_eq!( + (0.5 * MIN_OUTGOING_GAS as f64 + 0.5 * MAX_OUTGOING_GAS as f64) as u64, + congestion_info.outgoing_limit(1) + ); + // at 50%, still no new transactions to us are allowed + assert!(!congestion_info.shard_accepts_transactions()); + // but we accept new transactions to other shards + assert_eq!( + (0.5 * MIN_TX_GAS as f64 + 0.5 * MAX_TX_GAS as f64) as u64, + congestion_info.process_tx_limit() + ); + + // reduce congestion to 1/8 + congestion_info.remove_delayed_receipt_gas(3 * MAX_CONGESTION_INCOMING_GAS / 8).unwrap(); + assert_eq!(0.125, congestion_info.congestion_level()); + assert_eq!( + (0.125 * MIN_OUTGOING_GAS as f64 + 0.875 * MAX_OUTGOING_GAS as f64) as u64, + congestion_info.outgoing_limit(1) + ); + // at 12.5%, new transactions are allowed (threshold is 0.25) + assert!(congestion_info.shard_accepts_transactions()); + assert_eq!( + (0.125 * MIN_TX_GAS as f64 + 0.875 * MAX_TX_GAS as f64) as u64, + congestion_info.process_tx_limit() + ); + } + + #[test] + fn test_outgoing_congestion() { + let mut congestion_info = CongestionInfo::default(); + + congestion_info.add_buffered_receipt_gas(MAX_CONGESTION_OUTGOING_GAS).unwrap(); + congestion_info.add_buffered_receipt_gas(500).unwrap(); + congestion_info.remove_buffered_receipt_gas(500).unwrap(); + + assert_eq!(1.0, congestion_info.congestion_level()); + // fully congested, no more forwarding allowed + assert_eq!(0, congestion_info.outgoing_limit(1)); + assert!(!congestion_info.shard_accepts_transactions()); + // processing to other shards is not restricted by own outgoing congestion + assert_eq!(MAX_TX_GAS, congestion_info.process_tx_limit()); + + // remove halve the congestion + congestion_info.remove_buffered_receipt_gas(MAX_CONGESTION_OUTGOING_GAS / 2).unwrap(); + assert_eq!(0.5, congestion_info.congestion_level()); + assert_eq!( + (0.5 * MIN_OUTGOING_GAS as f64 + 0.5 * MAX_OUTGOING_GAS as f64) as u64, + congestion_info.outgoing_limit(1) + ); + // at 50%, still no new transactions to us are allowed + assert!(!congestion_info.shard_accepts_transactions()); + + // reduce congestion to 1/8 + congestion_info.remove_buffered_receipt_gas(3 * MAX_CONGESTION_OUTGOING_GAS / 8).unwrap(); + assert_eq!(0.125, congestion_info.congestion_level()); + assert_eq!( + (0.125 * MIN_OUTGOING_GAS as f64 + 0.875 * MAX_OUTGOING_GAS as f64) as u64, + congestion_info.outgoing_limit(1) + ); + // at 12.5%, new transactions are allowed (threshold is 0.25) + assert!(congestion_info.shard_accepts_transactions()); } } diff --git a/core/primitives/src/receipt.rs b/core/primitives/src/receipt.rs index 30d2a71bcf2..55698145566 100644 --- a/core/primitives/src/receipt.rs +++ b/core/primitives/src/receipt.rs @@ -8,7 +8,7 @@ use near_fmt::AbbrBytes; use serde_with::base64::Base64; use serde_with::serde_as; use std::borrow::Borrow; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::io; use std::io::{Error, ErrorKind}; @@ -284,5 +284,15 @@ impl From for DelayedReceiptIndices { } } +/// Stores indices for a persistent queue for buffered receipts that couldn't be +/// forwarded. +/// +/// This is the singleton value stored in the `BUFFERED_RECEIPT_INDICES` trie +/// column. +#[derive(Default, BorshSerialize, BorshDeserialize, Clone, PartialEq, Debug)] +pub struct BufferedReceiptIndices { + pub shard_buffers: BTreeMap, +} + /// Map of shard to list of receipts to send to it. pub type ReceiptResult = HashMap>; diff --git a/core/primitives/src/shard_layout.rs b/core/primitives/src/shard_layout.rs index a08ffa78b1f..87ac2778f9e 100644 --- a/core/primitives/src/shard_layout.rs +++ b/core/primitives/src/shard_layout.rs @@ -489,8 +489,11 @@ impl<'de> serde::de::Visitor<'de> for ShardUIdVisitor { #[cfg(test)] mod tests { + use crate::epoch_manager::{AllEpochConfig, EpochConfig, ValidatorSelectionConfig}; use crate::shard_layout::{account_id_to_shard_id, ShardLayout, ShardLayoutV1, ShardUId}; use near_primitives_core::types::{AccountId, ShardId}; + use near_primitives_core::version::ProtocolFeature; + use near_vm_runner::logic::ProtocolVersion; use rand::distributions::Alphanumeric; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; @@ -517,6 +520,34 @@ mod tests { version: ShardVersion, } + impl ShardLayout { + /// Constructor for tests that need a shard layout for a specific protocol version. + pub fn for_protocol_version(protocol_version: ProtocolVersion) -> Self { + // none of the epoch config fields matter, we only need the shard layout + // constructed through [`AllEpochConfig::for_protocol_version()`]. + let genesis_epoch_config = EpochConfig { + epoch_length: 0, + num_block_producer_seats: 0, + num_block_producer_seats_per_shard: vec![], + avg_hidden_validator_seats_per_shard: vec![], + block_producer_kickout_threshold: 0, + chunk_producer_kickout_threshold: 0, + validator_max_kickout_stake_perc: 0, + online_min_threshold: 0.into(), + online_max_threshold: 0.into(), + fishermen_threshold: 0, + minimum_stake_divisor: 0, + protocol_upgrade_stake_threshold: 0.into(), + shard_layout: ShardLayout::get_simple_nightshade_layout(), + validator_selection_config: ValidatorSelectionConfig::default(), + }; + + let all_epoch_config = AllEpochConfig::new(true, genesis_epoch_config, "test-chain"); + let latest_epoch_config = all_epoch_config.for_protocol_version(protocol_version); + latest_epoch_config.shard_layout + } + } + #[test] fn test_shard_layout_v0() { let num_shards = 4; @@ -719,4 +750,24 @@ mod tests { } "###); } + + #[test] + fn test_shard_layout_for_protocol_version() { + assert_eq!( + ShardLayout::get_simple_nightshade_layout(), + ShardLayout::for_protocol_version(ProtocolFeature::SimpleNightshade.protocol_version()) + ); + assert_eq!( + ShardLayout::get_simple_nightshade_layout_v2(), + ShardLayout::for_protocol_version( + ProtocolFeature::SimpleNightshadeV2.protocol_version() + ) + ); + assert_eq!( + ShardLayout::get_simple_nightshade_layout_v3(), + ShardLayout::for_protocol_version( + ProtocolFeature::SimpleNightshadeV3.protocol_version() + ) + ); + } } diff --git a/core/primitives/src/trie_key.rs b/core/primitives/src/trie_key.rs index 648103e862a..d2cb3ba9d1b 100644 --- a/core/primitives/src/trie_key.rs +++ b/core/primitives/src/trie_key.rs @@ -1,11 +1,9 @@ -use std::mem::size_of; - -use borsh::{BorshDeserialize, BorshSerialize}; - -use near_crypto::PublicKey; - use crate::hash::CryptoHash; use crate::types::AccountId; +use borsh::{BorshDeserialize, BorshSerialize}; +use near_crypto::PublicKey; +use near_primitives_core::types::ShardId; +use std::mem::size_of; pub(crate) const ACCOUNT_DATA_SEPARATOR: u8 = b','; // The use of `ACCESS_KEY` as a separator is a historical artefact. @@ -50,8 +48,18 @@ pub mod col { /// This column id is used when storing the postponed PromiseYield receipts /// (`primitives::receipt::Receipt`). pub const PROMISE_YIELD_RECEIPT: u8 = 12; - /// All columns except those used for the delayed receipts queue and the yielded promises - /// queue, which are both global state for the shard. + /// Indices of outgoing receipts. A singleton per shard. + /// (`primitives::receipt::BufferedReceiptIndices`) + pub const BUFFERED_RECEIPT_INDICES: u8 = 13; + /// Outgoing receipts that need to be buffered due to congestion + + /// backpressure on the receiving shard. + /// (`primitives::receipt::Receipt`). + pub const BUFFERED_RECEIPT: u8 = 14; + /// All columns except those used for the delayed receipts queue, the yielded promises + /// queue, and the outgoing receipts buffer, which are global state for the shard. + + // NOTE: NEW_COLUMN = 15 will be the last unique nibble in the trie! + // Consider demultiplexing on 15 and using 2-nibble prefixes. pub const COLUMNS_WITH_ACCOUNT_ID_IN_KEY: [(u8, &str); 9] = [ (ACCOUNT, "Account"), (CONTRACT_CODE, "ContractCode"), @@ -109,6 +117,14 @@ pub enum TrieKey { /// Used to store the postponed promise yield receipt `primitives::receipt::Receipt` /// for a given receiver's `AccountId` and a given `data_id`. PromiseYieldReceipt { receiver_id: AccountId, data_id: CryptoHash }, + /// Used to store indices of the buffered receipts queues per shard. + /// NOTE: It is a singleton per shard, holding indices for all outgoing shards. + BufferedReceiptIndices, + /// Used to store a buffered receipt `primitives::receipt::Receipt` for a + /// given index `u64` and receiving shard. There is one unique queue + /// per ordered shard pair. The trie for shard X stores all queues for pairs + /// (X,*) without (X,X). + BufferedReceipt { receiving_shard: ShardId, index: u64 }, } /// Provides `len` function. @@ -178,6 +194,12 @@ impl TrieKey { + ACCOUNT_DATA_SEPARATOR.len() + key.len() } + TrieKey::BufferedReceiptIndices => col::BUFFERED_RECEIPT_INDICES.len(), + TrieKey::BufferedReceipt { index, .. } => { + col::BUFFERED_RECEIPT.len() + + std::mem::size_of::() + + std::mem::size_of_val(index) + } } } @@ -250,6 +272,14 @@ impl TrieKey { buf.push(ACCOUNT_DATA_SEPARATOR); buf.extend(data_id.as_ref()); } + TrieKey::BufferedReceiptIndices => buf.push(col::BUFFERED_RECEIPT_INDICES), + TrieKey::BufferedReceipt { index, receiving_shard } => { + buf.push(col::BUFFERED_RECEIPT); + // Use u16 for shard id to reduce depth in trie. + assert!(*receiving_shard <= u16::MAX as u64, "Shard ID too big."); + buf.extend(&(*receiving_shard as u16).to_le_bytes()); + buf.extend(&index.to_le_bytes()); + } }; debug_assert_eq!(expected_len, buf.len() - start_len); } @@ -276,6 +306,8 @@ impl TrieKey { TrieKey::PromiseYieldIndices => None, TrieKey::PromiseYieldTimeout { .. } => None, TrieKey::PromiseYieldReceipt { receiver_id, .. } => Some(receiver_id.clone()), + TrieKey::BufferedReceiptIndices => None, + TrieKey::BufferedReceipt { .. } => None, } } } @@ -491,7 +523,9 @@ pub mod trie_key_parsers { #[cfg(test)] mod tests { + use crate::shard_layout::ShardLayout; use near_crypto::KeyType; + use near_primitives_core::version::PROTOCOL_VERSION; use super::*; @@ -794,4 +828,14 @@ mod tests { ); } } + + #[test] + fn test_shard_id_u16_optimization() { + let shard_layout = ShardLayout::for_protocol_version(PROTOCOL_VERSION); + let max_id = shard_layout.shard_ids().max().unwrap(); + assert!( + max_id <= u16::MAX as u64, + "buffered receipt trie key optimization broken, must fit in a u16" + ); + } } diff --git a/core/primitives/src/types.rs b/core/primitives/src/types.rs index 5b53b86dc2d..6aeba6ba155 100644 --- a/core/primitives/src/types.rs +++ b/core/primitives/src/types.rs @@ -379,6 +379,8 @@ impl StateChanges { TrieKey::PromiseYieldIndices => {} TrieKey::PromiseYieldTimeout { .. } => {} TrieKey::PromiseYieldReceipt { .. } => {} + TrieKey::BufferedReceiptIndices => {} + TrieKey::BufferedReceipt { .. } => {} } } diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 96ece091992..a19fe484946 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -24,8 +24,8 @@ use near_primitives::account::{AccessKey, Account}; pub use near_primitives::errors::{MissingTrieValueContext, StorageError}; use near_primitives::hash::CryptoHash; use near_primitives::receipt::{ - DelayedReceiptIndices, PromiseYieldIndices, PromiseYieldTimeout, Receipt, ReceiptEnum, - ReceivedData, + BufferedReceiptIndices, DelayedReceiptIndices, PromiseYieldIndices, PromiseYieldTimeout, + Receipt, ReceiptEnum, ReceivedData, }; pub use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::{trie_key_parsers, TrieKey}; @@ -902,6 +902,12 @@ pub fn has_promise_yield_receipt( trie.contains_key(&TrieKey::PromiseYieldReceipt { receiver_id, data_id }) } +pub fn get_buffered_receipt_indices( + trie: &dyn TrieAccess, +) -> Result { + Ok(get(trie, &TrieKey::BufferedReceiptIndices)?.unwrap_or_default()) +} + pub fn set_access_key( state_update: &mut TrieUpdate, account_id: AccountId, diff --git a/core/store/src/trie/receipts_column_helper.rs b/core/store/src/trie/receipts_column_helper.rs index 394192345c7..7e868f8626d 100644 --- a/core/store/src/trie/receipts_column_helper.rs +++ b/core/store/src/trie/receipts_column_helper.rs @@ -1,13 +1,13 @@ use crate::{get, set, TrieAccess, TrieUpdate}; use near_primitives::errors::{IntegerOverflowError, StorageError}; -use near_primitives::receipt::{Receipt, TrieQueueIndices}; +use near_primitives::receipt::{BufferedReceiptIndices, Receipt, TrieQueueIndices}; use near_primitives::trie_key::TrieKey; +use near_primitives::types::ShardId; /// Read-only iterator over receipt queues stored in the state trie. /// -/// This iterator currently only supports delayed receipts but is already -/// written general to work with the new queues that are going to be added for -/// congestion control. +/// This iterator currently supports delayed receipts and buffered outgoing +/// receipts. pub struct ReceiptIterator<'a> { indices: std::ops::Range, trie_queue: &'a dyn TrieQueue, @@ -27,6 +27,29 @@ pub struct DelayedReceiptQueue { indices: TrieQueueIndices, } +/// Type safe access to outgoing receipt buffers from this shard to all other +/// shards. Only use one at the time! +/// +/// Call [`ShardsOutgoingReceiptBuffer::to_shard`] to access queue operations on +/// a buffer to a specific shard. +pub struct ShardsOutgoingReceiptBuffer { + shards_indices: BufferedReceiptIndices, +} + +/// Type safe access to buffered receipts to a specific shard. +/// +/// Construct this from a parent `ShardsOutgoingReceiptBuffer` by calling +/// [`ShardsOutgoingReceiptBuffer::to_shard`]. Modification are written back to +/// the TrieUpdate immediately on every update. +/// +/// Due to the shared indices, modifying two `OutgoingReceiptBuffer` instances +/// independently would lead to inconsistencies. The mutable borrow ensures at +/// compile-time this does not happen. +pub struct OutgoingReceiptBuffer<'parent> { + shard_id: ShardId, + parent: &'parent mut ShardsOutgoingReceiptBuffer, +} + /// Common code for persistent queues stored in the trie. /// /// Here we use a trait to share code between different implementations of the @@ -149,6 +172,46 @@ impl TrieQueue for DelayedReceiptQueue { } } +impl ShardsOutgoingReceiptBuffer { + pub fn load(trie: &dyn TrieAccess) -> Result { + let shards_indices = crate::get_buffered_receipt_indices(trie)?; + Ok(Self { shards_indices }) + } + + pub fn to_shard(&mut self, shard_id: ShardId) -> OutgoingReceiptBuffer { + OutgoingReceiptBuffer { shard_id, parent: self } + } + + pub fn write_indices(&self, state_update: &mut TrieUpdate) { + set(state_update, TrieKey::BufferedReceiptIndices, &self.shards_indices); + } +} + +impl TrieQueue for OutgoingReceiptBuffer<'_> { + fn load_indices(&self, trie: &dyn TrieAccess) -> Result { + let all_indices: BufferedReceiptIndices = + get(trie, &TrieKey::BufferedReceiptIndices)?.unwrap_or_default(); + let indices = all_indices.shard_buffers.get(&self.shard_id).cloned().unwrap_or_default(); + Ok(indices) + } + + fn indices(&self) -> TrieQueueIndices { + self.parent.shards_indices.shard_buffers.get(&self.shard_id).cloned().unwrap_or_default() + } + + fn indices_mut(&mut self) -> &mut TrieQueueIndices { + self.parent.shards_indices.shard_buffers.entry(self.shard_id).or_default() + } + + fn write_indices(&self, state_update: &mut TrieUpdate) { + self.parent.write_indices(state_update); + } + + fn trie_key(&self, index: u64) -> TrieKey { + TrieKey::DelayedReceipt { index } + } +} + impl<'a> Iterator for ReceiptIterator<'a> { type Item = Result; @@ -192,28 +255,125 @@ mod tests { #[track_caller] fn check_delayed_receipt_queue(input_receipts: &[Receipt]) { let mut trie = init_state(); - let mut queue = DelayedReceiptQueue::load(&trie).expect("creating queue must not fail"); + // load a queue to fill it with receipts + { + let mut queue = DelayedReceiptQueue::load(&trie).expect("creating queue must not fail"); + check_push_to_receipt_queue(input_receipts, &mut trie, &mut queue); + } + + // drop queue and load another one to see if values are persisted + { + let mut queue = DelayedReceiptQueue::load(&trie).expect("creating queue must not fail"); + check_receipt_queue_contains_receipts(input_receipts, &mut trie, &mut queue); + } + } + + #[test] + fn test_outgoing_receipt_buffer_separately() { + // empty queues + check_outgoing_receipt_buffer_separately(&[]); + + // with random receipts + let mut rng = rand::thread_rng(); + check_outgoing_receipt_buffer_separately(&gen_receipts(&mut rng, 1)); + check_outgoing_receipt_buffer_separately(&gen_receipts(&mut rng, 10)); + check_outgoing_receipt_buffer_separately(&gen_receipts(&mut rng, 1000)); + } + + /// Check if inserting, reading, and popping from the outgoing buffers + /// works, loading one buffer at the time. + #[track_caller] + fn check_outgoing_receipt_buffer_separately(input_receipts: &[Receipt]) { + let mut trie = init_state(); + for id in 0..2u32 { + // load a buffer to fill it with receipts + { + let mut buffers = ShardsOutgoingReceiptBuffer::load(&trie) + .expect("creating buffers must not fail"); + let mut buffer = buffers.to_shard(ShardId::from(id)); + check_push_to_receipt_queue(input_receipts, &mut trie, &mut buffer); + } + + // drop queue and load another one to see if values are persisted + { + let mut buffers = ShardsOutgoingReceiptBuffer::load(&trie) + .expect("creating buffers must not fail"); + let mut buffer = buffers.to_shard(ShardId::from(id)); + check_receipt_queue_contains_receipts(input_receipts, &mut trie, &mut buffer); + } + } + } + + /// Check if inserting, reading, and popping from the outgoing buffers + /// works, loading buffers to all shards together. + #[test] + fn test_outgoing_receipt_buffer_combined() { + // empty queues + check_outgoing_receipt_buffer_combined(&[]); + + // with random receipts + let mut rng = rand::thread_rng(); + check_outgoing_receipt_buffer_combined(&gen_receipts(&mut rng, 1)); + check_outgoing_receipt_buffer_combined(&gen_receipts(&mut rng, 10)); + check_outgoing_receipt_buffer_combined(&gen_receipts(&mut rng, 1000)); + } + + #[track_caller] + fn check_outgoing_receipt_buffer_combined(input_receipts: &[Receipt]) { + let mut trie = init_state(); + // load shard_buffers once and hold on to it for the entire duration + let mut shard_buffers = + ShardsOutgoingReceiptBuffer::load(&trie).expect("creating buffers must not fail"); + for id in 0..2u32 { + // load a buffer to fill it with receipts + { + let mut buffer = shard_buffers.to_shard(ShardId::from(id)); + check_push_to_receipt_queue(input_receipts, &mut trie, &mut buffer); + } + + // drop queue and load another one to see if values are persisted + { + let mut buffer = shard_buffers.to_shard(ShardId::from(id)); + check_receipt_queue_contains_receipts(input_receipts, &mut trie, &mut buffer); + } + } + } + + /// Add given receipts to the receipts queue, then use `ReceiptIterator` to + /// read them back and assert it has the same receipts in the same order. + #[track_caller] + fn check_push_to_receipt_queue( + input_receipts: &[Receipt], + trie: &mut TrieUpdate, + queue: &mut impl TrieQueue, + ) { for receipt in input_receipts { - queue.push(&mut trie, receipt).expect("pushing must not fail"); + queue.push(trie, receipt).expect("pushing must not fail"); } let iterated_receipts: Vec = - queue.iter(&trie).collect::>().expect("iterating should not fail"); + queue.iter(trie).collect::>().expect("iterating should not fail"); // check 1: receipts should be in queue and contained in the iterator assert_eq!(input_receipts, iterated_receipts, "receipts were not recorded in queue"); + } - // check 2: drop queue and load another one to see if values are persisted - #[allow(clippy::drop_non_drop)] - drop(queue); - let mut queue = DelayedReceiptQueue::load(&trie).expect("creating queue must not fail"); + /// Assert receipts are in the queue and accessible from an iterator and + /// from popping one by one. + #[track_caller] + fn check_receipt_queue_contains_receipts( + input_receipts: &[Receipt], + trie: &mut TrieUpdate, + queue: &mut impl TrieQueue, + ) { + // check 2: assert newly loaded queue still contains the receipts let iterated_receipts: Vec = - queue.iter(&trie).collect::>().expect("iterating should not fail"); + queue.iter(trie).collect::>().expect("iterating should not fail"); assert_eq!(input_receipts, iterated_receipts, "receipts were not persisted correctly"); // check 3: pop receipts from queue and check if all are returned in the right order let mut popped = vec![]; - while let Some(receipt) = queue.pop(&mut trie).expect("pop must not fail") { + while let Some(receipt) = queue.pop(trie).expect("pop must not fail") { popped.push(receipt); } assert_eq!(input_receipts, popped, "receipts were not popped correctly"); diff --git a/core/store/src/trie/resharding.rs b/core/store/src/trie/resharding.rs index 0d71dcec6ac..9bd445f57a7 100644 --- a/core/store/src/trie/resharding.rs +++ b/core/store/src/trie/resharding.rs @@ -103,6 +103,9 @@ impl ShardTries { None => trie_update.remove(trie_key), } } + // TODO(congestion_control) + TrieKey::BufferedReceiptIndices => todo!(), + TrieKey::BufferedReceipt { .. } => todo!(), } } for (_, update) in trie_updates.iter_mut() { diff --git a/integration-tests/src/tests/client/features/multinode_test_loop_example.rs b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs index b6804b68070..538a7bdef3f 100644 --- a/integration-tests/src/tests/client/features/multinode_test_loop_example.rs +++ b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs @@ -33,7 +33,7 @@ use near_chunks::test_loop::{ }; use near_chunks::ShardsManager; use near_client::client_actions::{ - ClientActions, ClientSenderForClientMessage, ClientSenderForStateWitnessMessage, + ClientActions, ClientSenderForClientMessage, ClientSenderForPartialWitnessMessage, SyncJobsSenderForClientMessage, }; use near_client::sync::sync_actor::SyncActor; @@ -46,9 +46,9 @@ use near_client::test_utils::test_loop::client_actions::{ forward_client_messages_from_shards_manager, forward_client_messages_from_sync_adapter, forward_client_messages_from_sync_jobs_to_client_actions, }; -use near_client::test_utils::test_loop::state_witness_actions::{ - forward_messages_from_client_to_state_witness_actor, - forward_messages_from_network_to_state_witness_actor, +use near_client::test_utils::test_loop::partial_witness_actions::{ + forward_messages_from_client_to_partial_witness_actor, + forward_messages_from_network_to_partial_witness_actor, }; use near_client::test_utils::test_loop::sync_actor::{ forward_sync_actor_messages_from_client, forward_sync_actor_messages_from_network, @@ -59,11 +59,12 @@ use near_client::test_utils::test_loop::sync_jobs_actions::{ forward_sync_jobs_messages_from_sync_jobs_to_sync_jobs_actions, }; use near_client::test_utils::test_loop::{ - forward_messages_from_state_witness_actor_to_client, print_basic_client_info_before_each_event, + forward_messages_from_partial_witness_actor_to_client, + print_basic_client_info_before_each_event, }; use near_client::test_utils::test_loop::{route_network_messages_to_client, ClientQueries}; use near_client::{ - Client, StateWitnessActions, StateWitnessSenderForClientMessage, SyncAdapter, SyncMessage, + Client, PartialWitnessActions, PartialWitnessSenderForClientMessage, SyncAdapter, SyncMessage, }; use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; use near_epoch_manager::EpochManager; @@ -72,7 +73,7 @@ use near_network::client::{ }; use near_network::shards_manager::ShardsManagerRequestFromNetwork; use near_network::state_sync::StateSyncResponse; -use near_network::state_witness::StateWitnessSenderForNetworkMessage; +use near_network::state_witness::PartialWitnessSenderForNetworkMessage; use near_network::types::{PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo}; use near_primitives::network::PeerId; use near_primitives::shard_layout::{ShardLayout, ShardUId}; @@ -101,7 +102,7 @@ struct TestData { pub client: ClientActions, pub sync_jobs: SyncJobsActions, pub shards_manager: ShardsManager, - pub state_witness: StateWitnessActions, + pub partial_witness: PartialWitnessActions, pub sync_actors: TestSyncActors, pub state_sync_dumper: StateSyncDumper, pub state_snapshot: StateSnapshotActions, @@ -173,12 +174,12 @@ enum TestEvent { ), /// Calls to the network component to set chain info. SetChainInfo(SetChainInfo), - /// Message from Client to StateWitnessActor. - StateWitnessSenderForClient(StateWitnessSenderForClientMessage), - /// Message from Network to StateWitnessActor. - StateWitnessSenderForNetwork(StateWitnessSenderForNetworkMessage), - /// Message from StateWitnessActor to Client. - ClientSenderForStateWitness(ClientSenderForStateWitnessMessage), + /// Message from Client to PartialWitnessActor. + PartialWitnessSenderForClient(PartialWitnessSenderForClientMessage), + /// Message from Network to PartialWitnessActor. + PArtialWitnessSenderForNetwork(PartialWitnessSenderForNetworkMessage), + /// Message from PartialWitnessActor to Client. + PartialSenderForStateWitness(ClientSenderForPartialWitnessMessage), } const ONE_NEAR: u128 = 1_000_000_000_000_000_000_000_000; @@ -363,7 +364,7 @@ fn test_client_with_multi_test_loop() { builder .sender() .for_index(idx) - .into_wrapped_multi_sender::(), + .into_wrapped_multi_sender::(), ) .unwrap(); @@ -402,13 +403,13 @@ fn test_client_with_multi_test_loop() { ) .unwrap(); - let state_witness_actions = StateWitnessActions::new( + let partial_witness_actions = PartialWitnessActions::new( builder.clock(), builder.sender().for_index(idx).into_multi_sender(), builder .sender() .for_index(idx) - .into_wrapped_multi_sender::(), + .into_wrapped_multi_sender::(), validator_signer, epoch_manager.clone(), ); @@ -435,7 +436,7 @@ fn test_client_with_multi_test_loop() { client: client_actions, sync_jobs: sync_jobs_actions, shards_manager, - state_witness: state_witness_actions, + partial_witness: partial_witness_actions, sync_actors, state_sync_dumper, state_snapshot, @@ -469,7 +470,7 @@ fn test_client_with_multi_test_loop() { ); test.register_handler(forward_client_messages_from_shards_manager().widen().for_index(idx)); test.register_handler( - forward_messages_from_state_witness_actor_to_client().widen().for_index(idx), + forward_messages_from_partial_witness_actor_to_client().widen().for_index(idx), ); test.register_handler(forward_client_messages_from_sync_adapter().widen().for_index(idx)); @@ -506,12 +507,12 @@ fn test_client_with_multi_test_loop() { // Messages to the network layer; multi-node messages are handled below. test.register_handler(ignore_events::().widen().for_index(idx)); - // Messages to StateWitnessActor. + // Messages to PartialWitnessActor. test.register_handler( - forward_messages_from_client_to_state_witness_actor().widen().for_index(idx), + forward_messages_from_client_to_partial_witness_actor().widen().for_index(idx), ); test.register_handler( - forward_messages_from_network_to_state_witness_actor().widen().for_index(idx), + forward_messages_from_network_to_partial_witness_actor().widen().for_index(idx), ); } // Handles network routing. Outgoing messages are handled by emitting incoming messages to the diff --git a/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs b/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs index 76cd44a8df6..c22e41d1258 100644 --- a/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs +++ b/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs @@ -128,9 +128,9 @@ fn setup_orphan_witness_test() -> OrphanWitnessTestEnv { // The witness isn't processed on `excluded_validator` to give users of // `setup_orphan_witness_test()` full control over the events. let mut encoded_witness_opt = None; - let state_witness_adapter = - env.state_witness_adapters[env.get_client_index(&block2_chunk_producer)].clone(); - while let Some(request) = state_witness_adapter.pop_distribution_request() { + let partial_witness_adapter = + env.partial_witness_adapters[env.get_client_index(&block2_chunk_producer)].clone(); + while let Some(request) = partial_witness_adapter.pop_distribution_request() { let DistributeStateWitnessRequest { epoch_id, chunk_header, state_witness } = request; let (encoded_witness, _) = EncodedChunkStateWitness::encode(&state_witness).unwrap(); let chunk_validators = env diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index e81f50a838d..63308c22fc5 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -8,7 +8,7 @@ use near_chain::{Chain, ChainGenesis}; use near_chain_configs::{ClientConfig, Genesis, GenesisConfig}; use near_chunks::shards_manager_actor::start_shards_manager; use near_client::adapter::client_sender_for_network; -use near_client::{start_client, start_view_client, StateWitnessActor, SyncAdapter}; +use near_client::{start_client, start_view_client, PartialWitnessActor, SyncAdapter}; use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::EpochManager; use near_network::actix::ActixSystem; @@ -128,7 +128,7 @@ fn setup_network_node( runtime.store().clone(), client_config.chunk_request_retry_period, ); - let (state_witness_actor, _) = StateWitnessActor::spawn( + let (partial_witness_actor, _) = PartialWitnessActor::spawn( Clock::real(), network_adapter.as_multi_sender(), client_actor.clone().with_auto_span_context().into_multi_sender(), @@ -142,7 +142,7 @@ fn setup_network_node( config, client_sender_for_network(client_actor, view_client_actor), shards_manager_adapter.as_sender(), - state_witness_actor.with_auto_span_context().into_multi_sender(), + partial_witness_actor.with_auto_span_context().into_multi_sender(), genesis_id, ) .unwrap(); diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index e6aba5de83e..bcabb698dbd 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -24,8 +24,8 @@ use near_chunks::shards_manager_actor::start_shards_manager; use near_client::adapter::client_sender_for_network; use near_client::sync::adapter::SyncAdapter; use near_client::{ - start_client, start_view_client, ClientActor, ConfigUpdater, StartClientResult, - StateWitnessActor, ViewClientActor, + start_client, start_view_client, ClientActor, ConfigUpdater, PartialWitnessActor, + StartClientResult, ViewClientActor, }; use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; use near_epoch_manager::EpochManager; @@ -324,7 +324,7 @@ pub fn start_with_config_and_synchronization( let network_adapter = LateBoundSender::new(); let shards_manager_adapter = LateBoundSender::new(); let client_adapter_for_shards_manager = LateBoundSender::new(); - let client_adapter_for_state_witness_actor = LateBoundSender::new(); + let client_adapter_for_partial_witness_actor = LateBoundSender::new(); let adv = near_client::adversarial::Controls::new(config.client_config.archive); let view_client = start_view_client( @@ -353,16 +353,16 @@ pub fn start_with_config_and_synchronization( ); let snapshot_callbacks = SnapshotCallbacks { make_snapshot_callback, delete_snapshot_callback }; - let (state_witness_actor, state_witness_arbiter) = if config.validator_signer.is_some() { + let (partial_witness_actor, partial_witness_arbiter) = if config.validator_signer.is_some() { let my_signer = config.validator_signer.clone().unwrap(); - let (state_witness_actor, state_witness_arbiter) = StateWitnessActor::spawn( + let (partial_witness_actor, partial_witness_arbiter) = PartialWitnessActor::spawn( Clock::real(), network_adapter.as_multi_sender(), - client_adapter_for_state_witness_actor.as_multi_sender(), + client_adapter_for_partial_witness_actor.as_multi_sender(), my_signer, epoch_manager.clone(), ); - (Some(state_witness_actor), Some(state_witness_arbiter)) + (Some(partial_witness_actor), Some(partial_witness_arbiter)) } else { (None, None) }; @@ -393,7 +393,7 @@ pub fn start_with_config_and_synchronization( shutdown_signal, adv, config_updater, - state_witness_actor + partial_witness_actor .clone() .map(|actor| actor.with_auto_span_context().into_multi_sender()) .unwrap_or_else(|| noop().into_multi_sender()), @@ -402,7 +402,7 @@ pub fn start_with_config_and_synchronization( client_adapter_for_sync.bind(client_actor.clone().with_auto_span_context()) }; client_adapter_for_shards_manager.bind(client_actor.clone().with_auto_span_context()); - client_adapter_for_state_witness_actor.bind(client_actor.clone().with_auto_span_context()); + client_adapter_for_partial_witness_actor.bind(client_actor.clone().with_auto_span_context()); let (shards_manager_actor, shards_manager_arbiter_handle) = start_shards_manager( epoch_manager.clone(), shard_tracker.clone(), @@ -443,7 +443,7 @@ pub fn start_with_config_and_synchronization( config.network_config, client_sender_for_network(client_actor.clone(), view_client.clone()), shards_manager_adapter.as_sender(), - state_witness_actor + partial_witness_actor .map(|actor| actor.with_auto_span_context().into_multi_sender()) .unwrap_or_else(|| noop().into_multi_sender()), genesis_id, @@ -500,8 +500,8 @@ pub fn start_with_config_and_synchronization( if let Some(db_metrics_arbiter) = db_metrics_arbiter { arbiters.push(db_metrics_arbiter); } - if let Some(state_witness_arbiter) = state_witness_arbiter { - arbiters.push(state_witness_arbiter); + if let Some(partial_witness_arbiter) = partial_witness_arbiter { + arbiters.push(partial_witness_arbiter); } Ok(NearNode { diff --git a/nightly/pytest-sanity.txt b/nightly/pytest-sanity.txt index aaa7842d769..53d60cfc0c4 100644 --- a/nightly/pytest-sanity.txt +++ b/nightly/pytest-sanity.txt @@ -127,8 +127,15 @@ pytest sanity/proxy_example.py pytest sanity/proxy_example.py --features nightly pytest sanity/rpc_tx_submission.py pytest sanity/rpc_tx_submission.py --features nightly -pytest sanity/state_sync_fail.py -pytest sanity/state_sync_fail.py --features nightly + +# The state sync fail test checks that state sync fails during or after +# resharding. Currently it's disabled because resharding and congestion control +# are not fully integrated. There are two steps to be taken: +# TODO(congestion_control) - fix integration with resharding and enable fail test +# TODO(resharding) - fix integration with state sync and adjust fail test +# TODO(#9519) +# pytest sanity/state_sync_fail.py +# pytest sanity/state_sync_fail.py --features nightly pytest sanity/restart.py pytest sanity/restart.py --features nightly @@ -161,8 +168,8 @@ pytest --timeout=120 sanity/resharding_rpc_tx.py pytest --timeout=120 sanity/resharding_restart.py pytest --timeout=120 sanity/resharding_error_handling.py -# Tests for resharding in nightly are disabled because resharding is not -# compatible with stateless validation. +# TODO(resharding) Tests for resharding in nightly are disabled because +# resharding is not compatible with stateless validation. # pytest --timeout=120 sanity/resharding.py --features nightly # pytest --timeout=120 sanity/resharding_rpc_tx.py --features nightly # pytest --timeout=120 sanity/resharding_restart.py --features nightly diff --git a/pytest/lib/configured_logger.py b/pytest/lib/configured_logger.py index 138a50ca21d..b583b5740c7 100644 --- a/pytest/lib/configured_logger.py +++ b/pytest/lib/configured_logger.py @@ -44,6 +44,8 @@ def new_logger( log.addHandler(handler) log.propagate = False + log.info(f"created configured logger, name {name}, level {level}") + return log diff --git a/pytest/lib/messages/block.py b/pytest/lib/messages/block.py index 7da4a828bf5..556ad763c5e 100644 --- a/pytest/lib/messages/block.py +++ b/pytest/lib/messages/block.py @@ -183,6 +183,10 @@ class ShardChunkHeaderInnerV2: pass +class ShardChunkHeaderInnerV3: + pass + + class PartialEncodedChunkPart: pass @@ -257,6 +261,14 @@ class ApprovalInner: pass +class CongestionInfo: + pass + + +class CongestionInfoV1: + pass + + block_schema = [ [ Block, { @@ -605,7 +617,8 @@ class ApprovalInner: 'field': 'enum', 'values': [['V1', ShardChunkHeaderInnerV1], - ['V2', ShardChunkHeaderInnerV2]] + ['V2', ShardChunkHeaderInnerV2], + ['V3', ShardChunkHeaderInnerV3]] } ], [ @@ -650,6 +663,28 @@ class ApprovalInner: ] } ], + [ + ShardChunkHeaderInnerV3, { + 'kind': + 'struct', + 'fields': [ + ['prev_block_hash', [32]], + ['prev_state_root', [32]], + ['outcome_root', [32]], + ['encoded_merkle_root', [32]], + ['encoded_length', 'u64'], + ['height_created', 'u64'], + ['shard_id', 'u64'], + ['gas_used', 'u64'], + ['gas_limit', 'u64'], + ['balance_burnt', 'u128'], + ['outgoing_receipt_root', [32]], + ['tx_root', [32]], + ['validator_proposals', [ValidatorStake]], + ['congestion_info', CongestionInfo], + ] + } + ], [ ShardChunk, { 'kind': 'enum', @@ -797,4 +832,23 @@ class ApprovalInner: ] } ], + [ + CongestionInfo, { + 'kind': 'enum', + 'field': 'enum', + 'values': [['V1', CongestionInfoV1]] + } + ], + [ + CongestionInfoV1, { + 'kind': + 'struct', + 'fields': [ + ['delayed_receipts_gas', 'u128'], + ['buffered_receipts_gas', 'u128'], + ['receipt_bytes', 'u64'], + ['allowed_shard', 'u16'], + ] + } + ] ] diff --git a/pytest/lib/messages/network.py b/pytest/lib/messages/network.py index 72cf4510584..58952d12ab2 100644 --- a/pytest/lib/messages/network.py +++ b/pytest/lib/messages/network.py @@ -170,6 +170,11 @@ class PartialSync: ['EpochSyncFinalizationRequest', None], # TODO ['EpochSyncFinalizationResponse', None], # TODO ['RoutingTableSyncV2', RoutingTableSyncV2], + ['DistanceVector', None], + ['StateRequestHeader', None], + ['StateRequestPart', None], + ['VersionedStateResponse', None], + ['SyncSnapshotHosts', None], ] } ], diff --git a/pytest/lib/proxy.py b/pytest/lib/proxy.py index 7e81571e2cd..a968db5c9ca 100644 --- a/pytest/lib/proxy.py +++ b/pytest/lib/proxy.py @@ -90,11 +90,16 @@ async def _handle(self, raw_message, *, writer, sender_port_holder, # (or preferably reimplement the test in rust). try: message = BinarySerializer(schema).deserialize( - raw_message, PeerMessage) - except IndexError: - # unparsable message, ignore. + raw_message, + PeerMessage, + ) + except IndexError as err: + # This can happen when new fields are added to any of the + # messages. The new fields need to be added to the schema in the + # messages directory. logging.warn( - f"Warning: could not proxy message {raw_message.hex()}") + f"Warning: could not deserialize and proxy message err: '{err}' message: '{raw_message.hex()}'" + ) return assert BinarySerializer(schema).serialize(message) == raw_message @@ -118,7 +123,7 @@ async def _handle(self, raw_message, *, writer, sender_port_holder, decision = BinarySerializer(schema).serialize(decision) return decision - except: + except Exception as e: # TODO: Remove this if raw_message[0] == 13: # raw_message[0] == 13 is RoutedMessage. Skip leading fields to get to the RoutedMessageBody @@ -136,10 +141,11 @@ async def _handle(self, raw_message, *, writer, sender_port_holder, # Allow the handler determine if the message should be passed even when it couldn't be deserialized return await self.handle(None, sender_ordinal, receiver_ordinal) is not False - logger.info(f"ERROR 13 {int(raw_message[ser.offset])}") + logger.info( + f"ERROR 13 {int(raw_message[ser.offset])} exception={e}") else: - logger.info(f"ERROR {int(raw_message[0])}") + logger.info(f"ERROR {int(raw_message[0])} exception={e}") raise @@ -316,9 +322,9 @@ async def handle_connection(outer_reader, outer_writer, inner_port, outer_port, logging.debug( f"ConnectionRefusedError (handle_connection). port={_MY_PORT} connection_id={connection_id} global_stopped={global_stopped.value} local_stopped={local_stopped.value} error={error.value}" ) - except: + except Exception as e: logging.debug( - f"Other Error (handle_connection). port={_MY_PORT} connection_id={connection_id} global_stopped={global_stopped.value} local_stopped={local_stopped.value} error={error.value}" + f"Other Error (handle_connection). port={_MY_PORT} connection_id={connection_id} global_stopped={global_stopped.value} local_stopped={local_stopped.value} error={error.value} exception={e}" ) error.value = 1 raise diff --git a/pytest/lib/serializer.py b/pytest/lib/serializer.py index 60808ba0cb1..78bc20d3cc6 100644 --- a/pytest/lib/serializer.py +++ b/pytest/lib/serializer.py @@ -1,3 +1,16 @@ +# Small library for borsh serialization and deserialization. + +import logging +import pathlib +import sys + +sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) + +from configured_logger import new_logger + +logger = new_logger('serializer', level=logging.INFO) + + class BinarySerializer: def __init__(self, schema): @@ -20,6 +33,8 @@ def serialize_num(self, value, n_bytes): assert value == 0 def deserialize_num(self, n_bytes): + logger.debug(f"deserialize_num {n_bytes}") + value = 0 bytes_ = self.read_bytes(n_bytes) for b in bytes_[::-1]: @@ -72,6 +87,8 @@ def serialize_field(self, value, fieldType): assert False, type(fieldType) def deserialize_field(self, fieldType): + logger.debug(f"deserialize_field {fieldType} {type(fieldType)}") + if type(fieldType) == tuple: if len(fieldType) == 0: return None @@ -138,15 +155,33 @@ def serialize_struct(self, obj): def deserialize_struct(self, type_): structSchema = self.schema[type_] + logger.debug(f"deserialize_struct {type_} {structSchema['kind']}") + if structSchema['kind'] == 'struct': ret = type_() for fieldName, fieldType in structSchema['fields']: setattr(ret, fieldName, self.deserialize_field(fieldType)) return ret elif structSchema['kind'] == 'enum': - ret = type_() value_ord = self.deserialize_num(1) + logger.debug( + f"deserialize_struct {type_} {structSchema['kind']} enum value ord {value_ord} struct schema {len(structSchema['values'])}" + ) + value_schema = structSchema['values'][value_ord] + logger.debug( + f"deserialize_struct {type_} {structSchema['kind']} enum value sch {value_schema}" + ) + + if value_schema is None: + raise IndexError(f"value schema missing for type {type_}") + + if value_schema[1] is None: + raise IndexError( + f"value schema not supported for {type_}::{value_schema[0]}" + ) + + ret = type_() setattr(ret, structSchema['field'], value_schema[0]) setattr(ret, value_schema[0], self.deserialize_field(value_schema[1])) diff --git a/pytest/requirements.txt b/pytest/requirements.txt index c5b87763fa1..9e3b6a7c64f 100644 --- a/pytest/requirements.txt +++ b/pytest/requirements.txt @@ -3,6 +3,8 @@ base58 cython deepdiff ed25519 +locust +nearup numpy prometheus-client psutil diff --git a/pytest/tests/sanity/proxy_simple.py b/pytest/tests/sanity/proxy_simple.py index 51a1367a893..e21ed4e9f6e 100755 --- a/pytest/tests/sanity/proxy_simple.py +++ b/pytest/tests/sanity/proxy_simple.py @@ -2,8 +2,8 @@ # Start two nodes. Proxify both nodes # and wait until block at height >= 10 pass through the proxy. import sys, time -import multiprocessing import pathlib +from multiprocessing import Value sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) @@ -13,9 +13,6 @@ from peer import * from proxy import ProxyHandler -from multiprocessing import Value -from utils import obj_to_string - TIMEOUT = 30 diff --git a/runtime/near-vm-runner/Cargo.toml b/runtime/near-vm-runner/Cargo.toml index 042bd0bab2c..e5741215ebb 100644 --- a/runtime/near-vm-runner/Cargo.toml +++ b/runtime/near-vm-runner/Cargo.toml @@ -37,13 +37,15 @@ strum.workspace = true tempfile.workspace = true thiserror.workspace = true tracing.workspace = true +prometheus.workspace = true wasm-encoder = { workspace = true, optional = true } wasmparser = { workspace = true, optional = true } wasmtime = { workspace = true, optional = true } near-crypto.workspace = true -near-primitives-core.workspace = true +near-o11y.workspace = true near-parameters.workspace = true +near-primitives-core.workspace = true bytesize.workspace = true # Old versions of pwasm-utils we need to preserve backwards compatibility under @@ -82,6 +84,7 @@ wat.workspace = true [features] nightly_protocol = [ + "near-o11y/nightly_protocol", "near-parameters/nightly_protocol", "near-primitives-core/nightly_protocol", ] @@ -115,6 +118,7 @@ protocol_feature_fix_contract_loading_cost = [ ] nightly = [ + "near-o11y/nightly", "near-parameters/nightly", "near-primitives-core/nightly", "nightly_protocol", diff --git a/runtime/near-vm-runner/src/lib.rs b/runtime/near-vm-runner/src/lib.rs index 0b9911e504f..93138fceee1 100644 --- a/runtime/near-vm-runner/src/lib.rs +++ b/runtime/near-vm-runner/src/lib.rs @@ -10,6 +10,7 @@ mod instrument; pub mod logic; #[cfg(all(feature = "wasmer0_vm", target_arch = "x86_64"))] mod memory; +mod metrics; #[cfg(all(feature = "near_vm", target_arch = "x86_64"))] mod near_vm_runner; #[cfg(feature = "prepare")] @@ -33,6 +34,7 @@ pub use cache::{ NoContractRuntimeCache, }; pub use code::ContractCode; +pub use metrics::Metrics; pub use profile::ProfileDataV3; pub use runner::{run, VM}; diff --git a/runtime/near-vm-runner/src/metrics.rs b/runtime/near-vm-runner/src/metrics.rs new file mode 100644 index 00000000000..a8228449536 --- /dev/null +++ b/runtime/near-vm-runner/src/metrics.rs @@ -0,0 +1,65 @@ +use near_o11y::metrics::{try_create_histogram_vec, HistogramVec}; +use once_cell::sync::Lazy; +use std::{cell::RefCell, time::Duration}; + +thread_local! { + static METRICS: RefCell = const { RefCell::new(Metrics { + near_vm_compilation_time: Duration::new(0, 0), + wasmtime_compilation_time: Duration::new(0, 0), + }) }; +} + +pub static COMPILATION_TIME: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_vm_runner_compilation_seconds", + "Histogram of how long it takes to compile things", + &["vm_kind", "shard_id"], + None, + ) + .unwrap() +}); + +#[derive(Default, Copy, Clone)] +pub struct Metrics { + near_vm_compilation_time: Duration, + wasmtime_compilation_time: Duration, +} + +impl Metrics { + pub fn reset() { + METRICS.with_borrow_mut(|m| *m = Self::default()); + } + + /// Get the current metrics. + /// + /// Note that this is a thread-local operation. + pub fn get() -> Metrics { + METRICS.with_borrow(|m| *m) + } + + pub fn report(&mut self, shard_id: &str) { + if !self.near_vm_compilation_time.is_zero() { + COMPILATION_TIME + .with_label_values(&["near_vm", shard_id]) + .observe(self.near_vm_compilation_time.as_secs_f64()); + self.near_vm_compilation_time = Duration::default(); + } + if !self.wasmtime_compilation_time.is_zero() { + COMPILATION_TIME + .with_label_values(&["wasmtime", shard_id]) + .observe(self.wasmtime_compilation_time.as_secs_f64()); + self.wasmtime_compilation_time = Duration::default(); + } + } +} + +#[cfg(any(feature = "near_vm", feature = "wasmtime_vm"))] +pub(crate) fn compilation_duration(kind: near_parameters::vm::VMKind, duration: Duration) { + use near_parameters::vm::VMKind; + METRICS.with_borrow_mut(|m| match kind { + VMKind::Wasmer0 => {} + VMKind::Wasmtime => m.wasmtime_compilation_time += duration, + VMKind::Wasmer2 => {} + VMKind::NearVm => m.near_vm_compilation_time += duration, + }); +} diff --git a/runtime/near-vm-runner/src/near_vm_runner/runner.rs b/runtime/near-vm-runner/src/near_vm_runner/runner.rs index b0a2a206249..2c6cea3498d 100644 --- a/runtime/near-vm-runner/src/near_vm_runner/runner.rs +++ b/runtime/near-vm-runner/src/near_vm_runner/runner.rs @@ -162,6 +162,7 @@ impl NearVM { code: &ContractCode, ) -> Result { let _span = tracing::debug_span!(target: "vm", "NearVM::compile_uncached").entered(); + let start = std::time::Instant::now(); let prepared_code = prepare::prepare_contract(code.code(), &self.config, VMKind::NearVm) .map_err(CompilationError::PrepareError)?; @@ -176,6 +177,7 @@ impl NearVM { tracing::error!(?err, "near_vm failed to compile the prepared code (this is defense-in-depth, the error was recovered from but should be reported to pagoda)"); CompilationError::WasmerCompileError { msg: err.to_string() } })?; + crate::metrics::compilation_duration(VMKind::NearVm, start.elapsed()); Ok(executable) } diff --git a/runtime/near-vm-runner/src/runner.rs b/runtime/near-vm-runner/src/runner.rs index 266634ba313..bdfdff2aebd 100644 --- a/runtime/near-vm-runner/src/runner.rs +++ b/runtime/near-vm-runner/src/runner.rs @@ -58,12 +58,13 @@ pub fn run( fees_config: &RuntimeFeesConfig, promise_results: &[PromiseResult], cache: Option<&dyn ContractRuntimeCache>, -) -> VMResult { +) -> (VMResult, crate::Metrics) { let span = tracing::Span::current(); let vm_kind = wasm_config.vm_kind; let runtime = vm_kind .runtime(wasm_config.clone()) .unwrap_or_else(|| panic!("the {vm_kind:?} runtime has not been enabled at compile time")); + crate::Metrics::reset(); let outcome = runtime.run( account.code_hash(), @@ -74,11 +75,15 @@ pub fn run( fees_config, promise_results, cache, - )?; + ); + let outcome = match outcome { + Ok(o) => o, + e @ Err(_) => return (e, crate::Metrics::get()), + }; span.record("burnt_gas", outcome.burnt_gas); span.record("compute_usage", outcome.compute_usage); - Ok(outcome) + (Ok(outcome), crate::Metrics::get()) } pub trait VM { diff --git a/runtime/near-vm-runner/src/wasmtime_runner.rs b/runtime/near-vm-runner/src/wasmtime_runner.rs index fab63f399e6..7c76b60780b 100644 --- a/runtime/near-vm-runner/src/wasmtime_runner.rs +++ b/runtime/near-vm-runner/src/wasmtime_runner.rs @@ -190,10 +190,12 @@ impl crate::runner::VM for WasmtimeVM { Ok(code) => code, Err(err) => return Ok(VMOutcome::abort(logic, FunctionCallError::from(err))), }; + let start = std::time::Instant::now(); let module = match Module::new(&engine, prepared_code) { Ok(module) => module, Err(err) => return Ok(VMOutcome::abort(logic, err.into_vm_error()?)), }; + crate::metrics::compilation_duration(VMKind::Wasmtime, start.elapsed()); let mut linker = Linker::new(&engine); let result = logic.after_loading_executable(code.code().len() as u64); diff --git a/runtime/runtime/src/actions.rs b/runtime/runtime/src/actions.rs index 4763c252592..496854b4607 100644 --- a/runtime/runtime/src/actions.rs +++ b/runtime/runtime/src/actions.rs @@ -119,7 +119,7 @@ pub(crate) fn execute_function_call( if checked_feature!("stable", ChunkNodesCache, protocol_version) { runtime_ext.set_trie_cache_mode(TrieCacheMode::CachingChunk); } - let result_from_cache = near_vm_runner::run( + let (result_from_cache, mut metrics) = near_vm_runner::run( account, None, &function_call.method_name, @@ -130,6 +130,7 @@ pub(crate) fn execute_function_call( promise_results, apply_state.cache.as_deref(), ); + metrics.report(&apply_state.shard_id.to_string()); let result = match result_from_cache { Err(VMRunnerError::CacheError(CacheError::ReadError(err))) if err.kind() == std::io::ErrorKind::NotFound => @@ -157,7 +158,7 @@ pub(crate) fn execute_function_call( if checked_feature!("stable", ChunkNodesCache, protocol_version) { runtime_ext.set_trie_cache_mode(TrieCacheMode::CachingChunk); } - near_vm_runner::run( + let (r, mut metrics) = near_vm_runner::run( account, Some(&code), &function_call.method_name, @@ -167,7 +168,9 @@ pub(crate) fn execute_function_call( &config.fees, promise_results, apply_state.cache.as_deref(), - ) + ); + metrics.report(&apply_state.shard_id.to_string()); + r } res => res, };