Skip to content

Commit

Permalink
refactor: make RuntimeExt: Send (near#11634)
Browse files Browse the repository at this point in the history
In a world where we have pipelined compilation, instantiation and
execution, `VMLogic` will have to move between threads, which requires
that it becomes `Send`. It in turn has required some other types to
become not only `Send` but also `Sync` due to them currently being
stored as a `&` reference (which allows for multiple copies, there are
better places to explain why `Sync` becomes necessary here...)

I'm not sure if all of these types will continue requiring `Sync`. In
particular `TrieUpdate` that's stored in `RuntimeExt` is now by
reference, but I eventually want to also make `VMLogic: 'static`, which
would require finding some owning pointer structure that would work for
`TrieUpdate`... Or I might be able to use scoped threads... in which
case we're looking at `Sync` anyway...

I think the changes here are largely straightforward enough, but overall
things are shaping to be pretty involved, eh?

Part of near#11319
  • Loading branch information
nagisa authored Jun 21, 2024
1 parent 5ef1ac3 commit b6f6288
Show file tree
Hide file tree
Showing 70 changed files with 300 additions and 358 deletions.
4 changes: 2 additions & 2 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ check-cargo-fmt:
cargo fmt -- --check

# check clippy lints
check-cargo-clippy:
check-cargo-clippy *FLAGS:
CARGO_TARGET_DIR="target/clippy" \
RUSTFLAGS="-D warnings" \
cargo clippy --all-features --all-targets --locked
cargo clippy --all-features --all-targets --locked {{ FLAGS }}

# check cargo deny lints
check-cargo-deny:
Expand Down
12 changes: 6 additions & 6 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,8 +988,8 @@ impl Chain {
if header.next_bp_hash()
!= &Chain::compute_bp_hash(
self.epoch_manager.as_ref(),
header.next_epoch_id().clone(),
header.epoch_id().clone(),
*header.next_epoch_id(),
*header.epoch_id(),
header.prev_hash(),
)?
{
Expand Down Expand Up @@ -2080,7 +2080,7 @@ impl Chain {
// Check that we know the epoch of the block before we try to get the header
// (so that a block from unknown epoch doesn't get marked as an orphan)
if !self.epoch_manager.epoch_exists(header.epoch_id()) {
return Err(Error::EpochOutOfBounds(header.epoch_id().clone()));
return Err(Error::EpochOutOfBounds(*header.epoch_id()));
}

if block.chunks().len() != self.epoch_manager.shard_ids(header.epoch_id())?.len() {
Expand Down Expand Up @@ -2857,7 +2857,7 @@ impl Chain {
num_parts: u64,
state_parts_task_scheduler: &near_async::messaging::Sender<ApplyStatePartsRequest>,
) -> Result<(), Error> {
let epoch_id = self.get_block_header(&sync_hash)?.epoch_id().clone();
let epoch_id = *self.get_block_header(&sync_hash)?.epoch_id();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
Expand Down Expand Up @@ -4267,13 +4267,13 @@ impl Chain {
shard_id: ShardId,
) -> Result<Option<(CryptoHash, ShardId)>, Error> {
let mut block_hash = *block_hash;
let mut epoch_id = self.get_block_header(&block_hash)?.epoch_id().clone();
let mut epoch_id = *self.get_block_header(&block_hash)?.epoch_id();
let mut shard_layout = self.epoch_manager.get_shard_layout(&epoch_id)?;
// this corrects all the shard where the original shard will split to if sharding changes
let mut shard_ids = vec![shard_id];

while let Ok(next_block_hash) = self.chain_store.get_next_block_hash(&block_hash) {
let next_epoch_id = self.get_block_header(&next_block_hash)?.epoch_id().clone();
let next_epoch_id = *self.get_block_header(&next_block_hash)?.epoch_id();
if next_epoch_id != epoch_id {
let next_shard_layout = self.epoch_manager.get_shard_layout(&next_epoch_id)?;
if next_shard_layout != shard_layout {
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/chain_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ impl<'a> ChainUpdate<'a> {
// is also just height, so the very first block to cross the epoch end is guaranteed
// to be the head of the chain, and result in the light client block produced.
let prev = self.chain_store_update.get_previous_header(block.header())?;
let prev_epoch_id = prev.epoch_id().clone();
let prev_epoch_id = *prev.epoch_id();
if block.header().epoch_id() != &prev_epoch_id {
if prev.last_final_block() != &CryptoHash::default() {
let light_client_block = self.create_light_client_block(&prev)?;
Expand Down
5 changes: 2 additions & 3 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use near_store::flat::{
use near_store::Store;
use near_store::{Trie, TrieDBStorage, TrieTraversalItem};
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tracing::{debug, info};
Expand Down Expand Up @@ -97,7 +96,7 @@ impl FlatStorageShardCreator {
result_sender: Sender<u64>,
) {
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
let trie = Trie::new(Rc::new(trie_storage), state_root, None);
let trie = Trie::new(Arc::new(trie_storage), state_root, None);
let path_begin = trie.find_state_part_boundary(part_id.idx, part_id.total).unwrap();
let path_end = trie.find_state_part_boundary(part_id.idx + 1, part_id.total).unwrap();
let hex_path_begin = Self::nibbles_to_hex(&path_begin);
Expand Down Expand Up @@ -199,7 +198,7 @@ impl FlatStorageShardCreator {
let trie_storage = TrieDBStorage::new(store, shard_uid);
let state_root =
*chain_store.get_chunk_extra(&block_hash, &shard_uid)?.state_root();
let trie = Trie::new(Rc::new(trie_storage), state_root, None);
let trie = Trie::new(Arc::new(trie_storage), state_root, None);
let root_node = trie.retrieve_root_node().unwrap();
let num_state_parts =
root_node.memory_usage / STATE_PART_MEMORY_LIMIT.as_u64() + 1;
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl ChainStore {
if gc_stop_height > head.height {
return Err(Error::GCError("gc_stop_height cannot be larger than head.height".into()));
}
let prev_epoch_id = self.get_block_header(&head.prev_block_hash)?.epoch_id().clone();
let prev_epoch_id = *self.get_block_header(&head.prev_block_hash)?.epoch_id();
let epoch_change = prev_epoch_id != head.epoch_id;
let mut fork_tail = self.fork_tail()?;
metrics::TAIL_HEIGHT.set(tail as i64);
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@ impl node_runtime::adapter::ViewRuntimeAdapter for NightshadeRuntime {
block_height: height,
prev_block_hash: *prev_block_hash,
block_hash: *block_hash,
epoch_id: epoch_id.clone(),
epoch_id: *epoch_id,
epoch_height,
block_timestamp,
current_protocol_version,
Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ fn test_get_validator_info() {
expected_blocks: &mut [u64; 2],
expected_chunks: &mut [u64; 2],
expected_endorsements: &mut [u64; 2]| {
let epoch_id = env.head.epoch_id.clone();
let epoch_id = env.head.epoch_id;
let height = env.head.height;
let em = env.runtime.epoch_manager.read();
let bp = em.get_block_producer_info(&epoch_id, height).unwrap();
Expand Down Expand Up @@ -860,7 +860,7 @@ fn test_get_validator_info() {
);
assert!(env
.epoch_manager
.get_validator_info(ValidatorInfoIdentifier::EpochId(env.head.epoch_id.clone()))
.get_validator_info(ValidatorInfoIdentifier::EpochId(env.head.epoch_id))
.is_err());
env.step_default(vec![]);
update_validator_stats(
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/store/latest_witnesses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl ChainStore {
let key = LatestWitnessesKey {
height: witness.chunk_header.height_created(),
shard_id: witness.chunk_header.shard_id(),
epoch_id: witness.epoch_id.clone(),
epoch_id: witness.epoch_id,
witness_size: serialized_witness_size,
random_uuid,
};
Expand Down
8 changes: 4 additions & 4 deletions chain/chain/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ pub trait ChainStoreAccess {
.get(shard_id as usize)
.ok_or_else(|| Error::InvalidShardId(shard_id as ShardId))?
{
break Ok(block_header.epoch_id().clone());
break Ok(*block_header.epoch_id());
}
candidate_hash = *block_header.prev_hash();
shard_id = epoch_manager.get_prev_shard_ids(&candidate_hash, vec![shard_id])?[0];
Expand Down Expand Up @@ -2237,8 +2237,8 @@ impl<'a> ChainStoreUpdate<'a> {
height,
last_block_hash: *block_hash,
prev_block_hash: *header.prev_hash(),
epoch_id: header.epoch_id().clone(),
next_epoch_id: header.next_epoch_id().clone(),
epoch_id: *header.epoch_id(),
next_epoch_id: *header.next_epoch_id(),
};
chain_store_update.head = Some(tip.clone());
chain_store_update.tail = Some(height);
Expand Down Expand Up @@ -2361,7 +2361,7 @@ impl<'a> ChainStoreUpdate<'a> {
.get_all_block_hashes_by_height(block.header().height())?
.as_ref(),
);
map.entry(block.header().epoch_id().clone())
map.entry(*block.header().epoch_id())
.or_insert_with(|| HashSet::new())
.insert(*hash);
store_update.set_ser(
Expand Down
30 changes: 10 additions & 20 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,26 +268,16 @@ impl MockEpochManager {
None => 0,
Some(prev_valset) => prev_valset + 1,
};
(
prev_next_epoch.clone(),
EpochId(prev_hash),
new_valset,
prev_block_header.height() + 1,
)
(*prev_next_epoch, EpochId(prev_hash), new_valset, prev_block_header.height() + 1)
} else {
(
prev_epoch.unwrap().clone(),
prev_next_epoch.clone(),
prev_valset.unwrap(),
prev_epoch_start,
)
(*prev_epoch.unwrap(), *prev_next_epoch, prev_valset.unwrap(), prev_epoch_start)
};

hash_to_next_epoch.insert(prev_hash, next_epoch.clone());
hash_to_epoch.insert(prev_hash, epoch.clone());
hash_to_next_epoch.insert(prev_hash, next_epoch);
hash_to_epoch.insert(prev_hash, epoch);
hash_to_next_epoch_approvals_req.insert(prev_hash, needs_next_epoch_approvals);
hash_to_valset.insert(epoch.clone(), valset);
hash_to_valset.insert(next_epoch.clone(), valset + 1);
hash_to_valset.insert(epoch, valset);
hash_to_valset.insert(next_epoch, valset + 1);
epoch_start_map.insert(prev_hash, epoch_start);

Ok((epoch, valset as usize % self.validators_by_valset.len(), next_epoch))
Expand All @@ -309,7 +299,7 @@ impl MockEpochManager {
.read()
.unwrap()
.get(epoch_id)
.ok_or_else(|| EpochError::EpochOutOfBounds(epoch_id.clone()))? as usize
.ok_or_else(|| EpochError::EpochOutOfBounds(*epoch_id))? as usize
% self.validators_by_valset.len())
}

Expand Down Expand Up @@ -631,7 +621,7 @@ impl EpochManagerAdapter for MockEpochManager {
}
match (self.get_valset_for_epoch(epoch_id), self.get_valset_for_epoch(other_epoch_id)) {
(Ok(index1), Ok(index2)) => Ok(index1.cmp(&index2)),
_ => Err(EpochError::EpochOutOfBounds(epoch_id.clone())),
_ => Err(EpochError::EpochOutOfBounds(*epoch_id)),
}
}

Expand Down Expand Up @@ -773,7 +763,7 @@ impl EpochManagerAdapter for MockEpochManager {
return Ok((validator_stake.clone(), false));
}
}
Err(EpochError::NotAValidator(account_id.clone(), epoch_id.clone()))
Err(EpochError::NotAValidator(account_id.clone(), *epoch_id))
}

fn get_fisherman_by_account_id(
Expand All @@ -782,7 +772,7 @@ impl EpochManagerAdapter for MockEpochManager {
_last_known_block_hash: &CryptoHash,
account_id: &AccountId,
) -> Result<(ValidatorStake, bool), EpochError> {
Err(EpochError::NotAValidator(account_id.clone(), epoch_id.clone()))
Err(EpochError::NotAValidator(account_id.clone(), *epoch_id))
}

fn get_validator_info(
Expand Down
17 changes: 6 additions & 11 deletions chain/chain/src/tests/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn do_fork(
TestBlockBuilder::new(Clock::real(), &prev_block, signer.clone()).build()
} else {
let prev_hash = prev_block.hash();
let epoch_id = prev_block.header().next_epoch_id().clone();
let epoch_id = *prev_block.header().next_epoch_id();
if verbose {
println!(
"Creating block with new epoch id {:?} @{}",
Expand All @@ -67,8 +67,8 @@ fn do_fork(
}
let next_bp_hash = Chain::compute_bp_hash(
chain.epoch_manager.as_ref(),
next_epoch_id.clone(),
epoch_id.clone(),
next_epoch_id,
epoch_id,
&prev_hash,
)
.unwrap();
Expand Down Expand Up @@ -742,14 +742,9 @@ fn add_block(
TestBlockBuilder::new(Clock::real(), &prev_block, signer).height(height).build()
} else {
let prev_hash = prev_block.hash();
let epoch_id = prev_block.header().next_epoch_id().clone();
let next_bp_hash = Chain::compute_bp_hash(
epoch_manager,
next_epoch_id.clone(),
epoch_id.clone(),
&prev_hash,
)
.unwrap();
let epoch_id = *prev_block.header().next_epoch_id();
let next_bp_hash =
Chain::compute_bp_hash(epoch_manager, next_epoch_id, epoch_id, &prev_hash).unwrap();
TestBlockBuilder::new(Clock::real(), &prev_block, signer)
.height(height)
.epoch_id(epoch_id)
Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/tests/simple_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ fn build_chain_with_orphans() {
last_block.header().block_ordinal() + 1,
last_block.chunks().iter().cloned().collect(),
vec![vec![]; last_block.chunks().len()],
last_block.header().epoch_id().clone(),
last_block.header().next_epoch_id().clone(),
*last_block.header().epoch_id(),
*last_block.header().next_epoch_id(),
None,
vec![],
Ratio::from_integer(0),
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/chunk_distribution_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub fn request_missing_chunks<C>(
client.clone(),
chunk,
shards_manager_adapter,
epoch_id.clone(),
epoch_id,
ancestor_hash,
);
}
Expand Down
7 changes: 3 additions & 4 deletions chain/client/src/chunk_inclusion_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,8 @@ impl ChunkInclusionTracker {
}

fn is_banned(&self, epoch_id: &EpochId, chunk_info: &ChunkInfo) -> bool {
let banned = self
.banned_chunk_producers
.contains(&(epoch_id.clone(), chunk_info.chunk_producer.clone()));
let banned =
self.banned_chunk_producers.contains(&(*epoch_id, chunk_info.chunk_producer.clone()));
if banned {
tracing::warn!(
target: "client",
Expand Down Expand Up @@ -186,7 +185,7 @@ impl ChunkInclusionTracker {
pub fn get_banned_chunk_producers(&self) -> Vec<(EpochId, Vec<AccountId>)> {
let mut banned_chunk_producers: HashMap<EpochId, Vec<_>> = HashMap::new();
for ((epoch_id, account_id), _) in self.banned_chunk_producers.iter() {
banned_chunk_producers.entry(epoch_id.clone()).or_default().push(account_id.clone());
banned_chunk_producers.entry(*epoch_id).or_default().push(account_id.clone());
}
banned_chunk_producers.into_iter().collect_vec()
}
Expand Down
14 changes: 7 additions & 7 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ impl Client {
let epoch_sync = EpochSync::new(
clock.clone(),
network_adapter.clone(),
genesis_block.header().epoch_id().clone(),
genesis_block.header().next_epoch_id().clone(),
*genesis_block.header().epoch_id(),
*genesis_block.header().next_epoch_id(),
epoch_manager
.get_epoch_block_producers_ordered(
genesis_block.header().epoch_id(),
Expand Down Expand Up @@ -597,7 +597,7 @@ impl Client {

let prev = self.chain.get_block_header(&prev_hash)?;
let prev_height = prev.height();
let prev_epoch_id = prev.epoch_id().clone();
let prev_epoch_id = *prev.epoch_id();
let prev_next_bp_hash = *prev.next_bp_hash();

// Check and update the doomslug tip here. This guarantees that our endorsement will be in the
Expand Down Expand Up @@ -699,7 +699,7 @@ impl Client {
Chain::compute_bp_hash(
self.epoch_manager.as_ref(),
next_epoch_id,
epoch_id.clone(),
epoch_id,
&prev_hash,
)?
} else {
Expand Down Expand Up @@ -2114,7 +2114,7 @@ impl Client {
&parent_hash,
account_id,
) {
Ok(_) => next_block_epoch_id.clone(),
Ok(_) => next_block_epoch_id,
Err(EpochError::NotAValidator(_, _)) => {
match self.epoch_manager.get_next_epoch_id_from_prev_block(&parent_hash) {
Ok(next_block_next_epoch_id) => next_block_next_epoch_id,
Expand Down Expand Up @@ -2486,7 +2486,7 @@ impl Client {
true,
),
shards_to_split,
BlocksCatchUpState::new(sync_hash, epoch_id.clone()),
BlocksCatchUpState::new(sync_hash, *epoch_id),
)
});

Expand Down Expand Up @@ -2761,7 +2761,7 @@ impl Client {
}
}
let account_keys = Arc::new(account_keys);
self.tier1_accounts_cache = Some((tip.epoch_id.clone(), account_keys.clone()));
self.tier1_accounts_cache = Some((tip.epoch_id, account_keys.clone()));
Ok(account_keys)
}

Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1690,7 +1690,7 @@ impl ClientActorInner {
let me = signer.as_ref().map(|x| x.validator_id().clone());
let block_header = self.client.chain.get_block_header(&sync_hash);
let block_header = unwrap_and_report_state_sync_result!(block_header);
let epoch_id = block_header.epoch_id().clone();
let epoch_id = *block_header.epoch_id();
let shards_to_sync = get_shards_cares_about_this_or_next_epoch(
me.as_ref(),
true,
Expand Down
Loading

0 comments on commit b6f6288

Please sign in to comment.