Skip to content

Commit

Permalink
feat: add a metric which counts how many chunks couldn't fit all tran…
Browse files Browse the repository at this point in the history
…sactions from the pool (#10422)

When a chunk is produced, Client calls `prepare_transactions()`, which
fetches transactions from the transaction pool and adds them to the
chunk. `prepare_transactions()` adds transactions until some limit is
hit, a limit on gas, time, number of transactions, chunk size, etc.

Currently there's no way to know if some limit was hit and which limit
was it. Let's add a way to access this information, it's useful to know
what's the bottleneck of chunk production.

This PR adds a new metric which counts how many of the produced chunks
couldn't fit all transactions from the transaction pool due to hitting a
limit. It has two labels: `shard_id` - chunk's shard and `limited_by` -
which of the limits was hit in `prepare_transactions()`.

The need for this metric was discussed in
#10310 (comment). The
hope is that it'll allow us to figure out what's the bottleneck of chunk
production in scenarios of high chain congestion.
Right now in cases where `produce_chunk` takes a lot of time we can only
make theories why that's the case, but there's no observability into
what's going inside. This metric could help with that.

To test that the metric works I set up a mocknet network with a build
from this branch and put a load on it using locust.
I reduced `produce_chunk_add_transactions_time_limit` from 200ms to 2ms
to easily trigger the metric. It worked, as can be observed on the
grafana dashboard:
https://nearinc.grafana.net/d/eada7f01-b2dc-4df8-8a9f-ec4ec411159e/jancio-mocknet-stats?orgId=1&from=1705135200000&to=1705135800000
![Screenshot 2024-01-13 at 15 21
12](https://github.com/near/nearcore/assets/149345204/51ba6a04-c2ff-4773-a8d7-9cfbff8a7fa5)
  • Loading branch information
jancionear authored Jan 17, 2024
1 parent 0fcd21f commit 790d663
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 63 deletions.
6 changes: 3 additions & 3 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::ValidatorSchedule;
use crate::types::{
ApplyChunkBlockContext, ApplyChunkResult, ApplyChunkShardContext, ApplyResultForResharding,
RuntimeAdapter, RuntimeStorageConfig,
PreparedTransactions, RuntimeAdapter, RuntimeStorageConfig,
};
use crate::BlockHeader;
use borsh::{BorshDeserialize, BorshSerialize};
Expand Down Expand Up @@ -1040,12 +1040,12 @@ impl RuntimeAdapter for KeyValueRuntime {
_chain_validate: &mut dyn FnMut(&SignedTransaction) -> bool,
_current_protocol_version: ProtocolVersion,
_time_limit: Option<Duration>,
) -> Result<Vec<SignedTransaction>, Error> {
) -> Result<PreparedTransactions, Error> {
let mut res = vec![];
while let Some(iter) = transactions.next() {
res.push(iter.next().unwrap());
}
Ok(res)
Ok(PreparedTransactions { transactions: res, limited_by: None })
}

fn apply_chunk(
Expand Down
23 changes: 22 additions & 1 deletion chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,27 @@ pub struct ApplyChunkShardContext<'a> {
pub is_first_block_with_chunk_of_version: bool,
}

/// Contains transactions that were fetched from the transaction pool
/// and prepared for adding them to a new chunk that is being produced.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PreparedTransactions {
/// Prepared transactions
pub transactions: Vec<SignedTransaction>,
/// Describes which limit was hit when preparing the transactions.
pub limited_by: Option<PrepareTransactionsLimit>,
}

/// Chunk producer prepares transactions from the transaction pool
/// until it hits some limit (too many transactions, too much gas used, etc).
/// This enum describes which limit was hit when preparing transactions.
#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::AsRefStr)]
pub enum PrepareTransactionsLimit {
Gas,
Size,
Time,
ReceiptCount,
}

/// Bridge between the chain and the runtime.
/// Main function is to update state given transactions.
/// Additionally handles validators.
Expand Down Expand Up @@ -383,7 +404,7 @@ pub trait RuntimeAdapter: Send + Sync {
chain_validate: &mut dyn FnMut(&SignedTransaction) -> bool,
current_protocol_version: ProtocolVersion,
time_limit: Option<Duration>,
) -> Result<Vec<SignedTransaction>, Error>;
) -> Result<PreparedTransactions, Error>;

