Skip to content

Commit

Permalink
feat: add caching for get_block (#346)
Browse files Browse the repository at this point in the history
* feat: cached provider

* fix: optimize hash fetching

* fix: rename provider

* fix: optimize

* fix: comment

* fix: clippy
  • Loading branch information
atanmarko authored Jul 8, 2024
1 parent 043ab68 commit 341c322
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uint = "0.9.5"
url = "2.5.2"
lru = "0.12.3"

# zero-bin related dependencies
ops = { path = "zero_bin/ops" }
Expand Down
1 change: 1 addition & 0 deletions mpt_trie/src/trie_hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ mod tests {
const NUM_INSERTS_FOR_ETH_TRIE_CRATE_MASSIVE_TEST: usize = 1000;
const NODES_PER_BRANCH_FOR_HASH_REPLACEMENT_TEST: usize = 200;

#[allow(dead_code)]
#[derive(Copy, Clone, Debug)]
struct U256Rlpable(U256);

Expand Down
12 changes: 7 additions & 5 deletions zero_bin/leader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ pub(crate) async fn client_main(
block_interval: BlockInterval,
mut params: ProofParams,
) -> Result<()> {
let cached_provider = rpc::provider::CachedProvider::new(build_http_retry_provider(
rpc_params.rpc_url.clone(),
rpc_params.backoff,
rpc_params.max_retries,
));

let prover_input = rpc::prover_input(
&build_http_retry_provider(
rpc_params.rpc_url,
rpc_params.backoff,
rpc_params.max_retries,
),
&cached_provider,
block_interval,
params.checkpoint_block_number.into(),
rpc_params.rpc_type,
Expand Down
1 change: 1 addition & 0 deletions zero_bin/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ futures = { workspace = true }
url = { workspace = true }
__compat_primitive_types = { workspace = true }
tower = { workspace = true, features = ["retry"] }
lru = { workspace = true }

# Local dependencies
compat = { workspace = true }
Expand Down
12 changes: 8 additions & 4 deletions zero_bin/rpc/src/jerigon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use trace_decoder::trace_protocol::{
};

use super::fetch_other_block_data;
use crate::provider::CachedProvider;

/// Transaction traces retrieved from Erigon zeroTracer.
#[derive(Debug, Deserialize)]
Expand All @@ -23,7 +24,7 @@ pub struct ZeroTxResult {
pub struct ZeroBlockWitness(TrieCompact);

pub async fn block_prover_input<ProviderT, TransportT>(
provider: ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
target_block_id: BlockId,
checkpoint_state_trie_root: B256,
) -> anyhow::Result<BlockProverInput>
Expand All @@ -32,20 +33,23 @@ where
TransportT: Transport + Clone,
{
// Grab trace information
let tx_results = provider
let tx_results = cached_provider
.as_provider()
.raw_request::<_, Vec<ZeroTxResult>>(
"debug_traceBlockByNumber".into(),
(target_block_id, json!({"tracer": "zeroTracer"})),
)
.await?;

// Grab block witness info (packed as combined trie pre-images)
let block_witness = provider
let block_witness = cached_provider
.as_provider()
.raw_request::<_, ZeroBlockWitness>("eth_getWitness".into(), vec![target_block_id])
.await?;

let other_data =
fetch_other_block_data(provider, target_block_id, checkpoint_state_trie_root).await?;
fetch_other_block_data(cached_provider, target_block_id, checkpoint_state_trie_root)
.await?;

// Assemble
Ok(BlockProverInput {
Expand Down
48 changes: 30 additions & 18 deletions zero_bin/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ use zero_bin_common::block_interval::BlockInterval;

pub mod jerigon;
pub mod native;
pub mod provider;
pub mod retry;

use crate::provider::CachedProvider;

const PREVIOUS_HASHES_COUNT: usize = 256;

/// The RPC type.
Expand All @@ -28,7 +31,7 @@ pub enum RpcType {

/// Obtain the prover input for a given block interval
pub async fn prover_input<ProviderT, TransportT>(
provider: &ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
block_interval: BlockInterval,
checkpoint_block_id: BlockId,
rpc_type: RpcType,
Expand All @@ -38,10 +41,9 @@ where
TransportT: Transport + Clone,
{
// Grab interval checkpoint block state trie
let checkpoint_state_trie_root = provider
let checkpoint_state_trie_root = cached_provider
.get_block(checkpoint_block_id, BlockTransactionsKind::Hashes)
.await?
.context("block does not exist")?
.header
.state_root;

Expand All @@ -52,10 +54,12 @@ where
let block_id = BlockId::Number(BlockNumberOrTag::Number(block_num));
let block_prover_input = match rpc_type {
RpcType::Jerigon => {
jerigon::block_prover_input(&provider, block_id, checkpoint_state_trie_root).await?
jerigon::block_prover_input(cached_provider, block_id, checkpoint_state_trie_root)
.await?
}
RpcType::Native => {
native::block_prover_input(&provider, block_id, checkpoint_state_trie_root).await?
native::block_prover_input(cached_provider, block_id, checkpoint_state_trie_root)
.await?
}
};

Expand All @@ -68,35 +72,43 @@ where

/// Fetches other block data
async fn fetch_other_block_data<ProviderT, TransportT>(
provider: ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
target_block_id: BlockId,
checkpoint_state_trie_root: B256,
) -> anyhow::Result<OtherBlockData>
where
ProviderT: Provider<TransportT>,
TransportT: Transport + Clone,
{
let target_block = provider
let target_block = cached_provider
.get_block(target_block_id, BlockTransactionsKind::Hashes)
.await?
.context("target block does not exist")?;
.await?;
let target_block_number = target_block
.header
.number
.context("target block is missing field `number`")?;
let chain_id = provider.get_chain_id().await?;
let chain_id = cached_provider.as_provider().get_chain_id().await?;

// For one block, we will fetch 128 previous blocks to get hashes instead of
// 256. But for two consecutive blocks (odd and even) we would fetch 256
// previous blocks in total. To overcome this, we add an offset so that we
// always start fetching from an odd index and eventually skip the additional
// block for an even `target_block_number`.
let odd_offset: i128 = target_block_number as i128 % 2;

let previous_block_numbers =
std::iter::successors(Some(target_block_number as i128 - 1), |&it| Some(it - 1))
.take(PREVIOUS_HASHES_COUNT)
.filter(|i| *i >= 0)
.collect::<Vec<_>>();
std::iter::successors(Some(target_block_number as i128 - 1 + odd_offset), |&it| {
Some(it - 1)
})
.take(PREVIOUS_HASHES_COUNT)
.filter(|i| *i >= 0)
.collect::<Vec<_>>();
let concurrency = previous_block_numbers.len();
let collected_hashes = futures::stream::iter(
previous_block_numbers
.chunks(2) // we get hash for previous and current block with one request
.map(|block_numbers| {
let provider = &provider;
let cached_provider = &cached_provider;
let block_num = &block_numbers[0];
let previos_block_num = if block_numbers.len() > 1 {
Some(block_numbers[1])
Expand All @@ -105,11 +117,10 @@ where
None
};
async move {
let block = provider
let block = cached_provider
.get_block((*block_num as u64).into(), BlockTransactionsKind::Hashes)
.await
.context("couldn't get block")?
.context("no such block")?;
.context("couldn't get block")?;
anyhow::Ok([
(block.header.hash, Some(*block_num)),
(Some(block.header.parent_hash), previos_block_num),
Expand All @@ -126,6 +137,7 @@ where
collected_hashes
.into_iter()
.flatten()
.skip(odd_offset as usize)
.for_each(|(hash, block_num)| {
if let (Some(hash), Some(block_num)) = (hash, block_num) {
// Most recent previous block hash is expected at the end of the array
Expand Down
9 changes: 8 additions & 1 deletion zero_bin/rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io;

use alloy::rpc::types::eth::BlockId;
use clap::{Parser, ValueHint};
use rpc::provider::CachedProvider;
use rpc::{retry::build_http_retry_provider, RpcType};
use tracing_subscriber::{prelude::*, EnvFilter};
use url::Url;
Expand Down Expand Up @@ -53,9 +54,15 @@ impl Cli {
checkpoint_block_number.unwrap_or((start_block - 1).into());
let block_interval = BlockInterval::Range(start_block..end_block + 1);

let cached_provider = CachedProvider::new(build_http_retry_provider(
rpc_url.clone(),
backoff,
max_retries,
));

// Retrieve prover input from the Erigon node
let prover_input = rpc::prover_input(
&build_http_retry_provider(rpc_url, backoff, max_retries),
&cached_provider,
block_interval,
checkpoint_block_number,
rpc_type,
Expand Down
21 changes: 11 additions & 10 deletions zero_bin/rpc/src/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ use alloy::{
rpc::types::eth::{BlockId, BlockTransactionsKind},
transports::Transport,
};
use anyhow::Context as _;
use futures::try_join;
use prover::BlockProverInput;
use trace_decoder::trace_protocol::BlockTrace;

use crate::provider::CachedProvider;

mod state;
mod txn;

type CodeDb = HashMap<__compat_primitive_types::H256, Vec<u8>>;

/// Fetches the prover input for the given BlockId.
pub async fn block_prover_input<ProviderT, TransportT>(
provider: &ProviderT,
provider: &CachedProvider<ProviderT, TransportT>,
block_number: BlockId,
checkpoint_state_trie_root: B256,
) -> anyhow::Result<BlockProverInput>
Expand All @@ -27,8 +28,8 @@ where
TransportT: Transport + Clone,
{
let (block_trace, other_data) = try_join!(
process_block_trace(&provider, block_number),
crate::fetch_other_block_data(&provider, block_number, checkpoint_state_trie_root,)
process_block_trace(provider, block_number),
crate::fetch_other_block_data(provider, block_number, checkpoint_state_trie_root,)
)?;

Ok(BlockProverInput {
Expand All @@ -39,20 +40,20 @@ where

/// Processes the block with the given block number and returns the block trace.
async fn process_block_trace<ProviderT, TransportT>(
provider: &ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
block_number: BlockId,
) -> anyhow::Result<BlockTrace>
where
ProviderT: Provider<TransportT>,
TransportT: Transport + Clone,
{
let block = provider
let block = cached_provider
.get_block(block_number, BlockTransactionsKind::Full)
.await?
.context("target block does not exist")?;
.await?;

let (code_db, txn_info) = txn::process_transactions(&block, provider).await?;
let trie_pre_images = state::process_state_witness(provider, block, &txn_info).await?;
let (code_db, txn_info) =
txn::process_transactions(&block, cached_provider.as_provider()).await?;
let trie_pre_images = state::process_state_witness(cached_provider, block, &txn_info).await?;

Ok(BlockTrace {
txn_info,
Expand Down
17 changes: 10 additions & 7 deletions zero_bin/rpc/src/native/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use trace_decoder::trace_protocol::{
SeparateTriePreImages, TrieDirect, TxnInfo,
};

use crate::provider::CachedProvider;
use crate::Compat;

/// Processes the state witness for the given block.
pub async fn process_state_witness<ProviderT, TransportT>(
provider: &ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
block: Block,
txn_infos: &[TxnInfo],
) -> anyhow::Result<BlockTraceTriePreImages>
Expand All @@ -32,15 +33,15 @@ where
.header
.number
.context("Block number not returned with block")?;
let prev_state_root = provider
let prev_state_root = cached_provider
.get_block((block_number - 1).into(), BlockTransactionsKind::Hashes)
.await?
.context("Failed to get previous block")?
.header
.state_root;

let (state, storage_proofs) =
generate_state_witness(prev_state_root, state_access, provider, block_number).await?;
generate_state_witness(prev_state_root, state_access, cached_provider, block_number)
.await?;

Ok(BlockTraceTriePreImages::Separate(SeparateTriePreImages {
state: SeparateTriePreImage::Direct(TrieDirect(state.build())),
Expand Down Expand Up @@ -97,7 +98,7 @@ pub fn process_states_access(
async fn generate_state_witness<ProviderT, TransportT>(
prev_state_root: B256,
accounts_state: HashMap<Address, HashSet<StorageKey>>,
provider: &ProviderT,
cached_provider: &CachedProvider<ProviderT, TransportT>,
block_number: u64,
) -> anyhow::Result<(
PartialTrieBuilder<HashedPartialTrie>,
Expand All @@ -111,7 +112,7 @@ where
let mut storage_proofs = HashMap::<B256, PartialTrieBuilder<HashedPartialTrie>>::new();

let (account_proofs, next_account_proofs) =
fetch_proof_data(accounts_state, provider, block_number).await?;
fetch_proof_data(accounts_state, cached_provider, block_number).await?;

// Insert account proofs
for (address, proof) in account_proofs.into_iter() {
Expand Down Expand Up @@ -146,7 +147,7 @@ where
/// Fetches the proof data for the given accounts and associated storage keys.
async fn fetch_proof_data<ProviderT, TransportT>(
accounts_state: HashMap<Address, HashSet<StorageKey>>,
provider: &ProviderT,
provider: &CachedProvider<ProviderT, TransportT>,
block_number: u64,
) -> anyhow::Result<(
Vec<(Address, EIP1186AccountProofResponse)>,
Expand All @@ -161,6 +162,7 @@ where
.into_iter()
.map(|(address, keys)| async move {
let proof = provider
.as_provider()
.get_proof(address, keys.into_iter().collect())
.block_id((block_number - 1).into())
.await
Expand All @@ -173,6 +175,7 @@ where
.into_iter()
.map(|(address, keys)| async move {
let proof = provider
.as_provider()
.get_proof(address, keys.into_iter().collect())
.block_id(block_number.into())
.await
Expand Down
2 changes: 1 addition & 1 deletion zero_bin/rpc/src/native/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ where
.as_transactions()
.context("No transactions in block")?
.iter()
.map(|tx| super::txn::process_transaction(provider, tx))
.map(|tx| process_transaction(provider, tx))
.collect::<FuturesOrdered<_>>()
.try_fold(
(HashMap::new(), Vec::new()),
Expand Down
Loading

0 comments on commit 341c322

Please sign in to comment.