Skip to content

Commit

Permalink
fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pythonberg1997 committed Jul 2, 2024
1 parent 14641e6 commit ee9614f
Show file tree
Hide file tree
Showing 30 changed files with 641 additions and 193 deletions.
18 changes: 15 additions & 3 deletions bin/reth/src/commands/db/get.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use crate::utils::DbTool;
use clap::Parser;
use reth_db::{
static_file::{ColumnSelectorOne, ColumnSelectorTwo, HeaderMask, ReceiptMask, TransactionMask},
tables, RawKey, RawTable, Receipts, TableViewer, Transactions,
static_file::{
ColumnSelectorOne, ColumnSelectorTwo, HeaderMask, ReceiptMask, SidecarMask, TransactionMask,
},
tables, RawKey, RawTable, Receipts, Sidecars, TableViewer, Transactions,
};
use reth_db_api::{
database::Database,
table::{Decompress, DupSort, Table},
};
use reth_primitives::{BlockHash, Header, StaticFileSegment};
use reth_primitives::{BlobSidecars, BlockHash, Header, StaticFileSegment};
use reth_provider::StaticFileProviderFactory;
use tracing::error;

Expand Down Expand Up @@ -71,6 +73,10 @@ impl Command {
table_key::<tables::Receipts>(&key)?,
<ReceiptMask<<Receipts as Table>::Value>>::MASK,
),
StaticFileSegment::Sidecars => (
table_key::<tables::Sidecars>(&key)?,
<SidecarMask<BlobSidecars, BlockHash>>::MASK,
),
};

let content = tool.provider_factory.static_file_provider().find_static_file(
Expand Down Expand Up @@ -112,6 +118,12 @@ impl Command {
)?;
println!("{}", serde_json::to_string_pretty(&receipt)?);
}
StaticFileSegment::Sidecars => {
let sc = <<Sidecars as Table>::Value>::decompress(
content[0].as_slice(),
)?;
println!("{}", serde_json::to_string_pretty(&sc)?);
}
}
}
}
Expand Down
1 change: 0 additions & 1 deletion bin/reth/src/commands/stage/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl Command {
tx.clear::<tables::BlockOmmers>()?;
tx.clear::<tables::BlockWithdrawals>()?;
tx.clear::<tables::BlockRequests>()?;
tx.clear::<tables::BlockSidecars>()?;
tx.put::<tables::StageCheckpoints>(
StageId::Bodies.to_string(),
Default::default(),
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/src/engine/hooks/static_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl<DB: Database + 'static> StaticFileHook<DB> {
headers: Some(finalized_block_number),
receipts: Some(finalized_block_number),
transactions: Some(finalized_block_number),
sidecars: Some(finalized_block_number),
})?;

// Check if the moving data to static files has been requested.
Expand Down
2 changes: 1 addition & 1 deletion crates/primitives/src/alloy_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl TryFrom<alloy_rpc_types::Block> for Block {
withdrawals: block.withdrawals.map(Into::into),
// todo(onbjerg): we don't know if this is added to rpc yet, so for now we leave it as
// empty.
sidecars: None, // TODO(roshan)
sidecars: None,
requests: None,
})
}
Expand Down
3 changes: 1 addition & 2 deletions crates/rpc/rpc/src/eth/api/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ where
block_id: BlockId,
) -> EthResult<Option<Vec<BlockSidecar>>> {
if block_id.is_pending() {
// todo
return Ok(None);
}

Expand All @@ -231,7 +230,7 @@ where
None
};

// If no block and receipts found, return None
// If no block and sidecars found, return None
let Some(sidecars) = sidecars else {
return Ok(None);
};
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc/src/eth/api/pending_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl PendingBlockEnv {
requests_root,
};

// TODO: add sidecars
// sidecars should be queried by `eth_getBlobSidecars`
let sidecars = None;

