Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stateless_validation] Push get_chunk_validator_assignments call to StateWitnessActor #11090

1 change: 0 additions & 1 deletion chain/client/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ pub fn client_sender_for_network(
tx_status_response: view_client_addr.clone().into_sender(),
announce_account: view_client_addr.into_sender(),
chunk_state_witness: client_addr.clone().into_sender(),
chunk_state_witness_ack: client_addr.clone().into_sender(),
chunk_endorsement: client_addr.into_sender(),
}
}
13 changes: 5 additions & 8 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::debug::BlockProductionTracker;
use crate::debug::PRODUCTION_TIMES_CACHE_SIZE;
use crate::stateless_validation::chunk_endorsement_tracker::ChunkEndorsementTracker;
use crate::stateless_validation::chunk_validator::ChunkValidator;
use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker;
use crate::stateless_validation::state_witness_actor::StateWitnessSenderForClient;
use crate::sync::adapter::SyncShardInfo;
use crate::sync::block::BlockSync;
use crate::sync::epoch::EpochSync;
Expand Down Expand Up @@ -167,19 +167,16 @@ pub struct Client {
/// Last time the head was updated, or our head was rebroadcasted. Used to re-broadcast the head
/// again to prevent network from stalling if a large percentage of the network missed a block
last_time_head_progress_made: Instant,

/// Block production timing information. Used only for debug purposes.
/// Stores approval information and production time of the block
pub block_production_info: BlockProductionTracker,
/// Chunk production timing information. Used only for debug purposes.
pub chunk_production_info: lru::LruCache<(BlockHeight, ShardId), ChunkProduction>,

/// Cached precomputed set of TIER1 accounts.
/// See send_network_chain_info().
tier1_accounts_cache: Option<(EpochId, Arc<AccountKeys>)>,
/// Used when it is needed to create flat storage in background for some shards.
flat_storage_creator: Option<FlatStorageCreator>,

/// When the "sync block" was requested.
/// The "sync block" is the last block of the previous epoch, i.e. `prev_hash` of the `sync_hash` block.
pub last_time_sync_block_requested: Option<near_async::time::Utc>,
Expand All @@ -191,9 +188,8 @@ pub struct Client {
pub chunk_inclusion_tracker: ChunkInclusionTracker,
/// Tracks chunk endorsements received from chunk validators. Used to filter out chunks ready for inclusion
pub chunk_endorsement_tracker: Arc<ChunkEndorsementTracker>,
/// Tracks a collection of state witnesses sent from chunk producers to chunk validators.
pub state_witness_tracker: ChunkStateWitnessTracker,

/// Adapter to send request to state_witness_actor to distribute state witness.
pub state_witness_adapter: StateWitnessSenderForClient,
// Optional value used for the Chunk Distribution Network Feature.
chunk_distribution_network: Option<ChunkDistributionNetwork>,
}
Expand Down Expand Up @@ -255,6 +251,7 @@ impl Client {
rng_seed: RngSeed,
snapshot_callbacks: Option<SnapshotCallbacks>,
async_computation_spawner: Arc<dyn AsyncComputationSpawner>,
state_witness_adapter: StateWitnessSenderForClient,
) -> Result<Self, Error> {
let doomslug_threshold_mode = if enable_doomslug {
DoomslugThresholdMode::TwoThirds
Expand Down Expand Up @@ -409,7 +406,7 @@ impl Client {
chunk_validator,
chunk_inclusion_tracker: ChunkInclusionTracker::new(),
chunk_endorsement_tracker,
state_witness_tracker: ChunkStateWitnessTracker::new(clock),
state_witness_adapter,
chunk_distribution_network,
})
}
Expand Down
18 changes: 7 additions & 11 deletions chain/client/src/client_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::debug::new_network_info_view;
use crate::info::{display_sync_status, InfoHelper};
use crate::sync::adapter::{SyncMessage, SyncShardInfo};
use crate::sync::state::{StateSync, StateSyncResult};
use crate::{metrics, StatusResponse};
use crate::{metrics, DistributeStateWitnessRequest, StatusResponse};
use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt, FutureSpawner};
use near_async::messaging::{CanSend, Sender};
use near_async::time::{Clock, Utc};
Expand Down Expand Up @@ -41,8 +41,8 @@ use near_client_primitives::types::{
};
use near_network::client::{
BlockApproval, BlockHeadersResponse, BlockResponse, ChunkEndorsementMessage,
ChunkStateWitnessAckMessage, ChunkStateWitnessMessage, ProcessTxRequest, ProcessTxResponse,
RecvChallenge, SetNetworkInfo, StateResponse,
ChunkStateWitnessMessage, ProcessTxRequest, ProcessTxResponse, RecvChallenge, SetNetworkInfo,
StateResponse,
};
use near_network::types::ReasonForBan;
use near_network::types::{
Expand Down Expand Up @@ -95,6 +95,10 @@ pub struct SyncJobsSenderForClient {
pub resharding: Sender<ReshardingRequest>,
}

pub struct StateWitnessSenderForClient {
pub distribute_chunk_state_witness: Sender<DistributeStateWitnessRequest>,
}

pub struct ClientActions {
clock: Clock,

Expand Down Expand Up @@ -1865,14 +1869,6 @@ impl ClientActionHandler<ChunkStateWitnessMessage> for ClientActions {
}
}

impl ClientActionHandler<ChunkStateWitnessAckMessage> for ClientActions {
type Result = ();

fn handle(&mut self, msg: ChunkStateWitnessAckMessage) -> Self::Result {
self.client.process_chunk_state_witness_ack(msg.0);
}
}

impl ClientActionHandler<ChunkEndorsementMessage> for ClientActions {
type Result = ();

Expand Down
3 changes: 3 additions & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::sync::{Arc, RwLock};
use tokio::sync::broadcast;

use crate::client_actions::{ClientActionHandler, ClientActions, ClientSenderForClient};
use crate::stateless_validation::state_witness_actor::StateWitnessSenderForClient;
use crate::sync_jobs_actions::SyncJobsActions;
use crate::sync_jobs_actor::SyncJobsActor;
use crate::{metrics, Client, ConfigUpdater, SyncAdapter};
Expand Down Expand Up @@ -195,6 +196,7 @@ pub fn start_client(
sender: Option<broadcast::Sender<()>>,
adv: crate::adversarial::Controls,
config_updater: Option<ConfigUpdater>,
state_witness_adapter: StateWitnessSenderForClient,
) -> (Addr<ClientActor>, ArbiterHandle, ReshardingHandle) {
let client_arbiter = Arbiter::new();
let client_arbiter_handle = client_arbiter.handle();
Expand All @@ -215,6 +217,7 @@ pub fn start_client(
random_seed_from_thread(),
snapshot_callbacks,
Arc::new(RayonAsyncComputationSpawner),
state_witness_adapter,
)
.unwrap();
let resharding_handle = client.chain.resharding_handle.clone();
Expand Down
4 changes: 4 additions & 0 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub use near_network::client::{
BlockApproval, BlockResponse, ProcessTxRequest, ProcessTxResponse, SetNetworkInfo,
};
pub use stateless_validation::processing_tracker::{ProcessingDoneTracker, ProcessingDoneWaiter};
pub use stateless_validation::state_witness_actions::StateWitnessActions;
pub use stateless_validation::state_witness_actor::{
DistributeStateWitnessRequest, StateWitnessActor, StateWitnessSenderForClientMessage,
};

pub mod adapter;
pub mod adversarial;
Expand Down
2 changes: 2 additions & 0 deletions chain/client/src/stateless_validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ pub mod chunk_endorsement_tracker;
pub mod chunk_validator;
pub mod processing_tracker;
mod shadow_validate;
pub mod state_witness_actions;
pub mod state_witness_actor;
mod state_witness_producer;
pub mod state_witness_tracker;
118 changes: 118 additions & 0 deletions chain/client/src/stateless_validation/state_witness_actions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::sync::Arc;

use near_async::messaging::CanSend;
use near_async::time::Clock;
use near_chain::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest};
use near_primitives::stateless_validation::{
ChunkStateWitness, ChunkStateWitnessAck, EncodedChunkStateWitness,
SignedEncodedChunkStateWitness,
};
use near_primitives::validator_signer::ValidatorSigner;

use crate::metrics;

use super::state_witness_actor::DistributeStateWitnessRequest;
use super::state_witness_tracker::ChunkStateWitnessTracker;

pub struct StateWitnessActions {
/// Adapter to send messages to the network.
network_adapter: PeerManagerAdapter,
/// Validator signer to sign the state witness.
my_signer: Arc<dyn ValidatorSigner>,
/// Epoch manager to get the set of chunk validators
epoch_manager: Arc<dyn EpochManagerAdapter>,
/// Tracks a collection of state witnesses sent from chunk producers to chunk validators.
state_witness_tracker: ChunkStateWitnessTracker,
}

impl StateWitnessActions {
pub fn new(
clock: Clock,
network_adapter: PeerManagerAdapter,
my_signer: Arc<dyn ValidatorSigner>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
) -> Self {
Self {
network_adapter,
my_signer,
epoch_manager,
state_witness_tracker: ChunkStateWitnessTracker::new(clock),
}
}

pub fn handle_distribute_state_witness_request(
&mut self,
msg: DistributeStateWitnessRequest,
) -> Result<(), Error> {
let DistributeStateWitnessRequest { state_witness } = msg;

let signed_witness = create_signed_witness(&state_witness, self.my_signer.as_ref())?;

let mut chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
&state_witness.epoch_id,
state_witness.chunk_header.shard_id(),
state_witness.chunk_header.height_created(),
)?
.ordered_chunk_validators();

tracing::debug!(
target: "stateless_validation",
"Sending chunk state witness for chunk {:?} to chunk validators {:?}",
state_witness.chunk_header.chunk_hash(),
chunk_validators,
);

// Record the witness in order to match the incoming acks for measuring round-trip times.
// See process_chunk_state_witness_ack for the handling of the ack messages.
self.state_witness_tracker.record_witness_sent(
&state_witness,
signed_witness.witness_bytes.size_bytes(),
chunk_validators.len(),
);

// Remove ourselves from the list of chunk validators. Network can't send messages to ourselves.
chunk_validators.retain(|validator| validator != self.my_signer.validator_id());

self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitness(chunk_validators, signed_witness),
));

Ok(())
}