/// Returns true if the shard layout will change in the next epoch
/// Current epoch is the epoch of the block after `parent_hash`
Expand Down
99 changes: 55 additions & 44 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use near_chain::orphan::OrphanMissingChunks;
use near_chain::resharding::ReshardingRequest;
use near_chain::state_snapshot_actor::SnapshotCallbacks;
use near_chain::test_utils::format_hash;
use near_chain::types::RuntimeAdapter;
use near_chain::types::{ChainConfig, LatestKnown};
use near_chain::types::{PreparedTransactions, RuntimeAdapter};
use near_chain::{
BlockProcessingArtifact, BlockStatus, Chain, ChainGenesis, ChainStoreAccess,
DoneApplyChunkCallback, Doomslug, DoomslugThresholdMode, Provenance,
Expand Down Expand Up @@ -854,20 +854,23 @@ impl Client {
.map_err(|err| Error::ChunkProducer(format!("No chunk extra available: {}", err)))?;

let prev_block_header = self.chain.get_block_header(&prev_block_hash)?;
let transactions = self.prepare_transactions(
let prepared_transactions = self.prepare_transactions(
shard_uid,
chunk_extra.gas_limit(),
*chunk_extra.state_root(),
&prev_block_header,
)?;
#[cfg(feature = "test_features")]
let transactions = Self::maybe_insert_invalid_transaction(
transactions,
prev_block_hash,
self.produce_invalid_tx_in_chunks,
);
let num_filtered_transactions = transactions.len();
let (tx_root, _) = merklize(&transactions);
let prepared_transactions = PreparedTransactions {
transactions: Self::maybe_insert_invalid_transaction(
prepared_transactions.transactions,
prev_block_hash,
self.produce_invalid_tx_in_chunks,
),
limited_by: prepared_transactions.limited_by,
};
let num_filtered_transactions = prepared_transactions.transactions.len();
let (tx_root, _) = merklize(&prepared_transactions.transactions);
let outgoing_receipts = self.chain.get_outgoing_receipts_for_shard(
prev_block_hash,
shard_id,
Expand All @@ -889,7 +892,7 @@ impl Client {
chunk_extra.gas_limit(),
chunk_extra.balance_burnt(),
chunk_extra.validator_proposals().collect(),
transactions,
prepared_transactions.transactions,
&outgoing_receipts,
outgoing_receipts_root,
tx_root,
Expand All @@ -914,6 +917,12 @@ impl Client {
chunk_production_duration_millis: Some(timer.elapsed().as_millis() as u64),
},
);
if let Some(limit) = prepared_transactions.limited_by {
// When some transactions from the pool didn't fit into the chunk due to a limit, it's reported in a metric.
metrics::PRODUCED_CHUNKS_SOME_POOL_TRANSACTIONS_DIDNT_FIT
.with_label_values(&[&shard_id.to_string(), limit.as_ref()])
.inc();
}

Ok(Some((encoded_chunk, merkle_paths, outgoing_receipts)))
}
Expand Down Expand Up @@ -968,49 +977,51 @@ impl Client {
gas_limit: Gas,
state_root: StateRoot,
prev_block_header: &BlockHeader,
) -> Result<Vec<SignedTransaction>, Error> {
) -> Result<PreparedTransactions, Error> {
let Self { chain, sharded_tx_pool, epoch_manager, runtime_adapter: runtime, .. } = self;

let shard_id = shard_uid.shard_id as ShardId;
let next_epoch_id = epoch_manager.get_epoch_id_from_prev_block(prev_block_header.hash())?;
let protocol_version = epoch_manager.get_epoch_protocol_version(&next_epoch_id)?;

let transactions = if let Some(mut iter) = sharded_tx_pool.get_pool_iterator(shard_uid) {
let transaction_validity_period = chain.transaction_validity_period;
runtime.prepare_transactions(
prev_block_header.next_gas_price(),
gas_limit,
&next_epoch_id,
shard_id,
state_root,
// while the height of the next block that includes the chunk might not be prev_height + 1,
// passing it will result in a more conservative check and will not accidentally allow
// invalid transactions to be included.
prev_block_header.height() + 1,
&mut iter,
&mut |tx: &SignedTransaction| -> bool {
chain
.chain_store()
.check_transaction_validity_period(
prev_block_header,
&tx.transaction.block_hash,
transaction_validity_period,
)
.is_ok()
},
protocol_version,
self.config.produce_chunk_add_transactions_time_limit.get(),
)?
} else {
vec![]
};
let prepared_transactions =
if let Some(mut iter) = sharded_tx_pool.get_pool_iterator(shard_uid) {
let transaction_validity_period = chain.transaction_validity_period;
runtime.prepare_transactions(
prev_block_header.next_gas_price(),
gas_limit,
&next_epoch_id,
shard_id,
state_root,
// while the height of the next block that includes the chunk might not be prev_height + 1,
// passing it will result in a more conservative check and will not accidentally allow
// invalid transactions to be included.
prev_block_header.height() + 1,
&mut iter,
&mut |tx: &SignedTransaction| -> bool {
chain
.chain_store()
.check_transaction_validity_period(
prev_block_header,
&tx.transaction.block_hash,
transaction_validity_period,
)
.is_ok()
},
protocol_version,
self.config.produce_chunk_add_transactions_time_limit.get(),
)?
} else {
PreparedTransactions { transactions: Vec::new(), limited_by: None }
};
// Reintroduce valid transactions back to the pool. They will be removed when the chunk is
// included into the block.
let reintroduced_count = sharded_tx_pool.reintroduce_transactions(shard_uid, &transactions);
if reintroduced_count < transactions.len() {
debug!(target: "client", reintroduced_count, num_tx = transactions.len(), "Reintroduced transactions");
let reintroduced_count = sharded_tx_pool
.reintroduce_transactions(shard_uid, &prepared_transactions.transactions);
if reintroduced_count < prepared_transactions.transactions.len() {
debug!(target: "client", reintroduced_count, num_tx = prepared_transactions.transactions.len(), "Reintroduced transactions");
}
Ok(transactions)
Ok(prepared_transactions)
}

pub fn send_challenges(&mut self, challenges: Vec<ChallengeBody>) {
Expand Down
12 changes: 12 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ pub(crate) static CHUNK_PRODUCED_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});

pub(crate) static PRODUCED_CHUNKS_SOME_POOL_TRANSACTIONS_DIDNT_FIT: Lazy<IntCounterVec> = Lazy::new(
|| {
try_create_int_counter_vec(
"near_produced_chunks_some_pool_transactions_didnt_fit",
"Total number of produced chunks where some transactions from the pool didn't fit in the chunk \
(since starting this node). The limited_by label specifies which limit was hit.",
&["shard_id", "limited_by"],
)
.unwrap()
},
);

pub(crate) static IS_VALIDATOR: Lazy<IntGauge> = Lazy::new(|| {
try_create_int_gauge(
"near_is_validator",
Expand Down
43 changes: 28 additions & 15 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use borsh::BorshDeserialize;
use errors::FromStateViewerErrors;
use near_chain::types::{
ApplyChunkBlockContext, ApplyChunkResult, ApplyChunkShardContext, ApplyResultForResharding,
RuntimeAdapter, RuntimeStorageConfig, StorageDataSource, Tip,
PrepareTransactionsLimit, PreparedTransactions, RuntimeAdapter, RuntimeStorageConfig,
StorageDataSource, Tip,
};
use near_chain::Error;
use near_chain_configs::{
Expand Down Expand Up @@ -717,12 +718,8 @@ impl RuntimeAdapter for NightshadeRuntime {
chain_validate: &mut dyn FnMut(&SignedTransaction) -> bool,
current_protocol_version: ProtocolVersion,
time_limit: Option<Duration>,
) -> Result<Vec<SignedTransaction>, Error> {
) -> Result<PreparedTransactions, Error> {
let start_time = std::time::Instant::now();
let time_limit_reached = || match time_limit {
Some(limit_duration) => start_time.elapsed() >= limit_duration,
None => false,
};
let shard_uid = self.get_shard_uid_from_epoch_id(shard_id, epoch_id)?;
let mut state_update = self.tries.new_trie_update(shard_uid, state_root);

Expand All @@ -731,7 +728,7 @@ impl RuntimeAdapter for NightshadeRuntime {
let mut total_size = 0u64;
// TODO: Update gas limit for transactions
let transactions_gas_limit = gas_limit / 2;
let mut transactions = vec![];
let mut result = PreparedTransactions { transactions: Vec::new(), limited_by: None };
let mut num_checked_transactions = 0;

let runtime_config = self.runtime_config_store.get_config(current_protocol_version);
Expand Down Expand Up @@ -771,11 +768,27 @@ impl RuntimeAdapter for NightshadeRuntime {
/ (runtime_config.wasm_config.ext_costs.gas_cost(ExtCosts::storage_write_value_byte)
+ runtime_config.wasm_config.ext_costs.gas_cost(ExtCosts::storage_read_value_byte));

while total_gas_burnt < transactions_gas_limit
&& total_size < size_limit
&& transactions.len() < new_receipt_count_limit
&& !time_limit_reached()
{
// Add new transactions to the result until some limit is hit or the transactions run out.
loop {
if total_gas_burnt >= transactions_gas_limit {
result.limited_by = Some(PrepareTransactionsLimit::Gas);
break;
}
if total_size >= size_limit {
result.limited_by = Some(PrepareTransactionsLimit::Size);
break;
}
if result.transactions.len() >= new_receipt_count_limit {
result.limited_by = Some(PrepareTransactionsLimit::ReceiptCount);
break;
}
if let Some(time_limit) = &time_limit {
if start_time.elapsed() >= *time_limit {
result.limited_by = Some(PrepareTransactionsLimit::Time);
break;
}
}

if let Some(iter) = pool_iterator.next() {
while let Some(tx) = iter.next() {
num_checked_transactions += 1;
Expand All @@ -800,7 +813,7 @@ impl RuntimeAdapter for NightshadeRuntime {
state_update.commit(StateChangeCause::NotWritableToDisk);
total_gas_burnt += verification_result.gas_burnt;
total_size += tx.get_size();
transactions.push(tx);
result.transactions.push(tx);
break;
}
Err(RuntimeError::InvalidTxError(err)) => {
Expand All @@ -818,11 +831,11 @@ impl RuntimeAdapter for NightshadeRuntime {
break;
}
}
debug!(target: "runtime", "Transaction filtering results {} valid out of {} pulled from the pool", transactions.len(), num_checked_transactions);
debug!(target: "runtime", "Transaction filtering results {} valid out of {} pulled from the pool", result.transactions.len(), num_checked_transactions);
metrics::PREPARE_TX_SIZE
.with_label_values(&[&shard_id.to_string()])
.observe(total_size as f64);
Ok(transactions)
Ok(result)
}

fn get_gc_stop_height(&self, block_hash: &CryptoHash) -> BlockHeight {
Expand Down

0 comments on commit 790d663

Please sign in to comment.