// seal the block
Expand Down
4 changes: 2 additions & 2 deletions crates/rpc/rpc/src/eth/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl EthStateCache {
provider,
full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
sidecars_cache: SidecarsLruCache::new(max_receipts, "sidecars"),
sidecars_cache: SidecarsLruCache::new(max_blocks, "sidecars"),
evm_env_cache: EnvLruCache::new(max_envs, "evm_env"),
action_tx: to_service.clone(),
action_rx: UnboundedReceiverStream::new(rx),
Expand Down Expand Up @@ -528,7 +528,7 @@ where
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
let res = provider.sidecars_by_block(block_hash.into());
let res = provider.sidecars(&block_hash);

let _ = action_tx
.send(CacheAction::SidecarsResult { block_hash, res });
Expand Down
14 changes: 8 additions & 6 deletions crates/stages/api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,19 @@ where
// Copies data from database to static files
let lowest_static_file_height = {
let provider = self.provider_factory.provider()?;
let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies]
.into_iter()
.map(|stage| {
provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number))
})
.collect::<Result<Vec<_>, _>>()?;
let stages_checkpoints =
[StageId::Headers, StageId::Execution, StageId::Bodies, StageId::Bodies]
.into_iter()
.map(|stage| {
provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number))
})
.collect::<Result<Vec<_>, _>>()?;

let targets = static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: stages_checkpoints[0],
receipts: stages_checkpoints[1],
transactions: stages_checkpoints[2],
sidecars: stages_checkpoints[3],
})?;
static_file_producer.run(targets)?;
stages_checkpoints.into_iter().min().expect("exists")
Expand Down
100 changes: 75 additions & 25 deletions crates/stages/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,13 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
let mut tx_block_cursor = tx.cursor_write::<tables::TransactionBlocks>()?;
let mut ommers_cursor = tx.cursor_write::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = tx.cursor_write::<tables::BlockWithdrawals>()?;
let mut sidecars_cursor = tx.cursor_write::<tables::BlockSidecars>()?;
let mut requests_cursor = tx.cursor_write::<tables::BlockRequests>()?;

// Get id for the next tx_num of zero if there are no transactions.
let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();

let static_file_provider = provider.static_file_provider();
let mut static_file_producer =
let mut static_file_producer_tx =
static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;

// Make sure Transactions static file is at the same height. If it's further, this
Expand All @@ -145,11 +144,11 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// stage run. So, our only solution is to unwind the static files and proceed from the
// database expected height.
Ordering::Greater => {
static_file_producer
static_file_producer_tx
.prune_transactions(next_static_file_tx_num - next_tx_num, from_block - 1)?;
// Since this is a database <-> static file inconsistency, we commit the change
// straight away.
static_file_producer.commit()?;
static_file_producer_tx.commit()?;
}
// If static files are behind, then there was some corruption or loss of files. This
// error will trigger an unwind, that will bring the database to the same height as the
Expand All @@ -159,6 +158,38 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
next_static_file_tx_num.saturating_sub(1),
static_file_provider,
provider,
StaticFileSegment::Transactions,
)?)
}
Ordering::Equal => {}
}

let mut static_file_producer_sc =
static_file_provider.get_writer(from_block, StaticFileSegment::Sidecars)?;

// Make sure Sidecars static file is at the same height. If it's further, this
// input execution was interrupted previously and we need to unwind the static file.
let next_static_file_block_num = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Sidecars)
.map(|id| id + 1)
.unwrap_or_default();