/// Handles the state witness ack message from the chunk validator.
/// It computes the round-trip time between sending the state witness and receiving
/// the ack message and updates the corresponding metric with it.
/// Currently we do not raise an error for handling of witness-ack messages,
/// as it is used only for tracking some networking metrics.
pub fn handle_chunk_state_witness_ack(&mut self, witness_ack: ChunkStateWitnessAck) -> () {
self.state_witness_tracker.on_witness_ack_received(witness_ack);
}
}

fn create_signed_witness(
witness: &ChunkStateWitness,
my_signer: &dyn ValidatorSigner,
) -> Result<SignedEncodedChunkStateWitness, Error> {
let shard_id_label = witness.chunk_header.shard_id().to_string();
let encode_timer = metrics::CHUNK_STATE_WITNESS_ENCODE_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
let (witness_bytes, raw_witness_size) = EncodedChunkStateWitness::encode(&witness)?;
encode_timer.observe_duration();
let signed_witness = SignedEncodedChunkStateWitness {
signature: my_signer.sign_chunk_state_witness(&witness_bytes),
witness_bytes,
};
metrics::CHUNK_STATE_WITNESS_TOTAL_SIZE
.with_label_values(&[shard_id_label.as_str()])
.observe(signed_witness.witness_bytes.size_bytes() as f64);
metrics::CHUNK_STATE_WITNESS_RAW_SIZE
.with_label_values(&[shard_id_label.as_str()])
.observe(raw_witness_size as f64);
Ok(signed_witness)
}
79 changes: 79 additions & 0 deletions chain/client/src/stateless_validation/state_witness_actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::sync::Arc;

