Skip to content

Commit

Permalink
[stateless_validation] Push get_chunk_validator_assignments call to S…
Browse files Browse the repository at this point in the history
…tateWitnessActor (#11090)

Part 4

Minor change, this PR adds the epoch_manager to StateWitnessActor which
would later be used to get the set of validators to send the parts to.

This probably completes the setup for StateWitnessActor after which we
can start adding in the encoding details.

NOTE: I couldn't create stacked PRs. Please review [these
commits](2c201fa)
only.
  • Loading branch information
Shreyan Gupta authored Apr 18, 2024
1 parent 0542c38 commit 2bbde59
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 27 deletions.
19 changes: 18 additions & 1 deletion chain/client/src/stateless_validation/state_witness_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use near_async::messaging::CanSend;
use near_async::time::Clock;
use near_chain::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest};
use near_primitives::stateless_validation::{
ChunkStateWitness, ChunkStateWitnessAck, EncodedChunkStateWitness,
Expand All @@ -20,6 +21,8 @@ pub struct StateWitnessActions {
network_adapter: PeerManagerAdapter,
/// Validator signer to sign the state witness.
my_signer: Arc<dyn ValidatorSigner>,
/// Epoch manager to get the set of chunk validators
epoch_manager: Arc<dyn EpochManagerAdapter>,
/// Tracks a collection of state witnesses sent from chunk producers to chunk validators.
state_witness_tracker: ChunkStateWitnessTracker,
}
Expand All @@ -29,10 +32,12 @@ impl StateWitnessActions {
clock: Clock,
network_adapter: PeerManagerAdapter,
my_signer: Arc<dyn ValidatorSigner>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
) -> Self {
Self {
network_adapter,
my_signer,
epoch_manager,
state_witness_tracker: ChunkStateWitnessTracker::new(clock),
}
}
Expand All @@ -41,10 +46,19 @@ impl StateWitnessActions {
&mut self,
msg: DistributeStateWitnessRequest,
) -> Result<(), Error> {
let DistributeStateWitnessRequest { chunk_validators, state_witness } = msg;
let DistributeStateWitnessRequest { state_witness } = msg;

let signed_witness = create_signed_witness(&state_witness, self.my_signer.as_ref())?;

let mut chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
&state_witness.epoch_id,
state_witness.chunk_header.shard_id(),
state_witness.chunk_header.height_created(),
)?
.ordered_chunk_validators();

tracing::debug!(
target: "stateless_validation",
"Sending chunk state witness for chunk {:?} to chunk validators {:?}",
Expand All @@ -60,6 +74,9 @@ impl StateWitnessActions {
chunk_validators.len(),
);

// Remove ourselves from the list of chunk validators. Network can't send messages to ourselves.
chunk_validators.retain(|validator| validator != self.my_signer.validator_id());

self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitness(chunk_validators, signed_witness),
));
Expand Down
6 changes: 3 additions & 3 deletions chain/client/src/stateless_validation/state_witness_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use actix::Actor;
use near_async::messaging::Sender;
use near_async::time::Clock;
use near_async::{MultiSend, MultiSendMessage, MultiSenderFrom};
use near_epoch_manager::EpochManagerAdapter;
use near_network::state_witness::ChunkStateWitnessAckMessage;
use near_network::types::PeerManagerAdapter;
use near_o11y::{handler_debug_span, WithSpanContext};
use near_performance_metrics_macros::perf;
use near_primitives::stateless_validation::ChunkStateWitness;
use near_primitives::types::AccountId;
use near_primitives::validator_signer::ValidatorSigner;