match next_static_file_block_num.cmp(&from_block) {
Ordering::Greater => {
static_file_producer_sc.prune_sidecars(next_static_file_block_num - from_block)?;
// Since this is a database <-> static file inconsistency, we commit the change
// straight away.
static_file_producer_sc.commit()?
}
// If static files are behind, then there was some corruption or loss of files. This
// error will trigger an unwind, that will bring the database to the same height as the
// static files.
Ordering::Less => {
return Err(missing_static_data_error(
next_static_file_tx_num.saturating_sub(1),
static_file_provider,
provider,
StaticFileSegment::Sidecars,
)?)
}
Ordering::Equal => {}
Expand All @@ -183,7 +214,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {

// Increment block on static file header.
if block_number > 0 {
let appended_block_number = static_file_producer
let appended_block_number = static_file_producer_tx
.increment_block(StaticFileSegment::Transactions, block_number)?;

if appended_block_number != block_number {
Expand All @@ -206,7 +237,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {

// Write transactions
for transaction in block.body {
let appended_tx_number = static_file_producer
let appended_tx_number = static_file_producer_tx
.append_transaction(next_tx_num, transaction.into())?;

if appended_tx_number != next_tx_num {
Expand All @@ -223,6 +254,14 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
next_tx_num += 1;
}

// Write sidecars
let sidecars = block.sidecars.unwrap_or_default();
static_file_producer_sc.append_sidecars(
sidecars,
block_number,
block.header.hash(),
)?;

// Write ommers if any
if !block.ommers.is_empty() {
ommers_cursor
Expand All @@ -237,13 +276,6 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
}
}

// Write sidecars if any
if let Some(sidecars) = block.sidecars {
if !sidecars.is_empty() {
sidecars_cursor.append(block_number, sidecars)?;
}
}

// Write requests if any
if let Some(requests) = block.requests {
if !requests.0.is_empty() {
Expand Down Expand Up @@ -285,7 +317,6 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
let mut body_cursor = tx.cursor_write::<tables::BlockBodyIndices>()?;
let mut ommers_cursor = tx.cursor_write::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = tx.cursor_write::<tables::BlockWithdrawals>()?;
let mut sidecars_cursor = tx.cursor_write::<tables::BlockSidecars>()?;
let mut requests_cursor = tx.cursor_write::<tables::BlockRequests>()?;
// Cursors to unwind transitions
let mut tx_block_cursor = tx.cursor_write::<tables::TransactionBlocks>()?;
Expand All @@ -306,11 +337,6 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
withdrawals_cursor.delete_current()?;
}

// Delete the sidecars entry if any
if sidecars_cursor.seek_exact(number)?.is_some() {
sidecars_cursor.delete_current()?;
}

// Delete the requests entry if any
if requests_cursor.seek_exact(number)?.is_some() {
requests_cursor.delete_current()?;
Expand All @@ -327,7 +353,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
rev_walker.delete_current()?;
}

let mut static_file_producer =
let mut static_file_producer_tx =
static_file_provider.latest_writer(StaticFileSegment::Transactions)?;

// Unwind from static files. Get the current last expected transaction from DB, and match it
Expand All @@ -345,13 +371,39 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
static_file_tx_num,
static_file_provider,
provider,
StaticFileSegment::Transactions,
)?)
}

// Unwinds static file
static_file_producer
static_file_producer_tx
.prune_transactions(static_file_tx_num.saturating_sub(db_tx_num), input.unwind_to)?;

let mut static_file_producer_sc =
static_file_provider.latest_writer(StaticFileSegment::Sidecars)?;

// Unwind from static files. Get the current last expected block from DB, and match it
// on static file
let db_block_num = body_cursor.last()?.map(|(block_num, _)| block_num).unwrap_or_default();
let static_file_block_num: u64 = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Sidecars)
.unwrap_or_default();

// If there are more blocks on database, then we are missing static file data and we
// need to unwind further.
if db_block_num > static_file_block_num {
return Err(missing_static_data_error(
static_file_block_num,
static_file_provider,
provider,
StaticFileSegment::Sidecars,
)?)
}

// Unwinds static file
static_file_producer_sc
.prune_sidecars(static_file_block_num.saturating_sub(db_block_num))?;

Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(input.unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
Expand All @@ -363,6 +415,7 @@ fn missing_static_data_error<DB: Database>(
last_tx_num: TxNumber,
static_file_provider: &StaticFileProvider,
provider: &DatabaseProviderRW<DB>,
segment: StaticFileSegment,
) -> Result<StageError, ProviderError> {
let mut last_block = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Transactions)
Expand All @@ -384,10 +437,7 @@ fn missing_static_data_error<DB: Database>(

let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());

Ok(StageError::MissingStaticFileData {
block: missing_block,
segment: StaticFileSegment::Transactions,
})
Ok(StageError::MissingStaticFileData { block: missing_block, segment })
}

// TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know
Expand Down
Loading

0 comments on commit ee9614f

Please sign in to comment.