use actix::Actor;
use near_async::messaging::Sender;
use near_async::time::Clock;
use near_async::{MultiSend, MultiSendMessage, MultiSenderFrom};
use near_epoch_manager::EpochManagerAdapter;
use near_network::state_witness::ChunkStateWitnessAckMessage;
use near_network::types::PeerManagerAdapter;
use near_o11y::{handler_debug_span, WithSpanContext};
use near_performance_metrics_macros::perf;
use near_primitives::stateless_validation::ChunkStateWitness;
use near_primitives::validator_signer::ValidatorSigner;

use super::state_witness_actions::StateWitnessActions;

pub struct StateWitnessActor {
pub actions: StateWitnessActions,
}

impl StateWitnessActor {
pub fn spawn(
clock: Clock,
network_adapter: PeerManagerAdapter,
my_signer: Arc<dyn ValidatorSigner>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
) -> (actix::Addr<Self>, actix::ArbiterHandle) {
let arbiter = actix::Arbiter::new().handle();
let addr = Self::start_in_arbiter(&arbiter, |_ctx| Self {
actions: StateWitnessActions::new(clock, network_adapter, my_signer, epoch_manager),
});
(addr, arbiter)
}
}

impl actix::Actor for StateWitnessActor {
type Context = actix::Context<Self>;
}

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct DistributeStateWitnessRequest {
pub state_witness: ChunkStateWitness,
}

#[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)]
#[multi_send_message_derive(Debug)]
pub struct StateWitnessSenderForClient {
pub distribute_chunk_state_witness: Sender<DistributeStateWitnessRequest>,
}

impl actix::Handler<WithSpanContext<DistributeStateWitnessRequest>> for StateWitnessActor {
type Result = ();

#[perf]
fn handle(
&mut self,
msg: WithSpanContext<DistributeStateWitnessRequest>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_debug_span!(target: "stateless_validation", msg);
if let Err(err) = self.actions.handle_distribute_state_witness_request(msg) {
tracing::error!(target: "stateless_validation", ?err, "Failed to handle distribute chunk state witness request");
}
}
}

impl actix::Handler<WithSpanContext<ChunkStateWitnessAckMessage>> for StateWitnessActor {
type Result = ();

fn handle(
&mut self,
msg: WithSpanContext<ChunkStateWitnessAckMessage>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_debug_span!(target: "stateless_validation", msg);
self.actions.handle_chunk_state_witness_ack(msg.0);
}
}
Loading
Loading