use super::state_witness_actions::StateWitnessActions;
Expand All @@ -23,10 +23,11 @@ impl StateWitnessActor {
clock: Clock,
network_adapter: PeerManagerAdapter,
my_signer: Arc<dyn ValidatorSigner>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
) -> (actix::Addr<Self>, actix::ArbiterHandle) {
let arbiter = actix::Arbiter::new().handle();
let addr = Self::start_in_arbiter(&arbiter, |_ctx| Self {
actions: StateWitnessActions::new(clock, network_adapter, my_signer),
actions: StateWitnessActions::new(clock, network_adapter, my_signer, epoch_manager),
});
(addr, arbiter)
}
Expand All @@ -39,7 +40,6 @@ impl actix::Actor for StateWitnessActor {
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct DistributeStateWitnessRequest {
pub chunk_validators: Vec<AccountId>,
pub state_witness: ChunkStateWitness,
}

Expand Down
24 changes: 9 additions & 15 deletions chain/client/src/stateless_validation/state_witness_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,6 @@ impl Client {
return Ok(());
}

let chunk_header = chunk.cloned_header();
let mut chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(
epoch_id,
chunk_header.shard_id(),
chunk_header.height_created(),
)?
.ordered_chunk_validators();

let my_signer = self.validator_signer.as_ref().ok_or(Error::NotAValidator)?.clone();
let state_witness = self.create_state_witness(
my_signer.validator_id().clone(),
Expand All @@ -53,7 +43,14 @@ impl Client {
transactions_storage_proof,
)?;

if chunk_validators.contains(my_signer.validator_id()) {
let chunk_header = chunk.cloned_header();
let shard_id = chunk_header.shard_id();
let height = chunk_header.height_created();
if self
.epoch_manager
.get_chunk_validator_assignments(epoch_id, shard_id, height)?
.contains(my_signer.validator_id())
{
// Bypass state witness validation if we created state witness. Endorse the chunk immediately.
send_chunk_endorsement_to_block_producers(
&chunk_header,
Expand All @@ -63,11 +60,8 @@ impl Client {
self.chunk_endorsement_tracker.as_ref(),
);
}
// Remove ourselves from the list of chunk validators. Network can't send messages to ourselves.
chunk_validators.retain(|validator| validator != my_signer.validator_id());

self.state_witness_adapter
.send(DistributeStateWitnessRequest { chunk_validators, state_witness });
self.state_witness_adapter.send(DistributeStateWitnessRequest { state_witness });
Ok(())
}

Expand Down
8 changes: 6 additions & 2 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,12 @@ pub fn setup(
let state_sync_adapter =
Arc::new(RwLock::new(SyncAdapter::new(noop().into_sender(), noop().into_sender())));

let (state_witness_addr, _) =
StateWitnessActor::spawn(clock.clone(), network_adapter.clone(), signer.clone());
let (state_witness_addr, _) = StateWitnessActor::spawn(
clock.clone(),
network_adapter.clone(),
signer.clone(),
epoch_manager.clone(),
);
let state_witness_adapter = state_witness_addr.with_auto_span_context();

let client = Client::new(
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/test_utils/test_env_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ impl TestEnvBuilder {
clock.clone(),
network_adapters[i].clone().as_multi_sender(),
validator_signer.clone(),
epoch_manager.clone().into_adapter(),
));
setup_client_with_runtime(
clock.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ fn test_client_with_multi_test_loop() {
let shards_manager = ShardsManager::new(
builder.clock(),
Some(accounts[idx].clone()),
epoch_manager,
epoch_manager.clone(),
shard_tracker,
builder.sender().for_index(idx).into_sender(),
builder.sender().for_index(idx).into_sender(),
Expand Down Expand Up @@ -322,6 +322,7 @@ fn test_client_with_multi_test_loop() {
builder.clock(),
builder.sender().for_index(idx).into_multi_sender(),
validator_signer,
epoch_manager,
);

let data = TestData {
Expand Down
10 changes: 7 additions & 3 deletions integration-tests/src/tests/network/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,20 @@ fn setup_network_node(
adv,
);
let (shards_manager_actor, _) = start_shards_manager(
epoch_manager,
epoch_manager.clone(),
shard_tracker,
network_adapter.as_sender(),
client_actor.clone().with_auto_span_context().into_sender(),
Some(signer.validator_id().clone()),
runtime.store().clone(),
client_config.chunk_request_retry_period,
);
let (state_witness_actor, _) =
StateWitnessActor::spawn(Clock::real(), network_adapter.as_multi_sender(), signer);
let (state_witness_actor, _) = StateWitnessActor::spawn(
Clock::real(),
network_adapter.as_multi_sender(),
signer,
epoch_manager,
);
shards_manager_adapter.bind(shards_manager_actor.with_auto_span_context());
let peer_manager = PeerManagerActor::spawn(
time::Clock::real(),
Expand Down
8 changes: 6 additions & 2 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,12 @@ pub fn start_with_config_and_synchronization(

let (state_witness_actor, state_witness_arbiter) = if config.validator_signer.is_some() {
let my_signer = config.validator_signer.clone().unwrap();
let (state_witness_actor, state_witness_arbiter) =
StateWitnessActor::spawn(Clock::real(), network_adapter.as_multi_sender(), my_signer);
let (state_witness_actor, state_witness_arbiter) = StateWitnessActor::spawn(
Clock::real(),
network_adapter.as_multi_sender(),
my_signer,
epoch_manager.clone(),
);
(Some(state_witness_actor), Some(state_witness_arbiter))
} else {
(None, None)
Expand Down

0 comments on commit 2bbde59

Please sign in to comment.